From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> Open a return path, and handle messages that are received upon it.
Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> --- include/migration/migration.h | 11 ++++ migration.c | 145 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 155 insertions(+), 1 deletion(-) diff --git a/include/migration/migration.h b/include/migration/migration.h index 12e640d..0c9055f 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -47,6 +47,15 @@ enum mig_rpcomm_cmd { MIG_RPCOMM_ACK, /* data (seq: be32 ) */ MIG_RPCOMM_AFTERLASTVALID }; + +struct MigrationRetPathState { + uint16_t header_com; /* Headers of last (partially?) received cmd */ + uint16_t header_len; + uint32_t latest_ack; + bool error; /* True if something bad happened on the RP */ + QemuSemaphore finished; /* When the RP co quits */ +}; + typedef struct MigrationState MigrationState; /* State for the incoming migration */ @@ -69,9 +78,11 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QEMUFile *return_path; int state; MigrationParams params; + struct MigrationRetPathState rp_state; double mbps; int64_t total_time; int64_t downtime; diff --git a/migration.c b/migration.c index 3e4e120..c54d59e 100644 --- a/migration.c +++ b/migration.c @@ -373,6 +373,15 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + if (ms->return_path) { + DPRINTF("cleaning up return path\n"); + qemu_fclose(ms->return_path); + ms->return_path = NULL; + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -380,6 +389,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -657,8 +668,140 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} + +/* + * Handles messages sent on the return path towards the source VM + * + * This is a coroutine that sits around listening for messages as + * long as the return-path exists + */ +static void source_return_path_co(void *opaque) +{ + MigrationState *ms = opaque; + QEMUFile *rp = ms->return_path; + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + DPRINTF("RP: source_return_path_co entry"); + while (rp && !qemu_file_get_error(rp)) { + DPRINTF("RP: source_return_path_co top of loop"); + ms->rp_state.header_com = qemu_get_be16(rp); + ms->rp_state.header_len = qemu_get_be16(rp); + + uint16_t expected_len; + + switch (ms->rp_state.header_com) { + case MIG_RPCOMM_SHUT: + case MIG_RPCOMM_ACK: + expected_len = 4; + break; + + default: + error_report("RP: Received invalid cmd 0x%04x length 0x%04x", + ms->rp_state.header_com, ms->rp_state.header_len); + source_return_path_bad(ms); + goto out; + } + if (ms->rp_state.header_len > expected_len) { + error_report("RP: Received command 0x%04x with" + "incorrect length %d expecting %d", + ms->rp_state.header_com, ms->rp_state.header_len, + expected_len); + source_return_path_bad(ms); + goto out; + } + + /* We know we've got a valid header by this point */ + res = qemu_get_buffer(rp, buf, ms->rp_state.header_len); + if (res != ms->rp_state.header_len) { + DPRINTF("RP: Failed to read command data"); + source_return_path_bad(ms); + goto out; + } + + /* OK, we have the command and the data */ + switch (ms->rp_state.header_com) { + case MIG_RPCOMM_SHUT: + tmp32 = be32_to_cpup((uint32_t *)buf); + if (tmp32) { + error_report("RP: Sibling indicated error %d", tmp32); + source_return_path_bad(ms); + } else { + DPRINTF("RP: SHUT received"); + } + /* + * We'll let the main thread deal with closing the RP + * we could do a shutdown(2) on it, but we're the only user + * anyway, so there's nothing gained. + */ + goto out; + + case MIG_RPCOMM_ACK: + tmp32 = be32_to_cpup((uint32_t *)buf); + DPRINTF("RP: Received ACK 0x%x", tmp32); + atomic_xchg(&ms->rp_state.latest_ack, tmp32); + break; + + default: + /* This shouldn't happen because we should catch this above */ + DPRINTF("RP: Bad header_com in dispatch"); + } + /* Latest command processed, now leave a gap for the next one */ + ms->rp_state.header_com = MIG_RPCOMM_INVALID; + } + if (rp && qemu_file_get_error(rp)) { + DPRINTF("source_report_path_co: rp bad at end"); + source_return_path_bad(ms); + } + + DPRINTF("source_report_path_co: Bottom exit"); + +out: + /* For await_outgoing_return_path_close */ + qemu_sem_post(&ms->rp_state.finished); +} + +static int open_outgoing_return_path(MigrationState *ms) +{ + Coroutine *co = qemu_coroutine_create(source_return_path_co); + qemu_sem_init(&ms->rp_state.finished, 0); + + ms->return_path = qemu_file_get_return_path(ms->file); + if (!ms->return_path) { + return -1; + } + + DPRINTF("open_outgoing_return_path starting co"); + qemu_coroutine_enter(co, ms); + DPRINTF("open_outgoing_return_path continuing"); + + return 0; +} + +static void await_outgoing_return_path_close(MigrationState *ms) +{ + /* TODO: once the _co becomes a process we can replace this by a join */ + DPRINTF("%s: Waiting", __func__); + qemu_sem_wait(&ms->rp_state.finished); + DPRINTF("%s: Exit", __func__); +} + +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque; -- 1.9.3