From: Umesh Deshpande <udesh...@redhat.com>

This patch creates a separate thread for the guest migration on the source side.
The migration routine is called from the migration clock.

Signed-off-by: Umesh Deshpande <udesh...@redhat.com>
---
 arch_init.c      |   14 ++++++++++--
 buffered_file.c  |   12 ++++++----
 exec.c           |    4 +++
 migration-tcp.c  |   18 ++++++++--------
 migration-unix.c |    7 ++---
 migration.c      |   59 ++++++++++++++++++++++++++++++-----------------------
 migration.h      |    4 +-
 7 files changed, 69 insertions(+), 49 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 484b39d..cd545bc 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -110,7 +110,7 @@ static int is_dup_page(uint8_t *page, uint8_t ch)
 static RAMBlock *last_block;
 static ram_addr_t last_offset;
 
-static int ram_save_block(QEMUFile *f)
+static int ram_save_block(QEMUFile *f, int stage)
 {
     RAMBlock *block = last_block;
     ram_addr_t offset = last_offset;
@@ -131,6 +131,10 @@ static int ram_save_block(QEMUFile *f)
                                             current_addr + TARGET_PAGE_SIZE,
                                             MIGRATION_DIRTY_FLAG);
 
+            if (stage != 3) {
+                qemu_mutex_unlock_iothread();
+            }
+
             p = block->host + offset;
 
             if (is_dup_page(p, *p)) {
@@ -153,6 +157,10 @@ static int ram_save_block(QEMUFile *f)
                 bytes_sent = TARGET_PAGE_SIZE;
             }
 
+            if (stage != 3) {
+                qemu_mutex_lock_iothread();
+            }
+
             break;
         }
 
@@ -301,7 +309,7 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, 
void *opaque)
     while (!qemu_file_rate_limit(f)) {
         int bytes_sent;
 
-        bytes_sent = ram_save_block(f);
+        bytes_sent = ram_save_block(f, stage);
         bytes_transferred += bytes_sent;
         if (bytes_sent == 0) { /* no more blocks */
             break;
@@ -322,7 +330,7 @@ int ram_save_live(Monitor *mon, QEMUFile *f, int stage, 
void *opaque)
         int bytes_sent;
 
         /* flush all remaining blocks regardless of rate limiting */
-        while ((bytes_sent = ram_save_block(f)) != 0) {
+        while ((bytes_sent = ram_save_block(f, stage)) != 0) {
             bytes_transferred += bytes_sent;
         }
         cpu_physical_memory_set_dirty_tracking(0);
diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..73e9666 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -237,7 +237,7 @@ static void buffered_rate_tick(void *opaque)
         return;
     }
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_mod_timer(s->timer, qemu_get_clock_ms(migration_clock) + 100);
 
     if (s->freeze_output)
         return;
@@ -246,8 +246,10 @@ static void buffered_rate_tick(void *opaque)
 
     buffered_flush(s);
 
-    /* Add some checks around this */
     s->put_ready(s->opaque);
+    qemu_mutex_unlock_iothread();
+    usleep(qemu_timer_difference(s->timer, migration_clock) * 1000);
+    qemu_mutex_lock_iothread();
 }
 
 QEMUFile *qemu_fopen_ops_buffered(void *opaque,
@@ -271,11 +273,11 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
     s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
                              buffered_close, buffered_rate_limit,
                              buffered_set_rate_limit,
-                            buffered_get_rate_limit);
+                             buffered_get_rate_limit);
 
-    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+    s->timer = qemu_new_timer_ms(migration_clock, buffered_rate_tick, s);
 
-    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+    qemu_mod_timer(s->timer, qemu_get_clock_ms(migration_clock) + 100);
 
     return s->file;
 }
diff --git a/exec.c b/exec.c
index 0e2ce57..97f60c6 100644
--- a/exec.c
+++ b/exec.c
@@ -2106,6 +2106,10 @@ void cpu_physical_memory_reset_dirty(ram_addr_t start, 
ram_addr_t end,
         abort();
     }
 
