From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com> A new function multifd_queue_device_state() is provided for device to queue its state for transmission via a multifd channel.
Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com> --- include/migration/misc.h | 4 + migration/multifd-zlib.c | 2 +- migration/multifd-zstd.c | 2 +- migration/multifd.c | 244 ++++++++++++++++++++++++++++++++++----- migration/multifd.h | 30 +++-- 5 files changed, 244 insertions(+), 38 deletions(-) diff --git a/include/migration/misc.h b/include/migration/misc.h index c9e200f4eb8f..25968e31247b 100644 --- a/include/migration/misc.h +++ b/include/migration/misc.h @@ -117,4 +117,8 @@ bool migration_in_bg_snapshot(void); /* migration/block-dirty-bitmap.c */ void dirty_bitmap_mig_init(void); +/* migration/multifd.c */ +int multifd_queue_device_state(char *idstr, uint32_t instance_id, + char *data, size_t len); + #endif diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c index 99821cd4d5ef..e20c1de6033d 100644 --- a/migration/multifd-zlib.c +++ b/migration/multifd-zlib.c @@ -177,7 +177,7 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp) out: p->flags |= MULTIFD_FLAG_ZLIB; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); return 0; } diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c index 02112255adcc..37cebd006921 100644 --- a/migration/multifd-zstd.c +++ b/migration/multifd-zstd.c @@ -166,7 +166,7 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp) out: p->flags |= MULTIFD_FLAG_ZSTD; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); return 0; } diff --git a/migration/multifd.c b/migration/multifd.c index 878ff7d9f9f0..d8ce01539a05 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -12,6 +12,7 @@ #include "qemu/osdep.h" #include "qemu/cutils.h" +#include "qemu/iov.h" #include "qemu/rcu.h" #include "exec/target_page.h" #include "sysemu/sysemu.h" @@ -20,6 +21,7 @@ #include "qapi/error.h" #include "channel.h" #include "file.h" +#include "migration/misc.h" #include "migration.h" #include "migration-stats.h" #include "savevm.h" @@ -50,9 +52,17 @@ typedef struct { } __attribute__((packed)) MultiFDInit_t; struct { + /* + * Are there some device state dedicated channels (true) or + * should device state be sent via any available channel (false)? + */ + bool device_state_dedicated_channels; + GMutex queue_job_mutex; + MultiFDSendParams *params; - /* array of pages to sent */ + /* array of pages or device state to be sent */ MultiFDPages_t *pages; + MultiFDDeviceState_t *device_state; /* * Global number of generated multifd packets. * @@ -169,7 +179,7 @@ static void multifd_send_prepare_iovs(MultiFDSendParams *p) } /** - * nocomp_send_prepare: prepare date to be able to send + * nocomp_send_prepare_ram: prepare RAM data for sending * * For no compression we just have to calculate the size of the * packet. @@ -179,7 +189,7 @@ static void multifd_send_prepare_iovs(MultiFDSendParams *p) * @p: Params for the channel that we are using * @errp: pointer to an error */ -static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) +static int nocomp_send_prepare_ram(MultiFDSendParams *p, Error **errp) { bool use_zero_copy_send = migrate_zero_copy_send(); int ret; @@ -198,13 +208,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) * Only !zerocopy needs the header in IOV; zerocopy will * send it separately. */ - multifd_send_prepare_header(p); + multifd_send_prepare_header_ram(p); } multifd_send_prepare_iovs(p); p->flags |= MULTIFD_FLAG_NOCOMP; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); if (use_zero_copy_send) { /* Send header first, without zerocopy */ @@ -218,6 +228,59 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) return 0; } +static void multifd_send_fill_packet_device_state(MultiFDSendParams *p) +{ + MultiFDPacketDeviceState_t *packet = p->packet_device_state; + + packet->hdr.flags = cpu_to_be32(p->flags); + strncpy(packet->idstr, p->device_state->idstr, sizeof(packet->idstr)); + packet->instance_id = cpu_to_be32(p->device_state->instance_id); + packet->next_packet_size = cpu_to_be32(p->next_packet_size); +} + +/** + * nocomp_send_prepare_device_state: prepare device state data for sending + * + * Returns 0 for success or -1 for error + * + * @p: Params for the channel that we are using + * @errp: pointer to an error + */ +static int nocomp_send_prepare_device_state(MultiFDSendParams *p, + Error **errp) +{ + assert(!multifd_send_state->device_state_dedicated_channels || + p->is_device_state_dedicated); + + multifd_send_prepare_header_device_state(p); + + assert(!(p->flags & MULTIFD_FLAG_SYNC)); + + p->next_packet_size = p->device_state->buf_len; + if (p->next_packet_size > 0) { + p->iov[p->iovs_num].iov_base = p->device_state->buf; + p->iov[p->iovs_num].iov_len = p->next_packet_size; + p->iovs_num++; + } + + p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE; + + multifd_send_fill_packet_device_state(p); + + return 0; +} + +static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp) +{ + if (p->is_device_state_job) { + return nocomp_send_prepare_device_state(p, errp); + } else { + return nocomp_send_prepare_ram(p, errp); + } + + g_assert_not_reached(); +} + /** * nocomp_recv_setup: setup receive side * @@ -397,7 +460,18 @@ static void multifd_pages_clear(MultiFDPages_t *pages) g_free(pages); } -void multifd_send_fill_packet(MultiFDSendParams *p) +static void multifd_device_state_free(MultiFDDeviceState_t *device_state) +{ + if (!device_state) { + return; + } + + g_clear_pointer(&device_state->idstr, g_free); + g_clear_pointer(&device_state->buf, g_free); + g_free(device_state); +} + +void multifd_send_fill_packet_ram(MultiFDSendParams *p) { MultiFDPacket_t *packet = p->packet; MultiFDPages_t *pages = p->pages; @@ -585,7 +659,8 @@ static void multifd_send_kick_main(MultiFDSendParams *p) } /* - * How we use multifd_send_state->pages and channel->pages? + * How we use multifd_send_state->pages + channel->pages + * and multifd_send_state->device_state + channel->device_state? * * We create a pages for each channel, and a main one. Each time that * we need to send a batch of pages we interchange the ones between @@ -601,14 +676,15 @@ static void multifd_send_kick_main(MultiFDSendParams *p) * have to had finish with its own, otherwise pending_job can't be * false. * + * 'device_state' struct has similar handling. + * * Returns true if succeed, false otherwise. */ -static bool multifd_send_pages(void) +static bool multifd_send_queue_job(bool is_device_state) { int i; static int next_channel; MultiFDSendParams *p = NULL; /* make happy gcc */ - MultiFDPages_t *pages = multifd_send_state->pages; if (multifd_send_should_exit()) { return false; @@ -632,7 +708,9 @@ static bool multifd_send_pages(void) * Lockless read to p->pending_job is safe, because only multifd * sender thread can clear it. */ - if (qatomic_read(&p->pending_job) == false) { + if ((!multifd_send_state->device_state_dedicated_channels || + p->is_device_state_dedicated == is_device_state) && + qatomic_read(&p->pending_job) == false) { qatomic_store_release(&next_channel, (i + 1) % migrate_multifd_channels()); break; @@ -644,12 +722,30 @@ static bool multifd_send_pages(void) * qatomic_store_release() in multifd_send_thread(). */ smp_mb_acquire(); - assert(!p->pages->num); - multifd_send_state->pages = p->pages; - p->pages = pages; + + if (!is_device_state) { + assert(!p->pages->num); + } else { + assert(!p->device_state->buf); + } + + p->is_device_state_job = is_device_state; + + if (!is_device_state) { + MultiFDPages_t *pages = multifd_send_state->pages; + + multifd_send_state->pages = p->pages; + p->pages = pages; + } else { + MultiFDDeviceState_t *device_state = multifd_send_state->device_state; + + multifd_send_state->device_state = p->device_state; + p->device_state = device_state; + } + /* - * Making sure p->pages is setup before marking pending_job=true. Pairs - * with the qatomic_load_acquire() in multifd_send_thread(). + * Making sure p->pages or p->device state is setup before marking + * pending_job=true. Pairs with the qatomic_load_acquire() in multifd_send_thread(). */ qatomic_store_release(&p->pending_job, true); qemu_sem_post(&p->sem); @@ -673,7 +769,7 @@ static inline void multifd_enqueue(MultiFDPages_t *pages, ram_addr_t offset) } /* Returns true if enqueue successful, false otherwise */ -bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) +static bool multifd_queue_page_locked(RAMBlock *block, ram_addr_t offset) { MultiFDPages_t *pages; @@ -696,7 +792,7 @@ retry: * After flush, always retry. */ if (pages->block != block || multifd_queue_full(pages)) { - if (!multifd_send_pages()) { + if (!multifd_send_queue_job(false)) { return false; } goto retry; @@ -707,6 +803,45 @@ retry: return true; } +bool multifd_queue_page(RAMBlock *block, ram_addr_t offset) +{ + g_autoptr(GMutexLocker) locker = NULL; + + /* + * Device state submissions for shared channels can come + * from multiple threads and conflict with page submissions + * with respect to multifd_send_state access. + */ + if (!multifd_send_state->device_state_dedicated_channels) { + locker = g_mutex_locker_new(&multifd_send_state->queue_job_mutex); + } + + return multifd_queue_page_locked(block, offset); +} + +int multifd_queue_device_state(char *idstr, uint32_t instance_id, + char *data, size_t len) +{ + /* Device state submissions can came from multiple threads */ + g_autoptr(GMutexLocker) locker = + g_mutex_locker_new(&multifd_send_state->queue_job_mutex); + MultiFDDeviceState_t *device_state = multifd_send_state->device_state; + + assert(!device_state->buf); + device_state->idstr = g_strdup(idstr); + device_state->instance_id = instance_id; + device_state->buf = g_memdup2(data, len); + device_state->buf_len = len; + + if (!multifd_send_queue_job(true)) { + g_clear_pointer(&device_state->idstr, g_free); + g_clear_pointer(&device_state->buf, g_free); + return -1; + } + + return 0; +} + /* Multifd send side hit an error; remember it and prepare to quit */ static void multifd_send_set_error(Error *err) { @@ -811,10 +946,12 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp) multifd_pages_clear(p->pages); p->pages = NULL; p->packet_len = 0; + g_clear_pointer(&p->packet_device_state, g_free); g_free(p->packet); p->packet = NULL; g_free(p->iov); p->iov = NULL; + g_clear_pointer(&p->device_state, multifd_device_state_free); multifd_send_state->ops->send_cleanup(p, errp); return *errp == NULL; @@ -829,7 +966,9 @@ static void multifd_send_cleanup_state(void) g_free(multifd_send_state->params); multifd_send_state->params = NULL; multifd_pages_clear(multifd_send_state->pages); + g_clear_pointer(&multifd_send_state->device_state, multifd_device_state_free); multifd_send_state->pages = NULL; + g_mutex_clear(&multifd_send_state->queue_job_mutex); g_free(multifd_send_state); multifd_send_state = NULL; } @@ -876,17 +1015,28 @@ static int multifd_zero_copy_flush(QIOChannel *c) int multifd_send_sync_main(void) { + g_autoptr(GMutexLocker) locker = NULL; int i; bool flush_zero_copy; if (!migrate_multifd()) { return 0; } + + /* + * Page SYNC can conflict with device state submissions for shared channels + * with respect to multifd_send_state access. + */ + if (!multifd_send_state->device_state_dedicated_channels) { + locker = g_mutex_locker_new(&multifd_send_state->queue_job_mutex); + } + if (multifd_send_state->pages->num) { - if (!multifd_send_pages()) { + if (!multifd_send_queue_job(false)) { error_report("%s: multifd_send_pages fail", __func__); return -1; } + assert(!multifd_send_state->pages->num); } flush_zero_copy = migrate_zero_copy_send(); @@ -898,6 +1048,11 @@ int multifd_send_sync_main(void) return -1; } + if (p->is_device_state_dedicated) { + assert(multifd_send_state->device_state_dedicated_channels); + continue; + } + trace_multifd_send_sync_main_signal(p->id); /* @@ -915,6 +1070,10 @@ int multifd_send_sync_main(void) return -1; } + if (p->is_device_state_dedicated) { + continue; + } + qemu_sem_wait(&multifd_send_state->channels_ready); trace_multifd_send_sync_main_wait(p->id); qemu_sem_wait(&p->sem_sync); @@ -962,17 +1121,22 @@ static void *multifd_send_thread(void *opaque) */ if (qatomic_load_acquire(&p->pending_job)) { MultiFDPages_t *pages = p->pages; + bool is_device_state = p->is_device_state_job; + size_t total_size; p->flags = 0; p->iovs_num = 0; - assert(pages->num); + assert(is_device_state || pages->num); ret = multifd_send_state->ops->send_prepare(p, &local_err); if (ret != 0) { break; } + total_size = iov_size(p->iov, p->iovs_num); if (migrate_mapped_ram()) { + assert(!is_device_state); + ret = file_write_ramblock_iov(p->c, p->iov, p->iovs_num, p->pages->block, &local_err); } else { @@ -985,12 +1149,18 @@ static void *multifd_send_thread(void *opaque) break; } - stat64_add(&mig_stats.multifd_bytes, - p->next_packet_size + p->packet_len); - stat64_add(&mig_stats.normal_pages, pages->normal_num); - stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); + stat64_add(&mig_stats.multifd_bytes, total_size); + if (!is_device_state) { + stat64_add(&mig_stats.normal_pages, pages->normal_num); + stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num); + } - multifd_pages_reset(p->pages); + if (is_device_state) { + g_clear_pointer(&p->device_state->idstr, g_free); + g_clear_pointer(&p->device_state->buf, g_free); + } else { + multifd_pages_reset(p->pages); + } p->next_packet_size = 0; /* @@ -1009,7 +1179,7 @@ static void *multifd_send_thread(void *opaque) if (use_packets) { p->flags = MULTIFD_FLAG_SYNC; - multifd_send_fill_packet(p); + multifd_send_fill_packet_ram(p); ret = qio_channel_write_all(p->c, (void *)p->packet, p->packet_len, &local_err); if (ret != 0) { @@ -1223,7 +1393,12 @@ static bool multifd_new_send_channel_create(MultiFDSendParams *p, Error **errp) g_autoptr(MFDSendChannelConnectData) data = NULL; MigChannelHeader header = {}; - header.channel_type = MIG_CHANNEL_TYPE_MULTIFD; + if (!p->is_device_state_dedicated) { + header.channel_type = MIG_CHANNEL_TYPE_MULTIFD; + } else { + header.channel_type = MIG_CHANNEL_TYPE_MULTIFD_DEVICE_STATE; + } + data = mfd_send_channel_connect_data_new(p, &header); if (!multifd_use_packets()) { @@ -1239,7 +1414,7 @@ bool multifd_send_setup(void) { MigrationState *s = migrate_get_current(); Error *local_err = NULL; - int thread_count, ret = 0; + int thread_count, device_state_thread_count, ret = 0; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); bool use_packets = multifd_use_packets(); uint8_t i; @@ -1249,10 +1424,16 @@ bool multifd_send_setup(void) } thread_count = migrate_multifd_channels(); + device_state_thread_count = migrate_multifd_channels_device_state(); + assert(device_state_thread_count < thread_count); + multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); + multifd_send_state->device_state_dedicated_channels = device_state_thread_count >= 1; + g_mutex_init(&multifd_send_state->queue_job_mutex); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); qemu_sem_init(&multifd_send_state->channels_created, 0); + multifd_send_state->device_state = g_malloc0(sizeof(*multifd_send_state->device_state)); qemu_sem_init(&multifd_send_state->channels_ready, 0); qatomic_set(&multifd_send_state->exiting, 0); multifd_send_state->ops = multifd_ops[migrate_multifd_compression()]; @@ -1260,21 +1441,28 @@ bool multifd_send_setup(void) for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; + p->is_device_state_dedicated = i >= thread_count - device_state_thread_count; qemu_sem_init(&p->sem, 0); qemu_sem_init(&p->sem_sync, 0); p->id = i; p->pages = multifd_pages_init(page_count); if (use_packets) { + p->device_state = g_malloc0(sizeof(*p->device_state)); + p->packet_len = sizeof(MultiFDPacket_t) + sizeof(uint64_t) * page_count; p->packet = g_malloc0(p->packet_len); p->packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC); p->packet->hdr.version = cpu_to_be32(MULTIFD_VERSION); + p->packet_device_state = g_malloc0(sizeof(*p->packet_device_state)); + p->packet_device_state->hdr = p->packet->hdr; /* We need one extra place for the packet header */ p->iov = g_new0(struct iovec, page_count + 1); } else { + assert(!p->is_device_state_dedicated); + p->iov = g_new0(struct iovec, page_count); } p->name = g_strdup_printf("multifdsend_%d", i); @@ -1858,7 +2046,7 @@ bool multifd_send_prepare_common(MultiFDSendParams *p) return false; } - multifd_send_prepare_header(p); + multifd_send_prepare_header_ram(p); return true; } diff --git a/migration/multifd.h b/migration/multifd.h index b5fa56b791af..53cf80c66f98 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -138,6 +138,7 @@ typedef struct { uint32_t page_count; /* multifd flags for sending ram */ int write_flags; + bool is_device_state_dedicated; /* sem where to wait for more work */ QemuSemaphore sem; @@ -157,17 +158,23 @@ typedef struct { */ bool pending_job; bool pending_sync; - /* array of pages to sent. - * The owner of 'pages' depends of 'pending_job' value: + + /* Whether the pending job is pages (false) or device state (true) */ + bool is_device_state_job; + + /* Array of pages or device state to be sent (depending on the flag above). + * The owner of these depends of 'pending_job' value: * pending_job == 0 -> migration_thread can use it. * pending_job != 0 -> multifd_channel can use it. */ MultiFDPages_t *pages; + MultiFDDeviceState_t *device_state; /* thread local variables. No locking required */ - /* pointer to the packet */ + /* pointers to the possible packet types */ MultiFDPacket_t *packet; + MultiFDPacketDeviceState_t *packet_device_state; /* size of the next packet that contains pages */ uint32_t next_packet_size; /* packets sent through this channel */ @@ -268,20 +275,27 @@ typedef struct { } MultiFDMethods; void multifd_register_ops(int method, MultiFDMethods *ops); -void multifd_send_fill_packet(MultiFDSendParams *p); +void multifd_send_fill_packet_ram(MultiFDSendParams *p); bool multifd_send_prepare_common(MultiFDSendParams *p); void multifd_send_zero_page_detect(MultiFDSendParams *p); void multifd_recv_zero_page_process(MultiFDRecvParams *p); -static inline void multifd_send_prepare_header(MultiFDSendParams *p) +struct MFDSendChannelConnectData; +typedef struct MFDSendChannelConnectData MFDSendChannelConnectData; +bool multifd_channel_connect(MFDSendChannelConnectData *data, QIOChannel *ioc, Error **errp); + +static inline void multifd_send_prepare_header_ram(MultiFDSendParams *p) { p->iov[0].iov_len = p->packet_len; p->iov[0].iov_base = p->packet; p->iovs_num++; } -struct MFDSendChannelConnectData; -typedef struct MFDSendChannelConnectData MFDSendChannelConnectData; -bool multifd_channel_connect(MFDSendChannelConnectData *data, QIOChannel *ioc, Error **errp); +static inline void multifd_send_prepare_header_device_state(MultiFDSendParams *p) +{ + p->iov[0].iov_len = sizeof(*p->packet_device_state); + p->iov[0].iov_base = p->packet_device_state; + p->iovs_num++; +} #endif