This patch creates a separate thread for the guest migration on the source side.
All exits (on completion/error) from the migration thread are handled by a 
bottom
handler, which is called from the iothread.

Signed-off-by: Umesh Deshpande <udesh...@redhat.com>
---
 buffered_file.c     |   75 +++++++++++++++++--------------
 migration.c         |  122 +++++++++++++++++++++++++++++---------------------
 migration.h         |    9 ++++
 qemu-thread-posix.c |   10 ++++
 qemu-thread.h       |    1 +
 savevm.c            |    5 --
 6 files changed, 132 insertions(+), 90 deletions(-)

diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..0d94baa 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,6 +16,8 @@
 #include "qemu-timer.h"
 #include "qemu-char.h"
 #include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"
 
 //#define DEBUG_BUFFERED_FILE
 
@@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
     void *opaque;
     QEMUFile *file;
     int has_error;
+    int closed;
     int freeze_output;
     size_t bytes_xfer;
     size_t xfer_limit;
     uint8_t *buffer;
     size_t buffer_size;
     size_t buffer_capacity;
-    QEMUTimer *timer;
+    QemuThread thread;
 } QEMUFileBuffered;
 
 #ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t 
*buf, int64_t pos, in
         offset = size;
     }
 
-    if (pos == 0 && size == 0) {
-        DPRINTF("file is ready\n");
-        if (s->bytes_xfer <= s->xfer_limit) {
-            DPRINTF("notifying client\n");
-            s->put_ready(s->opaque);
-        }
-    }
-
     return offset;
 }
 
@@ -173,22 +168,25 @@ static int buffered_close(void *opaque)
 
     DPRINTF("closing\n");
 
-    while (!s->has_error && s->buffer_size) {
-        buffered_flush(s);
-        if (s->freeze_output)
-            s->wait_for_unfreeze(s);
-    }
+    s->closed = 1;
 
-    ret = s->close(s->opaque);
+    qemu_mutex_unlock_migthread();
+    qemu_mutex_unlock_iothread();
+
+    qemu_thread_join(&s->thread);
+    /* Waits for the completion of the migration thread */
 
-    qemu_del_timer(s->timer);
-    qemu_free_timer(s->timer);
+    qemu_mutex_lock_iothread();
+    qemu_mutex_lock_migthread();
+
+    ret = s->close(s->opaque);
     qemu_free(s->buffer);
     qemu_free(s);
 
     return ret;
 }
 
+
 static int buffered_rate_limit(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
@@ -228,26 +226,36 @@ static int64_t buffered_get_rate_limit(void *opaque)
     return s->xfer_limit;
 }
 
-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
 {
     QEMUFileBuffered *s = opaque;
+    int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+    struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
 
-    if (s->has_error) {
-        buffered_close(s);
-        return;
-    }
-
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    while (!s->has_error && (!s->closed || s->buffer_size)) {
+        if (s->freeze_output) {
+            s->wait_for_unfreeze(s);
+            s->freeze_output = 0;
+            continue;
+        }
 
-    if (s->freeze_output)
-        return;
+        current_time = qemu_get_clock_ms(rt_clock);
+        if (!s->closed && (expire_time > current_time)) {
+            tv.tv_usec = 1000 * (expire_time - current_time);
+            select(0, NULL, NULL, NULL, &tv);
+            continue;
+        }
 
-    s->bytes_xfer = 0;
+        s->bytes_xfer = 0;
+        buffered_flush(s);
 
-    buffered_flush(s);
+        expire_time = qemu_get_clock_ms(rt_clock) + 100;
+        if (!s->closed) {
+            s->put_ready(s->opaque);
+        }
+    }
 
-    /* Add some checks around this */
-    s->put_ready(s->opaque);
+    return NULL;
 }
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
@@ -267,15 +275,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->put_ready = put_ready;
     s->wait_for_unfreeze = wait_for_unfreeze;
     s->close = close;
+    s->closed = 0;
 
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-                            buffered_get_rate_limit);
-
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+                             buffered_get_rate_limit);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_thread_create(&s->thread, migrate_vm, s);
 
     return s->file;
 }
