This patch creates a separate thread for the guest migration on the source side. All exits (on completion/error) from the migration thread are handled by a bottom handler, which is called from the iothread.
Signed-off-by: Umesh Deshpande <udesh...@redhat.com> --- buffered_file.c | 75 +++++++++++++++++-------------- migration.c | 122 +++++++++++++++++++++++++++++--------------------- migration.h | 9 ++++ qemu-thread-posix.c | 10 ++++ qemu-thread.h | 1 + savevm.c | 5 -- 6 files changed, 132 insertions(+), 90 deletions(-) diff --git a/buffered_file.c b/buffered_file.c index 41b42c3..0d94baa 100644 --- a/buffered_file.c +++ b/buffered_file.c @@ -16,6 +16,8 @@ #include "qemu-timer.h" #include "qemu-char.h" #include "buffered_file.h" +#include "migration.h" +#include "qemu-thread.h" //#define DEBUG_BUFFERED_FILE @@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered void *opaque; QEMUFile *file; int has_error; + int closed; int freeze_output; size_t bytes_xfer; size_t xfer_limit; uint8_t *buffer; size_t buffer_size; size_t buffer_capacity; - QEMUTimer *timer; + QemuThread thread; } QEMUFileBuffered; #ifdef DEBUG_BUFFERED_FILE @@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, in offset = size; } - if (pos == 0 && size == 0) { - DPRINTF("file is ready\n"); - if (s->bytes_xfer <= s->xfer_limit) { - DPRINTF("notifying client\n"); - s->put_ready(s->opaque); - } - } - return offset; } @@ -173,22 +168,25 @@ static int buffered_close(void *opaque) DPRINTF("closing\n"); - while (!s->has_error && s->buffer_size) { - buffered_flush(s); - if (s->freeze_output) - s->wait_for_unfreeze(s); - } + s->closed = 1; - ret = s->close(s->opaque); + qemu_mutex_unlock_migthread(); + qemu_mutex_unlock_iothread(); + + qemu_thread_join(&s->thread); + /* Waits for the completion of the migration thread */ - qemu_del_timer(s->timer); - qemu_free_timer(s->timer); + qemu_mutex_lock_iothread(); + qemu_mutex_lock_migthread(); + + ret = s->close(s->opaque); qemu_free(s->buffer); qemu_free(s); return ret; } + static int buffered_rate_limit(void *opaque) { QEMUFileBuffered *s = opaque; @@ -228,26 +226,36 @@ static int64_t buffered_get_rate_limit(void *opaque) return s->xfer_limit; } -static void buffered_rate_tick(void *opaque) +static void *migrate_vm(void *opaque) { QEMUFileBuffered *s = opaque; + int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100; + struct timeval tv = { .tv_sec = 0, .tv_usec = 100000}; - if (s->has_error) { - buffered_close(s); - return; - } - - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + while (!s->has_error && (!s->closed || s->buffer_size)) { + if (s->freeze_output) { + s->wait_for_unfreeze(s); + s->freeze_output = 0; + continue; + } - if (s->freeze_output) - return; + current_time = qemu_get_clock_ms(rt_clock); + if (!s->closed && (expire_time > current_time)) { + tv.tv_usec = 1000 * (expire_time - current_time); + select(0, NULL, NULL, NULL, &tv); + continue; + } - s->bytes_xfer = 0; + s->bytes_xfer = 0; + buffered_flush(s); - buffered_flush(s); + expire_time = qemu_get_clock_ms(rt_clock) + 100; + if (!s->closed) { + s->put_ready(s->opaque); + } + } - /* Add some checks around this */ - s->put_ready(s->opaque); + return NULL; } QEMUFile *qemu_fopen_ops_buffered(void *opaque, @@ -267,15 +275,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque, s->put_ready = put_ready; s->wait_for_unfreeze = wait_for_unfreeze; s->close = close; + s->closed = 0; s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, buffered_close, buffered_rate_limit, buffered_set_rate_limit, - buffered_get_rate_limit); - - s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s); + buffered_get_rate_limit); - qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100); + qemu_thread_create(&s->thread, migrate_vm, s); return s->file; } diff --git a/migration.c b/migration.c index af3a1f2..17d866a 100644 --- a/migration.c +++ b/migration.c @@ -149,10 +149,12 @@ int do_migrate_set_speed(Monitor *mon, const QDict *qdict, QObject **ret_data) } max_throttle = d; + qemu_mutex_lock_migthread(); s = migrate_to_fms(current_migration); if (s && s->file) { qemu_file_set_rate_limit(s->file, max_throttle); } + qemu_mutex_unlock_migthread(); return 0; } @@ -284,8 +286,6 @@ int migrate_fd_cleanup(FdMigrationState *s) { int ret = 0; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - if (s->file) { DPRINTF("closing file\n"); if (qemu_fclose(s->file) != 0) { @@ -307,14 +307,6 @@ int migrate_fd_cleanup(FdMigrationState *s) return ret; } -void migrate_fd_put_notify(void *opaque) -{ - FdMigrationState *s = opaque; - - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); - qemu_file_put_notify(s->file); -} - ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) { FdMigrationState *s = opaque; @@ -327,76 +319,91 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size) if (ret == -1) ret = -(s->get_error(s)); - if (ret == -EAGAIN) { - qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s); - } else if (ret < 0) { - if (s->mon) { - monitor_resume(s->mon); + return ret; +} + +static void migrate_fd_terminate(void *opaque) +{ + FdMigrationState *s = opaque; + + qemu_mutex_lock_migthread(); + + if (s->code == COMPLETE) { + if (migrate_fd_cleanup(s) < 0) { + if (s->old_vm_running) { + vm_start(); + } + s->state = MIG_STATE_ERROR; + } else { + s->state = MIG_STATE_COMPLETED; } - s->state = MIG_STATE_ERROR; notifier_list_notify(&migration_state_notifiers); + } else if (s->code == RESUME) { + if (s->old_vm_running) { + vm_start(); + } + migrate_fd_error(s); + } else if (s->code == ERROR) { + migrate_fd_error(s); } - return ret; + qemu_mutex_unlock_migthread(); } void migrate_fd_connect(FdMigrationState *s) { - int ret; - + s->code = START; + s->bh = qemu_bh_new(migrate_fd_terminate, s); s->file = qemu_fopen_ops_buffered(s, s->bandwidth_limit, migrate_fd_put_buffer, migrate_fd_put_ready, migrate_fd_wait_for_unfreeze, migrate_fd_close); - - DPRINTF("beginning savevm\n"); - ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, - s->mig_state.shared); - if (ret < 0) { - DPRINTF("failed, %d\n", ret); - migrate_fd_error(s); - return; - } - - migrate_fd_put_ready(s); } void migrate_fd_put_ready(void *opaque) { FdMigrationState *s = opaque; + int ret; - if (s->state != MIG_STATE_ACTIVE) { + qemu_mutex_lock_iothread(); + if (s->code != ACTIVE && s->code != START) { DPRINTF("put_ready returning because of non-active state\n"); + qemu_mutex_unlock_iothread(); return; } + if (!s->code) { + DPRINTF("beginning savevm\n"); + ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk, + s->mig_state.shared); + if (ret < 0) { + DPRINTF("failed, %d\n", ret); + s->code = ERROR; + qemu_bh_schedule(s->bh); + qemu_mutex_unlock_iothread(); + return; + } + s->code = ACTIVE; + } + DPRINTF("iterate\n"); if (qemu_savevm_state_iterate(s->mon, s->file) == 1) { - int state; - int old_vm_running = vm_running; + s->old_vm_running = vm_running; DPRINTF("done iterating\n"); vm_stop(VMSTOP_MIGRATE); if ((qemu_savevm_state_complete(s->mon, s->file)) < 0) { - if (old_vm_running) { - vm_start(); - } - state = MIG_STATE_ERROR; + s->code = RESUME; } else { - state = MIG_STATE_COMPLETED; - } - if (migrate_fd_cleanup(s) < 0) { - if (old_vm_running) { - vm_start(); - } - state = MIG_STATE_ERROR; + s->code = COMPLETE; } - s->state = state; - notifier_list_notify(&migration_state_notifiers); + + qemu_bh_schedule(s->bh); } + qemu_mutex_unlock_iothread(); } int migrate_fd_get_status(MigrationState *mig_state) @@ -416,9 +423,11 @@ void migrate_fd_cancel(MigrationState *mig_state) s->state = MIG_STATE_CANCELLED; notifier_list_notify(&migration_state_notifiers); - qemu_savevm_state_cancel(s->mon, s->file); + qemu_mutex_lock_migthread(); + qemu_savevm_state_cancel(s->mon, s->file); migrate_fd_cleanup(s); + qemu_mutex_unlock_migthread(); } void migrate_fd_release(MigrationState *mig_state) @@ -426,22 +435,34 @@ void migrate_fd_release(MigrationState *mig_state) FdMigrationState *s = migrate_to_fms(mig_state); DPRINTF("releasing state\n"); - + if (s->state == MIG_STATE_ACTIVE) { s->state = MIG_STATE_CANCELLED; notifier_list_notify(&migration_state_notifiers); + qemu_mutex_lock_migthread(); migrate_fd_cleanup(s); + qemu_mutex_unlock_migthread(); + } + + if (s->bh) { + qemu_bh_delete(s->bh); } + qemu_free(s); } void migrate_fd_wait_for_unfreeze(void *opaque) { FdMigrationState *s = opaque; - int ret; + int ret, state; DPRINTF("wait for unfreeze\n"); - if (s->state != MIG_STATE_ACTIVE) + + qemu_mutex_lock_iothread(); + state = s->state; + qemu_mutex_unlock_iothread(); + + if (state != MIG_STATE_ACTIVE) return; do { @@ -458,7 +479,6 @@ int migrate_fd_close(void *opaque) { FdMigrationState *s = opaque; - qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL); return s->close(s); } diff --git a/migration.h b/migration.h index 050c56c..abbf9e4 100644 --- a/migration.h +++ b/migration.h @@ -23,6 +23,12 @@ #define MIG_STATE_CANCELLED 1 #define MIG_STATE_ACTIVE 2 +#define START 0 +#define ACTIVE 1 +#define COMPLETE 2 +#define ERROR 3 +#define RESUME 4 + typedef struct MigrationState MigrationState; struct MigrationState @@ -45,10 +51,13 @@ struct FdMigrationState int fd; Monitor *mon; int state; + int code; + int old_vm_running; int (*get_error)(struct FdMigrationState*); int (*close)(struct FdMigrationState*); int (*write)(struct FdMigrationState*, const void *, size_t); void *opaque; + QEMUBH *bh; }; void process_incoming_migration(QEMUFile *f); diff --git a/qemu-thread-posix.c b/qemu-thread-posix.c index 2bd02ef..6a275be 100644 --- a/qemu-thread-posix.c +++ b/qemu-thread-posix.c @@ -115,6 +115,16 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) error_exit(err, __func__); } +void qemu_thread_join(QemuThread *thread) +{ + int err; + + err = pthread_join(thread->thread, NULL); + if (err) { + error_exit(err, __func__); + } +} + void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), void *arg) diff --git a/qemu-thread.h b/qemu-thread.h index 0a73d50..d5b99d5 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -30,6 +30,7 @@ void qemu_cond_destroy(QemuCond *cond); void qemu_cond_signal(QemuCond *cond); void qemu_cond_broadcast(QemuCond *cond); void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex); +void qemu_thread_join(QemuThread *thread); void qemu_thread_create(QemuThread *thread, void *(*start_routine)(void*), diff --git a/savevm.c b/savevm.c index 8139bc7..f54f555 100644 --- a/savevm.c +++ b/savevm.c @@ -481,11 +481,6 @@ int qemu_fclose(QEMUFile *f) return ret; } -void qemu_file_put_notify(QEMUFile *f) -{ - f->put_buffer(f->opaque, NULL, 0, 0); -} - void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size) { int l; -- 1.7.4.1