Creation of the multifd send threads for RDMA migration, nothing inside yet.
Signed-off-by: Zhimin Feng <fengzhim...@huawei.com> --- migration/multifd.c | 33 +++++++++++++--- migration/multifd.h | 2 + migration/qemu-file.c | 5 +++ migration/qemu-file.h | 1 + migration/rdma.c | 88 ++++++++++++++++++++++++++++++++++++++++++- migration/rdma.h | 3 ++ 6 files changed, 125 insertions(+), 7 deletions(-) diff --git a/migration/multifd.c b/migration/multifd.c index b3e8ae9bcc..63678d7fdd 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f) { int i; - if (!migrate_use_multifd()) { + if (!migrate_use_multifd() || migrate_use_rdma()) { return; } if (multifd_send_state->pages->used) { @@ -562,6 +562,20 @@ out: return NULL; } +static void rdma_send_channel_create(MultiFDSendParams *p) +{ + Error *local_err = NULL; + + if (p->quit) { + error_setg(&local_err, "multifd: send id %d already quit", p->id); + return ; + } + p->running = true; + + qemu_thread_create(&p->thread, p->name, multifd_rdma_send_thread, p, + QEMU_THREAD_JOINABLE); +} + static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) { MultiFDSendParams *p = opaque; @@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp) p->packet->magic = cpu_to_be32(MULTIFD_MAGIC); p->packet->version = cpu_to_be32(MULTIFD_VERSION); p->name = g_strdup_printf("multifdsend_%d", i); - socket_send_channel_create(multifd_new_send_channel_async, p); + if (!migrate_use_rdma()) { + socket_send_channel_create(multifd_new_send_channel_async, p); + } else { + rdma_send_channel_create(p); + } } return 0; } @@ -720,7 +738,7 @@ void multifd_recv_sync_main(void) { int i; - if (!migrate_use_multifd()) { + if (!migrate_use_multifd() || migrate_use_rdma()) { return; } for (i = 0; i < migrate_multifd_channels(); i++) { @@ -890,8 +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) p->num_packets = 1; p->running = true; - qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, - QEMU_THREAD_JOINABLE); + if (!migrate_use_rdma()) { + qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p, + QEMU_THREAD_JOINABLE); + } else { + qemu_thread_create(&p->thread, p->name, multifd_rdma_recv_thread, p, + QEMU_THREAD_JOINABLE); + } atomic_inc(&multifd_recv_state->count); return atomic_read(&multifd_recv_state->count) == migrate_multifd_channels(); diff --git a/migration/multifd.h b/migration/multifd.h index d8b0205977..c9c11ad140 100644 --- a/migration/multifd.h +++ b/migration/multifd.h @@ -13,6 +13,8 @@ #ifndef QEMU_MIGRATION_MULTIFD_H #define QEMU_MIGRATION_MULTIFD_H +#include "migration/rdma.h" + int multifd_save_setup(Error **errp); void multifd_save_cleanup(void); int multifd_load_setup(Error **errp); diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 1c3a358a14..f0ed8f1381 100644 --- a/migration/qemu-file.c +++ b/migration/qemu-file.c @@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f) f->iovcnt = 0; } +void *getQIOChannel(QEMUFile *f) +{ + return f->opaque; +} + void ram_control_before_iterate(QEMUFile *f, uint64_t flags) { int ret = 0; diff --git a/migration/qemu-file.h b/migration/qemu-file.h index a9b6d6ccb7..fc656a3b72 100644 --- a/migration/qemu-file.h +++ b/migration/qemu-file.h @@ -161,6 +161,7 @@ int qemu_file_shutdown(QEMUFile *f); QEMUFile *qemu_file_get_return_path(QEMUFile *f); void qemu_fflush(QEMUFile *f); void qemu_file_set_blocking(QEMUFile *f, bool block); +void *getQIOChannel(QEMUFile *f); void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); diff --git a/migration/rdma.c b/migration/rdma.c index 2379b8345b..f086ab5a82 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -34,6 +34,7 @@ #include <arpa/inet.h> #include <rdma/rdma_cma.h> #include "trace.h" +#include "multifd.h" /* * Print and error on both the Monitor and the Log file. @@ -3975,6 +3976,34 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) return rioc->file; } +static void migration_rdma_process_incoming(QEMUFile *f, Error **errp) +{ + MigrationIncomingState *mis = migration_incoming_get_current(); + Error *local_err = NULL; + QIOChannel *ioc = NULL; + bool start_migration; + + if (!mis->from_src_file) { + mis->from_src_file = f; + qemu_file_set_blocking(f, false); + + start_migration = migrate_use_multifd(); + } else { + ioc = QIO_CHANNEL(getQIOChannel(f)); + /* Multiple connections */ + assert(migrate_use_multifd()); + start_migration = multifd_recv_new_channel(ioc, &local_err); + if (local_err) { + error_propagate(errp, local_err); + return; + } + } + + if (start_migration) { + migration_incoming_process(); + } +} + static void rdma_accept_incoming_migration(void *opaque) { RDMAContext *rdma = opaque; @@ -4003,8 +4032,12 @@ static void rdma_accept_incoming_migration(void *opaque) return; } - rdma->migration_started_on_destination = 1; - migration_fd_process_incoming(f, errp); + if (migrate_use_multifd()) { + migration_rdma_process_incoming(f, errp); + } else { + rdma->migration_started_on_destination = 1; + migration_fd_process_incoming(f, errp); + } } void rdma_start_incoming_migration(const char *host_port, Error **errp) @@ -4048,6 +4081,15 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp) qemu_rdma_return_path_dest_init(rdma_return_path, rdma); } + if (multifd_load_setup(&local_err) != 0) { + /* + * We haven't been able to create multifd threads + * nothing better to do + */ + error_report_err(local_err); + goto err; + } + qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration, NULL, (void *)(intptr_t)rdma); return; @@ -4118,3 +4160,45 @@ err: g_free(rdma); g_free(rdma_return_path); } + +void *multifd_rdma_recv_thread(void *opaque) +{ + MultiFDRecvParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem_sync); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} + +void *multifd_rdma_send_thread(void *opaque) +{ + MultiFDSendParams *p = opaque; + + while (true) { + qemu_mutex_lock(&p->mutex); + if (p->quit) { + qemu_mutex_unlock(&p->mutex); + break; + } + qemu_mutex_unlock(&p->mutex); + qemu_sem_wait(&p->sem); + } + + qemu_mutex_lock(&p->mutex); + p->running = false; + qemu_mutex_unlock(&p->mutex); + + return NULL; +} diff --git a/migration/rdma.h b/migration/rdma.h index de2ba09dc5..3a00573083 100644 --- a/migration/rdma.h +++ b/migration/rdma.h @@ -17,6 +17,9 @@ #ifndef QEMU_MIGRATION_RDMA_H #define QEMU_MIGRATION_RDMA_H +void *multifd_rdma_recv_thread(void *opaque); +void *multifd_rdma_send_thread(void *opaque); + void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp); -- 2.19.1