* Juan Quintela (quint...@redhat.com) wrote: > The function still don't use multifd, but we have simplified > ram_save_page, xbzrle and RDMA stuff is gone. We have added a new > counter and a new flag for this type of pages. > > Signed-off-by: Juan Quintela <quint...@redhat.com> > --- > hmp.c | 2 ++ > migration/migration.c | 1 + > migration/ram.c | 90 > ++++++++++++++++++++++++++++++++++++++++++++++++++- > qapi-schema.json | 5 ++- > 4 files changed, 96 insertions(+), 2 deletions(-) > > diff --git a/hmp.c b/hmp.c > index b01605a..eeb308b 100644 > --- a/hmp.c > +++ b/hmp.c > @@ -234,6 +234,8 @@ void hmp_info_migrate(Monitor *mon, const QDict *qdict) > monitor_printf(mon, "postcopy request count: %" PRIu64 "\n", > info->ram->postcopy_requests); > } > + monitor_printf(mon, "multifd: %" PRIu64 " pages\n", > + info->ram->multifd); > } > > if (info->has_disk) { > diff --git a/migration/migration.c b/migration/migration.c > index e1c79d5..d9d5415 100644 > --- a/migration/migration.c > +++ b/migration/migration.c > @@ -528,6 +528,7 @@ static void populate_ram_info(MigrationInfo *info, > MigrationState *s) > info->ram->dirty_sync_count = ram_counters.dirty_sync_count; > info->ram->postcopy_requests = ram_counters.postcopy_requests; > info->ram->page_size = qemu_target_page_size(); > + info->ram->multifd = ram_counters.multifd; > > if (migrate_use_xbzrle()) { > info->has_xbzrle_cache = true; > diff --git a/migration/ram.c b/migration/ram.c > index b80f511..2bf3fa7 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -68,6 +68,7 @@ > #define RAM_SAVE_FLAG_XBZRLE 0x40 > /* 0x80 is reserved in migration.h start with 0x100 next */ > #define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 > +#define RAM_SAVE_FLAG_MULTIFD_PAGE 0x200 > > static inline bool is_zero_range(uint8_t *p, uint64_t size) > { > @@ -362,12 +363,17 @@ static void compress_threads_save_setup(void) > /* Multiple fd's */ > > struct MultiFDSendParams { > + /* not changed */ > uint8_t id; > QemuThread thread; > QIOChannel *c; > QemuSemaphore sem; > QemuMutex mutex; > + /* protected by param mutex */ > bool quit;
Should probably comment to say what address space address is in - this is really a qemu pointer - and that's why we can treat 0 as special? > + uint8_t *address; > + /* protected by multifd mutex */ > + bool done; done needs a comment to explain what it is because it sounds similar to quit; I think 'done' is saying that the thread is idle having done what was asked? > }; > typedef struct MultiFDSendParams MultiFDSendParams; > > @@ -375,6 +381,8 @@ struct { > MultiFDSendParams *params; > /* number of created threads */ > int count; > + QemuMutex mutex; > + QemuSemaphore sem; > } *multifd_send_state; > > static void terminate_multifd_send_threads(void) > @@ -443,6 +451,7 @@ static void *multifd_send_thread(void *opaque) > } else { > qio_channel_write(p->c, string, MULTIFD_UUID_MSG, &error_abort); > } > + qemu_sem_post(&multifd_send_state->sem); > > while (!exit) { > qemu_mutex_lock(&p->mutex); > @@ -450,6 +459,15 @@ static void *multifd_send_thread(void *opaque) > qemu_mutex_unlock(&p->mutex); > break; > } > + if (p->address) { > + p->address = 0; > + qemu_mutex_unlock(&p->mutex); > + qemu_mutex_lock(&multifd_send_state->mutex); > + p->done = true; > + qemu_mutex_unlock(&multifd_send_state->mutex); > + qemu_sem_post(&multifd_send_state->sem); > + continue; > + } > qemu_mutex_unlock(&p->mutex); > qemu_sem_wait(&p->sem); > } > @@ -469,6 +487,8 @@ int multifd_save_setup(void) > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); > multifd_send_state->count = 0; > + qemu_mutex_init(&multifd_send_state->mutex); > + qemu_sem_init(&multifd_send_state->sem, 0); > for (i = 0; i < thread_count; i++) { > char thread_name[16]; > MultiFDSendParams *p = &multifd_send_state->params[i]; > @@ -477,6 +497,8 @@ int multifd_save_setup(void) > qemu_sem_init(&p->sem, 0); > p->quit = false; > p->id = i; > + p->done = true; > + p->address = 0; > p->c = socket_send_channel_create(); > if (!p->c) { > error_report("Error creating a send channel"); > @@ -491,6 +513,30 @@ int multifd_save_setup(void) > return 0; > } > > +static int multifd_send_page(uint8_t *address) > +{ > + int i; > + MultiFDSendParams *p = NULL; /* make happy gcc */ > + > + qemu_sem_wait(&multifd_send_state->sem); > + qemu_mutex_lock(&multifd_send_state->mutex); > + for (i = 0; i < multifd_send_state->count; i++) { > + p = &multifd_send_state->params[i]; > + > + if (p->done) { > + p->done = false; > + break; > + } > + } > + qemu_mutex_unlock(&multifd_send_state->mutex); > + qemu_mutex_lock(&p->mutex); > + p->address = address; > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_post(&p->sem); My feeling, without having fully thought it through, is that the locking around 'address' can be simplified; especially if the sending-thread never actually changes it. http://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap04.html#tag_04_11 defines that most of the pthread_ functions act as barriers; including the sem_post and pthread_cond_signal that qemu_sem_post uses. > + return 0; > +} > + > struct MultiFDRecvParams { > uint8_t id; > QemuThread thread; > @@ -537,6 +583,7 @@ void multifd_load_cleanup(void) > qemu_sem_destroy(&p->sem); > socket_recv_channel_destroy(p->c); > g_free(p); > + multifd_recv_state->params[i] = NULL; > } > g_free(multifd_recv_state->params); > multifd_recv_state->params = NULL; > @@ -1058,6 +1105,32 @@ static int ram_save_page(RAMState *rs, > PageSearchStatus *pss, bool last_stage) > return pages; > } > > +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss, > + bool last_stage) > +{ > + int pages; > + uint8_t *p; > + RAMBlock *block = pss->block; > + ram_addr_t offset = pss->page << TARGET_PAGE_BITS; > + > + p = block->host + offset; > + > + pages = save_zero_page(rs, block, offset, p); > + if (pages == -1) { > + ram_counters.transferred += > + save_page_header(rs, rs->f, block, > + offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > + qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); > + multifd_send_page(p); > + ram_counters.transferred += TARGET_PAGE_SIZE; > + pages = 1; > + ram_counters.normal++; > + ram_counters.multifd++; > + } > + > + return pages; > +} > + > static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, > ram_addr_t offset) > { > @@ -1486,6 +1559,8 @@ static int ram_save_target_page(RAMState *rs, > PageSearchStatus *pss, > if (migrate_use_compression() && > (rs->ram_bulk_stage || !migrate_use_xbzrle())) { > res = ram_save_compressed_page(rs, pss, last_stage); > + } else if (migrate_use_multifd()) { > + res = ram_multifd_page(rs, pss, last_stage); It's a pity we can't wire this up with compression, but I understand why you simplify that. I'll see how the multiple-pages stuff works below; but the interesting thing here is we've already split up host-pages, which seems like a bad idea. > } else { > res = ram_save_page(rs, pss, last_stage); > } > @@ -2778,6 +2853,10 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > if (!migrate_use_compression()) { > invalid_flags |= RAM_SAVE_FLAG_COMPRESS_PAGE; > } > + > + if (!migrate_use_multifd()) { > + invalid_flags |= RAM_SAVE_FLAG_MULTIFD_PAGE; > + } > /* This RCU critical section can be very long running. > * When RCU reclaims in the code start to become numerous, > * it will be necessary to reduce the granularity of this > @@ -2802,13 +2881,17 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > if (flags & invalid_flags & RAM_SAVE_FLAG_COMPRESS_PAGE) { > error_report("Received an unexpected compressed page"); > } > + if (flags & invalid_flags & RAM_SAVE_FLAG_MULTIFD_PAGE) { > + error_report("Received an unexpected multifd page"); > + } > > ret = -EINVAL; > break; > } > > if (flags & (RAM_SAVE_FLAG_ZERO | RAM_SAVE_FLAG_PAGE | > - RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE)) { > + RAM_SAVE_FLAG_COMPRESS_PAGE | RAM_SAVE_FLAG_XBZRLE | > + RAM_SAVE_FLAG_MULTIFD_PAGE)) { > RAMBlock *block = ram_block_from_stream(f, flags); > > host = host_from_ram_block_offset(block, addr); > @@ -2896,6 +2979,11 @@ static int ram_load(QEMUFile *f, void *opaque, int > version_id) > break; > } > break; > + > + case RAM_SAVE_FLAG_MULTIFD_PAGE: > + qemu_get_buffer(f, host, TARGET_PAGE_SIZE); > + break; > + > case RAM_SAVE_FLAG_EOS: > /* normal exit */ > break; > diff --git a/qapi-schema.json b/qapi-schema.json > index 5b3733e..f708782 100644 > --- a/qapi-schema.json > +++ b/qapi-schema.json > @@ -601,6 +601,8 @@ > # @page-size: The number of bytes per page for the various page-based > # statistics (since 2.10) > # > +# @multifd: number of pages sent with multifd (since 2.10) Hopeful! Dave > # Since: 0.14.0 > ## > { 'struct': 'MigrationStats', > @@ -608,7 +610,8 @@ > 'duplicate': 'int', 'skipped': 'int', 'normal': 'int', > 'normal-bytes': 'int', 'dirty-pages-rate' : 'int', > 'mbps' : 'number', 'dirty-sync-count' : 'int', > - 'postcopy-requests' : 'int', 'page-size' : 'int' } } > + 'postcopy-requests' : 'int', 'page-size' : 'int', > + 'multifd' : 'int'} } > > ## > # @XBZRLECacheStats: > -- > 2.9.4 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK