Now that all callers are converted to use I/O channels for initial connection setup, it is possible to switch the core NBD protocol handling core over to use QIOChannel APIs for actual sockets I/O.
Signed-off-by: Daniel P. Berrange <berra...@redhat.com> --- block/nbd-client.c | 19 +++---- blockdev-nbd.c | 6 +-- include/block/nbd.h | 20 ++++--- nbd/client.c | 40 +++++++------- nbd/common.c | 112 ++++++++++++++++++++++++++++++--------- nbd/nbd-internal.h | 18 +++---- nbd/server.c | 150 +++++++++++++++++++++++++++++----------------------- qemu-nbd.c | 10 ++-- 8 files changed, 226 insertions(+), 149 deletions(-) diff --git a/block/nbd-client.c b/block/nbd-client.c index bf5b0a3..ed69d4b 100644 --- a/block/nbd-client.c +++ b/block/nbd-client.c @@ -71,7 +71,7 @@ static void nbd_reply_ready(void *opaque) * that another thread has done the same thing in parallel, so * the socket is not readable anymore. */ - ret = nbd_receive_reply(s->sioc->fd, &s->reply); + ret = nbd_receive_reply(s->ioc, &s->reply); if (ret == -EAGAIN) { return; } @@ -122,6 +122,7 @@ static int nbd_co_send_request(BlockDriverState *bs, } } + g_assert(qemu_in_coroutine()); assert(i < MAX_NBD_REQUESTS); request->handle = INDEX_TO_HANDLE(s, i); s->send_coroutine = qemu_coroutine_self(); @@ -131,17 +132,17 @@ static int nbd_co_send_request(BlockDriverState *bs, nbd_reply_ready, nbd_restart_write, bs); if (qiov) { qio_channel_set_cork(s->ioc, true); - rc = nbd_send_request(s->sioc->fd, request); + rc = nbd_send_request(s->ioc, request); if (rc >= 0) { - ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov, - offset, request->len); + ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, + offset, request->len, 0); if (ret != request->len) { rc = -EIO; } } qio_channel_set_cork(s->ioc, false); } else { - rc = nbd_send_request(s->sioc->fd, request); + rc = nbd_send_request(s->ioc, request); } aio_set_fd_handler(aio_context, s->sioc->fd, false, nbd_reply_ready, NULL, bs); @@ -164,8 +165,8 @@ static void nbd_co_receive_reply(NbdClientSession *s, reply->error = EIO; } else { if (qiov && reply->error == 0) { - ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov, - offset, request->len); + ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, + offset, request->len, 1); if (ret != request->len) { reply->error = EIO; } @@ -372,7 +373,7 @@ void nbd_client_close(BlockDriverState *bs) return; } - nbd_send_request(client->sioc->fd, &request); + nbd_send_request(client->ioc, &request); nbd_teardown_connection(bs); } @@ -387,7 +388,7 @@ int nbd_client_init(BlockDriverState *bs, QIOChannelSocket *sioc, logout("session init %s\n", export); qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL); - ret = nbd_receive_negotiate(sioc->fd, export, + ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export, &client->nbdflags, &client->size, errp); if (ret < 0) { logout("Failed to negotiate with the NBD server\n"); diff --git a/blockdev-nbd.c b/blockdev-nbd.c index 94b653d..7d4c415 100644 --- a/blockdev-nbd.c +++ b/blockdev-nbd.c @@ -26,7 +26,6 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition, gpointer opaque) { QIOChannelSocket *cioc; - int fd; cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc), NULL); @@ -34,10 +33,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition condition, return TRUE; } - fd = dup(cioc->fd); - if (fd >= 0) { - nbd_client_new(NULL, fd, nbd_client_put); - } + nbd_client_new(NULL, cioc, nbd_client_put); object_unref(OBJECT(cioc)); return TRUE; } diff --git a/include/block/nbd.h b/include/block/nbd.h index 7eccb41..1080ef8 100644 --- a/include/block/nbd.h +++ b/include/block/nbd.h @@ -23,6 +23,7 @@ #include "qemu-common.h" #include "qemu/option.h" +#include "io/channel-socket.h" struct nbd_request { uint32_t magic; @@ -73,12 +74,17 @@ enum { /* Maximum size of a single READ/WRITE data buffer */ #define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024) -ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read); -int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, +ssize_t nbd_wr_syncv(QIOChannel *ioc, + struct iovec *iov, + size_t niov, + size_t offset, + size_t length, + bool do_read); +int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags, off_t *size, Error **errp); -int nbd_init(int fd, int csock, uint32_t flags, off_t size); -ssize_t nbd_send_request(int csock, struct nbd_request *request); -ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply); +int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size); +ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request); +ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply); int nbd_client(int fd); int nbd_disconnect(int fd); @@ -98,7 +104,9 @@ NBDExport *nbd_export_find(const char *name); void nbd_export_set_name(NBDExport *exp, const char *name); void nbd_export_close_all(void); -void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *)); +void nbd_client_new(NBDExport *exp, + QIOChannelSocket *sioc, + void (*close)(NBDClient *)); void nbd_client_get(NBDClient *client); void nbd_client_put(NBDClient *client); diff --git a/nbd/client.c b/nbd/client.c index 83df7ba..07e1fed 100644 --- a/nbd/client.c +++ b/nbd/client.c @@ -70,7 +70,7 @@ static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports); */ -int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, +int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags, off_t *size, Error **errp) { char buf[256]; @@ -82,7 +82,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, rc = -EINVAL; - if (read_sync(csock, buf, 8) != 8) { + if (read_sync(ioc, buf, 8) != 8) { error_setg(errp, "Failed to read data"); goto fail; } @@ -108,7 +108,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, goto fail; } - if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) { error_setg(errp, "Failed to read magic"); goto fail; } @@ -129,35 +129,35 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, } goto fail; } - if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) { error_setg(errp, "Failed to read server flags"); goto fail; } *flags = be16_to_cpu(tmp) << 16; /* reserved for future use */ - if (write_sync(csock, &reserved, sizeof(reserved)) != + if (write_sync(ioc, &reserved, sizeof(reserved)) != sizeof(reserved)) { error_setg(errp, "Failed to read reserved field"); goto fail; } /* write the export name */ magic = cpu_to_be64(magic); - if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) { + if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) { error_setg(errp, "Failed to send export name magic"); goto fail; } opt = cpu_to_be32(NBD_OPT_EXPORT_NAME); - if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) { + if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) { error_setg(errp, "Failed to send export name option number"); goto fail; } namesize = cpu_to_be32(strlen(name)); - if (write_sync(csock, &namesize, sizeof(namesize)) != + if (write_sync(ioc, &namesize, sizeof(namesize)) != sizeof(namesize)) { error_setg(errp, "Failed to send export name length"); goto fail; } - if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) { + if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) { error_setg(errp, "Failed to send export name"); goto fail; } @@ -174,7 +174,7 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, } } - if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) { + if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) { error_setg(errp, "Failed to read export length"); goto fail; } @@ -182,19 +182,19 @@ int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, TRACE("Size is %" PRIu64, *size); if (!name) { - if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) { + if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) { error_setg(errp, "Failed to read export flags"); goto fail; } *flags = be32_to_cpup(flags); } else { - if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) { error_setg(errp, "Failed to read export flags"); goto fail; } *flags |= be16_to_cpu(tmp); } - if (read_sync(csock, &buf, 124) != 124) { + if (read_sync(ioc, &buf, 124) != 124) { error_setg(errp, "Failed to read reserved block"); goto fail; } @@ -205,11 +205,11 @@ fail: } #ifdef __linux__ -int nbd_init(int fd, int csock, uint32_t flags, off_t size) +int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size) { TRACE("Setting NBD socket"); - if (ioctl(fd, NBD_SET_SOCK, csock) < 0) { + if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) { int serrno = errno; LOG("Failed to set NBD socket"); return -serrno; @@ -282,7 +282,7 @@ int nbd_client(int fd) return ret; } #else -int nbd_init(int fd, int csock, uint32_t flags, off_t size) +int nbd_init(int fd, QIOChannelSocket *ioc, uint32_t flags, off_t size) { return -ENOTSUP; } @@ -293,7 +293,7 @@ int nbd_client(int fd) } #endif -ssize_t nbd_send_request(int csock, struct nbd_request *request) +ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request) { uint8_t buf[NBD_REQUEST_SIZE]; ssize_t ret; @@ -308,7 +308,7 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request) "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}", request->from, request->len, request->handle, request->type); - ret = write_sync(csock, buf, sizeof(buf)); + ret = write_sync(ioc, buf, sizeof(buf)); if (ret < 0) { return ret; } @@ -320,13 +320,13 @@ ssize_t nbd_send_request(int csock, struct nbd_request *request) return 0; } -ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply) +ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply) { uint8_t buf[NBD_REPLY_SIZE]; uint32_t magic; ssize_t ret; - ret = read_sync(csock, buf, sizeof(buf)); + ret = read_sync(ioc, buf, sizeof(buf)); if (ret < 0) { return ret; } diff --git a/nbd/common.c b/nbd/common.c index 7b089b0..9f89372 100644 --- a/nbd/common.c +++ b/nbd/common.c @@ -18,47 +18,107 @@ #include "nbd-internal.h" -ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) +/** + * @skip: number of bytes to advance head of @iov + * @iov: pointer to iov array, updated on success + * @niov: number of elements in @iov, updated on success + * @oldiov: pointer single element to hold old info from @iov + * + * This will update @iov so that its head is advanced + * by @skip bytes. To do this, zero or more complete + * elements of @iov will be skipped over. The new head + * of @iov will then have its base & len updated to + * skip the remaining number of bytes. @oldiov will + * hold the original data from the new head of @iov. + */ +static void nbd_skip_iov(size_t skip, + struct iovec **iov, + int *niov, + struct iovec *oldiov) { - size_t offset = 0; - int err; - - if (qemu_in_coroutine()) { - if (do_read) { - return qemu_co_recv(fd, buffer, size); + ssize_t done = 0; + size_t i; + for (i = 0; i < *niov; i++) { + if ((*iov)[i].iov_len <= skip) { + done += (*iov)[i].iov_len; + skip -= (*iov)[i].iov_len; } else { - return qemu_co_send(fd, buffer, size); + done += skip; + oldiov->iov_base = (*iov)[i].iov_base; + oldiov->iov_len = (*iov)[i].iov_len; + (*iov)[i].iov_len -= skip; + (*iov)[i].iov_base += skip; + *iov = *iov + i; + *niov = *niov - i; + break; } } +} + +ssize_t nbd_wr_syncv(QIOChannel *ioc, + struct iovec *iov, + size_t niov, + size_t offset, + size_t length, + bool do_read) +{ + ssize_t done = 0; + size_t iovlen = iov_size(iov, niov); + struct iovec oldiov = { NULL, 0 }; + Error *local_err = NULL; + + /* Shouldn't happen but just to be sure */ + if (offset > iovlen) { + return -EINVAL; + } + iovlen -= offset; + if (length > iovlen) { + return -EINVAL; + } - while (offset < size) { + while (done < length) { ssize_t len; + struct iovec *cur = iov; + int curcnt = niov; + + nbd_skip_iov(offset + done, &cur, &curcnt, &oldiov); if (do_read) { - len = qemu_recv(fd, buffer + offset, size - offset, 0); + len = qio_channel_readv(ioc, cur, curcnt, &local_err); } else { - len = send(fd, buffer + offset, size - offset, 0); + len = qio_channel_writev(ioc, cur, curcnt, &local_err); } - - if (len < 0) { - err = socket_error(); - - /* recoverable error */ - if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) { - continue; + if (oldiov.iov_base) { + /* Restore original caller's info in @iov */ + cur[0].iov_base = oldiov.iov_base; + cur[0].iov_len = oldiov.iov_len; + oldiov.iov_base = NULL; + oldiov.iov_len = 0; + } + if (len == QIO_CHANNEL_ERR_BLOCK) { + if (qemu_in_coroutine()) { + /* XXX see about using qio_channel_yield() in + * future if we can use normal watches instead + * of the AIO handlers */ + qemu_coroutine_yield(); + } else { + qio_channel_wait(ioc, + do_read ? G_IO_IN : G_IO_OUT); } - - /* unrecoverable error */ - return -err; + continue; + } + if (len < 0) { + TRACE("I/O error: %s", error_get_pretty(local_err)); + error_free(local_err); + /* XXX handle Error objects */ + return -EIO; } - /* eof */ - if (len == 0) { + if (do_read && len == 0) { break; } - offset += len; + done += len; } - - return offset; + return done; } diff --git a/nbd/nbd-internal.h b/nbd/nbd-internal.h index c0a6575..9960034 100644 --- a/nbd/nbd-internal.h +++ b/nbd/nbd-internal.h @@ -13,6 +13,7 @@ #include "sysemu/block-backend.h" #include "qemu/coroutine.h" +#include "qemu/iov.h" #include <errno.h> #include <string.h> @@ -29,7 +30,6 @@ #include <linux/fs.h> #endif -#include "qemu/sockets.h" #include "qemu/queue.h" #include "qemu/main-loop.h" @@ -90,24 +90,22 @@ #define NBD_EINVAL 22 #define NBD_ENOSPC 28 -static inline ssize_t read_sync(int fd, void *buffer, size_t size) +static inline ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size) { + struct iovec iov = { .iov_base = buffer, .iov_len = size }; /* Sockets are kept in blocking mode in the negotiation phase. After * that, a non-readable socket simply means that another thread stole * our request/reply. Synchronization is done with recv_coroutine, so * that this is coroutine-safe. */ - return nbd_wr_sync(fd, buffer, size, true); + return nbd_wr_syncv(ioc, &iov, 1, 0, size, true); } -static inline ssize_t write_sync(int fd, void *buffer, size_t size) +static inline ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size) { - int ret; - do { - /* For writes, we do expect the socket to be writable. */ - ret = nbd_wr_sync(fd, buffer, size, false); - } while (ret == -EAGAIN); - return ret; + struct iovec iov = { .iov_base = buffer, .iov_len = size }; + + return nbd_wr_syncv(ioc, &iov, 1, 0, size, false); } #endif diff --git a/nbd/server.c b/nbd/server.c index c29ba5f..929db9c 100644 --- a/nbd/server.c +++ b/nbd/server.c @@ -73,7 +73,8 @@ struct NBDClient { void (*close)(NBDClient *client); NBDExport *exp; - int sock; + QIOChannelSocket *sioc; /* The underlying data channel */ + QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */ Coroutine *recv_coroutine; @@ -93,45 +94,56 @@ static void nbd_set_handlers(NBDClient *client); static void nbd_unset_handlers(NBDClient *client); static void nbd_update_can_read(NBDClient *client); -static void nbd_negotiate_continue(void *opaque) +static gboolean nbd_negotiate_continue(QIOChannel *ioc, + GIOCondition condition, + void *opaque) { qemu_coroutine_enter(opaque, NULL); + return TRUE; } -static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size) +static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size) { ssize_t ret; + guint watch; assert(qemu_in_coroutine()); /* Negotiation are always in main loop. */ - qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL, - qemu_coroutine_self()); - ret = read_sync(fd, buffer, size); - qemu_set_fd_handler(fd, NULL, NULL, NULL); + watch = qio_channel_add_watch(ioc, + G_IO_IN, + nbd_negotiate_continue, + qemu_coroutine_self(), + NULL); + ret = read_sync(ioc, buffer, size); + g_source_remove(watch); return ret; } -static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size) +static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size) { ssize_t ret; + guint watch; assert(qemu_in_coroutine()); /* Negotiation are always in main loop. */ - qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue, - qemu_coroutine_self()); - ret = write_sync(fd, buffer, size); - qemu_set_fd_handler(fd, NULL, NULL, NULL); + watch = qio_channel_add_watch(ioc, + G_IO_OUT, + nbd_negotiate_continue, + qemu_coroutine_self(), + NULL); + ret = write_sync(ioc, buffer, size); + g_source_remove(watch); return ret; } -static ssize_t nbd_negotiate_drop_sync(int fd, size_t size) +static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size) { ssize_t ret, dropped = size; uint8_t *buffer = g_malloc(MIN(65536, size)); while (size > 0) { - ret = nbd_negotiate_read(fd, buffer, MIN(65536, size)); + ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size)); if (ret < 0) { g_free(buffer); return ret; @@ -172,66 +184,66 @@ static ssize_t nbd_negotiate_drop_sync(int fd, size_t size) */ -static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt) +static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt) { uint64_t magic; uint32_t len; magic = cpu_to_be64(NBD_REP_MAGIC); - if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) { + if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) { LOG("write failed (rep magic)"); return -EINVAL; } opt = cpu_to_be32(opt); - if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) { + if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) { LOG("write failed (rep opt)"); return -EINVAL; } type = cpu_to_be32(type); - if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) { + if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) { LOG("write failed (rep type)"); return -EINVAL; } len = cpu_to_be32(0); - if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) { + if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) { LOG("write failed (rep data length)"); return -EINVAL; } return 0; } -static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp) +static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp) { uint64_t magic, name_len; uint32_t opt, type, len; name_len = strlen(exp->name); magic = cpu_to_be64(NBD_REP_MAGIC); - if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) { + if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) { LOG("write failed (magic)"); return -EINVAL; } opt = cpu_to_be32(NBD_OPT_LIST); - if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) { + if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) { LOG("write failed (opt)"); return -EINVAL; } type = cpu_to_be32(NBD_REP_SERVER); - if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) { + if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) { LOG("write failed (reply type)"); return -EINVAL; } len = cpu_to_be32(name_len + sizeof(len)); - if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) { + if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) { LOG("write failed (length)"); return -EINVAL; } len = cpu_to_be32(name_len); - if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) { + if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) { LOG("write failed (length)"); return -EINVAL; } - if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) { + if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) { LOG("write failed (buffer)"); return -EINVAL; } @@ -240,30 +252,29 @@ static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp) static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length) { - int csock; NBDExport *exp; - csock = client->sock; if (length) { - if (nbd_negotiate_drop_sync(csock, length) != length) { + if (nbd_negotiate_drop_sync(client->ioc, length) != length) { return -EIO; } - return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, NBD_OPT_LIST); + return nbd_negotiate_send_rep(client->ioc, + NBD_REP_ERR_INVALID, NBD_OPT_LIST); } /* For each export, send a NBD_REP_SERVER reply. */ QTAILQ_FOREACH(exp, &exports, next) { - if (nbd_negotiate_send_rep_list(csock, exp)) { + if (nbd_negotiate_send_rep_list(client->ioc, exp)) { return -EINVAL; } } /* Finish with a NBD_REP_ACK. */ - return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST); + return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST); } static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length) { - int rc = -EINVAL, csock = client->sock; + int rc = -EINVAL; char name[256]; /* Client sends: @@ -274,7 +285,7 @@ static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length) LOG("Bad length received"); goto fail; } - if (nbd_negotiate_read(csock, name, length) != length) { + if (nbd_negotiate_read(client->ioc, name, length) != length) { LOG("read failed"); goto fail; } @@ -295,7 +306,6 @@ fail: static int nbd_negotiate_options(NBDClient *client) { - int csock = client->sock; uint32_t flags; /* Client sends: @@ -312,7 +322,8 @@ static int nbd_negotiate_options(NBDClient *client) ... Rest of request */ - if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) { + if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) != + sizeof(flags)) { LOG("read failed"); return -EIO; } @@ -328,7 +339,8 @@ static int nbd_negotiate_options(NBDClient *client) uint32_t tmp, length; uint64_t magic; - if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) { + if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) != + sizeof(magic)) { LOG("read failed"); return -EINVAL; } @@ -338,13 +350,13 @@ static int nbd_negotiate_options(NBDClient *client) return -EINVAL; } - if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) { + if (nbd_negotiate_read(client->ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) { LOG("read failed"); return -EINVAL; } - if (nbd_negotiate_read(csock, &length, - sizeof(length)) != sizeof(length)) { + if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) != + sizeof(length)) { LOG("read failed"); return -EINVAL; } @@ -368,7 +380,7 @@ static int nbd_negotiate_options(NBDClient *client) default: tmp = be32_to_cpu(tmp); LOG("Unsupported option 0x%x", tmp); - nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp); + nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp); return -EINVAL; } } @@ -382,7 +394,6 @@ typedef struct { static coroutine_fn int nbd_negotiate(NBDClientNewData *data) { NBDClient *client = data->client; - int csock = client->sock; char buf[8 + 8 + 8 + 128]; int rc; const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM | @@ -407,6 +418,7 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data) [28 .. 151] reserved (0) */ + qio_channel_set_blocking(client->ioc, false, NULL); rc = -EINVAL; TRACE("Beginning negotiation."); @@ -423,12 +435,12 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data) } if (client->exp) { - if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) { + if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) { LOG("write failed"); goto fail; } } else { - if (nbd_negotiate_write(csock, buf, 18) != 18) { + if (nbd_negotiate_write(client->ioc, buf, 18) != 18) { LOG("write failed"); goto fail; } @@ -441,8 +453,8 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData *data) assert ((client->exp->nbdflags & ~65535) == 0); cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size); cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags); - if (nbd_negotiate_write(csock, buf + 18, - sizeof(buf) - 18) != sizeof(buf) - 18) { + if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) != + sizeof(buf) - 18) { LOG("write failed"); goto fail; } @@ -472,13 +484,13 @@ int nbd_disconnect(int fd) } #endif -static ssize_t nbd_receive_request(int csock, struct nbd_request *request) +static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request *request) { uint8_t buf[NBD_REQUEST_SIZE]; uint32_t magic; ssize_t ret; - ret = read_sync(csock, buf, sizeof(buf)); + ret = read_sync(ioc, buf, sizeof(buf)); if (ret < 0) { return ret; } @@ -513,7 +525,7 @@ static ssize_t nbd_receive_request(int csock, struct nbd_request *request) return 0; } -static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply) +static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply) { uint8_t buf[NBD_REPLY_SIZE]; ssize_t ret; @@ -531,7 +543,7 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply) TRACE("Sending response to client"); - ret = write_sync(csock, buf, sizeof(buf)); + ret = write_sync(ioc, buf, sizeof(buf)); if (ret < 0) { return ret; } @@ -559,8 +571,8 @@ void nbd_client_put(NBDClient *client) assert(client->closing); nbd_unset_handlers(client); - close(client->sock); - client->sock = -1; + object_unref(OBJECT(client->sioc)); + object_unref(OBJECT(client->ioc)); if (client->exp) { QTAILQ_REMOVE(&client->exp->clients, client, next); nbd_export_put(client->exp); @@ -580,7 +592,8 @@ static void client_close(NBDClient *client) /* Force requests to finish. They will drop their own references, * then we'll close the socket and free the NBDClient. */ - shutdown(client->sock, 2); + qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, + NULL); /* Also tell the client, so that they release their reference. */ if (client->close) { @@ -773,25 +786,25 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, int len) { NBDClient *client = req->client; - int csock = client->sock; ssize_t rc, ret; + g_assert(qemu_in_coroutine()); qemu_co_mutex_lock(&client->send_lock); client->send_coroutine = qemu_coroutine_self(); nbd_set_handlers(client); if (!len) { - rc = nbd_send_reply(csock, reply); + rc = nbd_send_reply(client->ioc, reply); } else { - socket_set_cork(csock, 1); - rc = nbd_send_reply(csock, reply); + qio_channel_set_cork(client->ioc, true); + rc = nbd_send_reply(client->ioc, reply); if (rc >= 0) { - ret = qemu_co_send(csock, req->data, len); + ret = write_sync(client->ioc, req->data, len); if (ret != len) { rc = -EIO; } } - socket_set_cork(csock, 0); + qio_channel_set_cork(client->ioc, false); } client->send_coroutine = NULL; @@ -803,14 +816,14 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct nbd_reply *reply, static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *request) { NBDClient *client = req->client; - int csock = client->sock; uint32_t command; ssize_t rc; + g_assert(qemu_in_coroutine()); client->recv_coroutine = qemu_coroutine_self(); nbd_update_can_read(client); - rc = nbd_receive_request(csock, request); + rc = nbd_receive_request(client->ioc, request); if (rc < 0) { if (rc != -EAGAIN) { rc = -EIO; @@ -845,7 +858,7 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request *reque if (command == NBD_CMD_WRITE) { TRACE("Reading %u byte(s)", request->len); - if (qemu_co_recv(csock, req->data, request->len) != request->len) { + if (read_sync(client->ioc, req->data, request->len) != request->len) { LOG("reading from socket failed"); rc = -EIO; goto out; @@ -1040,7 +1053,7 @@ static void nbd_restart_write(void *opaque) static void nbd_set_handlers(NBDClient *client) { if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sock, + aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, client->can_read ? nbd_read : NULL, client->send_coroutine ? nbd_restart_write : NULL, @@ -1051,7 +1064,7 @@ static void nbd_set_handlers(NBDClient *client) static void nbd_unset_handlers(NBDClient *client) { if (client->exp && client->exp->ctx) { - aio_set_fd_handler(client->exp->ctx, client->sock, + aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL, NULL, NULL); } } @@ -1093,7 +1106,9 @@ out: g_free(data); } -void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *)) +void nbd_client_new(NBDExport *exp, + QIOChannelSocket *sioc, + void (*close_fn)(NBDClient *)) { NBDClient *client; NBDClientNewData *data = g_new(NBDClientNewData, 1); @@ -1101,7 +1116,10 @@ void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *)) client = g_malloc0(sizeof(NBDClient)); client->refcount = 1; client->exp = exp; - client->sock = csock; + client->sioc = sioc; + object_ref(OBJECT(client->sioc)); + client->ioc = QIO_CHANNEL(sioc); + object_ref(OBJECT(client->ioc)); client->can_read = true; client->close = close_fn; diff --git a/qemu-nbd.c b/qemu-nbd.c index 1c9e3ea..2346e99 100644 --- a/qemu-nbd.c +++ b/qemu-nbd.c @@ -252,7 +252,7 @@ static void *nbd_client_thread(void *arg) goto out; } - ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags, + ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags, &size, &local_error); if (ret < 0) { if (local_error) { @@ -268,7 +268,7 @@ static void *nbd_client_thread(void *arg) goto out_socket; } - ret = nbd_init(fd, sioc->fd, nbdflags, size); + ret = nbd_init(fd, sioc, nbdflags, size); if (ret < 0) { goto out_fd; } @@ -328,7 +328,6 @@ static void nbd_client_closed(NBDClient *client) static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque) { QIOChannelSocket *cioc; - int fd; cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc), NULL); @@ -343,10 +342,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque) nb_fds++; nbd_update_server_watch(); - fd = dup(cioc->fd); - if (fd >= 0) { - nbd_client_new(exp, fd, nbd_client_closed); - } + nbd_client_new(exp, cioc, nbd_client_closed); object_unref(OBJECT(cioc)); return TRUE; -- 2.5.0