On 6/21/2024 5:21, Fabiano Rosas wrote:> Multifd currently has a simple
scheduling mechanism that distributes
> work to the various channels by providing the client (producer) with a
> memory slot and swapping that slot with free slot from the next idle
> channel (consumer). Or graphically:
> 
> []       <-- multifd_send_state->pages
> [][][][] <-- channels' p->pages pointers
> 
> 1) client fills the empty slot with data:
>   [a]
>   [][][][]
> 
> 2) multifd_send_pages() finds an idle channel and swaps the pointers:
>   [a]
>   [][][][]
>   ^idle
> 
>   []
>   [a][][][]
> 
> 3) client can immediately fill new slot with more data:
>   [b]
>   [a][][][]
> 
> 4) channel processes the data, the channel slot is now free to use
>    again:
>   [b]
>   [][][][]
> 
> This works just fine, except that it doesn't allow different types of
> payloads to be processed at the same time in different channels,
> i.e. the data type of multifd_send_state->pages needs to be the same
> as p->pages. For each new data type different from MultiFDPage_t that
> is to be handled, this logic needs to be duplicated by adding new
> fields to multifd_send_state and to the channels.
> 
> The core of the issue here is that we're using the channel parameters
> (MultiFDSendParams) to hold the storage space on behalf of the multifd
> client (currently ram.c). This is cumbersome because it forces us to
> change multifd_send_pages() to check the data type being handled
> before deciding which field to use.
> 
> One way to solve this is to detach the storage space from the multifd
> channel and put it somewhere else, in control of the multifd
> client. That way, multifd_send_pages() can operate on an opaque
> pointer without needing to be adapted to each new data type. Implement
> this logic with a new "slots" abstraction:
> 
> struct MultiFDSendData {
>     void *opaque;
>     size_t size;
> }
> 
> struct MultiFDSlots {
>     MultiFDSendData **free;   <-- what used to be p->pages
>     MultiFDSendData *active;  <-- what used to be multifd_send_state->pages
> };
> 
> Each multifd client now gets one set of slots to use. The slots are
> passed into multifd_send_pages() (renamed to multifd_send). The
> channels now only hold a pointer to the generic MultiFDSendData, and
> after it's processed that reference can be dropped.
> 
> Or graphically:
> 
> 1) client fills the active slot with data. Channels point to nothing
>    at this point:
>   [a]      <-- active slot
>   [][][][] <-- free slots, one per-channel
> 
>   [][][][] <-- channels' p->data pointers
> 
> 2) multifd_send() swaps the pointers inside the client slot. Channels
>    still point to nothing:
>   []
>   [a][][][]
> 
>   [][][][]
> 
> 3) multifd_send() finds an idle channel and updates its pointer:

It seems the action "finds an idle channel" is in step 2 rather than step 3,
which means the free slot is selected based on the id of the channel found, am I
understanding correctly?

