From: "Michael R. Hines" <mrhi...@us.ibm.com> This compiles with and without --enable-rdma.
Signed-off-by: Michael R. Hines <mrhi...@us.ibm.com> --- include/migration/qemu-file.h | 10 +++ savevm.c | 172 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 172 insertions(+), 10 deletions(-) diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h index df81261..9046751 100644 --- a/include/migration/qemu-file.h +++ b/include/migration/qemu-file.h @@ -51,23 +51,33 @@ typedef int (QEMUFileCloseFunc)(void *opaque); */ typedef int (QEMUFileGetFD)(void *opaque); +/* + * 'drain' from a QEMUFile perspective means + * to flush the outbound send buffer + * (if one exists). (Only used by RDMA right now) + */ +typedef int (QEMUFileDrainFunc)(void *opaque); + typedef struct QEMUFileOps { QEMUFilePutBufferFunc *put_buffer; QEMUFileGetBufferFunc *get_buffer; QEMUFileCloseFunc *close; QEMUFileGetFD *get_fd; + QEMUFileDrainFunc *drain; } QEMUFileOps; QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops); QEMUFile *qemu_fopen(const char *filename, const char *mode); QEMUFile *qemu_fdopen(int fd, const char *mode); QEMUFile *qemu_fopen_socket(int fd, const char *mode); +QEMUFile *qemu_fopen_rdma(void *opaque, const char *mode); QEMUFile *qemu_popen_cmd(const char *command, const char *mode); int qemu_get_fd(QEMUFile *f); int qemu_fclose(QEMUFile *f); int64_t qemu_ftell(QEMUFile *f); void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, int size); void qemu_put_byte(QEMUFile *f, int v); +int qemu_drain(QEMUFile *f); static inline void qemu_put_ubyte(QEMUFile *f, unsigned int v) { diff --git a/savevm.c b/savevm.c index 35c8d1e..9b90b7f 100644 --- a/savevm.c +++ b/savevm.c @@ -32,6 +32,7 @@ #include "qemu/timer.h" #include "audio/audio.h" #include "migration/migration.h" +#include "migration/rdma.h" #include "qemu/sockets.h" #include "qemu/queue.h" #include "sysemu/cpus.h" @@ -143,6 +144,13 @@ typedef struct QEMUFileSocket QEMUFile *file; } QEMUFileSocket; +typedef struct QEMUFileRDMA +{ + void *rdma; + size_t len; + QEMUFile *file; +} QEMUFileRDMA; + typedef struct { Coroutine *co; int fd; @@ -178,6 +186,66 @@ static int socket_get_fd(void *opaque) return s->fd; } +/* + * SEND messages for none-live state only. + * pc.ram is handled elsewhere... + */ +static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size) +{ + QEMUFileRDMA *r = opaque; + size_t remaining = size; + uint8_t * data = (void *) buf; + + /* + * Although we're sending non-live + * state here, push out any writes that + * we're queued up for pc.ram anyway. + */ + if (qemu_rdma_write_flush(r->rdma) < 0) + return -EIO; + + while(remaining) { + r->len = MIN(remaining, RDMA_SEND_INCREMENT); + remaining -= r->len; + + if(qemu_rdma_exchange_send(r->rdma, data, r->len) < 0) + return -EINVAL; + + data += r->len; + } + + return size; +} + +/* + * RDMA links don't use bytestreams, so we have to + * return bytes to QEMUFile opportunistically. + */ +static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size) +{ + QEMUFileRDMA *r = opaque; + + /* + * First, we hold on to the last SEND message we + * were given and dish out the bytes until we run + * out of bytes. + */ + if((r->len = qemu_rdma_fill(r->rdma, buf, size))) + return r->len; + + /* + * Once we run out, we block and wait for another + * SEND message to arrive. + */ + if(qemu_rdma_exchange_recv(r->rdma) < 0) + return -EINVAL; + + /* + * SEND was received with new bytes, now try again. + */ + return qemu_rdma_fill(r->rdma, buf, size); +} + static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size) { QEMUFileSocket *s = opaque; @@ -390,16 +458,24 @@ static const QEMUFileOps socket_write_ops = { .close = socket_close }; -QEMUFile *qemu_fopen_socket(int fd, const char *mode) +static bool qemu_mode_is_not_valid(const char * mode) { - QEMUFileSocket *s = g_malloc0(sizeof(QEMUFileSocket)); - if (mode == NULL || (mode[0] != 'r' && mode[0] != 'w') || mode[1] != 'b' || mode[2] != 0) { fprintf(stderr, "qemu_fopen: Argument validity check failed\n"); - return NULL; + return true; } + + return false; +} + +QEMUFile *qemu_fopen_socket(int fd, const char *mode) +{ + QEMUFileSocket *s = g_malloc0(sizeof(QEMUFileSocket)); + + if(qemu_mode_is_not_valid(mode)) + return NULL; s->fd = fd; if (mode[0] == 'w') { @@ -411,16 +487,66 @@ QEMUFile *qemu_fopen_socket(int fd, const char *mode) return s->file; } +static int qemu_rdma_close(void *opaque) +{ + QEMUFileRDMA *r = opaque; + if(r->rdma) { + qemu_rdma_cleanup(r->rdma); + g_free(r->rdma); + } + g_free(r); + return 0; +} + +void * migrate_use_rdma(QEMUFile *f) +{ + QEMUFileRDMA *r = f->opaque; + + return qemu_rdma_enabled(r->rdma) ? r->rdma : NULL; +} + +static int qemu_rdma_drain_completion(void *opaque) +{ + QEMUFileRDMA *r = opaque; + r->len = 0; + return qemu_rdma_drain_cq(r->rdma); +} + +static const QEMUFileOps rdma_read_ops = { + .get_buffer = qemu_rdma_get_buffer, + .close = qemu_rdma_close, +}; + +static const QEMUFileOps rdma_write_ops = { + .put_buffer = qemu_rdma_put_buffer, + .close = qemu_rdma_close, + .drain = qemu_rdma_drain_completion, +}; + +QEMUFile *qemu_fopen_rdma(void *opaque, const char * mode) +{ + QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA)); + + if(qemu_mode_is_not_valid(mode)) + return NULL; + + r->rdma = opaque; + + if (mode[0] == 'w') { + r->file = qemu_fopen_ops(r, &rdma_write_ops); + } else { + r->file = qemu_fopen_ops(r, &rdma_read_ops); + } + + return r->file; +} + QEMUFile *qemu_fopen(const char *filename, const char *mode) { QEMUFileStdio *s; - if (mode == NULL || - (mode[0] != 'r' && mode[0] != 'w') || - mode[1] != 'b' || mode[2] != 0) { - fprintf(stderr, "qemu_fopen: Argument validity check failed\n"); - return NULL; - } + if(qemu_mode_is_not_valid(mode)) + return NULL; s = g_malloc0(sizeof(QEMUFileStdio)); @@ -497,6 +623,24 @@ static void qemu_file_set_error(QEMUFile *f, int ret) } } +/* + * Called only for RDMA right now at the end + * of each live iteration of memory. + * + * 'drain' from a QEMUFile perspective means + * to flush the outbound send buffer + * (if one exists). + * + * For RDMA, this means to make sure we've + * received completion queue (CQ) messages + * successfully for all of the RDMA writes + * that we requested. + */ +int qemu_drain(QEMUFile *f) +{ + return f->ops->drain ? f->ops->drain(f->opaque) : 0; +} + /** Flushes QEMUFile buffer * */ @@ -723,6 +867,8 @@ int qemu_get_byte(QEMUFile *f) int64_t qemu_ftell(QEMUFile *f) { qemu_fflush(f); + if(migrate_use_rdma(f)) + return delta_norm_mig_bytes_transferred(); return f->pos; } @@ -1737,6 +1883,12 @@ void qemu_savevm_state_complete(QEMUFile *f) } } + if ((ret = qemu_drain(f)) < 0) { + fprintf(stderr, "failed to drain RDMA first!\n"); + qemu_file_set_error(f, ret); + return; + } + QTAILQ_FOREACH(se, &savevm_handlers, entry) { int len; -- 1.7.10.4