It is possible that one of the multifd channels fails to be created at
multifd_new_send_channel_async() while the rest of the channel
creation tasks are still in flight.

This could lead to multifd_save_cleanup() executing the
qemu_thread_join() loop too early and not waiting for the threads
which haven't been created yet, leading to the freeing of resources
that the newly created threads will try to access and crash.

Add a synchronization point after which there will be no attempts at
thread creation and therefore calling multifd_save_cleanup() past that
point will ensure it properly waits for the threads.

A note about performance: Prior to this patch, if a channel took too
long to be established, other channels could finish connecting first
and already start taking load. Now we're bounded by the
slowest-connecting channel.

Signed-off-by: Fabiano Rosas <faro...@suse.de>
---
 migration/multifd.c | 67 +++++++++++++++++++++++++--------------------
 1 file changed, 37 insertions(+), 30 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 1851206352..888ac8b05d 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -360,6 +360,11 @@ struct {
     MultiFDPages_t *pages;
     /* global number of generated multifd packets */
     uint64_t packet_num;
+    /*
+     * Synchronization point past which no more channels will be
+     * created.
+     */
+    QemuSemaphore channels_created;
     /* send channels ready */
     QemuSemaphore channels_ready;
     /*
@@ -561,6 +566,7 @@ void multifd_save_cleanup(void)
             error_free(local_err);
         }
     }
+    qemu_sem_destroy(&multifd_send_state->channels_created);
     qemu_sem_destroy(&multifd_send_state->channels_ready);
     g_free(multifd_send_state->params);
     multifd_send_state->params = NULL;
@@ -787,13 +793,6 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
     trace_multifd_tls_outgoing_handshake_error(ioc, error_get_pretty(err));
 
     migrate_set_error(migrate_get_current(), err);
-    /*
-     * Error happen, mark multifd_send_thread status as 'quit' although it
-     * is not created, and then tell who pay attention to me.
-     */
-    p->quit = true;
-    qemu_sem_post(&multifd_send_state->channels_ready);
-    qemu_sem_post(&p->sem_sync);
     error_free(err);
 }
 
@@ -862,39 +861,37 @@ static bool multifd_channel_connect(MultiFDSendParams *p,
     return true;
 }
 
-static void multifd_new_send_channel_cleanup(MultiFDSendParams *p,
-                                             QIOChannel *ioc, Error *err)
-{
-     migrate_set_error(migrate_get_current(), err);
-     /* Error happen, we need to tell who pay attention to me */
-     qemu_sem_post(&multifd_send_state->channels_ready);
-     qemu_sem_post(&p->sem_sync);
-     /*
-      * Although multifd_send_thread is not created, but main migration
-      * thread need to judge whether it is running, so we need to mark
-      * its status.
-      */
-     p->quit = true;
-     object_unref(OBJECT(ioc));
-     error_free(err);
-}
-
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
     MultiFDSendParams *p = opaque;
     QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
     Error *local_err = NULL;
+    bool ret;
 
     trace_multifd_new_send_channel_async(p->id);
-    if (!qio_task_propagate_error(task, &local_err)) {
-        qio_channel_set_delay(ioc, false);
-        if (multifd_channel_connect(p, ioc, &local_err)) {
-            return;
-        }
+
+    if (qio_task_propagate_error(task, &local_err)) {
+        ret = false;
+        goto out;
+    }
+
+    qio_channel_set_delay(ioc, false);
+    ret = multifd_channel_connect(p, ioc, &local_err);
+
+out:
+    /*
+     * Here we're not interested whether creation succeeded, only that
+     * it happened at all.
+     */
+    qemu_sem_post(&multifd_send_state->channels_created);
+    if (ret) {
+        return;
     }
 
     trace_multifd_new_send_channel_async_error(p->id, local_err);
-    multifd_new_send_channel_cleanup(p, ioc, local_err);
+    migrate_set_error(migrate_get_current(), local_err);
+    object_unref(OBJECT(ioc));
+    error_free(local_err);
 }
 
 static void multifd_new_send_channel_create(gpointer opaque)
@@ -918,6 +915,7 @@ bool multifd_save_setup(void)
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->pages = multifd_pages_init(page_count);
+    qemu_sem_init(&multifd_send_state->channels_created, 0);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
     qatomic_set(&multifd_send_state->exiting, 0);
     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
@@ -953,6 +951,15 @@ bool multifd_save_setup(void)
         multifd_new_send_channel_create(p);
     }
 
+    /*
+     * Wait until channel creation has started for all channels. The
+     * creation can still fail, but no more channels will be created
+     * past this point.
+     */
+    for (i = 0; i < thread_count; i++) {
+        qemu_sem_wait(&multifd_send_state->channels_created);
+    }
+
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-- 
2.35.3


Reply via email to