The function still don't use multifd, but we have simplified ram_save_page, xbzrle and RDMA stuff is gone. We have added a new counter.
Signed-off-by: Juan Quintela <quint...@redhat.com> -- Add last_page parameter Add commets for done and address Remove multifd field, it is the same than normal pages Merge next patch, now we send multiple pages at a time Remove counter for multifd pages, it is identical to normal pages Use iovec's instead of creating the equivalent. Clear memory used by pages (dave) Use g_new0(danp) define MULTIFD_CONTINUE now pages member is a pointer Fix off-by-one in number of pages in one packet Remove RAM_SAVE_FLAG_MULTIFD_PAGE s/multifd_pages_t/MultiFDPages_t/ --- migration/ram.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/migration/ram.c b/migration/ram.c index 398cb0af3b..862ec53d32 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -54,6 +54,7 @@ #include "migration/block.h" #include "sysemu/sysemu.h" #include "qemu/uuid.h" +#include "qemu/iov.h" /***********************************************************/ /* ram save/restore */ @@ -692,8 +693,65 @@ struct { QemuSemaphore sem_sync; /* global number of generated multifd packets */ uint32_t seq; + /* send channels ready */ + QemuSemaphore channels_ready; } *multifd_send_state; +static void multifd_send_pages(void) +{ + int i; + static int next_channel; + MultiFDSendParams *p = NULL; /* make happy gcc */ + MultiFDPages_t *pages = multifd_send_state->pages; + + qemu_sem_wait(&multifd_send_state->channels_ready); + for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + p = &multifd_send_state->params[i]; + + qemu_mutex_lock(&p->mutex); + if (!p->pending_job) { + p->pending_job++; + next_channel = (i + 1) % migrate_multifd_channels(); + break; + } + qemu_mutex_unlock(&p->mutex); + } + p->pages->used = 0; + multifd_send_state->seq++; + p->seq = multifd_send_state->seq; + p->pages->block = NULL; + multifd_send_state->pages = p->pages; + p->pages = pages; + qemu_mutex_unlock(&p->mutex); + qemu_sem_post(&p->sem); +} + +static void multifd_queue_page(RAMBlock *block, ram_addr_t offset) +{ + MultiFDPages_t *pages = multifd_send_state->pages; + + if (!pages->block) { + pages->block = block; + } + + if (pages->block == block) { + pages->offset[pages->used] = offset; + pages->iov[pages->used].iov_base = block->host + offset; + pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE; + pages->used++; + + if (pages->used < pages->allocated) { + return; + } + } + + multifd_send_pages(); + + if (pages->block != block) { + multifd_queue_page(block, offset); + } +} + static void multifd_send_terminate_threads(Error *err) { int i; @@ -746,6 +804,7 @@ int multifd_save_cleanup(Error **errp) g_free(p->packet); p->packet = NULL; } + qemu_sem_destroy(&multifd_send_state->channels_ready); qemu_sem_destroy(&multifd_send_state->sem_sync); g_free(multifd_send_state->params); multifd_send_state->params = NULL; @@ -763,12 +822,17 @@ static void multifd_send_sync_main(void) if (!migrate_use_multifd()) { return; } + if (multifd_send_state->pages->used) { + multifd_send_pages(); + } for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_signal(p->id); qemu_mutex_lock(&p->mutex); + multifd_send_state->seq++; + p->seq = multifd_send_state->seq; p->flags |= MULTIFD_FLAG_SYNC; p->pending_job++; qemu_mutex_unlock(&p->mutex); @@ -824,6 +888,7 @@ static void *multifd_send_thread(void *opaque) if (flags & MULTIFD_FLAG_SYNC) { qemu_sem_post(&multifd_send_state->sem_sync); } + qemu_sem_post(&multifd_send_state->channels_ready); } else if (p->quit) { qemu_mutex_unlock(&p->mutex); break; @@ -883,6 +948,7 @@ int multifd_save_setup(void) atomic_set(&multifd_send_state->count, 0); multifd_pages_init(&multifd_send_state->pages, page_count); qemu_sem_init(&multifd_send_state->sem_sync, 0); + qemu_sem_init(&multifd_send_state->channels_ready, 0); for (i = 0; i < thread_count; i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; @@ -1576,6 +1642,31 @@ static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage) return pages; } +static int ram_multifd_page(RAMState *rs, PageSearchStatus *pss, + bool last_stage) +{ + int pages; + uint8_t *p; + RAMBlock *block = pss->block; + ram_addr_t offset = pss->page << TARGET_PAGE_BITS; + + p = block->host + offset; + + pages = save_zero_page(rs, block, offset); + if (pages == -1) { + ram_counters.transferred += + save_page_header(rs, rs->f, block, + offset | RAM_SAVE_FLAG_PAGE); + multifd_queue_page(block, offset); + qemu_put_buffer(rs->f, p, TARGET_PAGE_SIZE); + ram_counters.transferred += TARGET_PAGE_SIZE; + pages = 1; + ram_counters.normal++; + } + + return pages; +} + static int do_compress_ram_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset) { @@ -2004,6 +2095,8 @@ static int ram_save_target_page(RAMState *rs, PageSearchStatus *pss, if (migrate_use_compression() && (rs->ram_bulk_stage || !migrate_use_xbzrle())) { res = ram_save_compressed_page(rs, pss, last_stage); + } else if (migrate_use_multifd()) { + res = ram_multifd_page(rs, pss, last_stage); } else { res = ram_save_page(rs, pss, last_stage); } -- 2.17.0