+    if (kvm_enabled()) {
+        return;
+    }
+
     for(env = first_cpu; env != NULL; env = env->next_cpu) {
         int mmu_idx;
         for (mmu_idx = 0; mmu_idx < NB_MMU_MODES; mmu_idx++) {
diff --git a/migration-tcp.c b/migration-tcp.c
index d3d80c9..5840668 100644
--- a/migration-tcp.c
+++ b/migration-tcp.c
@@ -65,11 +65,9 @@ static void tcp_wait_for_connect(void *opaque)
         return;
     }
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
-    if (val == 0)
+    if (val == 0) {
         migrate_fd_connect(s);
-    else {
+    } else {
         DPRINTF("error connecting %d\n", val);
         migrate_fd_error(s);
     }
@@ -79,8 +77,8 @@ MigrationState *tcp_start_outgoing_migration(Monitor *mon,
                                              const char *host_port,
                                              int64_t bandwidth_limit,
                                              int detach,
-                                            int blk,
-                                            int inc)
+                                             int blk,
+                                             int inc)
 {
     struct sockaddr_in addr;
     FdMigrationState *s;
@@ -121,15 +119,17 @@ MigrationState *tcp_start_outgoing_migration(Monitor *mon,
         if (ret == -1)
             ret = -(s->get_error(s));
 
-        if (ret == -EINPROGRESS || ret == -EWOULDBLOCK)
-            qemu_set_fd_handler2(s->fd, NULL, NULL, tcp_wait_for_connect, s);
     } while (ret == -EINTR);
 
     if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) {
         DPRINTF("connect failed\n");
         migrate_fd_error(s);
-    } else if (ret >= 0)
+    } else if (ret >= 0) {
         migrate_fd_connect(s);
+    } else {
+        migrate_fd_wait_for_unfreeze(s);
+        tcp_wait_for_connect(s);
+    }
 
     return &s->mig_state;
 }
diff --git a/migration-unix.c b/migration-unix.c
index c8625c7..e0a8f6f 100644
--- a/migration-unix.c
+++ b/migration-unix.c
@@ -64,8 +64,6 @@ static void unix_wait_for_connect(void *opaque)
         return;
     }
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (val == 0)
         migrate_fd_connect(s);
     else {
@@ -116,13 +114,14 @@ MigrationState *unix_start_outgoing_migration(Monitor 
*mon,
         if (ret == -1)
            ret = -(s->get_error(s));
 
-        if (ret == -EINPROGRESS || ret == -EWOULDBLOCK)
-           qemu_set_fd_handler2(s->fd, NULL, NULL, unix_wait_for_connect, s);
     } while (ret == -EINTR);
 
     if (ret < 0 && ret != -EINPROGRESS && ret != -EWOULDBLOCK) {
         DPRINTF("connect failed\n");
         goto err_after_open;
+    } else if (ret == -EINPROGRESS || ret == -EWOULDBLOCK) {
+        migrate_fd_wait_for_unfreeze(s);
+        unix_wait_for_connect(s);
     }
 
     if (!detach) {
diff --git a/migration.c b/migration.c
index af3a1f2..4649691 100644
--- a/migration.c
+++ b/migration.c
@@ -12,6 +12,8 @@
  */
 
 #include "qemu-common.h"
+#include "qemu-thread.h"
+#include "qemu-timer.h"
 #include "migration.h"
 #include "monitor.h"
 #include "buffered_file.h"
@@ -35,6 +37,7 @@
 static int64_t max_throttle = (32 << 20);
 
 static MigrationState *current_migration;
+char host_port[50];
 
 static NotifierList migration_state_notifiers =
     NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
@@ -284,8 +287,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
 {
     int ret = 0;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
     if (s->file) {
         DPRINTF("closing file\n");
         if (qemu_fclose(s->file) != 0) {
@@ -307,14 +308,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
     return ret;
 }
 
-void migrate_fd_put_notify(void *opaque)
-{
-    FdMigrationState *s = opaque;
-
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-    qemu_file_put_notify(s->file);
-}
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
 {
     FdMigrationState *s = opaque;
@@ -327,9 +320,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
     if (ret == -1)
         ret = -(s->get_error(s));
 
-    if (ret == -EAGAIN) {
-        qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
-    } else if (ret < 0) {
+    if (ret < 0 && ret != -EAGAIN) {
         if (s->mon) {
             monitor_resume(s->mon);
         }
@@ -340,10 +331,34 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void 
*data, size_t size)
     return ret;
 }
 
-void migrate_fd_connect(FdMigrationState *s)
+void *migrate_run_timers(void *arg)
 {
+    FdMigrationState *s = arg;
     int ret;
 
+    qemu_mutex_lock_iothread();
+    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+            s->mig_state.shared);
+    if (ret < 0) {
+        DPRINTF("failed, %d\n", ret);
+        migrate_fd_error(s);
+        return NULL;
+    }
+
+    migrate_fd_put_ready(s);
+
+    while (s->state == MIG_STATE_ACTIVE) {
+        qemu_run_timers(migration_clock);
+    }
+
+    qemu_mutex_unlock_iothread();
+
+    return NULL;
+}
+
+void migrate_fd_connect(FdMigrationState *s)
+{
+    struct QemuThread migrate_thread;
     s->file = qemu_fopen_ops_buffered(s,
                                       s->bandwidth_limit,
                                       migrate_fd_put_buffer,
@@ -352,15 +367,7 @@ void migrate_fd_connect(FdMigrationState *s)
                                       migrate_fd_close);
 
     DPRINTF("beginning savevm\n");
-    ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
-                                  s->mig_state.shared);
-    if (ret < 0) {
-        DPRINTF("failed, %d\n", ret);
-        migrate_fd_error(s);
-        return;
-    }
-    
-    migrate_fd_put_ready(s);
+    qemu_thread_create(&migrate_thread, migrate_run_timers, s);
 }
 
 void migrate_fd_put_ready(void *opaque)
@@ -376,8 +383,6 @@ void migrate_fd_put_ready(void *opaque)
     if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
         int state;
         int old_vm_running = vm_running;
-
-        DPRINTF("done iterating\n");
         vm_stop(VMSTOP_MIGRATE);
 
         if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) {
@@ -396,6 +401,9 @@ void migrate_fd_put_ready(void *opaque)
         }
         s->state = state;
         notifier_list_notify(&migration_state_notifiers);
+    } else {
+        migrate_fd_wait_for_unfreeze(s);
+        qemu_file_put_notify(s->file);
     }
 }
 
@@ -458,7 +466,6 @@ int migrate_fd_close(void *opaque)
 {
     FdMigrationState *s = opaque;
 
-    qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
     return s->close(s);
 }
 
diff --git a/migration.h b/migration.h
index 050c56c..8d8b7c9 100644
--- a/migration.h
+++ b/migration.h
@@ -72,6 +72,8 @@ void do_info_migrate(Monitor *mon, QObject **ret_data);
 
 int exec_start_incoming_migration(const char *host_port);
 
+void *migrate_run_timers(void *);
+
 MigrationState *exec_start_outgoing_migration(Monitor *mon,
                                               const char *host_port,
                                              int64_t bandwidth_limit,
@@ -112,8 +114,6 @@ void migrate_fd_error(FdMigrationState *s);
 
 int migrate_fd_cleanup(FdMigrationState *s);
 
-void migrate_fd_put_notify(void *opaque);
-
 ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size);
 
 void migrate_fd_connect(FdMigrationState *s);
-- 
1.7.4.1


Reply via email to