On Wed, Feb 25, 2015 at 04:51:35PM +0000, Dr. David Alan Gilbert (git) wrote:
> From: "Dr. David Alan Gilbert" <dgilb...@redhat.com>
> 
> Open a return path, and handle messages that are received upon it.
> 
> Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com>
> ---
>  include/migration/migration.h |   8 ++
>  migration/migration.c         | 178 
> +++++++++++++++++++++++++++++++++++++++++-
>  trace-events                  |  13 +++
>  3 files changed, 198 insertions(+), 1 deletion(-)
> 
> diff --git a/include/migration/migration.h b/include/migration/migration.h
> index 6775747..5242ead 100644
> --- a/include/migration/migration.h
> +++ b/include/migration/migration.h
> @@ -73,6 +73,14 @@ struct MigrationState
>  
>      int state;
>      MigrationParams params;
> +
> +    /* State related to return path */
> +    struct {
> +        QEMUFile     *file;
> +        QemuThread    rp_thread;
> +        bool          error;
> +    } rp_state;
> +
>      double mbps;
>      int64_t total_time;
>      int64_t downtime;
> diff --git a/migration/migration.c b/migration/migration.c
> index 80d234c..34cd4fe 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -237,6 +237,23 @@ MigrationCapabilityStatusList 
> *qmp_query_migrate_capabilities(Error **errp)
>      return head;
>  }
>  
> +/*
> + * Return true if we're already in the middle of a migration
> + * (i.e. any of the active or setup states)
> + */
> +static bool migration_already_active(MigrationState *ms)
> +{
> +    switch (ms->state) {
> +    case MIG_STATE_ACTIVE:
> +    case MIG_STATE_SETUP:
> +        return true;
> +
> +    default:
> +        return false;
> +
> +    }
> +}
> +
>  static void get_xbzrle_cache_stats(MigrationInfo *info)
>  {
>      if (migrate_use_xbzrle()) {
> @@ -362,6 +379,21 @@ static void migrate_set_state(MigrationState *s, int 
> old_state, int new_state)
>      }
>  }
>  
> +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
> +{
> +    QEMUFile *rp = ms->rp_state.file;
> +
> +    /*
> +     * When stuff goes wrong (e.g. failing destination) on the rp, it can get
> +     * cleaned up from a few threads; make sure not to do it twice in 
> parallel
> +     */
> +    rp = atomic_cmpxchg(&ms->rp_state.file, rp, NULL);

A cmpxchg seems dangerously subtle for such a basic and infrequent
operation, but ok.

> +    if (rp) {
> +        trace_migrate_fd_cleanup_src_rp();
> +        qemu_fclose(rp);
> +    }
> +}
> +
>  static void migrate_fd_cleanup(void *opaque)
>  {
>      MigrationState *s = opaque;
> @@ -369,6 +401,8 @@ static void migrate_fd_cleanup(void *opaque)
>      qemu_bh_delete(s->cleanup_bh);
>      s->cleanup_bh = NULL;
>  
> +    migrate_fd_cleanup_src_rp(s);
> +
>      if (s->file) {
>          trace_migrate_fd_cleanup();
>          qemu_mutex_unlock_iothread();
> @@ -406,6 +440,11 @@ static void migrate_fd_cancel(MigrationState *s)
>      QEMUFile *f = migrate_get_current()->file;
>      trace_migrate_fd_cancel();
>  
> +    if (s->rp_state.file) {
> +        /* shutdown the rp socket, so causing the rp thread to shutdown */
> +        qemu_file_shutdown(s->rp_state.file);

I missed where qemu_file_shutdown() was implemented.  Does this
introduce a leftover socket dependency?

> +    }
> +
>      do {
>          old_state = s->state;
>          if (old_state != MIG_STATE_SETUP && old_state != MIG_STATE_ACTIVE) {
> @@ -658,8 +697,145 @@ int64_t migrate_xbzrle_cache_size(void)
>      return s->xbzrle_cache_size;
>  }
>  
> -/* migration thread support */
> +/*
> + * Something bad happened to the RP stream, mark an error
> + * The caller shall print something to indicate why
> + */
> +static void source_return_path_bad(MigrationState *s)
> +{
> +    s->rp_state.error = true;
> +    migrate_fd_cleanup_src_rp(s);
> +}
> +
> +/*
> + * Handles messages sent on the return path towards the source VM
> + *
> + */
> +static void *source_return_path_thread(void *opaque)
> +{
> +    MigrationState *ms = opaque;
> +    QEMUFile *rp = ms->rp_state.file;
> +    uint16_t expected_len, header_len, header_com;
> +    const int max_len = 512;
> +    uint8_t buf[max_len];
> +    uint32_t tmp32;
> +    int res;
> +
> +    trace_source_return_path_thread_entry();
> +    while (rp && !qemu_file_get_error(rp) &&
> +        migration_already_active(ms)) {
> +        trace_source_return_path_thread_loop_top();
> +        header_com = qemu_get_be16(rp);
> +        header_len = qemu_get_be16(rp);
> +
> +        switch (header_com) {
> +        case MIG_RP_CMD_SHUT:
> +        case MIG_RP_CMD_PONG:
> +            expected_len = 4;

Could the knowledge of expected lengths be folded into the switch
below?  Switching twice on the same thing is a bit icky.

> +            break;
> +
> +        default:
> +            error_report("RP: Received invalid cmd 0x%04x length 0x%04x",
> +                    header_com, header_len);
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
>  
> +        if (header_len > expected_len) {
> +            error_report("RP: Received command 0x%04x with"
> +                    "incorrect length %d expecting %d",
> +                    header_com, header_len,
> +                    expected_len);
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
> +
> +        /* We know we've got a valid header by this point */
> +        res = qemu_get_buffer(rp, buf, header_len);
> +        if (res != header_len) {
> +            trace_source_return_path_thread_failed_read_cmd_data();
> +            source_return_path_bad(ms);
> +            goto out;
> +        }
> +
> +        /* OK, we have the command and the data */
> +        switch (header_com) {
> +        case MIG_RP_CMD_SHUT:
> +            tmp32 = be32_to_cpup((uint32_t *)buf);
> +            trace_source_return_path_thread_shut(tmp32);
> +            if (tmp32) {
> +                error_report("RP: Sibling indicated error %d", tmp32);
> +                source_return_path_bad(ms);
> +            }
> +            /*
> +             * We'll let the main thread deal with closing the RP
> +             * we could do a shutdown(2) on it, but we're the only user
> +             * anyway, so there's nothing gained.
> +             */
> +            goto out;
> +
> +        case MIG_RP_CMD_PONG:
> +            tmp32 = be32_to_cpup((uint32_t *)buf);
> +            trace_source_return_path_thread_pong(tmp32);
> +            break;
> +
> +        default:
> +            /* This shouldn't happen because we should catch this above */
> +            trace_source_return_path_bad_header_com();
> +        }
> +        /* Latest command processed, now leave a gap for the next one */
> +        header_com = MIG_RP_CMD_INVALID;

This assignment will always get overwritten.

> +    }
> +    if (rp && qemu_file_get_error(rp)) {
> +        trace_source_return_path_thread_bad_end();
> +        source_return_path_bad(ms);
> +    }
> +
> +    trace_source_return_path_thread_end();
> +out:
> +    return NULL;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static int open_outgoing_return_path(MigrationState *ms)

Uh.. surely this should be open_incoming_return_path(); it's designed
to be used on the source side, AFAICT.

> +{
> +
> +    ms->rp_state.file = qemu_file_get_return_path(ms->file);
> +    if (!ms->rp_state.file) {
> +        return -1;
> +    }
> +
> +    trace_open_outgoing_return_path();
> +    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
> +                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
> +
> +    trace_open_outgoing_return_path_continue();
> +
> +    return 0;
> +}
> +
> +__attribute__ (( unused )) /* Until later in patch series */
> +static void await_outgoing_return_path_close(MigrationState *ms)

Likewise "incoming" here, surely.

> +{
> +    /*
> +     * If this is a normal exit then the destination will send a SHUT and the
> +     * rp_thread will exit, however if there's an error we need to cause
> +     * it to exit, which we can do by a shutdown.
> +     * (canceling must also shutdown to stop us getting stuck here if
> +     * the destination died at just the wrong place)
> +     */
> +    if (qemu_file_get_error(ms->file) && ms->rp_state.file) {
> +        qemu_file_shutdown(ms->rp_state.file);
> +    }
> +    trace_await_outgoing_return_path_joining();
> +    qemu_thread_join(&ms->rp_state.rp_thread);
> +    trace_await_outgoing_return_path_close();
> +}
> +
> +/*
> + * Master migration thread on the source VM.
> + * It drives the migration and pumps the data down the outgoing channel.
> + */
>  static void *migration_thread(void *opaque)
>  {
>      MigrationState *s = opaque;
> diff --git a/trace-events b/trace-events
> index 4f3eff8..1951b25 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -1374,12 +1374,25 @@ flic_no_device_api(int err) "flic: no Device Contral 
> API support %d"
>  flic_reset_failed(int err) "flic: reset failed %d"
>  
>  # migration.c
> +await_outgoing_return_path_close(void) ""
> +await_outgoing_return_path_joining(void) ""
>  migrate_set_state(int new_state) "new state %d"
>  migrate_fd_cleanup(void) ""
> +migrate_fd_cleanup_src_rp(void) ""
>  migrate_fd_error(void) ""
>  migrate_fd_cancel(void) ""
>  migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max 
> %" PRIu64
>  migrate_send_rp_message(int cmd, uint16_t len) "cmd=%d, len=%d"
> +open_outgoing_return_path(void) ""
> +open_outgoing_return_path_continue(void) ""
> +source_return_path_thread_bad_end(void) ""
> +source_return_path_bad_header_com(void) ""
> +source_return_path_thread_end(void) ""
> +source_return_path_thread_entry(void) ""
> +source_return_path_thread_failed_read_cmd_data(void) ""
> +source_return_path_thread_loop_top(void) ""
> +source_return_path_thread_pong(uint32_t val) "%x"
> +source_return_path_thread_shut(uint32_t val) "%x"
>  migrate_transferred(uint64_t tranferred, uint64_t time_spent, double 
> bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " 
> bandwidth %g max_size %" PRId64
>  
>  # migration/rdma.c

-- 
David Gibson                    | I'll have my music baroque, and my code
david AT gibson.dropbear.id.au  | minimalist, thank you.  NOT _the_ _other_
                                | _way_ _around_!
http://www.ozlabs.org/~dgibson

Attachment: pgpyzHwbgs10m.pgp
Description: PGP signature

Reply via email to