* zhanghailiang (zhang.zhanghaili...@huawei.com) wrote: > Hi Dave, > > On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote: > >From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> > > > >Open a return path, and handle messages that are received upon it. > > > >Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> > >--- > > include/migration/migration.h | 8 ++ > > migration/migration.c | 177 > > +++++++++++++++++++++++++++++++++++++++++- > > trace-events | 12 +++ > > 3 files changed, 196 insertions(+), 1 deletion(-) > > > >diff --git a/include/migration/migration.h b/include/migration/migration.h > >index 36caab9..868f59a 100644 > >--- a/include/migration/migration.h > >+++ b/include/migration/migration.h > >@@ -77,6 +77,14 @@ struct MigrationState > > > > int state; > > MigrationParams params; > >+ > >+ /* State related to return path */ > >+ struct { > >+ QEMUFile *file; > > There is already a 'file' member in MigrationState, > and since for migration, there is only one path direction, just from source > side > to destination side, so it is ok to use that name. > > But for post-copy and COLO, we need two-way communication, > So we can rename the original 'file' member of MigrationState to 'ouput_file', > and add a new 'input_file' member. For MigrationIncomingState struct, rename > its original > 'file' member to 'input_file',and add a new 'output_file'. > IMHO, this will make things more clear.
Would the following be clearer: On the source make the existing migration file: QEMUFile *to_dst_file; and for the return path QEMUFile *from_dst_dile; and then on the destination, the incoming migration stream: QEMUFile *from_src_file; and then the return path on the destionation: QEMUFile *to_src_file; Dave > Thanks, > zhanghailiang > > > >+ QemuThread rp_thread; > >+ bool error; > >+ } rp_state; > >+ > > double mbps; > > int64_t total_time; > > int64_t downtime; > >diff --git a/migration/migration.c b/migration/migration.c > >index afb19a1..fb2f491 100644 > >--- a/migration/migration.c > >+++ b/migration/migration.c > >@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error > >**errp) > > return params; > > } > > > >+/* > >+ * Return true if we're already in the middle of a migration > >+ * (i.e. any of the active or setup states) > >+ */ > >+static bool migration_already_active(MigrationState *ms) > >+{ > >+ switch (ms->state) { > >+ case MIGRATION_STATUS_ACTIVE: > >+ case MIGRATION_STATUS_SETUP: > >+ return true; > >+ > >+ default: > >+ return false; > >+ > >+ } > >+} > >+ > > static void get_xbzrle_cache_stats(MigrationInfo *info) > > { > > if (migrate_use_xbzrle()) { > >@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int > >old_state, int new_state) > > } > > } > > > >+static void migrate_fd_cleanup_src_rp(MigrationState *ms) > >+{ > >+ QEMUFile *rp = ms->rp_state.file; > >+ > >+ /* > >+ * When stuff goes wrong (e.g. failing destination) on the rp, it can > >get > >+ * cleaned up from a few threads; make sure not to do it twice in > >parallel > >+ */ > >+ rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL); > >+ if (rp) { > >+ trace_migrate_fd_cleanup_src_rp(); > >+ qemu_fclose(rp); > >+ } > >+} > >+ > > static void migrate_fd_cleanup(void *opaque) > > { > > MigrationState *s = opaque; > >@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque) > > qemu_bh_delete(s->cleanup_bh); > > s->cleanup_bh = NULL; > > > >+ migrate_fd_cleanup_src_rp(s); > >+ > > if (s->file) { > > trace_migrate_fd_cleanup(); > > qemu_mutex_unlock_iothread(); > >@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s) > > QEMUFile *f = migrate_get_current()->file; > > trace_migrate_fd_cancel(); > > > >+ if (s->rp_state.file) { > >+ /* shutdown the rp socket, so causing the rp thread to shutdown */ > >+ qemu_file_shutdown(s->rp_state.file); > >+ } > >+ > > do { > > old_state = s->state; > > if (old_state != MIGRATION_STATUS_SETUP && > >@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void) > > return s->xbzrle_cache_size; > > } > > > >-/* migration thread support */ > >+/* > >+ * Something bad happened to the RP stream, mark an error > >+ * The caller shall print something to indicate why > >+ */ > >+static void source_return_path_bad(MigrationState *s) > >+{ > >+ s->rp_state.error = true; > >+ migrate_fd_cleanup_src_rp(s); > >+} > >+ > >+/* > >+ * Handles messages sent on the return path towards the source VM > >+ * > >+ */ > >+static void *source_return_path_thread(void *opaque) > >+{ > >+ MigrationState *ms = opaque; > >+ QEMUFile *rp = ms->rp_state.file; > >+ uint16_t expected_len, header_len, header_type; > >+ const int max_len = 512; > >+ uint8_t buf[max_len]; > >+ uint32_t tmp32; > >+ int res; > >+ > >+ trace_source_return_path_thread_entry(); > >+ while (rp && !qemu_file_get_error(rp) && > >+ migration_already_active(ms)) { > >+ trace_source_return_path_thread_loop_top(); > >+ header_type = qemu_get_be16(rp); > >+ header_len = qemu_get_be16(rp); > >+ > >+ switch (header_type) { > >+ case MIG_RP_MSG_SHUT: > >+ case MIG_RP_MSG_PONG: > >+ expected_len = 4; > >+ break; > >+ > >+ default: > >+ error_report("RP: Received invalid message 0x%04x length > >0x%04x", > >+ header_type, header_len); > >+ source_return_path_bad(ms); > >+ goto out; > >+ } > > > >+ if (header_len > expected_len) { > >+ error_report("RP: Received message 0x%04x with" > >+ "incorrect length %d expecting %d", > >+ header_type, header_len, > >+ expected_len); > >+ source_return_path_bad(ms); > >+ goto out; > >+ } > >+ > >+ /* We know we've got a valid header by this point */ > >+ res = qemu_get_buffer(rp, buf, header_len); > >+ if (res != header_len) { > >+ trace_source_return_path_thread_failed_read_cmd_data(); > >+ source_return_path_bad(ms); > >+ goto out; > >+ } > >+ > >+ /* OK, we have the message and the data */ > >+ switch (header_type) { > >+ case MIG_RP_MSG_SHUT: > >+ tmp32 = be32_to_cpup((uint32_t *)buf); > >+ trace_source_return_path_thread_shut(tmp32); > >+ if (tmp32) { > >+ error_report("RP: Sibling indicated error %d", tmp32); > >+ source_return_path_bad(ms); > >+ } > >+ /* > >+ * We'll let the main thread deal with closing the RP > >+ * we could do a shutdown(2) on it, but we're the only user > >+ * anyway, so there's nothing gained. > >+ */ > >+ goto out; > >+ > >+ case MIG_RP_MSG_PONG: > >+ tmp32 = be32_to_cpup((uint32_t *)buf); > >+ trace_source_return_path_thread_pong(tmp32); > >+ break; > >+ > >+ default: > >+ break; > >+ } > >+ } > >+ if (rp && qemu_file_get_error(rp)) { > >+ trace_source_return_path_thread_bad_end(); > >+ source_return_path_bad(ms); > >+ } > >+ > >+ trace_source_return_path_thread_end(); > >+out: > >+ return NULL; > >+} > >+ > >+__attribute__ (( unused )) /* Until later in patch series */ > >+static int open_return_path_on_source(MigrationState *ms) > >+{ > >+ > >+ ms->rp_state.file = qemu_file_get_return_path(ms->file); > >+ if (!ms->rp_state.file) { > >+ return -1; > >+ } > >+ > >+ trace_open_return_path_on_source(); > >+ qemu_thread_create(&ms->rp_state.rp_thread, "return path", > >+ source_return_path_thread, ms, QEMU_THREAD_JOINABLE); > >+ > >+ trace_open_return_path_on_source_continue(); > >+ > >+ return 0; > >+} > >+ > >+__attribute__ (( unused )) /* Until later in patch series */ > >+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */ > >+static int await_return_path_close_on_source(MigrationState *ms) > >+{ > >+ /* > >+ * If this is a normal exit then the destination will send a SHUT and > >the > >+ * rp_thread will exit, however if there's an error we need to cause > >+ * it to exit, which we can do by a shutdown. > >+ * (canceling must also shutdown to stop us getting stuck here if > >+ * the destination died at just the wrong place) > >+ */ > >+ if (qemu_file_get_error(ms->file) && ms->rp_state.file) { > >+ qemu_file_shutdown(ms->rp_state.file); > >+ } > >+ trace_await_return_path_close_on_source_joining(); > >+ qemu_thread_join(&ms->rp_state.rp_thread); > >+ trace_await_return_path_close_on_source_close(); > >+ return ms->rp_state.error; > >+} > >+ > >+/* > >+ * Master migration thread on the source VM. > >+ * It drives the migration and pumps the data down the outgoing channel. > >+ */ > > static void *migration_thread(void *opaque) > > { > > MigrationState *s = opaque; > >diff --git a/trace-events b/trace-events > >index 5738e3f..282cde1 100644 > >--- a/trace-events > >+++ b/trace-events > >@@ -1394,12 +1394,24 @@ flic_no_device_api(int err) "flic: no Device Contral > >API support %d" > > flic_reset_failed(int err) "flic: reset failed %d" > > > > # migration.c > >+await_return_path_close_on_source_close(void) "" > >+await_return_path_close_on_source_joining(void) "" > > migrate_set_state(int new_state) "new state %d" > > migrate_fd_cleanup(void) "" > >+migrate_fd_cleanup_src_rp(void) "" > > migrate_fd_error(void) "" > > migrate_fd_cancel(void) "" > > migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max > > %" PRIu64 > > migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d" > >+open_return_path_on_source(void) "" > >+open_return_path_on_source_continue(void) "" > >+source_return_path_thread_bad_end(void) "" > >+source_return_path_thread_end(void) "" > >+source_return_path_thread_entry(void) "" > >+source_return_path_thread_failed_read_cmd_data(void) "" > >+source_return_path_thread_loop_top(void) "" > >+source_return_path_thread_pong(uint32_t val) "%x" > >+source_return_path_thread_shut(uint32_t val) "%x" > > migrate_transferred(uint64_t tranferred, uint64_t time_spent, double > > bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " > > bandwidth %g max_size %" PRId64 > > > > # migration/rdma.c > > > > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK