On 2015/8/18 18:45, Dr. David Alan Gilbert wrote:
* zhanghailiang (zhang.zhanghaili...@huawei.com) wrote:
Hi Dave,

On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote:
From: "Dr. David Alan Gilbert" <dgilb...@redhat.com>

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com>
---
  include/migration/migration.h |   8 ++
  migration/migration.c         | 177 +++++++++++++++++++++++++++++++++++++++++-
  trace-events                  |  12 +++
  3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 36caab9..868f59a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,14 @@ struct MigrationState

      int state;
      MigrationParams params;
+
+    /* State related to return path */
+    struct {
+        QEMUFile     *file;

There is already a 'file' member in MigrationState,
and since for migration, there is only one path direction, just from source side
to destination side, so it is ok to use that name.

But for post-copy and COLO, we need two-way communication,
So we can rename the original 'file' member of MigrationState to 'ouput_file',
and add a new 'input_file' member. For MigrationIncomingState struct, rename 
its original
'file' member to 'input_file',and add a new 'output_file'.
IMHO, this will make things more clear.

Would the following be clearer:


Yes, it is clearer and  more graceful :)

   On the source make the existing migration file:
        QEMUFile  *to_dst_file;
   and for the return path
        QEMUFile  *from_dst_dile;
                            ^
                     from_dst_file

   and then on the destination, the incoming migration stream:
        QEMUFile  *from_src_file;
   and then the return path on the destionation:
        QEMUFile  *to_src_file;

Dave

Thanks,
zhanghailiang


+        QemuThread    rp_thread;
+        bool          error;
+    } rp_state;
+
      double mbps;
      int64_t total_time;
      int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index afb19a1..fb2f491 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
**errp)
      return params;
  }

+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+    switch (ms->state) {
+    case MIGRATION_STATUS_ACTIVE:
+    case MIGRATION_STATUS_SETUP:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
  static void get_xbzrle_cache_stats(MigrationInfo *info)
  {
      if (migrate_use_xbzrle()) {
@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
      }
  }

+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+    QEMUFile *rp = ms->rp_state.file;
+
+    /*
+     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+     * cleaned up from a few threads; make sure not to do it twice in parallel
+     */
+    rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);
+    if (rp) {
+        trace_migrate_fd_cleanup_src_rp();
+        qemu_fclose(rp);
+    }
+}
+
  static void migrate_fd_cleanup(void *opaque)
  {
      MigrationState *s = opaque;
@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
      qemu_bh_delete(s->cleanup_bh);
      s->cleanup_bh = NULL;

+    migrate_fd_cleanup_src_rp(s);
+
      if (s->file) {
          trace_migrate_fd_cleanup();
          qemu_mutex_unlock_iothread();
@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
      QEMUFile *f = migrate_get_current()->file;
      trace_migrate_fd_cancel();

+    if (s->rp_state.file) {
+        /* shutdown the rp socket, so causing the rp thread to shutdown */
+        qemu_file_shutdown(s->rp_state.file);
+    }
+
      do {
          old_state = s->state;
          if (old_state != MIGRATION_STATUS_SETUP &&
@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
      return s->xbzrle_cache_size;
  }

-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+    migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->rp_state.file;
+    uint16_t expected_len, header_len, header_type;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32;
+    int res;
+
+    trace_source_return_path_thread_entry();
+    while (rp && !qemu_file_get_error(rp) &&
+        migration_already_active(ms)) {
+        trace_source_return_path_thread_loop_top();
+        header_type = qemu_get_be16(rp);
+        header_len = qemu_get_be16(rp);
+
+        switch (header_type) {
+        case MIG_RP_MSG_SHUT:
+        case MIG_RP_MSG_PONG:
+            expected_len = 4;
+            break;
+
+        default:
+            error_report("RP: Received invalid message 0x%04x length 0x%04x",
+                    header_type, header_len);
+            source_return_path_bad(ms);
+            goto out;
+        }

+        if (header_len > expected_len) {
+            error_report("RP: Received message 0x%04x with"
+                    "incorrect length %d expecting %d",
+                    header_type, header_len,
+                    expected_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, header_len);
+        if (res != header_len) {
+            trace_source_return_path_thread_failed_read_cmd_data();
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the message and the data */
+        switch (header_type) {
+        case MIG_RP_MSG_SHUT:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_shut(tmp32);
+            if (tmp32) {
+                error_report("RP: Sibling indicated error %d", tmp32);
+                source_return_path_bad(ms);
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RP_MSG_PONG:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            trace_source_return_path_thread_pong(tmp32);
+            break;
+
+        default:
+            break;
+        }
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        trace_source_return_path_thread_bad_end();
+        source_return_path_bad(ms);
+    }
+
+    trace_source_return_path_thread_end();
+out:
+    return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_return_path_on_source(MigrationState *ms)
+{
+
+    ms->rp_state.file = qemu_file_get_return_path(ms->file);
+    if (!ms->rp_state.file) {
+        return -1;
+    }
+
+    trace_open_return_path_on_source();
+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+    trace_open_return_path_on_source_continue();
+
+    return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
+static int await_return_path_close_on_source(MigrationState *ms)
+{
+    /*
+     * If this is a normal exit then the destination will send a SHUT and the
+     * rp_thread will exit, however if there's an error we need to cause
+     * it to exit, which we can do by a shutdown.
+     * (canceling must also shutdown to stop us getting stuck here if
+     * the destination died at just the wrong place)
+     */
+    if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
+        qemu_file_shutdown(ms->rp_state.file);
+    }
+    trace_await_return_path_close_on_source_joining();
+    qemu_thread_join(&ms->rp_state.rp_thread);
+    trace_await_return_path_close_on_source_close();
+    return ms->rp_state.error;
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
  static void *migration_thread(void *opaque)
  {
      MigrationState *s = opaque;
diff --git a/trace-events b/trace-events
index 5738e3f..282cde1 100644
--- a/trace-events
+++ b/trace-events
@@ -1394,12 +1394,24 @@ flic_no_device_api(int err) "flic: no Device Contral API 
support %d"
  flic_reset_failed(int err) "flic: reset failed %d"

  # migration.c
+await_return_path_close_on_source_close(void) ""
+await_return_path_close_on_source_joining(void) ""
  migrate_set_state(int new_state) "new state %d"
  migrate_fd_cleanup(void) ""
+migrate_fd_cleanup_src_rp(void) ""
  migrate_fd_error(void) ""
  migrate_fd_cancel(void) ""
  migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" 
PRIu64
  migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d"
+open_return_path_on_source(void) ""
+open_return_path_on_source_continue(void) ""
+source_return_path_thread_bad_end(void) ""
+source_return_path_thread_end(void) ""
+source_return_path_thread_entry(void) ""
+source_return_path_thread_failed_read_cmd_data(void) ""
+source_return_path_thread_loop_top(void) ""
+source_return_path_thread_pong(uint32_t val) "%x"
+source_return_path_thread_shut(uint32_t val) "%x"
  migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) 
"transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64

  # migration/rdma.c



--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK

.




Reply via email to