diff --git a/migration.c b/migration.c
index af3a1f2..17d866a 100644
--- a/migration.c
+++ b/migration.c
@@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict 
*qdict, QObject **ret_data)
     }
     max_throttle = d;
 
+    qemu_mutex_lock_migthread();
     s = migrate_to_fms(current_migration);
     if (s && s->file) {
         qemu_file_set_rate_limit(s->file, max_throttle);
     }
+    qemu_mutex_unlock_migthread();
 
     return 0;
 }
@@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -307,14 +307,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
     return ret;
 }
 
-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -327,76 +319,91 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
     if (ret == -1)
         ret = -(s->get_error(s));
 
-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret < 0) {
-        if (s->mon) {
-            monitor_resume(s->mon);
+    return ret;
+}
+
+static void migrate_fd_terminate(void *opaque)
+{
+    FdMigrationState *s = opaque;
+
+    qemu_mutex_lock_migthread();
+
+    if (s->code == COMPLETE) {
+        if (migrate_fd_cleanup(s) < 0) {
+            if (s->old_vm_running) {
+                vm_start();
+            }
+            s->state = MIG_STATE_ERROR;
+        } else {
+            s->state = MIG_STATE_COMPLETED;
         }
-        s->state = MIG_STATE_ERROR;
         notifier_list_notify(&migration_state_notifiers);
+    } else if (s->code == RESUME) {
+        if (s->old_vm_running) {
+            vm_start();
+        }
+        migrate_fd_error(s);
+    } else if (s->code == ERROR) {
+        migrate_fd_error(s);
     }
 
-    return ret;
+    qemu_mutex_unlock_migthread();
 }
 
 void migrate_fd_connect(FdMigrationState *s)
 {
-    int ret;
-
+    s->code = START;
+    s->bh = qemu_bh_new(migrate_fd_terminate, s);
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
                                       migrate_fd_put_ready,
                                       migrate_fd_wait_for_unfreeze,
                                       migrate_fd_close);
-
-    DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-    
-    migrate_fd_put_ready(s);
 }
 
 void migrate_fd_put_ready(void *opaque)
 {
     FdMigrationState *s = opaque;
+    int ret;
 
-    if (s->state != MIG_STATE_ACTIVE) {
+    qemu_mutex_lock_iothread();
+    if (s->code != ACTIVE && s->code != START) {
         DPRINTF("put_ready returning because of non-active state\n");
+        qemu_mutex_unlock_iothread();
         return;
     }
 
+    if (!s->code) {
+        DPRINTF("beginning savevm\n");
+        ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+                s->mig_state.shared);
+        if (ret < 0) {
+            DPRINTF("failed, %d\n", ret);
+            s->code = ERROR;
+            qemu_bh_schedule(s->bh);
+            qemu_mutex_unlock_iothread();
+            return;
+        }
+        s->code = ACTIVE;
+    }
+
     DPRINTF("iterate\n");
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
-        int state;
-        int old_vm_running = vm_running;
+        s->old_vm_running = vm_running;
 
         DPRINTF("done iterating\n");
         vm_stop(VMSTOP_MIGRATE);
 
         if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
-            if (old_vm_running) {
-                vm_start();
-            }
-            state = MIG_STATE_ERROR;
+            s->code = RESUME;
         } else {
-            state = MIG_STATE_COMPLETED;
-        }
-        if (migrate_fd_cleanup(s) < 0) {
-            if (old_vm_running) {
-                vm_start();
-            }
-            state = MIG_STATE_ERROR;
+            s->code = COMPLETE;
         }
-        s->state = state;
-        notifier_list_notify(&migration_state_notifiers);
+
+        qemu_bh_schedule(s->bh);
     }
+    qemu_mutex_unlock_iothread();
 }
 
 int migrate_fd_get_status(MigrationState *mig_state)
@@ -416,9 +423,11 @@ void migrate_fd_cancel(MigrationState *mig_state)
 
     s->state = MIG_STATE_CANCELLED;
     notifier_list_notify(&migration_state_notifiers);
-    qemu_savevm_state_cancel(s->mon, s->file);
 
+    qemu_mutex_lock_migthread();
+    qemu_savevm_state_cancel(s->mon, s->file);
     migrate_fd_cleanup(s);
+    qemu_mutex_unlock_migthread();
 }
 
 void migrate_fd_release(MigrationState *mig_state)
@@ -426,22 +435,34 @@ void migrate_fd_release(MigrationState *mig_state)
     FdMigrationState *s = migrate_to_fms(mig_state);
 
     DPRINTF("releasing state\n");
-   
+
     if (s->state == MIG_STATE_ACTIVE) {
         s->state = MIG_STATE_CANCELLED;
         notifier_list_notify(&migration_state_notifiers);
+        qemu_mutex_lock_migthread();
         migrate_fd_cleanup(s);
+        qemu_mutex_unlock_migthread();
+    }
+
+    if (s->bh) {
+        qemu_bh_delete(s->bh);
     }
+
     qemu_free(s);
 }
 
 void migrate_fd_wait_for_unfreeze(void *opaque)
 {
     FdMigrationState *s = opaque;
-    int ret;
+    int ret, state;
 
     DPRINTF("wait for unfreeze\n");
-    if (s->state != MIG_STATE_ACTIVE)
+
+    qemu_mutex_lock_iothread();
+    state = s->state;
+    qemu_mutex_unlock_iothread();
+
+    if (state != MIG_STATE_ACTIVE)
         return;
 
     do {
@@ -458,7 +479,6 @@ int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..abbf9e4 100644
--- a/migration.h
+++ b/migration.h
@@ -23,6 +23,12 @@
 #define MIG_STATE_CANCELLED    1
 #define MIG_STATE_ACTIVE       2
 
+#define START       0
+#define ACTIVE      1
+#define COMPLETE    2
+#define ERROR       3
+#define RESUME      4
+
 typedef struct MigrationState MigrationState;
 
 struct MigrationState
@@ -45,10 +51,13 @@ struct FdMigrationState
     int fd;
     Monitor *mon;
     int state;
+    int code;
+    int old_vm_running;
     int (*get_error)(struct FdMigrationState*);
     int (*close)(struct FdMigrationState*);
     int (*write)(struct FdMigrationState*, const void *, size_t);
     void *opaque;
+    QEMUBH *bh;
 };
 
 void process_incoming_migration(QEMUFile *f);
diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c
index 2bd02ef..6a275be 100644
--- a/qemu-thread-posix.c
+++ b/qemu-thread-posix.c
@@ -115,6 +115,16 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex)
         error_exit(err, __func__);
 }
 
+void qemu_thread_join(QemuThread *thread)
+{
+    int err;
+
+    err = pthread_join(thread->thread, NULL);
+    if (err) {
+        error_exit(err, __func__);
+    }
+}
+
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
                        void *arg)
diff --git a/qemu-thread.h b/qemu-thread.h
index 0a73d50..d5b99d5 100644
--- a/qemu-thread.h
+++ b/qemu-thread.h
@@ -30,6 +30,7 @@ void qemu_cond_destroy(QemuCond *cond);
 void qemu_cond_signal(QemuCond *cond);
 void qemu_cond_broadcast(QemuCond *cond);
 void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex);
+void qemu_thread_join(QemuThread *thread);
 
 void qemu_thread_create(QemuThread *thread,
                        void *(*start_routine)(void*),
diff --git a/savevm.c b/savevm.c
index 8139bc7..f54f555 100644
--- a/savevm.c
+++ b/savevm.c
@@ -481,11 +481,6 @@ int qemu_fclose(QEMUFile *f)
     return ret;
 }
 
-void qemu_file_put_notify(QEMUFile *f)
-{
-    f->put_buffer(f->opaque, NULL, 0, 0);
-}
-
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size)
 {
     int l;
-- 
1.7.4.1


Reply via email to