Now that all callers are converted to use I/O channels for
initial connection setup, it is possible to switch the core
NBD protocol handling core over to use QIOChannel APIs for
actual sockets I/O.

Signed-off-by: Daniel P. Berrange <berra...@redhat.com>
---
 block/nbd-client.c  |  19 +++----
 blockdev-nbd.c      |   6 +--
 include/block/nbd.h |  20 ++++---
 nbd/client.c        |  40 +++++++-------
 nbd/common.c        | 112 ++++++++++++++++++++++++++++++---------
 nbd/nbd-internal.h  |  18 +++----
 nbd/server.c        | 150 +++++++++++++++++++++++++++++-----------------------
 qemu-nbd.c          |  10 ++--
 8 files changed, 226 insertions(+), 149 deletions(-)

diff --git a/block/nbd-client.c b/block/nbd-client.c
index bf5b0a3..ed69d4b 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -71,7 +71,7 @@ static void nbd_reply_ready(void *opaque)
          * that another thread has done the same thing in parallel, so
          * the socket is not readable anymore.
          */
-        ret = nbd_receive_reply(s->sioc->fd, &s->reply);
+        ret = nbd_receive_reply(s->ioc, &s->reply);
         if (ret == -EAGAIN) {
             return;
         }
@@ -122,6 +122,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
+    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
     request->handle = INDEX_TO_HANDLE(s, i);
     s->send_coroutine = qemu_coroutine_self();
@@ -131,17 +132,17 @@ static int nbd_co_send_request(BlockDriverState *bs,
                        nbd_reply_ready, nbd_restart_write, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
         if (rc >= 0) {
-            ret = qemu_co_sendv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 0);
             if (ret != request->len) {
                 rc = -EIO;
             }
         }
         qio_channel_set_cork(s->ioc, false);
     } else {
-        rc = nbd_send_request(s->sioc->fd, request);
+        rc = nbd_send_request(s->ioc, request);
     }
     aio_set_fd_handler(aio_context, s->sioc->fd, false,
                        nbd_reply_ready, NULL, bs);
@@ -164,8 +165,8 @@ static void nbd_co_receive_reply(NbdClientSession *s,
         reply->error = EIO;
     } else {
         if (qiov && reply->error == 0) {
-            ret = qemu_co_recvv(s->sioc->fd, qiov->iov, qiov->niov,
-                                offset, request->len);
+            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
+                               offset, request->len, 1);
             if (ret != request->len) {
                 reply->error = EIO;
             }
@@ -372,7 +373,7 @@ void nbd_client_close(BlockDriverState *bs)
         return;
     }
 
-    nbd_send_request(client->sioc->fd, &request);
+    nbd_send_request(client->ioc, &request);
 
     nbd_teardown_connection(bs);
 }
@@ -387,7 +388,7 @@ int nbd_client_init(BlockDriverState *bs, QIOChannelSocket 
*sioc,
     logout("session init %s\n", export);
     qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 
-    ret = nbd_receive_negotiate(sioc->fd, export,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
                                 &client->nbdflags, &client->size, errp);
     if (ret < 0) {
         logout("Failed to negotiate with the NBD server\n");
diff --git a/blockdev-nbd.c b/blockdev-nbd.c
index 94b653d..7d4c415 100644
--- a/blockdev-nbd.c
+++ b/blockdev-nbd.c
@@ -26,7 +26,6 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition 
condition,
                            gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -34,10 +33,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition 
condition,
         return TRUE;
     }
 
-    fd = dup(cioc->fd);
-    if (fd >= 0) {
-        nbd_client_new(NULL, fd, nbd_client_put);
-    }
+    nbd_client_new(NULL, cioc, nbd_client_put);
     object_unref(OBJECT(cioc));
     return TRUE;
 }
diff --git a/include/block/nbd.h b/include/block/nbd.h
index 7eccb41..1080ef8 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -23,6 +23,7 @@
 
 #include "qemu-common.h"
 #include "qemu/option.h"
+#include "io/channel-socket.h"
 
 struct nbd_request {
     uint32_t magic;
@@ -73,12 +74,17 @@ enum {
 /* Maximum size of a single READ/WRITE data buffer */
 #define NBD_MAX_BUFFER_SIZE (32 * 1024 * 1024)
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read);
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read);
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp);
-int nbd_init(int fd, int csock, uint32_t flags, off_t size);
-ssize_t nbd_send_request(int csock, struct nbd_request *request);
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply);
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size);
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request);
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply);
 int nbd_client(int fd);
 int nbd_disconnect(int fd);
 
