On 6/21/2024 5:21, Fabiano Rosas wrote:> Multifd currently has a simple scheduling mechanism that distributes > work to the various channels by providing the client (producer) with a > memory slot and swapping that slot with free slot from the next idle > channel (consumer). Or graphically: > > [] <-- multifd_send_state->pages > [][][][] <-- channels' p->pages pointers > > 1) client fills the empty slot with data: > [a] > [][][][] > > 2) multifd_send_pages() finds an idle channel and swaps the pointers: > [a] > [][][][] > ^idle > > [] > [a][][][] > > 3) client can immediately fill new slot with more data: > [b] > [a][][][] > > 4) channel processes the data, the channel slot is now free to use > again: > [b] > [][][][] > > This works just fine, except that it doesn't allow different types of > payloads to be processed at the same time in different channels, > i.e. the data type of multifd_send_state->pages needs to be the same > as p->pages. For each new data type different from MultiFDPage_t that > is to be handled, this logic needs to be duplicated by adding new > fields to multifd_send_state and to the channels. > > The core of the issue here is that we're using the channel parameters > (MultiFDSendParams) to hold the storage space on behalf of the multifd > client (currently ram.c). This is cumbersome because it forces us to > change multifd_send_pages() to check the data type being handled > before deciding which field to use. > > One way to solve this is to detach the storage space from the multifd > channel and put it somewhere else, in control of the multifd > client. That way, multifd_send_pages() can operate on an opaque > pointer without needing to be adapted to each new data type. Implement > this logic with a new "slots" abstraction: > > struct MultiFDSendData { > void *opaque; > size_t size; > } > > struct MultiFDSlots { > MultiFDSendData **free; <-- what used to be p->pages > MultiFDSendData *active; <-- what used to be multifd_send_state->pages > }; > > Each multifd client now gets one set of slots to use. The slots are > passed into multifd_send_pages() (renamed to multifd_send). The > channels now only hold a pointer to the generic MultiFDSendData, and > after it's processed that reference can be dropped. > > Or graphically: > > 1) client fills the active slot with data. Channels point to nothing > at this point: > [a] <-- active slot > [][][][] <-- free slots, one per-channel > > [][][][] <-- channels' p->data pointers > > 2) multifd_send() swaps the pointers inside the client slot. Channels > still point to nothing: > [] > [a][][][] > > [][][][] > > 3) multifd_send() finds an idle channel and updates its pointer:
It seems the action "finds an idle channel" is in step 2 rather than step 3, which means the free slot is selected based on the id of the channel found, am I understanding correctly? > [] > [a][][][] > > [a][][][] > ^idle > > 4) a second client calls multifd_send(), but with it's own slots: > [] [b] > [a][][][] [][][][] > > [a][][][] > > 5) multifd_send() does steps 2 and 3 again: > [] [] > [a][][][] [][b][][] > > [a][b][][] > ^idle > > 6) The channels continue processing the data and lose/acquire the > references as multifd_send() updates them. The free lists of each > client are not affected. > > Signed-off-by: Fabiano Rosas <faro...@suse.de> > --- > migration/multifd.c | 119 +++++++++++++++++++++++++++++++------------- > migration/multifd.h | 17 +++++++ > migration/ram.c | 1 + > 3 files changed, 102 insertions(+), 35 deletions(-) > > diff --git a/migration/multifd.c b/migration/multifd.c > index 6fe339b378..f22a1c2e84 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -97,6 +97,30 @@ struct { > MultiFDMethods *ops; > } *multifd_recv_state; > > +MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void), > + void (*reset_fn)(void *), > + void (*cleanup_fn)(void *)) > +{ > + int thread_count = migrate_multifd_channels(); > + MultiFDSlots *slots = g_new0(MultiFDSlots, 1); > + > + slots->active = g_new0(MultiFDSendData, 1); > + slots->free = g_new0(MultiFDSendData *, thread_count); > + > + slots->active->opaque = alloc_fn(); > + slots->active->reset = reset_fn; > + slots->active->cleanup = cleanup_fn; > + > + for (int i = 0; i < thread_count; i++) { > + slots->free[i] = g_new0(MultiFDSendData, 1); > + slots->free[i]->opaque = alloc_fn(); > + slots->free[i]->reset = reset_fn; > + slots->free[i]->cleanup = cleanup_fn; > + } > + > + return slots; > +} > + > static bool multifd_use_packets(void) > { > return !migrate_mapped_ram(); > @@ -313,8 +337,10 @@ void multifd_register_ops(int method, MultiFDMethods > *ops) > } > > /* Reset a MultiFDPages_t* object for the next use */ > -static void multifd_pages_reset(MultiFDPages_t *pages) > +static void multifd_pages_reset(void *opaque) > { > + MultiFDPages_t *pages = opaque; > + > /* > * We don't need to touch offset[] array, because it will be > * overwritten later when reused. > @@ -388,8 +414,9 @@ static int multifd_recv_initial_packet(QIOChannel *c, > Error **errp) > return msg.id; > } > > -static MultiFDPages_t *multifd_pages_init(uint32_t n) > +static void *multifd_pages_init(void) > { > + uint32_t n = MULTIFD_PACKET_SIZE / qemu_target_page_size(); > MultiFDPages_t *pages = g_new0(MultiFDPages_t, 1); > > pages->allocated = n; > @@ -398,13 +425,24 @@ static MultiFDPages_t *multifd_pages_init(uint32_t n) > return pages; > } > > -static void multifd_pages_clear(MultiFDPages_t *pages) > +static void multifd_pages_clear(void *opaque) > { > + MultiFDPages_t *pages = opaque; > + > multifd_pages_reset(pages); > pages->allocated = 0; > g_free(pages->offset); > pages->offset = NULL; > - g_free(pages); > +} > + > +/* TODO: move these to multifd-ram.c */ > +MultiFDSlots *multifd_ram_send_slots; > + > +void multifd_ram_save_setup(void) > +{ > + multifd_ram_send_slots = multifd_allocate_slots(multifd_pages_init, > + multifd_pages_reset, > + multifd_pages_clear); > } > > static void multifd_ram_fill_packet(MultiFDSendParams *p) > @@ -617,13 +655,12 @@ static void multifd_send_kick_main(MultiFDSendParams *p) > * > * Returns true if succeed, false otherwise. > */ > -static bool multifd_send_pages(void) > +static bool multifd_send(MultiFDSlots *slots) > { > int i; > static int next_channel; > MultiFDSendParams *p = NULL; /* make happy gcc */ > - MultiFDPages_t *channel_pages; > - MultiFDSendData *data = multifd_send_state->data; > + MultiFDSendData *active_slot; > > if (multifd_send_should_exit()) { > return false; > @@ -659,11 +696,24 @@ static bool multifd_send_pages(void) > */ > smp_mb_acquire(); > > - channel_pages = p->data->opaque; > - assert(!channel_pages->num); > + assert(!slots->free[p->id]->size); > + > + /* > + * Swap the slots. The client gets a free slot to fill up for the > + * next iteration and the channel gets the active slot for > + * processing. > + */ > + active_slot = slots->active; > + slots->active = slots->free[p->id]; > + p->data = active_slot; > + > + /* > + * By the next time we arrive here, the channel will certainly > + * have consumed the active slot. Put it back on the free list > + * now. > + */ > + slots->free[p->id] = active_slot; > > - multifd_send_state->data = p->data; > - p->data = data; > /* > * Making sure p->data is setup before marking pending_job=true. Pairs > * with the qatomic_load_acquire() in multifd_send_thread(). > @@ -687,6 +737,7 @@ static inline bool multifd_queue_full(MultiFDPages_t > *pages) > static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) > { > pages->offset[pages->num++] = offset; > + multifd_ram_send_slots->active->size += qemu_target_page_size(); > } > > /* Returns true if enqueue successful, false otherwise */ > @@ -695,7 +746,7 @@ bool multifd_queue_page(RAMBlock *block, ram_addr_t > offset) > MultiFDPages_t *pages; > > retry: > - pages = multifd_send_state->data->opaque; > + pages = multifd_ram_send_slots->active->opaque; > > /* If the queue is empty, we can already enqueue now */ > if (multifd_queue_empty(pages)) { > @@ -713,7 +764,7 @@ retry: > * After flush, always retry. > */ > if (pages->block != block || multifd_queue_full(pages)) { > - if (!multifd_send_pages()) { > + if (!multifd_send(multifd_ram_send_slots)) { > return false; > } > goto retry; > @@ -825,10 +876,12 @@ static bool > multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) > qemu_sem_destroy(&p->sem_sync); > g_free(p->name); > p->name = NULL; > - multifd_pages_clear(p->data->opaque); > - p->data->opaque = NULL; > - g_free(p->data); > - p->data = NULL; > + if (p->data) { > + p->data->cleanup(p->data->opaque); > + p->data->opaque = NULL; > + /* p->data was not allocated by us, just clear the pointer */ > + p->data = NULL; > + } > p->packet_len = 0; > g_free(p->packet); > p->packet = NULL; > @@ -845,10 +898,6 @@ static void multifd_send_cleanup_state(void) > qemu_sem_destroy(&multifd_send_state->channels_ready); > g_free(multifd_send_state->params); > multifd_send_state->params = NULL; > - multifd_pages_clear(multifd_send_state->data->opaque); > - multifd_send_state->data->opaque = NULL; > - g_free(multifd_send_state->data); > - multifd_send_state->data = NULL; > g_free(multifd_send_state); > multifd_send_state = NULL; > } > @@ -897,14 +946,13 @@ int multifd_send_sync_main(void) > { > int i; > bool flush_zero_copy; > - MultiFDPages_t *pages; > > if (!migrate_multifd()) { > return 0; > } > - pages = multifd_send_state->data->opaque; > - if (pages->num) { > - if (!multifd_send_pages()) { > + > + if (multifd_ram_send_slots->active->size) { > + if (!multifd_send(multifd_ram_send_slots)) { > error_report("%s: multifd_send_pages fail", __func__); > return -1; > } > @@ -979,13 +1027,11 @@ static void *multifd_send_thread(void *opaque) > > /* > * Read pending_job flag before p->data. Pairs with the > - * qatomic_store_release() in multifd_send_pages(). > + * qatomic_store_release() in multifd_send(). > */ > if (qatomic_load_acquire(&p->pending_job)) { > - MultiFDPages_t *pages = p->data->opaque; > - > p->iovs_num = 0; > - assert(pages->num); > + assert(p->data->size); > > ret = multifd_send_state->ops->send_prepare(p, &local_err); > if (ret != 0) { > @@ -1008,13 +1054,20 @@ static void *multifd_send_thread(void *opaque) > stat64_add(&mig_stats.multifd_bytes, > p->next_packet_size + p->packet_len); > > - multifd_pages_reset(pages); > p->next_packet_size = 0; > > + /* > + * The data has now been sent. Since multifd_send() > + * already put this slot on the free list, reset the > + * entire slot before releasing the barrier below. > + */ > + p->data->size = 0; > + p->data->reset(p->data->opaque); > + > /* > * Making sure p->data is published before saying "we're > * free". Pairs with the smp_mb_acquire() in > - * multifd_send_pages(). > + * multifd_send(). > */ > qatomic_store_release(&p->pending_job, false); > } else { > @@ -1208,8 +1261,6 @@ bool multifd_send_setup(void) > thread_count = migrate_multifd_channels(); > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); > - multifd_send_state->data = g_new0(MultiFDSendData, 1); > - multifd_send_state->data->opaque = multifd_pages_init(page_count); > qemu_sem_init(&multifd_send_state->channels_created, 0); > qemu_sem_init(&multifd_send_state->channels_ready, 0); > qatomic_set(&multifd_send_state->exiting, 0); > @@ -1221,8 +1272,6 @@ bool multifd_send_setup(void) > qemu_sem_init(&p->sem, 0); > qemu_sem_init(&p->sem_sync, 0); > p->id = i; > - p->data = g_new0(MultiFDSendData, 1); > - p->data->opaque = multifd_pages_init(page_count); > > if (use_packets) { > p->packet_len = sizeof(MultiFDPacket_t) > diff --git a/migration/multifd.h b/migration/multifd.h > index 2029bfd80a..5230729077 100644 > --- a/migration/multifd.h > +++ b/migration/multifd.h > @@ -17,6 +17,10 @@ > > typedef struct MultiFDRecvData MultiFDRecvData; > typedef struct MultiFDSendData MultiFDSendData; > +typedef struct MultiFDSlots MultiFDSlots; > + > +typedef void *(multifd_data_alloc_cb)(void); > +typedef void (multifd_data_cleanup_cb)(void *); > > bool multifd_send_setup(void); > void multifd_send_shutdown(void); > @@ -93,8 +97,21 @@ struct MultiFDRecvData { > struct MultiFDSendData { > void *opaque; > size_t size; > + /* reset the slot for reuse after successful transfer */ > + void (*reset)(void *); > + void (*cleanup)(void *); > }; > > +struct MultiFDSlots { > + MultiFDSendData **free; > + MultiFDSendData *active; > +}; > + > +MultiFDSlots *multifd_allocate_slots(void *(*alloc_fn)(void), > + void (*reset_fn)(void *), > + void (*cleanup_fn)(void *)); > +void multifd_ram_save_setup(void); > + > typedef struct { > /* Fields are only written at creating/deletion time */ > /* No lock required for them, they are read only */ > diff --git a/migration/ram.c b/migration/ram.c > index ceea586b06..c33a9dcf3f 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -3058,6 +3058,7 @@ static int ram_save_setup(QEMUFile *f, void *opaque, > Error **errp) > migration_ops = g_malloc0(sizeof(MigrationOps)); > > if (migrate_multifd()) { > + multifd_ram_save_setup(); > migration_ops->ram_save_target_page = ram_save_target_page_multifd; > } else { > migration_ops->ram_save_target_page = ram_save_target_page_legacy;