This patch implements bi-directional RDMA QIOChannel. Because different threads may access RDMAQIOChannel concurrently, this patch use RCU to protect it.
Signed-off-by: Lidong Chen <lidongc...@tencent.com> --- migration/rdma.c | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 146 insertions(+), 16 deletions(-) diff --git a/migration/rdma.c b/migration/rdma.c index f5c1d02..0652224 100644 --- a/migration/rdma.c +++ b/migration/rdma.c @@ -86,6 +86,7 @@ static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL; " to abort!"); \ rdma->error_reported = 1; \ } \ + rcu_read_unlock(); \ return rdma->error_state; \ } \ } while (0) @@ -405,6 +406,7 @@ struct QIOChannelRDMA { RDMAContext *rdma; QEMUFile *file; bool blocking; /* XXX we don't actually honour this yet */ + QemuMutex lock; }; /* @@ -2635,12 +2637,29 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); QEMUFile *f = rioc->file; - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; int ret; ssize_t done = 0; size_t i; size_t len = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + + if (rdma->listen_id) { + rdma = rdma->return_path; + } + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); /* @@ -2650,6 +2669,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, ret = qemu_rdma_write_flush(f, rdma); if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -2669,6 +2689,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -2677,6 +2698,7 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, } } + rcu_read_unlock(); return done; } @@ -2710,12 +2732,29 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, Error **errp) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; RDMAControlHeader head; int ret = 0; ssize_t i; size_t done = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + + if (!rdma->listen_id) { + rdma = rdma->return_path; + } + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); for (i = 0; i < niov; i++) { @@ -2727,7 +2766,7 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, * were given and dish out the bytes until we run * out of bytes. */ - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + ret = qemu_rdma_fill(rdma, data, want, 0); done += ret; want -= ret; /* Got what we needed, so go to next iovec */ @@ -2749,25 +2788,28 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, if (ret < 0) { rdma->error_state = ret; + rcu_read_unlock(); return ret; } /* * SEND was received with new bytes, now try again. */ - ret = qemu_rdma_fill(rioc->rdma, data, want, 0); + ret = qemu_rdma_fill(rdma, data, want, 0); done += ret; want -= ret; /* Still didn't get enough, so lets just return */ if (want) { if (done == 0) { + rcu_read_unlock(); return QIO_CHANNEL_ERR_BLOCK; } else { break; } } } + rcu_read_unlock(); return done; } @@ -2823,6 +2865,16 @@ qio_channel_rdma_source_prepare(GSource *source, GIOCondition cond = 0; *timeout = -1; + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || + (!rdma->listen_id && rsource->condition == G_IO_IN)) { + rdma = rdma->return_path; + } + + if (!rdma) { + error_report("RDMAContext is NULL when prepare Gsource"); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } @@ -2838,6 +2890,16 @@ qio_channel_rdma_source_check(GSource *source) RDMAContext *rdma = rsource->rioc->rdma; GIOCondition cond = 0; + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || + (!rdma->listen_id && rsource->condition == G_IO_IN)) { + rdma = rdma->return_path; + } + + if (!rdma) { + error_report("RDMAContext is NULL when check Gsource"); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } @@ -2856,6 +2918,16 @@ qio_channel_rdma_source_dispatch(GSource *source, RDMAContext *rdma = rsource->rioc->rdma; GIOCondition cond = 0; + if ((rdma->listen_id && rsource->condition == G_IO_OUT) || + (!rdma->listen_id && rsource->condition == G_IO_IN)) { + rdma = rdma->return_path; + } + + if (!rdma) { + error_report("RDMAContext is NULL when dispatch Gsource"); + return FALSE; + } + if (rdma->wr_data[0].control_len) { cond |= G_IO_IN; } @@ -2905,15 +2977,29 @@ static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); + RDMAContext *rdma; trace_qemu_rdma_close(); - if (rioc->rdma) { - if (!rioc->rdma->error_state) { - rioc->rdma->error_state = qemu_file_get_error(rioc->file); - } - qemu_rdma_cleanup(rioc->rdma); - g_free(rioc->rdma); - rioc->rdma = NULL; + + qemu_mutex_lock(&rioc->lock); + rdma = rioc->rdma; + if (!rdma) { + qemu_mutex_unlock(&rioc->lock); + return 0; + } + atomic_rcu_set(&rioc->rdma, NULL); + qemu_mutex_unlock(&rioc->lock); + + if (!rdma->error_state) { + rdma->error_state = qemu_file_get_error(rioc->file); + } + qemu_rdma_cleanup(rdma); + + if (rdma->return_path) { + qemu_rdma_cleanup(rdma->return_path); + g_free(rdma->return_path); } + + g_free(rdma); return 0; } @@ -2956,12 +3042,21 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, size_t size, uint64_t *bytes_sent) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; int ret; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); return RAM_SAVE_CONTROL_NOT_SUPP; } @@ -3046,9 +3141,11 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque, } } + rcu_read_unlock(); return RAM_SAVE_CONTROL_DELAYED; err: rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -3224,8 +3321,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT, .repeat = 1 }; QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; - RDMALocalBlocks *local = &rdma->local_ram_blocks; + RDMAContext *rdma; + RDMALocalBlocks *local; RDMAControlHeader head; RDMARegister *reg, *registers; RDMACompress *comp; @@ -3238,8 +3335,17 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque) int count = 0; int i = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); + local = &rdma->local_ram_blocks; do { trace_qemu_rdma_registration_handle_wait(); @@ -3469,6 +3575,7 @@ out: if (ret < 0) { rdma->error_state = ret; } + rcu_read_unlock(); return ret; } @@ -3525,11 +3632,19 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, uint64_t flags, void *data) { QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; + + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } CHECK_ERROR_STATE(); if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); return 0; } @@ -3537,6 +3652,7 @@ static int qemu_rdma_registration_start(QEMUFile *f, void *opaque, qemu_put_be64(f, RAM_SAVE_FLAG_HOOK); qemu_fflush(f); + rcu_read_unlock(); return 0; } @@ -3549,13 +3665,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, { Error *local_err = NULL, **errp = &local_err; QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque); - RDMAContext *rdma = rioc->rdma; + RDMAContext *rdma; RDMAControlHeader head = { .len = 0, .repeat = 1 }; int ret = 0; + rcu_read_lock(); + rdma = atomic_rcu_read(&rioc->rdma); + if (!rdma) { + rcu_read_unlock(); + return -EIO; + } + CHECK_ERROR_STATE(); if (migrate_get_current()->state == MIGRATION_STATUS_POSTCOPY_ACTIVE) { + rcu_read_unlock(); return 0; } @@ -3587,6 +3711,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, qemu_rdma_reg_whole_ram_blocks : NULL); if (ret < 0) { ERROR(errp, "receiving remote info!"); + rcu_read_unlock(); return ret; } @@ -3610,6 +3735,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, "not identical on both the source and destination.", local->nb_blocks, nb_dest_blocks); rdma->error_state = -EINVAL; + rcu_read_unlock(); return -EINVAL; } @@ -3626,6 +3752,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, local->block[i].length, rdma->dest_blocks[i].length); rdma->error_state = -EINVAL; + rcu_read_unlock(); return -EINVAL; } local->block[i].remote_host_addr = @@ -3643,9 +3770,11 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque, goto err; } + rcu_read_unlock(); return 0; err: rdma->error_state = ret; + rcu_read_unlock(); return ret; } @@ -3707,6 +3836,7 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode) rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); rioc->rdma = rdma; + qemu_mutex_init(&rioc->lock); if (mode[0] == 'w') { rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc)); -- 1.8.3.1