We make the locking and the transfer of information specific, even if we
are still receiving things through the main thread.

Signed-off-by: Juan Quintela <quint...@redhat.com>

--

We split when we create the main channel and where we start the main
migration thread, so we wait for the creation of the other threads.

Use multifd_clear_group().
---
 migration/migration.c |  7 ++++---
 migration/migration.h |  1 +
 migration/ram.c       | 55 +++++++++++++++++++++++++++++++++++++++++++++++----
 migration/socket.c    |  2 +-
 4 files changed, 57 insertions(+), 8 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index 8e9505a..b78dffc 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -381,7 +381,7 @@ static void migration_incoming_setup(QEMUFile *f)
     qemu_file_set_blocking(f, false);
 }
 
-static void migration_incoming_process(void)
+void migration_incoming_process(void)
 {
     Coroutine *co = qemu_coroutine_create(process_incoming_migration_co, NULL);
     qemu_coroutine_enter(co);
@@ -400,9 +400,10 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
     if (!mis->from_src_file) {
         QEMUFile *f = qemu_fopen_channel_input(ioc);
         mis->from_src_file = f;
-        migration_fd_process_incoming(f);
+        migration_incoming_setup(f);
+        return;
     }
-    /* We still only have a single channel.  Nothing to do here yet */
+    multifd_new_channel(ioc);
 }
 
 /**
diff --git a/migration/migration.h b/migration/migration.h
index ddf8c89..1442b33 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -154,6 +154,7 @@ void migrate_set_state(int *state, int old_state, int 
new_state);
 
 void migration_fd_process_incoming(QEMUFile *f);
 void migration_ioc_process_incoming(QIOChannel *ioc);
+void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 
diff --git a/migration/ram.c b/migration/ram.c
index 169cfc9..eb0015e 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -572,12 +572,17 @@ static uint16_t multifd_send_page(uint8_t *address, bool 
last_page)
 }
 
 struct MultiFDRecvParams {
+    /* not changed */
     uint8_t id;
     QemuThread thread;
     QIOChannel *c;
+    QemuSemaphore ready;
     QemuSemaphore sem;
     QemuMutex mutex;
+    /* proteced by param mutex */
     bool quit;
+    multifd_pages_t pages;
+    bool done;
 };
 typedef struct MultiFDRecvParams MultiFDRecvParams;
 
@@ -587,6 +592,7 @@ struct {
     int count;
     /* Should we finish */
     bool quit;
+    multifd_pages_t pages;
 } *multifd_recv_state;
 
 static void terminate_multifd_recv_threads(void)
@@ -602,6 +608,7 @@ static void terminate_multifd_recv_threads(void)
         p->quit = true;
         qemu_sem_post(&p->sem);
         qemu_mutex_unlock(&p->mutex);
+        multifd_clear_group(&p->pages);
     }
 }
 
@@ -625,6 +632,7 @@ void multifd_load_cleanup(void)
     }
     g_free(multifd_recv_state->params);
     multifd_recv_state->params = NULL;
+    multifd_clear_group(&multifd_recv_state->pages);
     g_free(multifd_recv_state);
     multifd_recv_state = NULL;
 }
@@ -633,12 +641,20 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
 
+    qemu_sem_post(&p->ready);
     while (true) {
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
+        if (p->pages.num) {
+            p->pages.num = 0;
+            p->done = true;
+            qemu_mutex_unlock(&p->mutex);
+            qemu_sem_post(&p->ready);
+            continue;
+        }
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_wait(&p->sem);
     }
@@ -682,8 +698,11 @@ void multifd_new_channel(QIOChannel *ioc)
     }
     qemu_mutex_init(&p->mutex);
     qemu_sem_init(&p->sem, 0);
+    qemu_sem_init(&p->ready, 0);
     p->quit = false;
     p->id = id;
+    p->done = false;
+    multifd_init_group(&p->pages);
     p->c = ioc;
     atomic_set(&multifd_recv_state->params[id], p);
     multifd_recv_state->count++;
@@ -703,6 +722,7 @@ int multifd_load_setup(void)
     multifd_recv_state->params = g_new0(MultiFDRecvParams *, thread_count);
     multifd_recv_state->count = 0;
     multifd_recv_state->quit = false;
+    multifd_init_group(&multifd_recv_state->pages);
     return 0;
 }
 
@@ -711,6 +731,36 @@ int multifd_created_threads(void)
     return multifd_recv_state->count;
 }
 
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+    int thread_count;
+    MultiFDRecvParams *p;
+    multifd_pages_t *pages = &multifd_recv_state->pages;
+
+    pages->iov[pages->num].iov_base = address;
+    pages->iov[pages->num].iov_len = TARGET_PAGE_SIZE;
+    pages->num++;
+
+    if (fd_num == MULTIFD_CONTINUE) {
+        return;
+    }
+
+    thread_count = migrate_multifd_threads();
+    assert(fd_num < thread_count);
+    p = multifd_recv_state->params[fd_num];
+
+    qemu_sem_wait(&p->ready);
+
+    qemu_mutex_lock(&p->mutex);
+    p->done = false;
+    iov_copy(p->pages.iov, pages->num, pages->iov, pages->num, 0,
+             iov_size(pages->iov, pages->num));
+    p->pages.num = pages->num;
+    pages->num = 0;
+    qemu_mutex_unlock(&p->mutex);
+    qemu_sem_post(&p->sem);
+}
+
 /**
  * save_page_header: write page header to wire
  *
@@ -3022,10 +3072,7 @@ static int ram_load(QEMUFile *f, void *opaque, int 
version_id)
 
         case RAM_SAVE_FLAG_MULTIFD_PAGE:
             fd_num = qemu_get_be16(f);
-            if (fd_num != 0) {
-                /* this is yet an unused variable, changed later */
-                fd_num = fd_num;
-            }
+            multifd_recv_page(host, fd_num);
             qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
             break;
 
diff --git a/migration/socket.c b/migration/socket.c
index 5dd6f42..3af9f7c 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -183,12 +183,12 @@ static gboolean 
socket_accept_incoming_migration(QIOChannel *ioc,
 
     qio_channel_set_name(QIO_CHANNEL(sioc), "migration-socket-incoming");
     migration_channel_process_incoming(QIO_CHANNEL(sioc));
-    object_unref(OBJECT(sioc));
 
 out:
     if (migration_has_all_channels()) {
         /* Close listening socket as its no longer needed */
         qio_channel_close(ioc, NULL);
+        migration_incoming_process();
         return G_SOURCE_REMOVE;
     } else {
         return G_SOURCE_CONTINUE;
-- 
2.9.4


Reply via email to