From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> Open a return path, and handle messages that are received upon it.
Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> --- include/migration/migration.h | 10 +++ migration.c | 142 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/include/migration/migration.h b/include/migration/migration.h index 375efec..f722f06 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -46,6 +46,14 @@ enum mig_rpcomm_cmd { MIG_RPCOMM_ACK, /* data (seq: be32 ) */ MIG_RPCOMM_AFTERLASTVALID }; + +struct MigrationRetPathState { + uint16_t header_com; /* Headers of last (partially?) received command */ + uint16_t header_len; + uint32_t latest_ack; + bool error; /* True if something bad happened on the RP */ +}; + typedef struct MigrationState MigrationState; /* State for the incoming migration */ @@ -67,9 +75,11 @@ struct MigrationState QemuThread thread; QEMUBH *cleanup_bh; QEMUFile *file; + QEMUFile *return_path; int state; MigrationParams params; + struct MigrationRetPathState rp_state; double mbps; int64_t total_time; int64_t downtime; diff --git a/migration.c b/migration.c index 74bffbc..e69a49e 100644 --- a/migration.c +++ b/migration.c @@ -351,6 +351,17 @@ static void migrate_set_state(MigrationState *s, int old_state, int new_state) } } +static void migrate_fd_cleanup_src_rp(MigrationState *ms) +{ + if (ms->return_path) { + DPRINTF("cleaning up return path\n"); + int return_fd = qemu_get_fd(ms->return_path); + qemu_set_fd_handler2(return_fd, NULL, NULL, NULL, ms); + qemu_fclose(ms->return_path); + ms->return_path = NULL; + } +} + static void migrate_fd_cleanup(void *opaque) { MigrationState *s = opaque; @@ -358,6 +369,8 @@ static void migrate_fd_cleanup(void *opaque) qemu_bh_delete(s->cleanup_bh); s->cleanup_bh = NULL; + migrate_fd_cleanup_src_rp(s); + if (s->file) { trace_migrate_fd_cleanup(); qemu_mutex_unlock_iothread(); @@ -635,8 +648,135 @@ int64_t migrate_xbzrle_cache_size(void) return s->xbzrle_cache_size; } -/* migration thread support */ +/* + * Something bad happened to the RP stream, mark an error + * The caller shall print something to indicate why + */ +static void source_return_path_bad(MigrationState *s) +{ + s->rp_state.error = true; + migrate_fd_cleanup_src_rp(s); +} + +/* + * 'can read handler' for the fd callback + * stops the data handler being called if it's gone into + * error. + * Note: Currently we don't provide a recovery mechanism, + * if we do then we'll have to call qemu_notify_event + */ +static int source_return_path_can_read_handler(void *opaque) +{ + MigrationState *s = opaque; + return !s->rp_state.error; +} + +/* + * Handles messages sent on the return path towards the source VM + * + * We might be called multiple times for the same command if the data + * wasn't fully available when we were first called. When that + * happens we stash the command/length as soon as we get it. + */ +static void source_return_path_handler(void *opaque) +{ + MigrationState *s = opaque; + QEMUFile *rp = qemu_file_get_return_path(s->file); + const int max_len = 512; + uint8_t buf[max_len]; + uint32_t tmp32; + int res; + + DPRINTF("RP: Receive\n"); + if (!rp || qemu_file_get_error(rp)) { + source_return_path_bad(s); + return; + } + + if (s->rp_state.header_com == MIG_RPCOMM_INVALID) { + uint16_t expected_len; + + /* No command stored, so we're expecting a new header */ + res = qemu_peek_buffer(rp, buf, 4, 0); + + /* If we couldn't get all of our header, give up and + * we should be called back again when the rest arrives + */ + if (res != 4) { + return; + } + qemu_file_skip(rp, 4); + s->rp_state.header_com = be16_to_cpup((uint16_t *)buf); + s->rp_state.header_len = be16_to_cpup((uint16_t *)(buf + 2)); + + switch (s->rp_state.header_com) { + case MIG_RPCOMM_ACK: + expected_len = 4; + break; + + default: + DPRINTF("RP: Received invalid cmd 0x%04x length 0x%04x\n", + s->rp_state.header_com, s->rp_state.header_len); + source_return_path_bad(s); + return; + } + + if (s->rp_state.header_len > expected_len) { + DPRINTF("RP: Received command 0x%04x with" + "incorrect length %d expecting %d\n", + s->rp_state.header_com, s->rp_state.header_len, + expected_len); + source_return_path_bad(s); + return; + } + } + + /* We know we've got a valid header by this point */ + res = qemu_peek_buffer(rp, buf, s->rp_state.header_len, 0); + + /* If we haven't got all the data yet, just try again later */ + if (res != s->rp_state.header_len) { + return; + } + qemu_file_skip(rp, s->rp_state.header_len); + + /* OK, we have the command and the data */ + switch (s->rp_state.header_com) { + case MIG_RPCOMM_ACK: + tmp32 = be32_to_cpup((uint32_t *)buf); + DPRINTF("RP: Received ACK 0x%x\n", tmp32); + atomic_xchg(&s->rp_state.latest_ack, tmp32); + break; + + default: + /* This shouldn't happen because we should catch this above */ + DPRINTF("RP: Bad header_com in dispatch\n"); + } + /* Latest command processed, now leave a gap for the next one */ + s->rp_state.header_com = MIG_RPCOMM_INVALID; +} + +static int open_outgoing_return_path(MigrationState *ms) +{ + int return_fd; + + ms->return_path = qemu_file_get_return_path(ms->file); + if (!ms->return_path) { + return -1; + } + + return_fd = qemu_get_fd(ms->return_path); + qemu_set_fd_handler2(return_fd, source_return_path_can_read_handler, + source_return_path_handler, NULL, ms); + + return 0; +} + +/* + * Master migration thread on the source VM. + * It drives the migration and pumps the data down the outgoing channel. + */ static void *migration_thread(void *opaque) { MigrationState *s = opaque; -- 1.9.3