>   []
>   [a][][][]
> 
>   [a][][][]
>   ^idle
> 
> 4) a second client calls multifd_send(), but with it's own slots:
>   []          [b]
>   [a][][][]   [][][][]
> 
>         [a][][][]
> 
> 5) multifd_send() does steps 2 and 3 again:
>   []          []
>   [a][][][]   [][b][][]
> 
>         [a][b][][]
>            ^idle
> 
> 6) The channels continue processing the data and lose/acquire the
> references as multifd_send() updates them. The free lists of each
> client are not affected.
> 
> Signed-off-by: Fabiano Rosas <faro...@suse.de>
> ---
>  migration/multifd.c | 119 +++++++++++++++++++++++++++++++-------------
>  migration/multifd.h |  17 +++++++
>  migration/ram.c     |   1 +
>  3 files changed, 102 insertions(+), 35 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 6fe339b378..f22a1c2e84 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -97,6 +97,30 @@ struct {
>      MultiFDMethods *ops;
>  } *multifd_recv_state;
>  
> +MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void),
> +                                     void (*reset_fn)(void *),
> +                                     void (*cleanup_fn)(void *))
> +{
> +    int thread_count = migrate_multifd_channels();
> +    MultiFDSlots *slots = g_new0(MultiFDSlots, 1);
> +
> +    slots->active = g_new0(MultiFDSendData, 1);
> +    slots->free = g_new0(MultiFDSendData *, thread_count);
> +
> +    slots->active->opaque = alloc_fn();
> +    slots->active->reset = reset_fn;
> +    slots->active->cleanup = cleanup_fn;
> +
> +    for (int i = 0; i < thread_count; i++) {
> +        slots->free[i] = g_new0(MultiFDSendData, 1);
> +        slots->free[i]->opaque = alloc_fn();
> +        slots->free[i]->reset = reset_fn;
> +        slots->free[i]->cleanup = cleanup_fn;
> +    }
> +
> +    return slots;
> +}
> +
>  static bool multifd_use_packets(void)
>  {
>      return !migrate_mapped_ram();
> @@ -313,8 +337,10 @@ void multifd_register_ops(int method, MultiFDMethods 
> *ops)
>  }
>  
>  /* Reset a MultiFDPages_t* object for the next use */
> -static void multifd_pages_reset(MultiFDPages_t *pages)
> +static void multifd_pages_reset(void *opaque)
>  {
> +    MultiFDPages_t *pages = opaque;
> +
>      /*
>       * We don't need to touch offset[] array, because it will be
>       * overwritten later when reused.
> @@ -388,8 +414,9 @@ static int multifd_recv_initial_packet(QIOChannel *c, 
> Error **errp)
>      return msg.id;
>  }
>  
> -static MultiFDPages_t *multifd_pages_init(uint32_t n)
> +static void *multifd_pages_init(void)
>  {
> +    uint32_t n = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1);
>  
>      pages->allocated = n;
> @@ -398,13 +425,24 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n)
>      return pages;
>  }
>  
> -static void multifd_pages_clear(MultiFDPages_t *pages)
> +static void multifd_pages_clear(void *opaque)
>  {
> +    MultiFDPages_t *pages = opaque;
> +
>      multifd_pages_reset(pages);
>      pages->allocated = 0;
>      g_free(pages->offset);
>      pages->offset = NULL;
> -    g_free(pages);
> +}
> +
> +/* TODO: move these to multifd-ram.c */
> +MultiFDSlots *multifd_ram_send_slots;
> +
> +void multifd_ram_save_setup(void)
> +{
> +    multifd_ram_send_slots = multifd_allocate_slots(multifd_pages_init,
> +                                                    multifd_pages_reset,
> +                                                    multifd_pages_clear);
>  }
>  
>  static void multifd_ram_fill_packet(MultiFDSendParams *p)
> @@ -617,13 +655,12 @@ static void multifd_send_kick_main(MultiFDSendParams *p)
>   *
>   * Returns true if succeed, false otherwise.
>   */
> -static bool multifd_send_pages(void)
> +static bool multifd_send(MultiFDSlots *slots)
>  {
>      int i;
>      static int next_channel;
>      MultiFDSendParams *p = NULL; /* make happy gcc */
> -    MultiFDPages_t *channel_pages;
> -    MultiFDSendData *data = multifd_send_state->data;
> +    MultiFDSendData *active_slot;
>  
>      if (multifd_send_should_exit()) {
>          return false;
> @@ -659,11 +696,24 @@ static bool multifd_send_pages(void)
>       */
>      smp_mb_acquire();
>  
> -    channel_pages = p->data->opaque;
> -    assert(!channel_pages->num);
> +    assert(!slots->free[p->id]->size);
> +
> +    /*
> +     * Swap the slots. The client gets a free slot to fill up for the
> +     * next iteration and the channel gets the active slot for
> +     * processing.
> +     */
> +    active_slot = slots->active;
> +    slots->active = slots->free[p->id];
> +    p->data = active_slot;
> +
> +    /*
> +     * By the next time we arrive here, the channel will certainly
> +     * have consumed the active slot. Put it back on the free list
> +     * now.
> +     */
> +    slots->free[p->id] = active_slot;
>  
> -    multifd_send_state->data = p->data;
> -    p->data = data;
>      /*
>       * Making sure p->data is setup before marking pending_job=true. Pairs
>       * with the qatomic_load_acquire() in multifd_send_thread().
> @@ -687,6 +737,7 @@ static inline bool multifd_queue_full(MultiFDPages_t 
> *pages)
>  static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset)
>  {
>      pages->offset[pages->num++] = offset;
> +    multifd_ram_send_slots->active->size += qemu_target_page_size();
>  }
>  
>  /* Returns true if enqueue successful, false otherwise */
> @@ -695,7 +746,7 @@ bool multifd_queue_page(RAMBlock *block, ram_addr_t 
> offset)
>      MultiFDPages_t *pages;
>  
>  retry:
> -    pages = multifd_send_state->data->opaque;
> +    pages = multifd_ram_send_slots->active->opaque;
>  
>      /* If the queue is empty, we can already enqueue now */
>      if (multifd_queue_empty(pages)) {
> @@ -713,7 +764,7 @@ retry:
>       * After flush, always retry.
>       */
>      if (pages->block != block || multifd_queue_full(pages)) {
> -        if (!multifd_send_pages()) {
> +        if (!multifd_send(multifd_ram_send_slots)) {
>              return false;
>          }
>          goto retry;
> @@ -825,10 +876,12 @@ static bool 
> multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>      qemu_sem_destroy(&p->sem_sync);
>      g_free(p->name);
>      p->name = NULL;
> -    multifd_pages_clear(p->data->opaque);
> -    p->data->opaque = NULL;
> -    g_free(p->data);
> -    p->data = NULL;
> +    if (p->data) {
> +        p->data->cleanup(p->data->opaque);
> +        p->data->opaque = NULL;
> +        /* p->data was not allocated by us, just clear the pointer */
> +        p->data = NULL;
> +    }
>      p->packet_len = 0;
>      g_free(p->packet);
>      p->packet = NULL;
> @@ -845,10 +898,6 @@ static void multifd_send_cleanup_state(void)
>      qemu_sem_destroy(&multifd_send_state->channels_ready);
>      g_free(multifd_send_state->params);
>      multifd_send_state->params = NULL;
> -    multifd_pages_clear(multifd_send_state->data->opaque);
> -    multifd_send_state->data->opaque = NULL;
> -    g_free(multifd_send_state->data);
> -    multifd_send_state->data = NULL;
>      g_free(multifd_send_state);
>      multifd_send_state = NULL;
>  }
> @@ -897,14 +946,13 @@ int multifd_send_sync_main(void)
>  {
>      int i;
>      bool flush_zero_copy;
> -    MultiFDPages_t *pages;
>  
>      if (!migrate_multifd()) {
>          return 0;
>      }
> -    pages = multifd_send_state->data->opaque;
> -    if (pages->num) {
> -        if (!multifd_send_pages()) {
> +
> +    if (multifd_ram_send_slots->active->size) {
> +        if (!multifd_send(multifd_ram_send_slots)) {
>              error_report("%s: multifd_send_pages fail", __func__);
>              return -1;
>          }
> @@ -979,13 +1027,11 @@ static void *multifd_send_thread(void *opaque)
>  
>          /*
>           * Read pending_job flag before p->data.  Pairs with the
> -         * qatomic_store_release() in multifd_send_pages().
> +         * qatomic_store_release() in multifd_send().
>           */
>          if (qatomic_load_acquire(&p->pending_job)) {
> -            MultiFDPages_t *pages = p->data->opaque;
> -
>              p->iovs_num = 0;
> -            assert(pages->num);
> +            assert(p->data->size);
>  
>              ret = multifd_send_state->ops->send_prepare(p, &local_err);
>              if (ret != 0) {
> @@ -1008,13 +1054,20 @@ static void *multifd_send_thread(void *opaque)
>              stat64_add(&mig_stats.multifd_bytes,
>                         p->next_packet_size + p->packet_len);
>  
> -            multifd_pages_reset(pages);
>              p->next_packet_size = 0;
>  
> +            /*
> +             * The data has now been sent. Since multifd_send()
> +             * already put this slot on the free list, reset the
> +             * entire slot before releasing the barrier below.
> +             */
> +            p->data->size = 0;
> +            p->data->reset(p->data->opaque);
> +
>              /*
>               * Making sure p->data is published before saying "we're
>               * free".  Pairs with the smp_mb_acquire() in
> -             * multifd_send_pages().
> +             * multifd_send().
>               */
>              qatomic_store_release(&p->pending_job, false);
>          } else {
> @@ -1208,8 +1261,6 @@ bool multifd_send_setup(void)
>      thread_count = migrate_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> -    multifd_send_state->data = g_new0(MultiFDSendData, 1);
> -    multifd_send_state->data->opaque = multifd_pages_init(page_count);
>      qemu_sem_init(&multifd_send_state->channels_created, 0);
>      qemu_sem_init(&multifd_send_state->channels_ready, 0);
>      qatomic_set(&multifd_send_state->exiting, 0);
> @@ -1221,8 +1272,6 @@ bool multifd_send_setup(void)
>          qemu_sem_init(&p->sem, 0);
>          qemu_sem_init(&p->sem_sync, 0);
>          p->id = i;
> -        p->data = g_new0(MultiFDSendData, 1);
> -        p->data->opaque = multifd_pages_init(page_count);
>  
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 2029bfd80a..5230729077 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -17,6 +17,10 @@
>  
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  typedef struct MultiFDSendData MultiFDSendData;
> +typedef struct MultiFDSlots MultiFDSlots;
> +
> +typedef void *(multifd_data_alloc_cb)(void);
> +typedef void (multifd_data_cleanup_cb)(void *);
>  
>  bool multifd_send_setup(void);
>  void multifd_send_shutdown(void);
> @@ -93,8 +97,21 @@ struct MultiFDRecvData {
>  struct MultiFDSendData {
>      void *opaque;
>      size_t size;
> +    /* reset the slot for reuse after successful transfer */
> +    void (*reset)(void *);
> +    void (*cleanup)(void *);
>  };
>  
> +struct MultiFDSlots {
> +    MultiFDSendData **free;
> +    MultiFDSendData *active;
> +};
> +
> +MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void),
> +                                     void (*reset_fn)(void *),
> +                                     void (*cleanup_fn)(void *));
> +void multifd_ram_save_setup(void);
> +
>  typedef struct {
>      /* Fields are only written at creating/deletion time */
>      /* No lock required for them, they are read only */
> diff --git a/migration/ram.c b/migration/ram.c
> index ceea586b06..c33a9dcf3f 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -3058,6 +3058,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque, 
> Error **errp)
>      migration_ops = g_malloc0(sizeof(MigrationOps));
>  
>      if (migrate_multifd()) {
> +        multifd_ram_save_setup();
>          migration_ops->ram_save_target_page = ram_save_target_page_multifd;
>      } else {
>          migration_ops->ram_save_target_page = ram_save_target_page_legacy;

Reply via email to