Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work

2018-01-23 Thread Dr. David Alan Gilbert
* Juan Quintela (quint...@redhat.com) wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela 

General comment; given it's reasonably large, it would be nice to split
this into a send and a receive patch.
Some tracing wouldn't do any harm - it's going to be fun debugging
failures where some sockets connect and then fail, so tracing would
help.


> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c   | 114 
> +++---
>  migration/ram.h   |   3 ++
>  migration/socket.c|  39 -
>  migration/socket.h|  10 +
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  QEMUFile *f = qemu_fopen_channel_input(ioc);
>  migration_fd_process_incoming(f);
>  }
> -/* We still only have a single channel.  Nothing to do here yet */
> +multifd_recv_new_channel(ioc);

OK, that looks a little odd at the moment - do we grow the
recv_new_channel and still leave the first one in there?

>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +if (migrate_use_multifd()) {
> +int thread_count = migrate_multifd_channels();
> +
> +return thread_count == multifd_created_channels();
> +}
>  return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>  uint8_t id;
>  char *name;
>  QemuThread thread;
> +QIOChannel *c;
>  QemuSemaphore sem;
>  QemuMutex mutex;
>  bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>  int i;
>  
> +if (errp) {
> +MigrationState *s = migrate_get_current();
> +migrate_set_error(s, errp);
> +if (s->state == MIGRATION_STATUS_SETUP ||
> +s->state == MIGRATION_STATUS_ACTIVE) {
> +migrate_set_state(>state, s->state,
> +  MIGRATION_STATUS_FAILED);
> +}
> +}
>  for (i = 0; i < multifd_send_state->count; i++) {
>  MultiFDSendParams *p = _send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>  qemu_thread_join(>thread);
>  qemu_mutex_destroy(>mutex);
>  qemu_sem_destroy(>sem);
> +socket_send_channel_destroy(p->c);
>  g_free(p->name);
>  p->name = NULL;
>  }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>  return ret;
>  }
>  
> +typedef struct {
> +uint32_t version;
> +unsigned char uuid[16]; /* QemuUUID */
> +uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>  MultiFDSendParams *p = opaque;
> +

Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work

2018-01-21 Thread Peter Xu
On Wed, Jan 10, 2018 at 01:47:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela 
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c   | 114 
> +++---
>  migration/ram.h   |   3 ++
>  migration/socket.c|  39 -
>  migration/socket.h|  10 +
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  QEMUFile *f = qemu_fopen_channel_input(ioc);
>  migration_fd_process_incoming(f);
>  }
> -/* We still only have a single channel.  Nothing to do here yet */
> +multifd_recv_new_channel(ioc);
>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +if (migrate_use_multifd()) {
> +int thread_count = migrate_multifd_channels();
> +
> +return thread_count == multifd_created_channels();
> +}
>  return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>  uint8_t id;
>  char *name;
>  QemuThread thread;
> +QIOChannel *c;
>  QemuSemaphore sem;
>  QemuMutex mutex;
>  bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>  int i;
>  
> +if (errp) {
> +MigrationState *s = migrate_get_current();
> +migrate_set_error(s, errp);
> +if (s->state == MIGRATION_STATUS_SETUP ||
> +s->state == MIGRATION_STATUS_ACTIVE) {
> +migrate_set_state(>state, s->state,
> +  MIGRATION_STATUS_FAILED);

I'm fine with it, but could I ask why we explicitly pass the error
into this function and handle the state machine transition here?
Asked since IMHO we know the error already when calling
terminate_multifd_send_threads() because we passed it in as parameter,
then why not we do that there?

[1]

> +}
> +}
>  for (i = 0; i < multifd_send_state->count; i++) {
>  MultiFDSendParams *p = _send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>  qemu_thread_join(>thread);
>  qemu_mutex_destroy(>mutex);
>  qemu_sem_destroy(>sem);
> +socket_send_channel_destroy(p->c);
>  g_free(p->name);
>  p->name = NULL;
>  }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>  return ret;
>  }
>  
> +typedef struct {
> +uint32_t version;
> +unsigned char uuid[16]; /* QemuUUID */
> +uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>  MultiFDSendParams *p = opaque;
> +MultiFDInit_t msg;
> +Error *local_err = NULL;
> +