@@ -98,7 +104,9 @@ NBDExport *nbd_export_find(const char *name);
 void nbd_export_set_name(NBDExport *exp, const char *name);
 void nbd_export_close_all(void);
 
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *));
+void nbd_client_new(NBDExport *exp,
+                    QIOChannelSocket *sioc,
+                    void (*close)(NBDClient *));
 void nbd_client_get(NBDClient *client);
 void nbd_client_put(NBDClient *client);
 
diff --git a/nbd/client.c b/nbd/client.c
index 83df7ba..07e1fed 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -70,7 +70,7 @@ static QTAILQ_HEAD(, NBDExport) exports = 
QTAILQ_HEAD_INITIALIZER(exports);
 
 */
 
-int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags,
+int nbd_receive_negotiate(QIOChannel *ioc, const char *name, uint32_t *flags,
                           off_t *size, Error **errp)
 {
     char buf[256];
@@ -82,7 +82,7 @@ int nbd_receive_negotiate(int csock, const char *name, 
uint32_t *flags,
 
     rc = -EINVAL;
 
-    if (read_sync(csock, buf, 8) != 8) {
+    if (read_sync(ioc, buf, 8) != 8) {
         error_setg(errp, "Failed to read data");
         goto fail;
     }
@@ -108,7 +108,7 @@ int nbd_receive_negotiate(int csock, const char *name, 
uint32_t *flags,
         goto fail;
     }
 
-    if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (read_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         error_setg(errp, "Failed to read magic");
         goto fail;
     }
@@ -129,35 +129,35 @@ int nbd_receive_negotiate(int csock, const char *name, 
uint32_t *flags,
             }
             goto fail;
         }
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read server flags");
             goto fail;
         }
         *flags = be16_to_cpu(tmp) << 16;
         /* reserved for future use */
-        if (write_sync(csock, &reserved, sizeof(reserved)) !=
+        if (write_sync(ioc, &reserved, sizeof(reserved)) !=
             sizeof(reserved)) {
             error_setg(errp, "Failed to read reserved field");
             goto fail;
         }
         /* write the export name */
         magic = cpu_to_be64(magic);
-        if (write_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+        if (write_sync(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
             error_setg(errp, "Failed to send export name magic");
             goto fail;
         }
         opt = cpu_to_be32(NBD_OPT_EXPORT_NAME);
-        if (write_sync(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+        if (write_sync(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
             error_setg(errp, "Failed to send export name option number");
             goto fail;
         }
         namesize = cpu_to_be32(strlen(name));
-        if (write_sync(csock, &namesize, sizeof(namesize)) !=
+        if (write_sync(ioc, &namesize, sizeof(namesize)) !=
             sizeof(namesize)) {
             error_setg(errp, "Failed to send export name length");
             goto fail;
         }
-        if (write_sync(csock, (char*)name, strlen(name)) != strlen(name)) {
+        if (write_sync(ioc, (char *)name, strlen(name)) != strlen(name)) {
             error_setg(errp, "Failed to send export name");
             goto fail;
         }
@@ -174,7 +174,7 @@ int nbd_receive_negotiate(int csock, const char *name, 
uint32_t *flags,
         }
     }
 
-    if (read_sync(csock, &s, sizeof(s)) != sizeof(s)) {
+    if (read_sync(ioc, &s, sizeof(s)) != sizeof(s)) {
         error_setg(errp, "Failed to read export length");
         goto fail;
     }
@@ -182,19 +182,19 @@ int nbd_receive_negotiate(int csock, const char *name, 
uint32_t *flags,
     TRACE("Size is %" PRIu64, *size);
 
     if (!name) {
-        if (read_sync(csock, flags, sizeof(*flags)) != sizeof(*flags)) {
+        if (read_sync(ioc, flags, sizeof(*flags)) != sizeof(*flags)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags = be32_to_cpup(flags);
     } else {
-        if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (read_sync(ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) {
             error_setg(errp, "Failed to read export flags");
             goto fail;
         }
         *flags |= be16_to_cpu(tmp);
     }
-    if (read_sync(csock, &buf, 124) != 124) {
+    if (read_sync(ioc, &buf, 124) != 124) {
         error_setg(errp, "Failed to read reserved block");
         goto fail;
     }
@@ -205,11 +205,11 @@ fail:
 }
 
 #ifdef __linux__
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *sioc, uint32_t flags, off_t size)
 {
     TRACE("Setting NBD socket");
 
-    if (ioctl(fd, NBD_SET_SOCK, csock) < 0) {
+    if (ioctl(fd, NBD_SET_SOCK, sioc->fd) < 0) {
         int serrno = errno;
         LOG("Failed to set NBD socket");
         return -serrno;
@@ -282,7 +282,7 @@ int nbd_client(int fd)
     return ret;
 }
 #else
-int nbd_init(int fd, int csock, uint32_t flags, off_t size)
+int nbd_init(int fd, QIOChannelSocket *ioc, uint32_t flags, off_t size)
 {
     return -ENOTSUP;
 }
@@ -293,7 +293,7 @@ int nbd_client(int fd)
 }
 #endif
 
-ssize_t nbd_send_request(int csock, struct nbd_request *request)
+ssize_t nbd_send_request(QIOChannel *ioc, struct nbd_request *request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     ssize_t ret;
@@ -308,7 +308,7 @@ ssize_t nbd_send_request(int csock, struct nbd_request 
*request)
           "{ .from = %" PRIu64", .len = %u, .handle = %" PRIu64", .type=%i}",
           request->from, request->len, request->handle, request->type);
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -320,13 +320,13 @@ ssize_t nbd_send_request(int csock, struct nbd_request 
*request)
     return 0;
 }
 
-ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply)
+ssize_t nbd_receive_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
diff --git a/nbd/common.c b/nbd/common.c
index 7b089b0..9f89372 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -18,47 +18,107 @@
 
 #include "nbd-internal.h"
 
-ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
+/**
+ * @skip: number of bytes to advance head of @iov
+ * @iov: pointer to iov array, updated on success
+ * @niov: number of elements in @iov, updated on success
+ * @oldiov: pointer single element to hold old info from @iov
+ *
+ * This will update @iov so that its head is advanced
+ * by @skip bytes. To do this, zero or more complete
+ * elements of @iov will be skipped over. The new head
+ * of @iov will then have its base & len updated to
+ * skip the remaining number of bytes. @oldiov will
+ * hold the original data from the new head of @iov.
+ */
+static void nbd_skip_iov(size_t skip,
+                         struct iovec **iov,
+                         int *niov,
+                         struct iovec *oldiov)
 {
-    size_t offset = 0;
-    int err;
-
-    if (qemu_in_coroutine()) {
-        if (do_read) {
-            return qemu_co_recv(fd, buffer, size);
+    ssize_t done = 0;
+    size_t i;
+    for (i = 0; i < *niov; i++) {
+        if ((*iov)[i].iov_len <= skip) {
+            done += (*iov)[i].iov_len;
+            skip -= (*iov)[i].iov_len;
         } else {
-            return qemu_co_send(fd, buffer, size);
+            done += skip;
+            oldiov->iov_base = (*iov)[i].iov_base;
+            oldiov->iov_len = (*iov)[i].iov_len;
+            (*iov)[i].iov_len -= skip;
+            (*iov)[i].iov_base += skip;
+            *iov = *iov + i;
+            *niov = *niov - i;
+            break;
         }
     }
+}
+
+ssize_t nbd_wr_syncv(QIOChannel *ioc,
+                     struct iovec *iov,
+                     size_t niov,
+                     size_t offset,
+                     size_t length,
+                     bool do_read)
+{
+    ssize_t done = 0;
+    size_t iovlen = iov_size(iov, niov);
+    struct iovec oldiov = { NULL, 0 };
+    Error *local_err = NULL;
+
+    /* Shouldn't happen but just to be sure */
+    if (offset > iovlen) {
+        return -EINVAL;
+    }
+    iovlen -= offset;
+    if (length > iovlen) {
+        return -EINVAL;
+    }
 
-    while (offset < size) {
+    while (done < length) {
         ssize_t len;
+        struct iovec *cur = iov;
+        int curcnt = niov;
+
+        nbd_skip_iov(offset + done, &cur, &curcnt, &oldiov);
 
         if (do_read) {
-            len = qemu_recv(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_readv(ioc, cur, curcnt, &local_err);
         } else {
-            len = send(fd, buffer + offset, size - offset, 0);
+            len = qio_channel_writev(ioc, cur, curcnt, &local_err);
         }
-
-        if (len < 0) {
-            err = socket_error();
-
-            /* recoverable error */
-            if (err == EINTR || (offset > 0 && (err == EAGAIN || err == 
EWOULDBLOCK))) {
-                continue;
+        if (oldiov.iov_base) {
+            /* Restore original caller's info in @iov */
+            cur[0].iov_base = oldiov.iov_base;
+            cur[0].iov_len = oldiov.iov_len;
+            oldiov.iov_base = NULL;
+            oldiov.iov_len = 0;
+        }
+        if (len == QIO_CHANNEL_ERR_BLOCK) {
+            if (qemu_in_coroutine()) {
+                /* XXX see about using qio_channel_yield() in
+                 * future if we can use normal watches instead
+                 * of the AIO handlers */
+                qemu_coroutine_yield();
+            } else {
+                qio_channel_wait(ioc,
+                                 do_read ? G_IO_IN : G_IO_OUT);
             }
-
-            /* unrecoverable error */
-            return -err;
+            continue;
+        }
+        if (len < 0) {
+            TRACE("I/O error: %s", error_get_pretty(local_err));
+            error_free(local_err);
+            /* XXX handle Error objects */
+            return -EIO;
         }
 
-        /* eof */
-        if (len == 0) {
+        if (do_read && len == 0) {
             break;
         }
 
-        offset += len;
+        done += len;
     }
-
-    return offset;
+    return done;
 }
diff --git a/nbd/nbd-internal.h b/nbd/nbd-internal.h
index c0a6575..9960034 100644
--- a/nbd/nbd-internal.h
+++ b/nbd/nbd-internal.h
@@ -13,6 +13,7 @@
 #include "sysemu/block-backend.h"
 
 #include "qemu/coroutine.h"
+#include "qemu/iov.h"
 
 #include <errno.h>
 #include <string.h>
@@ -29,7 +30,6 @@
 #include <linux/fs.h>
 #endif
 
-#include "qemu/sockets.h"
 #include "qemu/queue.h"
 #include "qemu/main-loop.h"
 
@@ -90,24 +90,22 @@
 #define NBD_EINVAL     22
 #define NBD_ENOSPC     28
 
-static inline ssize_t read_sync(int fd, void *buffer, size_t size)
+static inline ssize_t read_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
     /* Sockets are kept in blocking mode in the negotiation phase.  After
      * that, a non-readable socket simply means that another thread stole
      * our request/reply.  Synchronization is done with recv_coroutine, so
      * that this is coroutine-safe.
      */
-    return nbd_wr_sync(fd, buffer, size, true);
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, true);
 }
 
-static inline ssize_t write_sync(int fd, void *buffer, size_t size)
+static inline ssize_t write_sync(QIOChannel *ioc, void *buffer, size_t size)
 {
-    int ret;
-    do {
-        /* For writes, we do expect the socket to be writable.  */
-        ret = nbd_wr_sync(fd, buffer, size, false);
-    } while (ret == -EAGAIN);
-    return ret;
+    struct iovec iov = { .iov_base = buffer, .iov_len = size };
+
+    return nbd_wr_syncv(ioc, &iov, 1, 0, size, false);
 }
 
 #endif
diff --git a/nbd/server.c b/nbd/server.c
index c29ba5f..929db9c 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -73,7 +73,8 @@ struct NBDClient {
     void (*close)(NBDClient *client);
 
     NBDExport *exp;
-    int sock;
+    QIOChannelSocket *sioc; /* The underlying data channel */
+    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 
     Coroutine *recv_coroutine;
 
@@ -93,45 +94,56 @@ static void nbd_set_handlers(NBDClient *client);
 static void nbd_unset_handlers(NBDClient *client);
 static void nbd_update_can_read(NBDClient *client);
 
-static void nbd_negotiate_continue(void *opaque)
+static gboolean nbd_negotiate_continue(QIOChannel *ioc,
+                                       GIOCondition condition,
+                                       void *opaque)
 {
     qemu_coroutine_enter(opaque, NULL);
+    return TRUE;
 }
 
-static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_read(QIOChannel *ioc, void *buffer, size_t size)
 {
     ssize_t ret;
+    guint watch;
 
     assert(qemu_in_coroutine());
     /* Negotiation are always in main loop. */
-    qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
-                        qemu_coroutine_self());
-    ret = read_sync(fd, buffer, size);
-    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    watch = qio_channel_add_watch(ioc,
+                                  G_IO_IN,
+                                  nbd_negotiate_continue,
+                                  qemu_coroutine_self(),
+                                  NULL);
+    ret = read_sync(ioc, buffer, size);
+    g_source_remove(watch);
     return ret;
 
 }
 
-static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+static ssize_t nbd_negotiate_write(QIOChannel *ioc, void *buffer, size_t size)
 {
     ssize_t ret;
+    guint watch;
 
     assert(qemu_in_coroutine());
     /* Negotiation are always in main loop. */
-    qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
-                        qemu_coroutine_self());
-    ret = write_sync(fd, buffer, size);
-    qemu_set_fd_handler(fd, NULL, NULL, NULL);
+    watch = qio_channel_add_watch(ioc,
+                                  G_IO_OUT,
+                                  nbd_negotiate_continue,
+                                  qemu_coroutine_self(),
+                                  NULL);
+    ret = write_sync(ioc, buffer, size);
+    g_source_remove(watch);
     return ret;
 }
 
-static ssize_t nbd_negotiate_drop_sync(int fd, size_t size)
+static ssize_t nbd_negotiate_drop_sync(QIOChannel *ioc, size_t size)
 {
     ssize_t ret, dropped = size;
     uint8_t *buffer = g_malloc(MIN(65536, size));
 
     while (size > 0) {
-        ret = nbd_negotiate_read(fd, buffer, MIN(65536, size));
+        ret = nbd_negotiate_read(ioc, buffer, MIN(65536, size));
         if (ret < 0) {
             g_free(buffer);
             return ret;
@@ -172,66 +184,66 @@ static ssize_t nbd_negotiate_drop_sync(int fd, size_t 
size)
 
 */
 
-static int nbd_negotiate_send_rep(int csock, uint32_t type, uint32_t opt)
+static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt)
 {
     uint64_t magic;
     uint32_t len;
 
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (rep magic)");
         return -EINVAL;
     }
     opt = cpu_to_be32(opt);
-    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (rep opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(type);
-    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (rep type)");
         return -EINVAL;
     }
     len = cpu_to_be32(0);
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (rep data length)");
         return -EINVAL;
     }
     return 0;
 }
 
-static int nbd_negotiate_send_rep_list(int csock, NBDExport *exp)
+static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp)
 {
     uint64_t magic, name_len;
     uint32_t opt, type, len;
 
     name_len = strlen(exp->name);
     magic = cpu_to_be64(NBD_REP_MAGIC);
-    if (nbd_negotiate_write(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+    if (nbd_negotiate_write(ioc, &magic, sizeof(magic)) != sizeof(magic)) {
         LOG("write failed (magic)");
         return -EINVAL;
      }
     opt = cpu_to_be32(NBD_OPT_LIST);
-    if (nbd_negotiate_write(csock, &opt, sizeof(opt)) != sizeof(opt)) {
+    if (nbd_negotiate_write(ioc, &opt, sizeof(opt)) != sizeof(opt)) {
         LOG("write failed (opt)");
         return -EINVAL;
     }
     type = cpu_to_be32(NBD_REP_SERVER);
-    if (nbd_negotiate_write(csock, &type, sizeof(type)) != sizeof(type)) {
+    if (nbd_negotiate_write(ioc, &type, sizeof(type)) != sizeof(type)) {
         LOG("write failed (reply type)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len + sizeof(len));
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
     len = cpu_to_be32(name_len);
-    if (nbd_negotiate_write(csock, &len, sizeof(len)) != sizeof(len)) {
+    if (nbd_negotiate_write(ioc, &len, sizeof(len)) != sizeof(len)) {
         LOG("write failed (length)");
         return -EINVAL;
     }
-    if (nbd_negotiate_write(csock, exp->name, name_len) != name_len) {
+    if (nbd_negotiate_write(ioc, exp->name, name_len) != name_len) {
         LOG("write failed (buffer)");
         return -EINVAL;
     }
@@ -240,30 +252,29 @@ static int nbd_negotiate_send_rep_list(int csock, 
NBDExport *exp)
 
 static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length)
 {
-    int csock;
     NBDExport *exp;
 
-    csock = client->sock;
     if (length) {
-        if (nbd_negotiate_drop_sync(csock, length) != length) {
+        if (nbd_negotiate_drop_sync(client->ioc, length) != length) {
             return -EIO;
         }
-        return nbd_negotiate_send_rep(csock, NBD_REP_ERR_INVALID, 
NBD_OPT_LIST);
+        return nbd_negotiate_send_rep(client->ioc,
+                                      NBD_REP_ERR_INVALID, NBD_OPT_LIST);
     }
 
     /* For each export, send a NBD_REP_SERVER reply. */
     QTAILQ_FOREACH(exp, &exports, next) {
-        if (nbd_negotiate_send_rep_list(csock, exp)) {
+        if (nbd_negotiate_send_rep_list(client->ioc, exp)) {
             return -EINVAL;
         }
     }
     /* Finish with a NBD_REP_ACK. */
-    return nbd_negotiate_send_rep(csock, NBD_REP_ACK, NBD_OPT_LIST);
+    return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST);
 }
 
 static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length)
 {
-    int rc = -EINVAL, csock = client->sock;
+    int rc = -EINVAL;
     char name[256];
 
     /* Client sends:
@@ -274,7 +285,7 @@ static int nbd_negotiate_handle_export_name(NBDClient 
*client, uint32_t length)
         LOG("Bad length received");
         goto fail;
     }
-    if (nbd_negotiate_read(csock, name, length) != length) {
+    if (nbd_negotiate_read(client->ioc, name, length) != length) {
         LOG("read failed");
         goto fail;
     }
@@ -295,7 +306,6 @@ fail:
 
 static int nbd_negotiate_options(NBDClient *client)
 {
-    int csock = client->sock;
     uint32_t flags;
 
     /* Client sends:
@@ -312,7 +322,8 @@ static int nbd_negotiate_options(NBDClient *client)
         ...           Rest of request
     */
 
-    if (nbd_negotiate_read(csock, &flags, sizeof(flags)) != sizeof(flags)) {
+    if (nbd_negotiate_read(client->ioc, &flags, sizeof(flags)) !=
+        sizeof(flags)) {
         LOG("read failed");
         return -EIO;
     }
@@ -328,7 +339,8 @@ static int nbd_negotiate_options(NBDClient *client)
         uint32_t tmp, length;
         uint64_t magic;
 
-        if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) 
{
+        if (nbd_negotiate_read(client->ioc, &magic, sizeof(magic)) !=
+            sizeof(magic)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -338,13 +350,13 @@ static int nbd_negotiate_options(NBDClient *client)
             return -EINVAL;
         }
 
-        if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+        if (nbd_negotiate_read(client->ioc, &tmp, sizeof(tmp)) != sizeof(tmp)) 
{
             LOG("read failed");
             return -EINVAL;
         }
 
-        if (nbd_negotiate_read(csock, &length,
-                               sizeof(length)) != sizeof(length)) {
+        if (nbd_negotiate_read(client->ioc, &length, sizeof(length)) !=
+            sizeof(length)) {
             LOG("read failed");
             return -EINVAL;
         }
@@ -368,7 +380,7 @@ static int nbd_negotiate_options(NBDClient *client)
         default:
             tmp = be32_to_cpu(tmp);
             LOG("Unsupported option 0x%x", tmp);
-            nbd_negotiate_send_rep(client->sock, NBD_REP_ERR_UNSUP, tmp);
+            nbd_negotiate_send_rep(client->ioc, NBD_REP_ERR_UNSUP, tmp);
             return -EINVAL;
         }
     }
@@ -382,7 +394,6 @@ typedef struct {
 static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
 {
     NBDClient *client = data->client;
-    int csock = client->sock;
     char buf[8 + 8 + 8 + 128];
     int rc;
     const int myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
@@ -407,6 +418,7 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData 
*data)
         [28 .. 151]   reserved     (0)
      */
 
+    qio_channel_set_blocking(client->ioc, false, NULL);
     rc = -EINVAL;
 
     TRACE("Beginning negotiation.");
@@ -423,12 +435,12 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData 
*data)
     }
 
     if (client->exp) {
-        if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
+        if (nbd_negotiate_write(client->ioc, buf, sizeof(buf)) != sizeof(buf)) 
{
             LOG("write failed");
             goto fail;
         }
     } else {
-        if (nbd_negotiate_write(csock, buf, 18) != 18) {
+        if (nbd_negotiate_write(client->ioc, buf, 18) != 18) {
             LOG("write failed");
             goto fail;
         }
@@ -441,8 +453,8 @@ static coroutine_fn int nbd_negotiate(NBDClientNewData 
*data)
         assert ((client->exp->nbdflags & ~65535) == 0);
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
-        if (nbd_negotiate_write(csock, buf + 18,
-                                sizeof(buf) - 18) != sizeof(buf) - 18) {
+        if (nbd_negotiate_write(client->ioc, buf + 18, sizeof(buf) - 18) !=
+            sizeof(buf) - 18) {
             LOG("write failed");
             goto fail;
         }
@@ -472,13 +484,13 @@ int nbd_disconnect(int fd)
 }
 #endif
 
-static ssize_t nbd_receive_request(int csock, struct nbd_request *request)
+static ssize_t nbd_receive_request(QIOChannel *ioc, struct nbd_request 
*request)
 {
     uint8_t buf[NBD_REQUEST_SIZE];
     uint32_t magic;
     ssize_t ret;
 
-    ret = read_sync(csock, buf, sizeof(buf));
+    ret = read_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -513,7 +525,7 @@ static ssize_t nbd_receive_request(int csock, struct 
nbd_request *request)
     return 0;
 }
 
-static ssize_t nbd_send_reply(int csock, struct nbd_reply *reply)
+static ssize_t nbd_send_reply(QIOChannel *ioc, struct nbd_reply *reply)
 {
     uint8_t buf[NBD_REPLY_SIZE];
     ssize_t ret;
@@ -531,7 +543,7 @@ static ssize_t nbd_send_reply(int csock, struct nbd_reply 
*reply)
 
     TRACE("Sending response to client");
 
-    ret = write_sync(csock, buf, sizeof(buf));
+    ret = write_sync(ioc, buf, sizeof(buf));
     if (ret < 0) {
         return ret;
     }
@@ -559,8 +571,8 @@ void nbd_client_put(NBDClient *client)
         assert(client->closing);
 
         nbd_unset_handlers(client);
-        close(client->sock);
-        client->sock = -1;
+        object_unref(OBJECT(client->sioc));
+        object_unref(OBJECT(client->ioc));
         if (client->exp) {
             QTAILQ_REMOVE(&client->exp->clients, client, next);
             nbd_export_put(client->exp);
@@ -580,7 +592,8 @@ static void client_close(NBDClient *client)
     /* Force requests to finish.  They will drop their own references,
      * then we'll close the socket and free the NBDClient.
      */
-    shutdown(client->sock, 2);
+    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
+                         NULL);
 
     /* Also tell the client, so that they release their reference.  */
     if (client->close) {
@@ -773,25 +786,25 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct 
nbd_reply *reply,
                                  int len)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
     ssize_t rc, ret;
 
+    g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine = qemu_coroutine_self();
     nbd_set_handlers(client);
 
     if (!len) {
-        rc = nbd_send_reply(csock, reply);
+        rc = nbd_send_reply(client->ioc, reply);
     } else {
-        socket_set_cork(csock, 1);
-        rc = nbd_send_reply(csock, reply);
+        qio_channel_set_cork(client->ioc, true);
+        rc = nbd_send_reply(client->ioc, reply);
         if (rc >= 0) {
-            ret = qemu_co_send(csock, req->data, len);
+            ret = write_sync(client->ioc, req->data, len);
             if (ret != len) {
                 rc = -EIO;
             }
         }
-        socket_set_cork(csock, 0);
+        qio_channel_set_cork(client->ioc, false);
     }
 
     client->send_coroutine = NULL;
@@ -803,14 +816,14 @@ static ssize_t nbd_co_send_reply(NBDRequest *req, struct 
nbd_reply *reply,
 static ssize_t nbd_co_receive_request(NBDRequest *req, struct nbd_request 
*request)
 {
     NBDClient *client = req->client;
-    int csock = client->sock;
     uint32_t command;
     ssize_t rc;
 
+    g_assert(qemu_in_coroutine());
     client->recv_coroutine = qemu_coroutine_self();
     nbd_update_can_read(client);
 
-    rc = nbd_receive_request(csock, request);
+    rc = nbd_receive_request(client->ioc, request);
     if (rc < 0) {
         if (rc != -EAGAIN) {
             rc = -EIO;
@@ -845,7 +858,7 @@ static ssize_t nbd_co_receive_request(NBDRequest *req, 
struct nbd_request *reque
     if (command == NBD_CMD_WRITE) {
         TRACE("Reading %u byte(s)", request->len);
 
-        if (qemu_co_recv(csock, req->data, request->len) != request->len) {
+        if (read_sync(client->ioc, req->data, request->len) != request->len) {
             LOG("reading from socket failed");
             rc = -EIO;
             goto out;
@@ -1040,7 +1053,7 @@ static void nbd_restart_write(void *opaque)
 static void nbd_set_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true,
                            client->can_read ? nbd_read : NULL,
                            client->send_coroutine ? nbd_restart_write : NULL,
@@ -1051,7 +1064,7 @@ static void nbd_set_handlers(NBDClient *client)
 static void nbd_unset_handlers(NBDClient *client)
 {
     if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sock,
+        aio_set_fd_handler(client->exp->ctx, client->sioc->fd,
                            true, NULL, NULL, NULL);
     }
 }
@@ -1093,7 +1106,9 @@ out:
     g_free(data);
 }
 
-void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
+void nbd_client_new(NBDExport *exp,
+                    QIOChannelSocket *sioc,
+                    void (*close_fn)(NBDClient *))
 {
     NBDClient *client;
     NBDClientNewData *data = g_new(NBDClientNewData, 1);
@@ -1101,7 +1116,10 @@ void nbd_client_new(NBDExport *exp, int csock, void 
(*close_fn)(NBDClient *))
     client = g_malloc0(sizeof(NBDClient));
     client->refcount = 1;
     client->exp = exp;
-    client->sock = csock;
+    client->sioc = sioc;
+    object_ref(OBJECT(client->sioc));
+    client->ioc = QIO_CHANNEL(sioc);
+    object_ref(OBJECT(client->ioc));
     client->can_read = true;
     client->close = close_fn;
 
diff --git a/qemu-nbd.c b/qemu-nbd.c
index 1c9e3ea..2346e99 100644
--- a/qemu-nbd.c
+++ b/qemu-nbd.c
@@ -252,7 +252,7 @@ static void *nbd_client_thread(void *arg)
         goto out;
     }
 
-    ret = nbd_receive_negotiate(sioc->fd, NULL, &nbdflags,
+    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), NULL, &nbdflags,
                                 &size, &local_error);
     if (ret < 0) {
         if (local_error) {
@@ -268,7 +268,7 @@ static void *nbd_client_thread(void *arg)
         goto out_socket;
     }
 
-    ret = nbd_init(fd, sioc->fd, nbdflags, size);
+    ret = nbd_init(fd, sioc, nbdflags, size);
     if (ret < 0) {
         goto out_fd;
     }
@@ -328,7 +328,6 @@ static void nbd_client_closed(NBDClient *client)
 static gboolean nbd_accept(QIOChannel *ioc, GIOCondition cond, gpointer opaque)
 {
     QIOChannelSocket *cioc;
-    int fd;
 
     cioc = qio_channel_socket_accept(QIO_CHANNEL_SOCKET(ioc),
                                      NULL);
@@ -343,10 +342,7 @@ static gboolean nbd_accept(QIOChannel *ioc, GIOCondition 
cond, gpointer opaque)
 
     nb_fds++;
     nbd_update_server_watch();
-    fd = dup(cioc->fd);
-    if (fd >= 0) {
-        nbd_client_new(exp, fd, nbd_client_closed);
-    }
+    nbd_client_new(exp, cioc, nbd_client_closed);
     object_unref(OBJECT(cioc));
 
     return TRUE;
-- 
2.5.0


Reply via email to