On 2014/10/4 1:47, Dr. David Alan Gilbert (git) wrote:
From: "Dr. David Alan Gilbert" <dgilb...@redhat.com>

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com>
---
  include/migration/migration.h |  10 +++
  migration.c                   | 181 +++++++++++++++++++++++++++++++++++++++++-
  2 files changed, 190 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 12e640d..b87c289 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -47,6 +47,14 @@ enum mig_rpcomm_cmd {
      MIG_RPCOMM_ACK,          /* data (seq: be32 ) */
      MIG_RPCOMM_AFTERLASTVALID
  };
+
+/* Source side RP state */
+struct MigrationRetPathState {
+    uint32_t      latest_ack;
+    QemuThread    rp_thread;
+    bool          error;
+};
+
  typedef struct MigrationState MigrationState;

  /* State for the incoming migration */
@@ -69,9 +77,11 @@ struct MigrationState
      QemuThread thread;
      QEMUBH *cleanup_bh;
      QEMUFile *file;
+    QEMUFile *return_path;

      int state;
      MigrationParams params;
+    struct MigrationRetPathState rp_state;
      double mbps;
      int64_t total_time;
      int64_t downtime;
diff --git a/migration.c b/migration.c
index 5ba8f3e..ee6db1d 100644
--- a/migration.c
+++ b/migration.c
@@ -246,6 +246,23 @@ MigrationCapabilityStatusList 
*qmp_query_migrate_capabilities(Error **errp)
      return head;
  }

+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+    switch (ms->state) {
+    case MIG_STATE_ACTIVE:
+    case MIG_STATE_SETUP:
+        return true;
+
+    default:
+        return false;
+
+    }
+}
+
  static void get_xbzrle_cache_stats(MigrationInfo *info)
  {
      if (migrate_use_xbzrle()) {
@@ -371,6 +388,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
      }
  }

+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+    QEMUFile *rp = ms->return_path;
+
+    /*
+     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+     * cleaned up from a few threads; make sure not to do it twice in parallel
+     */
+    rp = atomic_cmpxchg(&ms->return_path, rp, NULL);
+    if (rp) {
+        DPRINTF("cleaning up return path\n");
+        qemu_fclose(rp);
+    }
+}
+
  static void migrate_fd_cleanup(void *opaque)
  {
      MigrationState *s = opaque;
@@ -378,6 +410,8 @@ static void migrate_fd_cleanup(void *opaque)
      qemu_bh_delete(s->cleanup_bh);
      s->cleanup_bh = NULL;

+    migrate_fd_cleanup_src_rp(s);
+
      if (s->file) {
          trace_migrate_fd_cleanup();
          qemu_mutex_unlock_iothread();
@@ -414,6 +448,11 @@ static void migrate_fd_cancel(MigrationState *s)
      int old_state ;
      trace_migrate_fd_cancel();

+    if (s->return_path) {
+        /* shutdown the rp socket, so causing the rp thread to shutdown */
+        qemu_file_shutdown(s->return_path);
+    }
+
      do {
          old_state = s->state;
          if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
@@ -655,8 +694,148 @@ int64_t migrate_xbzrle_cache_size(void)
      return s->xbzrle_cache_size;
  }

-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+    s->rp_state.error = true;
+    migrate_fd_cleanup_src_rp(s);
+}

+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+    MigrationState *ms = opaque;
+    QEMUFile *rp = ms->return_path;
+    uint16_t expected_len, header_len, header_com;
+    const int max_len = 512;
+    uint8_t buf[max_len];
+    uint32_t tmp32;
+    int res;
+
+    DPRINTF("RP: %s entry", __func__);
+    while (rp && !qemu_file_get_error(rp) &&
+        migration_already_active(ms)) {
+        DPRINTF("RP: %s top of loop", __func__);
+        header_com = qemu_get_be16(rp);
+        header_len = qemu_get_be16(rp);
+
+        switch (header_com) {
+        case MIG_RPCOMM_SHUT:
+        case MIG_RPCOMM_ACK:
+            expected_len = 4;
+            break;
+
+        default:
+            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
+                    header_com, header_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        if (header_len > expected_len) {
+            error_report("RP: Received command 0x%04x with"
+                    "incorrect length %d expecting %d",
+                    header_com, header_len,
+                    expected_len);
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* We know we've got a valid header by this point */
+        res = qemu_get_buffer(rp, buf, header_len);
+        if (res != header_len) {
+            DPRINTF("RP: Failed to read command data");
+            source_return_path_bad(ms);
+            goto out;
+        }
+
+        /* OK, we have the command and the data */
+        switch (header_com) {
+        case MIG_RPCOMM_SHUT:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            if (tmp32) {
+                error_report("RP: Sibling indicated error %d", tmp32);
+                source_return_path_bad(ms);
+            } else {
+                DPRINTF("RP: SHUT received");
+            }
+            /*
+             * We'll let the main thread deal with closing the RP
+             * we could do a shutdown(2) on it, but we're the only user
+             * anyway, so there's nothing gained.
+             */
+            goto out;
+
+        case MIG_RPCOMM_ACK:
+            tmp32 = be32_to_cpup((uint32_t *)buf);
+            DPRINTF("RP: Received ACK 0x%x", tmp32);
+            atomic_xchg(&ms->rp_state.latest_ack, tmp32);

I didn't see *ms->rp_state.latest_ack* been used elsewhere, what's it used 
for?;)

+            break;
+
+        default:
+            /* This shouldn't happen because we should catch this above */
+            DPRINTF("RP: Bad header_com in dispatch");
+        }
+        /* Latest command processed, now leave a gap for the next one */
+        header_com = MIG_RPCOMM_INVALID;
+    }
+    if (rp && qemu_file_get_error(rp)) {
+        DPRINTF("%s: rp bad at end", __func__);
+        source_return_path_bad(ms);
+    }
+
+    DPRINTF("%s: Bottom exit", __func__);
+
+out:
+    return NULL;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static int open_outgoing_return_path(MigrationState *ms)
+{
+
+    ms->return_path = qemu_file_get_return_path(ms->file);
+    if (!ms->return_path) {
+        return -1;
+    }
+
+    DPRINTF("%s: starting thread", __func__);
+    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
+                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
+
+    DPRINTF("%s: continuing", __func__);
+
+    return 0;
+}
+
+__attribute__ (( unused )) /* Until later in patch series */
+static void await_outgoing_return_path_close(MigrationState *ms)
+{
+    /*
+     * If this is a normal exit then the destination will send a SHUT and the
+     * rp_thread will exit, however if there's an error we need to cause
+     * it to exit, which we can do by a shutdown.
+     * (canceling must also shutdown to stop us getting stuck here if
+     * the destination died at just the wrong place)
+     */
+    if (qemu_file_get_error(ms->file) && ms->return_path) {
+        qemu_file_shutdown(ms->return_path);
+    }
+    DPRINTF("%s: Joining", __func__);
+    qemu_thread_join(&ms->rp_state.rp_thread);
+    DPRINTF("%s: Exit", __func__);
+}
+
+/*
+ * Master migration thread on the source VM.
+ * It drives the migration and pumps the data down the outgoing channel.
+ */
  static void *migration_thread(void *opaque)
  {
      MigrationState *s = opaque;




Reply via email to