From: "Dr. David Alan Gilbert" <dgilb...@redhat.com> Implement migration over RDMA using the 'rsocket' library, the code appears to work for guests with < 4GB RAM, but is hitting what appears to be internal library limitations above that. (riowrite always returns EAGAIN as soon as I register more RAM). Note also that the library doesn't provide zero copy on the send side.
The code has a few other hacks and incompletenesses, but I thought I'd release the source anyway for anyone else interested in investigating. Very lightly tested, but it did manage to migrate a 'stressapptest' run on a 3.5GB guest successfully over 10Gb ROCE. Note it needs the 'qemu_ram_foreach_block: pass up error value, and down the ramblock name' patch from my postcopy world. Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com> --- arch_init.c | 3 +- include/migration/migration.h | 4 + include/qemu/iov.h | 18 +- include/qemu/sockets.h | 4 + migration/Makefile.objs | 2 +- migration/migration.c | 4 + migration/rsocket.c | 964 ++++++++++++++++++++++++++++++++++++++++++ qemu-coroutine-io.c | 3 +- trace-events | 35 ++ util/iov.c | 14 +- util/qemu-sockets.c | 8 +- 11 files changed, 1042 insertions(+), 17 deletions(-) create mode 100644 migration/rsocket.c diff --git a/arch_init.c b/arch_init.c index 7680d28..5aaa51b 100644 --- a/arch_init.c +++ b/arch_init.c @@ -847,10 +847,10 @@ static int ram_save_setup(QEMUFile *f, void *opaque) qemu_put_be64(f, block->length); } - qemu_mutex_unlock_ramlist(); ram_control_before_iterate(f, RAM_CONTROL_SETUP); ram_control_after_iterate(f, RAM_CONTROL_SETUP); + qemu_mutex_unlock_ramlist(); qemu_put_be64(f, RAM_SAVE_FLAG_EOS); @@ -1103,6 +1103,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) total_ram_bytes -= length; } + ram_control_before_iterate(f, RAM_CONTROL_SETUP); break; case RAM_SAVE_FLAG_COMPRESS: host = host_from_stream_offset(f, addr, flags); diff --git a/include/migration/migration.h b/include/migration/migration.h index 3cb5ba8..a54dd99 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -94,6 +94,10 @@ void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error ** void rdma_start_incoming_migration(const char *host_port, Error **errp); +void rsocket_start_outgoing_migration(MigrationState *s, const char *host_port, Error **errp); + +void rsocket_start_incoming_migration(const char *host_port, Error **errp); + void migrate_fd_error(MigrationState *s); void migrate_fd_connect(MigrationState *s); diff --git a/include/qemu/iov.h b/include/qemu/iov.h index 68d25f2..7bd9935 100644 --- a/include/qemu/iov.h +++ b/include/qemu/iov.h @@ -58,6 +58,17 @@ size_t iov_memset(const struct iovec *iov, const unsigned int iov_cnt, size_t offset, int fillc, size_t bytes); /* + * Helper function for iov_send_recv for standard FDs + */ +ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt, + bool do_send); + +/* + * Type of iov_send_recv_fd and similar helper functions. + */ +typedef ssize_t (*iov_send_recv_func)(int, struct iovec *, unsigned, bool); + +/* * Send/recv data from/to iovec buffers directly * * `offset' bytes in the beginning of iovec buffer are skipped and @@ -76,11 +87,12 @@ size_t iov_memset(const struct iovec *iov, const unsigned int iov_cnt, * should be within the iovec, not only beginning of it. */ ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, - size_t offset, size_t bytes, bool do_send); + size_t offset, size_t bytes, bool do_send, + iov_send_recv_func helper); #define iov_recv(sockfd, iov, iov_cnt, offset, bytes) \ - iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false) + iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, false, iov_send_recv_fd) #define iov_send(sockfd, iov, iov_cnt, offset, bytes) \ - iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true) + iov_send_recv(sockfd, iov, iov_cnt, offset, bytes, true, iov_send_recv_fd) /** * Produce a text hexdump of iovec `iov' with `iov_cnt' number of elements diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h index f47dae6..8611d8f 100644 --- a/include/qemu/sockets.h +++ b/include/qemu/sockets.h @@ -60,8 +60,11 @@ int inet_nonblocking_connect(const char *str, NonBlockingConnectHandler *callback, void *opaque, Error **errp); +void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr); int inet_dgram_opts(QemuOpts *opts, Error **errp); NetworkAddressFamily inet_netfamily(int family); +int inet_getport(struct addrinfo *e); +void inet_setport(struct addrinfo *e, int port); int unix_listen_opts(QemuOpts *opts, Error **errp); int unix_listen(const char *path, char *ostr, int olen, Error **errp); @@ -71,6 +74,7 @@ int unix_connect(const char *path, Error **errp); int unix_nonblocking_connect(const char *str, NonBlockingConnectHandler *callback, void *opaque, Error **errp); +struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp); SocketAddress *socket_parse(const char *str, Error **errp); int socket_connect(SocketAddress *addr, Error **errp, diff --git a/migration/Makefile.objs b/migration/Makefile.objs index d929e96..02fe66f 100644 --- a/migration/Makefile.objs +++ b/migration/Makefile.objs @@ -3,7 +3,7 @@ common-obj-y += vmstate.o common-obj-y += qemu-file.o qemu-file-buf.o qemu-file-unix.o qemu-file-stdio.o common-obj-y += xbzrle.o -common-obj-$(CONFIG_RDMA) += rdma.o +common-obj-$(CONFIG_RDMA) += rdma.o rsocket.o common-obj-$(CONFIG_POSIX) += exec.o unix.o fd.o common-obj-y += block.o diff --git a/migration/migration.c b/migration/migration.c index c49a05a..d264400 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -74,6 +74,8 @@ void qemu_start_incoming_migration(const char *uri, Error **errp) #ifdef CONFIG_RDMA else if (strstart(uri, "rdma:", &p)) rdma_start_incoming_migration(p, errp); + else if (strstart(uri, "rsocket:", &p)) + rsocket_start_incoming_migration(p, errp); #endif #if !defined(WIN32) else if (strstart(uri, "exec:", &p)) @@ -442,6 +444,8 @@ void qmp_migrate(const char *uri, bool has_blk, bool blk, #ifdef CONFIG_RDMA } else if (strstart(uri, "rdma:", &p)) { rdma_start_outgoing_migration(s, p, &local_err); + } else if (strstart(uri, "rsocket:", &p)) { + rsocket_start_outgoing_migration(s, p, &local_err); #endif #if !defined(WIN32) } else if (strstart(uri, "exec:", &p)) { diff --git a/migration/rsocket.c b/migration/rsocket.c new file mode 100644 index 0000000..59fca29 --- /dev/null +++ b/migration/rsocket.c @@ -0,0 +1,964 @@ +/* + * QEMU live migration + * + * Copyright Copyright 2015 Red Hat, Inc. and/or its affiliates + * + * Authors: + * David Gilbert <dgilb...@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + * (Based on migration/tcp.c) + */ + +#include <string.h> + +#include "qemu-common.h" +#include "qemu/error-report.h" +#include "qemu/iov.h" +#include "qemu/sockets.h" +#include "qemu/thread.h" +#include "migration/migration.h" +#include "migration/qemu-file.h" +#include "block/block.h" +#include "qemu/main-loop.h" +#include "trace.h" + +#include "rdma/rsocket.h" + + +typedef struct Rsocket_handler_thread_data { + int rfd; + short events; + IOHandler *callback; + void *callback_data; + + QemuThread thread; + int pipe_fds[2]; +} Rsocket_handler_thread_data; + +/* + * Called via qemu_set_fd_handler in main thread, after + * handler_thread has kicked it on receipt of the event. + */ +static void rsocket_handler_fdcallback(void *opaque) +{ + Rsocket_handler_thread_data *rhtd = opaque; + IOHandler *callback = rhtd->callback; + IOHandler *callback_data = rhtd->callback_data; + + trace_rsocket_handler_fdcallback(); + /* First do some cleanup */ + qemu_thread_join(&rhtd->thread); + qemu_set_fd_handler(rhtd->pipe_fds[0], NULL, NULL, NULL); + close(rhtd->pipe_fds[0]); + close(rhtd->pipe_fds[1]); + g_free(rhtd); + + callback(callback_data); +} + +/* Created by rsocket_set_handler */ +static void *handler_thread(void *opaque) +{ + Rsocket_handler_thread_data *rhtd = opaque; + + struct pollfd pollfd; + pollfd.fd = rhtd->rfd; + pollfd.events = rhtd->events; + pollfd.revents = 0; + + trace_rsocket_handler_thread_top(); + rpoll(&pollfd, 1, -1 /* Forever - hmm */); + + trace_rsocket_handler_thread_after_poll(); + /* + * Kick the real handler + * rsocket_handler_fdcallback will now be called and cleanup + */ + return (void *)write(rhtd->pipe_fds[1], "K", 1); +} + +/* + * The 'rfd' isn't a real fd, and so we can't give it to qemu_set_fd_handler, + * so spawn a dummy thread that waits in an rpoll, kicks a real fd which + * then ends up calling callback. + */ +static int rsocket_set_handler(int rfd, short events, const char *name, + IOHandler *callback, void *callback_data) +{ + Rsocket_handler_thread_data *rhtd = g_malloc0(sizeof(*rhtd)); + + trace_rsocket_set_handler(); + rhtd->rfd = rfd; + rhtd->callback = callback; + rhtd->callback_data = callback_data; + rhtd->events = events; + if (qemu_pipe(rhtd->pipe_fds)) { + return -1; + } + qemu_set_fd_handler(rhtd->pipe_fds[0], rsocket_handler_fdcallback, NULL, + rhtd); + qemu_thread_create(&rhtd->thread, name, handler_thread, rhtd, + QEMU_THREAD_JOINABLE); + + return 0; +} + +/* - - - Replacements for util/qemu_socket.c functions - - - - - - - - - - */ + +/* Struct to store connect state for non blocking connect */ +typedef struct RConnectState { + int fd; + struct addrinfo *addr_list; + struct addrinfo *current_addr; + NonBlockingConnectHandler *callback; + void *opaque; +} RConnectState; + +static void rsocket_set_nonblock(int rfd) +{ + long f; /* rsocket uses va_arg(..,long) to read this! */ + trace_rsocket_set_nonblock(rfd); + /* Take care, rsocket's rfcntl is very basic */ + f = rfcntl(rfd, F_GETFL); + rfcntl(rfd, F_SETFL, f | O_NONBLOCK); +} + +/* + * rsocket_inet_connect_addr/rsocket_wait_for_connect/rsocket_inet_connect_opts + * deal with multiple addresses and connections that take a while to connect + */ +static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress, + RConnectState *connect_state, Error **errp); +static void rsocket_wait_for_connect(void *opaque) +{ + RConnectState *s = opaque; + int val = 0, rc = 0; + socklen_t valsize = sizeof(val); + bool in_progress; + Error *err = NULL; + + trace_rsocket_wait_for_connect(); + do { + rc = rgetsockopt(s->fd, SOL_SOCKET, SO_ERROR, &val, &valsize); + } while (rc == -1 && errno == EINTR); + + /* update rc to contain error */ + if (!rc && val) { + rc = -1; + errno = val; + } + + /* connect error */ + if (rc < 0) { + error_setg_errno(&err, errno, "Error connecting to rsocket"); + rclose(s->fd); + s->fd = rc; + } + + /* try to connect to the next address on the list */ + if (s->current_addr) { + while (s->current_addr->ai_next != NULL && s->fd < 0) { + s->current_addr = s->current_addr->ai_next; + s->fd = rsocket_inet_connect_addr(s->current_addr, &in_progress, s, + NULL); + if (s->fd < 0) { + error_free(err); + err = NULL; + error_setg_errno(&err, errno, + "Unable to start rsocket connect"); + } + /* connect in progress */ + if (in_progress) { + goto out; + } + } + + freeaddrinfo(s->addr_list); + } + + if (s->callback) { + s->callback(s->fd, err, s->opaque); + } + g_free(s); +out: + error_free(err); +} + +static int rsocket_inet_connect_addr(struct addrinfo *addr, bool *in_progress, + RConnectState *connect_state, Error **errp) +{ + int sock, rc, tmp; + + trace_rsocket_inet_connect_addr(); + *in_progress = false; + + sock = rsocket(addr->ai_family, addr->ai_socktype, addr->ai_protocol); + if (sock < 0) { + error_setg_errno(errp, errno, "Failed to create socket"); + return -1; + } + rsetsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (const char *)&tmp, sizeof(tmp)); + /* connect to peer */ + do { + rc = 0; + if (rconnect(sock, addr->ai_addr, addr->ai_addrlen) < 0) { + rc = -errno; + } + } while (rc == -EINTR); + + if (connect_state != NULL && rc == -EINPROGRESS) { + connect_state->fd = sock; + rsocket_set_handler(sock, POLLOUT | POLLERR , "rsocketconnect", + rsocket_wait_for_connect, connect_state); + *in_progress = true; + } else if (rc < 0) { + error_setg_errno(errp, errno, "Failed to connect socket"); + rclose(sock); + return -1; + } + return sock; +} + +static int rsocket_inet_connect_opts(QemuOpts *opts, Error **errp, + NonBlockingConnectHandler *callback, void *opaque) +{ + Error *local_err = NULL; + struct addrinfo *res, *e; + int sock = -1; + bool in_progress; + RConnectState *connect_state = NULL; + + trace_rsocket_inet_connect_opts(); + res = inet_parse_connect_opts(opts, errp); + if (!res) { + return -1; + } + + if (callback != NULL) { + connect_state = g_malloc0(sizeof(*connect_state)); + connect_state->addr_list = res; + connect_state->callback = callback; + connect_state->opaque = opaque; + } + + for (e = res; e != NULL; e = e->ai_next) { + error_free(local_err); + local_err = NULL; + if (connect_state != NULL) { + connect_state->current_addr = e; + } + sock = rsocket_inet_connect_addr(e, &in_progress, connect_state, + &local_err); + if (sock >= 0) { + break; + } + } + + if (sock < 0) { + error_propagate(errp, local_err); + } else if (in_progress) { + /* wait_for_connect() will do the rest */ + return sock; + } else { + if (callback) { + callback(sock, NULL, opaque); + } + } + g_free(connect_state); + freeaddrinfo(res); + return sock; +} + +static int rsocket_inet_listen_opts(QemuOpts *opts, int port_offset, + Error **errp) +{ + struct addrinfo ai,*res,*e; + const char *addr; + char port[33]; + char uaddr[INET6_ADDRSTRLEN+1]; + char uport[33]; + int slisten; + int rc, to, port_min, port_max, p; + + trace_rsocket_inet_listen_opts(); + memset(&ai,0, sizeof(ai)); + ai.ai_flags = AI_PASSIVE | AI_ADDRCONFIG; + ai.ai_family = PF_UNSPEC; + ai.ai_socktype = SOCK_STREAM; + + if ((qemu_opt_get(opts, "host") == NULL) || + (qemu_opt_get(opts, "port") == NULL)) { + error_setg(errp, "host and/or port not specified"); + return -1; + } + pstrcpy(port, sizeof(port), qemu_opt_get(opts, "port")); + addr = qemu_opt_get(opts, "host"); + + to = qemu_opt_get_number(opts, "to", 0); + if (qemu_opt_get_bool(opts, "ipv4", 0)) + ai.ai_family = PF_INET; + if (qemu_opt_get_bool(opts, "ipv6", 0)) + ai.ai_family = PF_INET6; + + /* lookup */ + if (port_offset) { + unsigned long long baseport; + if (parse_uint_full(port, &baseport, 10) < 0) { + error_setg(errp, "can't convert to a number: %s", port); + return -1; + } + if (baseport > 65535 || + baseport + port_offset > 65535) { + error_setg(errp, "port %s out of range", port); + return -1; + } + snprintf(port, sizeof(port), "%d", (int)baseport + port_offset); + } + rc = getaddrinfo(strlen(addr) ? addr : NULL, port, &ai, &res); + if (rc != 0) { + error_setg(errp, "address resolution failed for %s:%s: %s", addr, port, + gai_strerror(rc)); + return -1; + } + + /* create socket + bind */ + for (e = res; e != NULL; e = e->ai_next) { + int tmp; + getnameinfo((struct sockaddr*)e->ai_addr,e->ai_addrlen, + uaddr,INET6_ADDRSTRLEN,uport,32, + NI_NUMERICHOST | NI_NUMERICSERV); + slisten = rsocket(e->ai_family, e->ai_socktype, e->ai_protocol); + if (slisten < 0) { + if (!e->ai_next) { + error_setg_errno(errp, errno, "Failed to create socket"); + } + continue; + } + + tmp = 1; + rsetsockopt(slisten, SOL_SOCKET, SO_REUSEADDR, + (const char *)&tmp, sizeof(tmp)); +#ifdef IPV6_V6ONLY + if (e->ai_family == PF_INET6) { + /* listen on both ipv4 and ipv6 */ + const int off = 0; + + rsetsockopt(slisten, IPPROTO_IPV6, IPV6_V6ONLY, &off, + sizeof(off)); + } +#endif + + port_min = inet_getport(e); + port_max = to ? to + port_offset : port_min; + for (p = port_min; p <= port_max; p++) { + inet_setport(e, p); + if (rbind(slisten, e->ai_addr, e->ai_addrlen) == 0) { + goto listen; + } + if (p == port_max) { + if (!e->ai_next) { + error_setg_errno(errp, errno, "Failed to bind socket"); + } + } + } + rclose(slisten); + } + freeaddrinfo(res); + return -1; + +listen: + if (rlisten(slisten,1) != 0) { + error_setg_errno(errp, errno, "Failed to listen on socket"); + rclose(slisten); + freeaddrinfo(res); + return -1; + } + snprintf(uport, sizeof(uport), "%d", inet_getport(e) - port_offset); + qemu_opt_set(opts, "host", uaddr); + qemu_opt_set(opts, "port", uport); + qemu_opt_set(opts, "ipv6", (e->ai_family == PF_INET6) ? "on" : "off"); + qemu_opt_set(opts, "ipv4", (e->ai_family != PF_INET6) ? "on" : "off"); + freeaddrinfo(res); + return slisten; +} + +static int rsocket_inet_listen(const char *str, + int socktype, int port_offset, Error **errp) +{ + QemuOpts *opts; + int sock = -1; + InetSocketAddress *addr; + + trace_rsocket_inet_listen(); + addr = inet_parse(str, errp); + if (addr != NULL) { + opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort); + inet_addr_to_opts(opts, addr); + qapi_free_InetSocketAddress(addr); + sock = rsocket_inet_listen_opts(opts, port_offset, errp); + qemu_opts_del(opts); + } + return sock; +} + +/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ +typedef struct RAMBlockMapEntry { + void *host_addr; + off_t rsocket_offset; + size_t len; +} RAMBlockMapEntry; + +typedef struct QEMURsocket { + int rfd; + QEMUFile *file; + bool isSource; + + /* Used by the 'yield_thread' that waits on an rsocket for data */ + QemuThread thread; + QemuSemaphore sem; + int pipe_fds[2]; + short poll_event; + + /* A mapping from ram_addr_t (block_offset) to RAMBlockMapEntry */ + GHashTable *RAMBlockMap; + + /* Only used for get_fd */ + int dummy_fd; +} QEMURsocket; + +/* The yield thread is used once the rsocket is open to wait for data + * or the space to write data by using 'rpoll'; it then uses a pipe + * to wake a waiting coroutine. + * The thread waits for a semaphore and when it receives it waits on the + * channel. + */ +static void *yield_thread(void *opaque) +{ + QEMURsocket *rs = opaque; + struct pollfd pfd; + + while (1) { + trace_rsocket_yield_thread_top(); + /* Wait until something tries to yield */ + qemu_sem_wait(&rs->sem); + /* Now wait for the rsocket */ + pfd.fd = rs->rfd; + pfd.events = atomic_fetch_add(&rs->poll_event, 0); + if (!pfd.events) { + trace_rsocket_yield_thread_requested_exit(); + /* Exit */ + break; + } + pfd.events |= POLLERR | POLLHUP | POLLNVAL; + rpoll(&pfd, 1, -1 /* Hmm */); + + /* Kick the waiting coroutine */ + if (write(rs->pipe_fds[1], "K", 1) != 1) { + break; + } + } + trace_rsocket_yield_thread_exit(); + return NULL; +} + +static int start_yield_thread(QEMURsocket *rs, short poll_event) +{ + trace_rsocket_start_yield_thread(); + if (qemu_pipe(rs->pipe_fds)) { + return -errno; + } + rs->poll_event = poll_event; + qemu_sem_init(&rs->sem, 0); + + qemu_thread_create(&rs->thread, "rsocketyield", yield_thread, rs, + QEMU_THREAD_JOINABLE); + + return 0; +} + +static void stop_yield_thread(QEMURsocket *rs) +{ + trace_rsocket_stop_yield_thread(); + /* Tell the thread to exit */ + atomic_and(&rs->poll_event, 0); + qemu_sem_post(&rs->sem); + + qemu_thread_join(&rs->thread); + close(rs->pipe_fds[0]); + close(rs->pipe_fds[1]); + qemu_sem_destroy(&rs->sem); +} + +static int rsocket_yield(QEMURsocket *rs) +{ + char dummy; + + trace_rsocket_yield(); + /* Ask the yield_thread to wait on the rsocket */ + qemu_sem_post(&rs->sem); + /* And then wait for it to kick us */ + yield_until_fd_readable(rs->pipe_fds[0]); + /* Consume the dummy character on the pipe */ + return read(rs->pipe_fds[0], &dummy, 1); +} + +/* helper function for iov_send_recv as used by rsocket_writev_buffer */ +static ssize_t rsocket_send(int rsockfd, struct iovec *iov, unsigned iov_cnt, + bool do_send) +{ + ssize_t ret; + struct msghdr msg; + + trace_rsocket_send(); + assert(do_send); + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = iov; + msg.msg_iovlen = iov_cnt; + do { + ret = rsendmsg(rsockfd, &msg, 0); + } while (ret < 0 && errno == EINTR); + return ret; +} + +static ssize_t rsocket_writev_buffer(void *opaque, struct iovec *iov, int iovcnt, + int64_t pos) +{ + QEMURsocket *rs = opaque; + ssize_t len; + ssize_t size = iov_size(iov, iovcnt); + + trace_rsocket_writev_buffer(size); + len = iov_send_recv(rs->rfd, iov, iovcnt, 0 /* offset */, size, true, rsocket_send); + if (len < size) { + len = -socket_error(); + } + trace_rsocket_writev_buffer_end(size, len); + return len; +} + +static int rsocket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size) +{ + QEMURsocket *rs = opaque; + ssize_t len; + + trace_rsocket_get_buffer(size); + for (;;) { + len = rrecv(rs->rfd, buf, size, 0); + if (len != -1) { + break; + } + if (errno == EAGAIN) { + rsocket_yield(rs); + } else if (errno != EINTR) { + break; + } + } + + if (len == -1) { + len = -socket_error(); + } + trace_rsocket_get_buffer_exit(size, len); + return len; +} + +static gboolean rsocket_close_RAMBlock_func(gpointer key, gpointer value, + gpointer user_data) +{ + QEMURsocket *rs = user_data; + /* key is the host address of the start of the RAMBlock */ + RAMBlockMapEntry *rbme = value; + + trace_rsocket_close_RAMBlock_func(rbme->host_addr, rs->isSource); + + if (rs->isSource) { + /* rsocket segs if I don't unmap before close */ + riounmap(rs->rfd, rbme->host_addr, rbme->len); + } + g_free(rbme); + + return TRUE; /* Delete from hash */ +} + +static int rsocket_close(void *opaque) +{ + QEMURsocket *rs = opaque; + trace_rsocket_close(); + stop_yield_thread(rs); + g_hash_table_foreach_remove(rs->RAMBlockMap, rsocket_close_RAMBlock_func, + rs); + g_hash_table_destroy(rs->RAMBlockMap); + /* rclose(rs->rfd); - HACK! I'm getting a seg in the rsocket code here; even with the close above */ + close(rs->dummy_fd); + g_free(rs); + + return 0; +} + +static int rsocket_get_fd(void *opaque) +{ + QEMURsocket *rs = opaque; + + /* Hack! get_fd is used for one thing in the general migration code, and + * that's for marking it as non-blocking; that needs fixing since we don't + * have a real fd we can return. + */ + return rs->dummy_fd; +} + +static int rsocket_dest_ramblock_reg(const char *block_name, void *host_addr, + ram_addr_t block_offset, ram_addr_t length, void *opaque) +{ + QEMURsocket *rs = opaque; + uint64_t rsocket_offset; + uint8_t block_name_len = strlen(block_name); + RAMBlockMapEntry *rbme = g_new0(RAMBlockMapEntry, 1); + + /* Register every RAMBlock so that we can RDMA into it */ + rsocket_offset = riomap(rs->rfd, host_addr, length, PROT_WRITE, + 0 /* flags? */, -1 /* Offset */); + if (rsocket_offset == -1) { + error_report("riomap for %s", block_name); + return -1; + } + rbme->rsocket_offset = rsocket_offset; + rbme->len = length; + rbme->host_addr = host_addr; + g_hash_table_insert(rs->RAMBlockMap, (gpointer)block_offset, rbme); + + trace_rsocket_dest_ramblock_reg(block_name, block_offset, rsocket_offset); + + /* Send the block name and the key to the other side */ + if (rwrite(rs->rfd, &block_name_len, 1)!=1) { + error_report("%s: block_name_len write for %s\n", strerror(errno), block_name); + return -1; + } + if (rwrite(rs->rfd, block_name, block_name_len)!=block_name_len) { + error_report("%s: block_name write for %s\n", strerror(errno), block_name); + return -1; + } + if (rwrite(rs->rfd, &rsocket_offset, 8)!=8) { + error_report("%s: rsocket_offset write for %s\n", strerror(errno), block_name); + return -1; + } + return 0; +} + +/* + * Register all the RAMBlocks as places we might want to RDMA into. + */ +static int rsocket_dest_ramblock_setup(QEMURsocket *rs) +{ + char zero = 0; + + /* + * Now register each RAMBlock and get an 'offset' that the src can pass + * to Riowrite. + */ + if (qemu_ram_foreach_block(rsocket_dest_ramblock_reg, rs)) { + error_report("Failed to map rsocket buffers"); + return -1; + } + + if (rwrite(rs->rfd, &zero, 1)!=1) { + error_report("%s: terminator write RAMBlock list\n", strerror(errno)); + return -1; + } + return 0; +} + +/* Effectively this is just a RAMBlock but we don't have access to it */ +typedef struct RAMBlockSourceData { + void *host_addr; + ram_addr_t block_offset; + ram_addr_t length; +} RAMBlockSourceData; + +static int source_ramblock_name_mapfunc(const char *block_name, void *host_addr, + ram_addr_t block_offset, ram_addr_t length, void *opaque) +{ + GHashTable *map = opaque; + RAMBlockSourceData *rbsd = g_new(RAMBlockSourceData, 1); + + rbsd->host_addr = host_addr; + rbsd->block_offset = block_offset; + rbsd->length = length; + g_hash_table_insert(map, (gpointer)(intptr_t)g_quark_from_string(block_name), rbsd); + return 0; +} + +/* + * Read the list of rsocket keys from the source. + */ +static int rsocket_source_ramblock_setup(QEMUFile *f, QEMURsocket *rs) +{ + char ram_block_name[256]; + uint8_t block_name_len; + uint64_t rsocket_key; + GHashTable *block_name_map; + int ret = -1; + + /* Ensure the previous data sent gets to the destination, because + * only then will it send this response. + */ + qemu_fflush(f); + + /* Build a mapping from RAMBlock name to ram_addr_t offset for block */ + block_name_map = g_hash_table_new(NULL, NULL); + if (qemu_ram_foreach_block(source_ramblock_name_mapfunc, block_name_map)) { + error_report("Failed to make source RAMBlock map"); + goto err; + } + + /* + * We're sent a list of RAMBlocks of the form: + * byte - length of RAMBlock name + * byte[] - The RAMBlock name + * uint64_t - The rsocket 'offset' or key for the block + * + * If the length is 0 it's the end of the list. + */ + do { + RAMBlockMapEntry *rbme; + if (rread(rs->rfd, &block_name_len, 1) != 1) { + error_report("%s: block_name_len read", strerror(errno)); + goto err; + } + if (block_name_len) { + RAMBlockSourceData* rbsd; + if (rread(rs->rfd, ram_block_name, block_name_len) != + block_name_len) { + error_report("%s: block_name read", strerror(errno)); + goto err; + } + ram_block_name[block_name_len] = 0; + if (rread(rs->rfd, &rsocket_key, 8) != 8) { + error_report("%s: rsocket_key read", strerror(errno)); + goto err; + } + rbsd = g_hash_table_lookup(block_name_map, + (gpointer)(intptr_t)g_quark_from_string(ram_block_name)); + if (!rbsd) { + error_report("No matching RAMBlock for %s", ram_block_name); + goto err; + } + rbme = g_new0(RAMBlockMapEntry, 1); + rbme->rsocket_offset = rsocket_key; + rbme->len = rbsd->length; + rbme->host_addr = rbsd->host_addr; + g_hash_table_insert(rs->RAMBlockMap, (gpointer)rbsd->block_offset, rbme); + trace_rsocket_source_ramblock_setup(ram_block_name, rbsd->block_offset, rsocket_key); + } + } while (block_name_len); + + ret = 0; /* Good */ +err: + /* TODO: Clean up contents */ + g_hash_table_destroy(block_name_map); + return ret; +} + +static int rsocket_dest_before_ram_iterate(QEMUFile *f, void *opaque, + uint64_t flags) +{ + QEMURsocket *rs = opaque;; + + trace_rsocket_dest_before_ram_iterate(flags); + switch (flags) { + case RAM_CONTROL_SETUP: + /* + * Called after we've loaded the list of RAMBlocks from the source and + * checked them + */ + return rsocket_dest_ramblock_setup(rs); + break; + + } + + return 0; +} + +static int rsocket_source_before_ram_iterate(QEMUFile *f, void *opaque, + uint64_t flags) +{ + QEMURsocket *rs = opaque; + + trace_rsocket_source_before_ram_iterate(flags); + switch (flags) { + case RAM_CONTROL_SETUP: + /* + * Called after we've sent the list of RAMBlocks + */ + return rsocket_source_ramblock_setup(f, rs); + break; + + } + + return 0; +} + +static size_t rsocket_save_page(QEMUFile *f, void *opaque, + ram_addr_t block_offset, ram_addr_t offset, + size_t size, int *bytes_sent) +{ + QEMURsocket *rs = opaque; + RAMBlockMapEntry *rbme; + size_t ret; + + rbme = g_hash_table_lookup(rs->RAMBlockMap, (gpointer)block_offset); + if (!rbme) { + error_report("Unable to find matching RSocket key for block " RAM_ADDR_FMT, block_offset); + return -1; + } + trace_rsocket_save_page(block_offset, offset, rbme->rsocket_offset); + + do { + ret=riowrite(rs->rfd, rbme->host_addr+offset, size, rbme->rsocket_offset+offset, 0); + } while (ret==0 && errno == EAGAIN); + if (ret != size) { + error_report("riowrite: %s (%zd/%zd)", strerror(errno), size, ret); + return -1; + } + *bytes_sent = size; + + return RAM_SAVE_CONTROL_DELAYED; +} + + +static const QEMUFileOps rsocket_read_ops = { + .get_fd = rsocket_get_fd, + .get_buffer = rsocket_get_buffer, + .close = rsocket_close, + .before_ram_iterate = rsocket_dest_before_ram_iterate, +}; + +static const QEMUFileOps rsocket_write_ops = { + .writev_buffer = rsocket_writev_buffer, + .close = rsocket_close, + .before_ram_iterate = rsocket_source_before_ram_iterate, + .save_page = rsocket_save_page, +}; + +static QEMUFile *qemu_fopen_rsocket(int rfd, const char *mode) +{ + QEMURsocket *s; + + trace_qemu_fopen_rsocket(rfd, mode); + if (qemu_file_mode_is_not_valid(mode)) { + return NULL; + } + + s = g_malloc0(sizeof(QEMURsocket)); + s->rfd = rfd; + s->dummy_fd = open("/dev/null", O_RDWR); + s->RAMBlockMap = g_hash_table_new(NULL, NULL); + if (s->dummy_fd == -1) { + error_report("Failed to open dummy_fd (%s)", strerror(errno)); + goto err; + } + if (start_yield_thread(s, (mode[0] == 'w')?POLLOUT:POLLIN)) { + goto errfd; + } + if (mode[0] == 'w') { + s->file = qemu_fopen_ops(s, &rsocket_write_ops); + s->isSource = true; + } else { + s->file = qemu_fopen_ops(s, &rsocket_read_ops); + s->isSource = false; + rsocket_set_nonblock(rfd); + } + + return s->file; + +errfd: + close(s->dummy_fd); +err: + rclose(rfd); + g_free(s); + return NULL; +} + +static void rsocket_have_connect(int rfd, Error *err, void *opaque) +{ + MigrationState *s = opaque; + + trace_rsocket_have_connect(rfd); + + if (rfd < 0) { + s->file = NULL; + migrate_fd_error(s); + } else { + s->file = qemu_fopen_rsocket(rfd, "wb"); + migrate_fd_connect(s); + } +} + +void rsocket_start_outgoing_migration(MigrationState *s, const char *host_port, + Error **errp) +{ + QemuOpts *opts; + InetSocketAddress *addr; + + trace_rsocket_start_outgoing_migration(); + addr = inet_parse(host_port, errp); + if (addr != NULL) { + opts = qemu_opts_create(&socket_optslist, NULL, 0, &error_abort); + inet_addr_to_opts(opts, addr); + qapi_free_InetSocketAddress(addr); + rsocket_inet_connect_opts(opts, errp, rsocket_have_connect, s); + qemu_opts_del(opts); + } +} + +static void rsocket_accept_incoming_migration(void *opaque) +{ + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + int rfd_s = (intptr_t)opaque; + int rfd_c; + QEMUFile *f; + int err; + + trace_rsocket_accept_incoming_migration(); + do { + rfd_c = raccept(rfd_s, (struct sockaddr *)&addr, &addrlen); + err = errno; + } while (rfd_c < 0 && err == EINTR); + rclose(rfd_s); + + if (rfd_c < 0) { + error_report("could not accept migration connection (%s)", + strerror(err)); + return; + } + + f = qemu_fopen_rsocket(rfd_c, "rb"); + if (f == NULL) { + error_report("could not qemu_fopen rsocket"); + goto out; + } + + process_incoming_migration(f); + return; + +out: + rclose(rfd_c); +} + +void rsocket_start_incoming_migration(const char *host_port, Error **errp) +{ + int rfd; + + trace_rsocket_start_incoming_migration(); + rfd = rsocket_inet_listen(host_port, SOCK_STREAM, 0, errp); + if (rfd < 0) { + return; + } + + rsocket_set_handler(rfd,POLLIN | POLLERR , "rsocketlisten", + rsocket_accept_incoming_migration, + (void *)(intptr_t)rfd); +} diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c index d404926..9569d17 100644 --- a/qemu-coroutine-io.c +++ b/qemu-coroutine-io.c @@ -37,7 +37,8 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt, int err; while (done < bytes) { ret = iov_send_recv(sockfd, iov, iov_cnt, - offset + done, bytes - done, do_send); + offset + done, bytes - done, do_send, + iov_send_recv_fd); if (ret > 0) { done += ret; } else if (ret < 0) { diff --git a/trace-events b/trace-events index b5722ea..92a6dff 100644 --- a/trace-events +++ b/trace-events @@ -1149,6 +1149,41 @@ vmstate_load_field_error(const char *field, int ret) "field \"%s\" load failed, # qemu-file.c qemu_file_fclose(void) "" +rsocket_handler_fdcallback(void) "" +rsocket_handler_thread_top(void) "" +rsocket_handler_thread_after_poll(void) "" +rsocket_set_handler(void) "" +rsocket_wait_for_connect(void) "" +rsocket_inet_connect_addr(void) "" +rsocket_inet_connect_opts(void) "" +rsocket_inet_listen_opts(void) "" +rsocket_inet_listen(void) "" +rsocket_yield_thread_top(void) "" +rsocket_yield_thread_requested_exit(void) "" +rsocket_yield_thread_exit(void) "" +rsocket_set_nonblock(int rfd) "%d" +rsocket_start_yield_thread(void) "" +rsocket_stop_yield_thread(void) "" +rsocket_yield(void) "" +rsocket_send(void) "" +rsocket_writev_buffer(ssize_t size) "%zd" +rsocket_writev_buffer_end(ssize_t size, ssize_t len) "size=%zd / return %zd" +rsocket_get_buffer(int size) "size=%d" +rsocket_get_buffer_exit(int size, ssize_t len) "size=%d return %zd" +rsocket_close(void) "" +qemu_fopen_rsocket(int rfd, const char *mode) "rfd %d mode %s" +rsocket_have_connect(int rfd) "rfd %d" +rsocket_start_outgoing_migration(void) "" +rsocket_accept_incoming_migration(void) "" +rsocket_start_incoming_migration(void) "" +rsocket_dest_ramblock_reg(const char *block_name, uint64_t block_offset, uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64 +rsocket_source_ramblock_setup(const char *block_name, uint64_t block_offset, uint64_t rsock_key) "%s %" PRIx64 "->%" PRIx64 +rsocket_dest_before_ram_iterate(uint64_t flags) "%" PRIx64 +rsocket_save_page(uint64_t block_offset, uint64_t offset, uint64_t rsocket_key) "block=%" PRIx64 " offset=%" PRIx64 " rsocket_key=%" PRIx64 +rsocket_source_before_ram_iterate(uint64_t flags) "%" PRIx64 +rsocket_close_RAMBlock_func(void *host_addr, bool isSource) "%p %u" +rsocket_save_page_riowrite_eagain(uint64_t offset) "%" PRIx64 + # arch_init.c migration_bitmap_sync_start(void) "" migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64"" diff --git a/util/iov.c b/util/iov.c index 2fb18e6..6dbf62e 100644 --- a/util/iov.c +++ b/util/iov.c @@ -88,9 +88,9 @@ size_t iov_size(const struct iovec *iov, const unsigned int iov_cnt) return len; } -/* helper function for iov_send_recv() */ -static ssize_t -do_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, bool do_send) +/* helper function for iov_send_recv() for normal FDs */ +ssize_t iov_send_recv_fd(int sockfd, struct iovec *iov, unsigned iov_cnt, + bool do_send) { #ifdef CONFIG_POSIX ssize_t ret; @@ -134,8 +134,8 @@ do_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, bool do_send) } ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, - size_t offset, size_t bytes, - bool do_send) + size_t offset, size_t bytes, bool do_send, + iov_send_recv_func helper) { ssize_t total = 0; ssize_t ret; @@ -174,11 +174,11 @@ ssize_t iov_send_recv(int sockfd, struct iovec *iov, unsigned iov_cnt, assert(iov[niov].iov_len > tail); orig_len = iov[niov].iov_len; iov[niov++].iov_len = tail; - ret = do_send_recv(sockfd, iov, niov, do_send); + ret = helper(sockfd, iov, niov, do_send); /* Undo the changes above before checking for errors */ iov[niov-1].iov_len = orig_len; } else { - ret = do_send_recv(sockfd, iov, niov, do_send); + ret = helper(sockfd, iov, niov, do_send); } if (offset) { iov[0].iov_base -= offset; diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c index a76bb3c..2d0fa60 100644 --- a/util/qemu-sockets.c +++ b/util/qemu-sockets.c @@ -58,7 +58,7 @@ QemuOptsList socket_optslist = { }, }; -static int inet_getport(struct addrinfo *e) +int inet_getport(struct addrinfo *e) { struct sockaddr_in *i4; struct sockaddr_in6 *i6; @@ -75,7 +75,7 @@ static int inet_getport(struct addrinfo *e) } } -static void inet_setport(struct addrinfo *e, int port) +void inet_setport(struct addrinfo *e, int port) { struct sockaddr_in *i4; struct sockaddr_in6 *i6; @@ -319,7 +319,7 @@ static int inet_connect_addr(struct addrinfo *addr, bool *in_progress, return sock; } -static struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp) +struct addrinfo *inet_parse_connect_opts(QemuOpts *opts, Error **errp) { struct addrinfo ai, *res; int rc; @@ -574,7 +574,7 @@ fail: return NULL; } -static void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr) +void inet_addr_to_opts(QemuOpts *opts, const InetSocketAddress *addr) { bool ipv4 = addr->ipv4 || !addr->has_ipv4; bool ipv6 = addr->ipv6 || !addr->has_ipv6; -- 2.1.0