From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com> A new function multifd_queue_device_state() is provided for device to queue its state for transmission via a multifd channel.
Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com> --- include/migration/misc.h | 4 + migration/multifd-zlib.c | 2 +- migration/multifd-zstd.c | 2 +- migration/multifd.c | 181 +++++++++++++++++++++++++++++++++------ migration/multifd.h | 26 ++++-- 5 files changed, 182 insertions(+), 33 deletions(-) diff --git a/include/migration/misc.h b/include/migration/misc.h index bfadc5613bac..abf6f33eeae8 100644 --- a/include/migration/misc.h +++ b/include/migration/misc.h @@ -111,4 +111,8 @@ bool migration_in_bg_snapshot(void); /* migration/block-dirty-bitmap.c */ void dirty_bitmap_mig_init(void); +/* migration/multifd.c */ +int multifd_queue_device_state(char *idstr, uint32_t instance_id, + char *data, size_t len); + #endif diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c index 737a9645d2fe..424547aa5be0 100644 --- a/migration/multifd-zlib.c +++ b/migration/multifd-zlib.c @@ -177,7 +177,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp) out: p->flags |= MULTIFD_FLAG_ZLIB; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); return 0; } diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c index 256858df0a0a..89ef21898485 100644 --- a/migration/multifd-zstd.c +++ b/migration/multifd-zstd.c @@ -166,7 +166,7 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) out: p->flags |= MULTIFD_FLAG_ZSTD; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); return 0; } diff --git a/migration/multifd.c b/migration/multifd.c index daa34172bf24..6a7e5d659925 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -12,6 +12,7 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/iov.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "sysemu/sysemu.h" @@ -19,6 +20,7 @@ #include "qemu/error-report.h" #include "qapi/error.h" #include "file.h" +#include "migration/misc.h" #include "migration.h" #include "migration-stats.h" #include "savevm.h" @@ -49,9 +51,12 @@ typedef struct { } __attribute__((packed)) MultiFDInit_t; struct { + QemuMutex queue_job_mutex; + MultiFDSendParams *params; - /* array of pages to sent */ + /* array of pages or device state to be sent */ MultiFDPages_t *pages; + MultiFDDeviceState_t *device_state; /* * Global number of generated multifd packets. * @@ -168,7 +173,7 @@ static void multifd_send_prepare_iovs(MultiFDSendParams *p) } /** - * nocomp_send_prepare: prepare date to be able to send + * nocomp_send_prepare_ram: prepare RAM data for sending * * For no compression we just have to calculate the size of the * packet. @@ -178,7 +183,7 @@ static void multifd_send_prepare_iovs(MultiFDSendParams *p) * @p: Params for the channel that we are using * @errp: pointer to an error */ -static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) +static int nocomp_send_prepare_ram(MultiFDSendParams *p, Error **errp) { bool use_zero_copy_send = migrate_zero_copy_send(); int ret; @@ -197,13 +202,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) * Only !zerocopy needs the header in IOV; zerocopy will * send it separately. */ - multifd_send_prepare_header(p); + multifd_send_prepare_header_ram(p); } multifd_send_prepare_iovs(p); p->flags |= MULTIFD_FLAG_NOCOMP; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); if (use_zero_copy_send) { /* Send header first, without zerocopy */ @@ -217,6 +222,56 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) return 0; } +static void multifd_send_fill_packet_device_state(MultiFDSendParams *p) +{ + MultiFDPacketDeviceState_t *packet = p->packet_device_state; + + packet->hdr.flags = cpu_to_be32(p->flags); + strncpy(packet->idstr, p->device_state->idstr, sizeof(packet->idstr)); + packet->instance_id = cpu_to_be32(p->device_state->instance_id); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); +} + +/** + * nocomp_send_prepare_device_state: prepare device state data for sending + * + * Returns 0 for success or -1 for error + * + * @p: Params for the channel that we are using + * @errp: pointer to an error + */ +static int nocomp_send_prepare_device_state(MultiFDSendParams *p, + Error **errp) +{ + multifd_send_prepare_header_device_state(p); + + assert(!(p->flags & MULTIFD_FLAG_SYNC)); + + p->next_packet_size = p->device_state->buf_len; + if (p->next_packet_size > 0) { + p->iov[p->iovs_num].iov_base = p->device_state->buf; + p->iov[p->iovs_num].iov_len = p->next_packet_size; + p->iovs_num++; + } + + p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE; + + multifd_send_fill_packet_device_state(p); + + return 0; +} + +static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) +{ + if (p->is_device_state_job) { + return nocomp_send_prepare_device_state(p, errp); + } else { + return nocomp_send_prepare_ram(p, errp); + } + + g_assert_not_reached(); +} + /** * nocomp_recv_setup: setup receive side * @@ -397,7 +452,18 @@ static void multifd_pages_clear(MultiFDPages_t *pages) g_free(pages); } -void multifd_send_fill_packet(MultiFDSendParams *p) +static void multifd_device_state_free(MultiFDDeviceState_t *device_state) +{ + if (!device_state) { + return; + } + + g_clear_pointer(&device_state->idstr, g_free); + g_clear_pointer(&device_state->buf, g_free); + g_free(device_state); +} + +void multifd_send_fill_packet_ram(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; MultiFDPages_t *pages = p->pages; @@ -585,7 +651,8 @@ static void multifd_send_kick_main(MultiFDSendParams *p) } /* - * How we use multifd_send_state->pages and channel->pages? + * How we use multifd_send_state->pages + channel->pages + * and multifd_send_state->device_state + channel->device_state? * * We create a pages for each channel, and a main one. Each time that * we need to send a batch of pages we interchange the ones between @@ -601,14 +668,15 @@ static void multifd_send_kick_main(MultiFDSendParams *p) * have to had finish with its own, otherwise pending_job can't be * false. * + * 'device_state' struct has similar handling. + * * Returns true if succeed, false otherwise. */ -static bool multifd_send_pages(void) +static bool multifd_send_queue_job(bool is_device_state) { int i; static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; if (multifd_send_should_exit()) { return false; @@ -645,7 +713,7 @@ static bool multifd_send_pages(void) * Lockless read to p->pending_job is safe, because only multifd * sender thread can clear it. */ - if (qatomic_read(&p->pending_job) == false) { + if (qatomic_cmpxchg(&p->pending_job_preparing, false, true) == false) { break; } } @@ -655,12 +723,30 @@ static bool multifd_send_pages(void) * qatomic_store_release() in multifd_send_thread(). */ smp_mb_acquire(); - assert(!p->pages->num); - multifd_send_state->pages = p->pages; - p->pages = pages; + + if (!is_device_state) { + assert(!p->pages->num); + } else { + assert(!p->device_state->buf); + } + + p->is_device_state_job = is_device_state; + + if (!is_device_state) { + MultiFDPages_t *pages = multifd_send_state->pages; + + multifd_send_state->pages = p->pages; + p->pages = pages; + } else { + MultiFDDeviceState_t *device_state = multifd_send_state->device_state; + + multifd_send_state->device_state = p->device_state; + p->device_state = device_state; + } + /* - * Making sure p->pages is setup before marking pending_job=true. Pairs - * with the qatomic_load_acquire() in multifd_send_thread(). + * Making sure p->pages or p->device state is setup before marking + * pending_job=true. Pairs with the qatomic_load_acquire() in multifd_send_thread(). */ qatomic_store_release(&p->pending_job, true); qemu_sem_post(&p->sem); @@ -707,7 +793,7 @@ retry: * After flush, always retry. */ if (pages->block != block || multifd_queue_full(pages)) { - if (!multifd_send_pages()) { + if (!multifd_send_queue_job(false)) { return false; } goto retry; @@ -718,6 +804,28 @@ retry: return true; } +int multifd_queue_device_state(char *idstr, uint32_t instance_id, + char *data, size_t len) +{ + /* Device state submissions can come from multiple threads */ + QEMU_LOCK_GUARD(&multifd_send_state->queue_job_mutex); + MultiFDDeviceState_t *device_state = multifd_send_state->device_state; + + assert(!device_state->buf); + device_state->idstr = g_strdup(idstr); + device_state->instance_id = instance_id; + device_state->buf = g_memdup2(data, len); + device_state->buf_len = len; + + if (!multifd_send_queue_job(true)) { + g_clear_pointer(&device_state->idstr, g_free); + g_clear_pointer(&device_state->buf, g_free); + return -1; + } + + return 0; +} + /* Multifd send side hit an error; remember it and prepare to quit */ static void multifd_send_set_error(Error *err) { @@ -822,10 +930,12 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) multifd_pages_clear(p->pages); p->pages = NULL; p->packet_len = 0; + g_clear_pointer(&p->packet_device_state, g_free); g_free(p->packet); p->packet = NULL; g_free(p->iov); p->iov = NULL; + g_clear_pointer(&p->device_state, multifd_device_state_free); multifd_send_state->ops->send_cleanup(p, errp); return *errp == NULL; @@ -840,7 +950,9 @@ static void multifd_send_cleanup_state(void) g_free(multifd_send_state->params); multifd_send_state->params = NULL; multifd_pages_clear(multifd_send_state->pages); + g_clear_pointer(&multifd_send_state->device_state, multifd_device_state_free); multifd_send_state->pages = NULL; + qemu_mutex_destroy(&multifd_send_state->queue_job_mutex); g_free(multifd_send_state); multifd_send_state = NULL; } @@ -894,10 +1006,11 @@ int multifd_send_sync_main(void) return 0; } if (multifd_send_state->pages->num) { - if (!multifd_send_pages()) { + if (!multifd_send_queue_job(false)) { error_report("%s: multifd_send_pages fail", __func__); return -1; } + assert(!multifd_send_state->pages->num); } flush_zero_copy = migrate_zero_copy_send(); @@ -973,17 +1086,22 @@ static void *multifd_send_thread(void *opaque) */ if (qatomic_load_acquire(&p->pending_job)) { MultiFDPages_t *pages = p->pages; + bool is_device_state = p->is_device_state_job; + size_t total_size; p->flags = 0; p->iovs_num = 0; - assert(pages->num); + assert(is_device_state || pages->num); ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { break; } + total_size = iov_size(p->iov, p->iovs_num); if (migrate_mapped_ram()) { + assert(!is_device_state); + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, p->pages->block, &local_err); } else { @@ -996,12 +1114,18 @@ static void *multifd_send_thread(void *opaque) break; } - stat64_add(&mig_stats.multifd_bytes, - p->next_packet_size + p->packet_len); - stat64_add(&mig_stats.normal_pages, pages->normal_num); - stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); + stat64_add(&mig_stats.multifd_bytes, total_size); + if (!is_device_state) { + stat64_add(&mig_stats.normal_pages, pages->normal_num); + stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); + } - multifd_pages_reset(p->pages); + if (is_device_state) { + g_clear_pointer(&p->device_state->idstr, g_free); + g_clear_pointer(&p->device_state->buf, g_free); + } else { + multifd_pages_reset(p->pages); + } p->next_packet_size = 0; /* @@ -1010,6 +1134,7 @@ static void *multifd_send_thread(void *opaque) * multifd_send_pages(). */ qatomic_store_release(&p->pending_job, false); + qatomic_store_release(&p->pending_job_preparing, false); } else { /* * If not a normal job, must be a sync request. Note that @@ -1020,7 +1145,7 @@ static void *multifd_send_thread(void *opaque) if (use_packets) { p->flags = MULTIFD_FLAG_SYNC; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); ret = qio_channel_write_all(p->c, (void *)p->packet, p->packet_len, &local_err); if (ret != 0) { @@ -1199,9 +1324,11 @@ bool multifd_send_setup(void) thread_count = migrate_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); + qemu_mutex_init(&multifd_send_state->queue_job_mutex); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); qemu_sem_init(&multifd_send_state->channels_created, 0); + multifd_send_state->device_state = g_malloc0(sizeof(*multifd_send_state->device_state)); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1215,11 +1342,15 @@ bool multifd_send_setup(void) p->pages = multifd_pages_init(page_count); if (use_packets) { + p->device_state = g_malloc0(sizeof(*p->device_state)); + p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); p->packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); p->packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); + p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); + p->packet_device_state->hdr = p->packet->hdr; /* We need one extra place for the packet header */ p->iov = g_new0(struct iovec, page_count + 1); @@ -1786,7 +1917,7 @@ bool multifd_send_prepare_common(MultiFDSendParams *p) return false; } - multifd_send_prepare_header(p); + multifd_send_prepare_header_ram(p); return true; } diff --git a/migration/multifd.h b/migration/multifd.h index 40ee613dd88a..655bec110f87 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -156,18 +156,25 @@ typedef struct { * cleared by the multifd sender threads. */ bool pending_job; + bool pending_job_preparing; bool pending_sync; - /* array of pages to sent. - * The owner of 'pages' depends of 'pending_job' value: + + /* Whether the pending job is pages (false) or device state (true) */ + bool is_device_state_job; + + /* Array of pages or device state to be sent (depending on the flag above). + * The owner of these depends of 'pending_job' value: * pending_job == 0 -> migration_thread can use it. * pending_job != 0 -> multifd_channel can use it. */ MultiFDPages_t *pages; + MultiFDDeviceState_t *device_state; /* thread local variables. No locking required */ - /* pointer to the packet */ + /* pointers to the possible packet types */ MultiFDPacket_t *packet; + MultiFDPacketDeviceState_t *packet_device_state; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ @@ -267,18 +274,25 @@ typedef struct { } MultiFDMethods; void multifd_register_ops(int method, MultiFDMethods *ops); -void multifd_send_fill_packet(MultiFDSendParams *p); +void multifd_send_fill_packet_ram(MultiFDSendParams *p); bool multifd_send_prepare_common(MultiFDSendParams *p); void multifd_send_zero_page_detect(MultiFDSendParams *p); void multifd_recv_zero_page_process(MultiFDRecvParams *p); -static inline void multifd_send_prepare_header(MultiFDSendParams *p) +void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc); + +static inline void multifd_send_prepare_header_ram(MultiFDSendParams *p) { p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; p->iovs_num++; } -void multifd_channel_connect(MultiFDSendParams *p, QIOChannel *ioc); +static inline void multifd_send_prepare_header_device_state(MultiFDSendParams *p) +{ + p->iov[0].iov_len = sizeof(*p->packet_device_state); + p->iov[0].iov_base = p->packet_device_state; + p->iovs_num++; +} #endif