* zhanghailiang (zhang.zhanghaili...@huawei.com) wrote:
> Hi Dave,
> 
> On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote:
> >From: "Dr. David Alan Gilbert" <dgilb...@redhat.com>
> >
> >Open a return path, and handle messages that are received upon it.
> >
> >Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com>
> >---
> >  include/migration/migration.h |   8 ++
> >  migration/migration.c         | 177 
> > +++++++++++++++++++++++++++++++++++++++++-
> >  trace-events                  |  12 +++
> >  3 files changed, 196 insertions(+), 1 deletion(-)
> >
> >diff --git a/include/migration/migration.h b/include/migration/migration.h
> >index 36caab9..868f59a 100644
> >--- a/include/migration/migration.h
> >+++ b/include/migration/migration.h
> >@@ -77,6 +77,14 @@ struct MigrationState
> >
> >      int state;
> >      MigrationParams params;
> >+
> >+    /* State related to return path */
> >+    struct {
> >+        QEMUFile     *file;
> 
> There is already a 'file' member in MigrationState,
> and since for migration, there is only one path direction, just from source 
> side
> to destination side, so it is ok to use that name.
> 
> But for post-copy and COLO, we need two-way communication,
> So we can rename the original 'file' member of MigrationState to 'ouput_file',
> and add a new 'input_file' member. For MigrationIncomingState struct, rename 
> its original
> 'file' member to 'input_file',and add a new 'output_file'.
> IMHO, this will make things more clear.

Would the following be clearer:

  On the source make the existing migration file:
       QEMUFile  *to_dst_file;
  and for the return path
       QEMUFile  *from_dst_dile;

  and then on the destination, the incoming migration stream:
       QEMUFile  *from_src_file;
  and then the return path on the destionation:
       QEMUFile  *to_src_file;

Dave

> Thanks,
> zhanghailiang
> 
> 
> >+        QemuThread    rp_thread;
> >+        bool          error;
> >+    } rp_state;
> >+
> >      double mbps;
> >      int64_t total_time;
> >      int64_t downtime;
> >diff --git a/migration/migration.c b/migration/migration.c
> >index afb19a1..fb2f491 100644
> >--- a/migration/migration.c
> >+++ b/migration/migration.c
> >@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
> >**errp)
> >      return params;
> >  }
> >
> >+/*
> >+ * Return true if we're already in the middle of a migration
> >+ * (i.e. any of the active or setup states)
> >+ */
> >+static bool migration_already_active(MigrationState *ms)
> >+{
> >+    switch (ms->state) {
> >+    case MIGRATION_STATUS_ACTIVE:
> >+    case MIGRATION_STATUS_SETUP:
> >+        return true;
> >+
> >+    default:
> >+        return false;
> >+
> >+    }
> >+}
> >+
> >  static void get_xbzrle_cache_stats(MigrationInfo *info)
> >  {
> >      if (migrate_use_xbzrle()) {
> >@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
> >old_state, int new_state)
> >      }
> >  }
> >
> >+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> >+{
> >+    QEMUFile *rp = ms->rp_state.file;
> >+
> >+    /*
> >+     * When stuff goes wrong (e.g. failing destination) on the rp, it can 
> >get
> >+     * cleaned up from a few threads; make sure not to do it twice in 
> >parallel
> >+     */
> >+    rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
> >+    if (rp) {
> >+        trace_migrate_fd_cleanup_src_rp();
> >+        qemu_fclose(rp);
> >+    }
> >+}
> >+
> >  static void migrate_fd_cleanup(void *opaque)
> >  {
> >      MigrationState *s = opaque;
> >@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
> >      qemu_bh_delete(s->cleanup_bh);
> >      s->cleanup_bh = NULL;
> >
> >+    migrate_fd_cleanup_src_rp(s);
> >+
> >      if (s->file) {
> >          trace_migrate_fd_cleanup();
> >          qemu_mutex_unlock_iothread();
> >@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
> >      QEMUFile *f = migrate_get_current()->file;
> >      trace_migrate_fd_cancel();
> >
> >+    if (s->rp_state.file) {
> >+        /* shutdown the rp socket, so causing the rp thread to shutdown */
> >+        qemu_file_shutdown(s->rp_state.file);
> >+    }
> >+
> >      do {
> >          old_state = s->state;
> >          if (old_state != MIGRATION_STATUS_SETUP &&
> >@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
> >      return s->xbzrle_cache_size;
> >  }
> >
> >-/* migration thread support */
> >+/*
> >+ * Something bad happened to the RP stream, mark an error
> >+ * The caller shall print something to indicate why
> >+ */
> >+static void source_return_path_bad(MigrationState *s)
> >+{
> >+    s->rp_state.error = true;
> >+    migrate_fd_cleanup_src_rp(s);
> >+}
> >+
> >+/*
> >+ * Handles messages sent on the return path towards the source VM
> >+ *
> >+ */
> >+static void *source_return_path_thread(void *opaque)
> >+{
> >+    MigrationState *ms = opaque;
> >+    QEMUFile *rp = ms->rp_state.file;
> >+    uint16_t expected_len, header_len, header_type;
> >+    const int max_len = 512;
> >+    uint8_t buf[max_len];
> >+    uint32_t tmp32;
> >+    int res;
> >+
> >+    trace_source_return_path_thread_entry();
> >+    while (rp && !qemu_file_get_error(rp) &&
> >+        migration_already_active(ms)) {
> >+        trace_source_return_path_thread_loop_top();
> >+        header_type = qemu_get_be16(rp);
> >+        header_len = qemu_get_be16(rp);
> >+
> >+        switch (header_type) {
> >+        case MIG_RP_MSG_SHUT:
> >+        case MIG_RP_MSG_PONG:
> >+            expected_len = 4;
> >+            break;
> >+
> >+        default:
> >+            error_report("RP: Received invalid message 0x%04x length 
> >0x%04x",
> >+                    header_type, header_len);
> >+            source_return_path_bad(ms);
> >+            goto out;
> >+        }
> >
> >+        if (header_len > expected_len) {
> >+            error_report("RP: Received message 0x%04x with"
> >+                    "incorrect length %d expecting %d",
> >+                    header_type, header_len,
> >+                    expected_len);
> >+            source_return_path_bad(ms);
> >+            goto out;
> >+        }
> >+
> >+        /* We know we've got a valid header by this point */
> >+        res = qemu_get_buffer(rp, buf, header_len);
> >+        if (res != header_len) {
> >+            trace_source_return_path_thread_failed_read_cmd_data();
> >+            source_return_path_bad(ms);
> >+            goto out;
> >+        }
> >+
> >+        /* OK, we have the message and the data */
> >+        switch (header_type) {
> >+        case MIG_RP_MSG_SHUT:
> >+            tmp32 = be32_to_cpup((uint32_t *)buf);
> >+            trace_source_return_path_thread_shut(tmp32);
> >+            if (tmp32) {
> >+                error_report("RP: Sibling indicated error %d", tmp32);
> >+                source_return_path_bad(ms);
> >+            }
> >+            /*
> >+             * We'll let the main thread deal with closing the RP
> >+             * we could do a shutdown(2) on it, but we're the only user
> >+             * anyway, so there's nothing gained.
> >+             */
> >+            goto out;
> >+
> >+        case MIG_RP_MSG_PONG:
> >+            tmp32 = be32_to_cpup((uint32_t *)buf);
> >+            trace_source_return_path_thread_pong(tmp32);
> >+            break;
> >+
> >+        default:
> >+            break;
> >+        }
> >+    }
> >+    if (rp && qemu_file_get_error(rp)) {
> >+        trace_source_return_path_thread_bad_end();
> >+        source_return_path_bad(ms);
> >+    }
> >+
> >+    trace_source_return_path_thread_end();
> >+out:
> >+    return NULL;
> >+}
> >+
> >+__attribute__ (( unused )) /* Until later in patch series */
> >+static int open_return_path_on_source(MigrationState *ms)
> >+{
> >+
> >+    ms->rp_state.file = qemu_file_get_return_path(ms->file);
> >+    if (!ms->rp_state.file) {
> >+        return -1;
> >+    }
> >+
> >+    trace_open_return_path_on_source();
> >+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> >+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
> >+
> >+    trace_open_return_path_on_source_continue();
> >+
> >+    return 0;
> >+}
> >+
> >+__attribute__ (( unused )) /* Until later in patch series */
> >+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
> >+static int await_return_path_close_on_source(MigrationState *ms)
> >+{
> >+    /*
> >+     * If this is a normal exit then the destination will send a SHUT and 
> >the
> >+     * rp_thread will exit, however if there's an error we need to cause
> >+     * it to exit, which we can do by a shutdown.
> >+     * (canceling must also shutdown to stop us getting stuck here if
> >+     * the destination died at just the wrong place)
> >+     */
> >+    if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
> >+        qemu_file_shutdown(ms->rp_state.file);
> >+    }
> >+    trace_await_return_path_close_on_source_joining();
> >+    qemu_thread_join(&ms->rp_state.rp_thread);
> >+    trace_await_return_path_close_on_source_close();
> >+    return ms->rp_state.error;
> >+}
> >+
> >+/*
> >+ * Master migration thread on the source VM.
> >+ * It drives the migration and pumps the data down the outgoing channel.
> >+ */
> >  static void *migration_thread(void *opaque)
> >  {
> >      MigrationState *s = opaque;
> >diff --git a/trace-events b/trace-events
> >index 5738e3f..282cde1 100644
> >--- a/trace-events
> >+++ b/trace-events
> >@@ -1394,12 +1394,24 @@ flic_no_device_api(int err) "flic: no Device Contral 
> >API support %d"
> >  flic_reset_failed(int err) "flic: reset failed %d"
> >
> >  # migration.c
> >+await_return_path_close_on_source_close(void) ""
> >+await_return_path_close_on_source_joining(void) ""
> >  migrate_set_state(int new_state) "new state %d"
> >  migrate_fd_cleanup(void) ""
> >+migrate_fd_cleanup_src_rp(void) ""
> >  migrate_fd_error(void) ""
> >  migrate_fd_cancel(void) ""
> >  migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max 
> > %" PRIu64
> >  migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d"
> >+open_return_path_on_source(void) ""
> >+open_return_path_on_source_continue(void) ""
> >+source_return_path_thread_bad_end(void) ""
> >+source_return_path_thread_end(void) ""
> >+source_return_path_thread_entry(void) ""
> >+source_return_path_thread_failed_read_cmd_data(void) ""
> >+source_return_path_thread_loop_top(void) ""
> >+source_return_path_thread_pong(uint32_t val) "%x"
> >+source_return_path_thread_shut(uint32_t val) "%x"
> >  migrate_transferred(uint64_t tranferred, uint64_t time_spent, double 
> > bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " 
> > bandwidth %g max_size %" PRId64
> >
> >  # migration/rdma.c
> >
> 
> 
--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK

Reply via email to