From: Or Goshen <ober...@gmail.com> --- aio-win32.c | 244 +++++++++++++++++++++++++++++++++++++++++---------- block/Makefile.objs | 4 +- block/nbd-client.h | 2 +- include/block/aio.h | 2 - include/block/nbd.h | 2 +- main-loop.c | 2 - nbd.c | 4 +- qemu-coroutine-io.c | 4 +- 8 files changed, 208 insertions(+), 56 deletions(-)
diff --git a/aio-win32.c b/aio-win32.c index 23f4e5b..7f716b1 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -22,12 +22,76 @@ struct AioHandler { EventNotifier *e; + IOHandler *io_read; + IOHandler *io_write; EventNotifierHandler *io_notify; GPollFD pfd; int deleted; + void *opaque; QLIST_ENTRY(AioHandler) node; }; +void aio_set_fd_handler(AioContext *ctx, + int fd, + IOHandler *io_read, + IOHandler *io_write, + void *opaque) +{ + /* fd is a SOCKET in our case */ + AioHandler *node; + + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + if (node->pfd.fd == fd && !node->deleted) + break; + } + + /* Are we deleting the fd handler? */ + if (!io_read && !io_write) { + if (node) { + /* If the lock is held, just mark the node as deleted */ + if (ctx->walking_handlers) { + node->deleted = 1; + node->pfd.revents = 0; + } else { + /* Otherwise, delete it for real. We can't just mark it as + * deleted because deleted nodes are only cleaned up after + * releasing the walking_handlers lock. + */ + QLIST_REMOVE(node, node); + CloseHandle((HANDLE)node->e); + g_free(node); + } + } + } else { + if (node == NULL) { + /* Alloc and insert if it's not already there */ + node = g_malloc0(sizeof(AioHandler)); + node->pfd.fd = fd; + QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + } + /* Create event */ + HANDLE event = WSACreateEvent(); + long lNetworkEvents = 0; + + if (node->io_read) + lNetworkEvents |= FD_READ; + if (node->io_write) + lNetworkEvents |= FD_WRITE; + + WSAEventSelect(node->pfd.fd, event, lNetworkEvents); + node->e = (EventNotifier *)event; + + /* Update handler with latest information */ + node->pfd.events = (io_read != NULL ? G_IO_IN : 0); + node->pfd.events |= (io_write != NULL ? G_IO_OUT : 0); + node->opaque = opaque; + node->io_read = io_read; + node->io_write = io_write; + } + + aio_notify(ctx); +} + void aio_set_event_notifier(AioContext *ctx, EventNotifier *e, EventNotifierHandler *io_notify) @@ -81,14 +145,88 @@ bool aio_pending(AioContext *ctx) AioHandler *node; QLIST_FOREACH(node, &ctx->aio_handlers, node) { + // HANDLE ? if (node->pfd.revents && node->io_notify) { return true; } + + // SOCKET ? + int revents; + + revents = node->pfd.revents & node->pfd.events; + if ((revents & G_IO_IN) && node->io_read) { + return true; + } + if ((revents & G_IO_OUT) && node->io_write) { + return true; + } } return false; } +static bool aio_dispatch(AioContext *ctx) +{ + AioHandler *node; + bool progress = false; + + /* + * We have to walk very carefully in case qemu_aio_set_fd_handler is + * called while we're walking. + */ + node = QLIST_FIRST(&ctx->aio_handlers); + while (node) { + AioHandler *tmp = node; + + ctx->walking_handlers++; + + if (!node->deleted) { + + // HANDLE ? + if (node->pfd.revents && node->io_notify) { + node->pfd.revents = 0; + node->io_notify(node->e); + + /* aio_notify() does not count as progress */ + if (node->e != &ctx->notifier) { + progress = true; + } + } + + // SOCKET ? + int revents = node->pfd.revents & node->pfd.events; + node->pfd.revents = 0; + + if ((revents & G_IO_IN) && node->io_read) { + node->io_read(node->opaque); + + /* aio_notify() does not count as progress */ + if (node->opaque != &ctx->notifier) { + progress = true; + } + } + if ((revents & G_IO_OUT) && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + } + + node = QLIST_NEXT(node, node); + + ctx->walking_handlers--; + + if (!ctx->walking_handlers && tmp->deleted) { + QLIST_REMOVE(tmp, node); + g_free(tmp); + } + } + + /* Run our timers */ + progress |= timerlistgroup_run_timers(&ctx->tlg); + + return progress; +} + bool aio_poll(AioContext *ctx, bool blocking) { AioHandler *node; @@ -96,6 +234,8 @@ bool aio_poll(AioContext *ctx, bool blocking) bool progress; int count; int timeout; + fd_set rfds, wfds; + struct timeval tv0 = { .tv_sec = 0, .tv_usec = 0}; progress = false; @@ -109,41 +249,7 @@ bool aio_poll(AioContext *ctx, bool blocking) progress = true; } - /* Run timers */ - progress |= timerlistgroup_run_timers(&ctx->tlg); - - /* - * Then dispatch any pending callbacks from the GSource. - * - * We have to walk very carefully in case qemu_aio_set_fd_handler is - * called while we're walking. - */ - node = QLIST_FIRST(&ctx->aio_handlers); - while (node) { - AioHandler *tmp; - - ctx->walking_handlers++; - - if (node->pfd.revents && node->io_notify) { - node->pfd.revents = 0; - node->io_notify(node->e); - - /* aio_notify() does not count as progress */ - if (node->e != &ctx->notifier) { - progress = true; - } - } - - tmp = node; - node = QLIST_NEXT(node, node); - - ctx->walking_handlers--; - - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); - } - } + progress = aio_dispatch(ctx); if (progress && !blocking) { return true; @@ -151,12 +257,42 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers++; - /* fill fd sets */ + FD_ZERO(&rfds); + FD_ZERO(&wfds); count = 0; QLIST_FOREACH(node, &ctx->aio_handlers, node) { - if (!node->deleted && node->io_notify) { + if (node->deleted) + continue; + + /* HANDLE ? */ + if (node->io_notify) { events[count++] = event_notifier_get_handle(node->e); } + + /* SOCKET ? */ + else if (node->io_read || node->io_write) { + if (node->io_read) + FD_SET ((SOCKET)node->pfd.fd, &rfds); + if (node->io_write) + FD_SET ((SOCKET)node->pfd.fd, &wfds); + + events[count++] = (HANDLE)node->e; + } + } + + if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { + QLIST_FOREACH(node, &ctx->aio_handlers, node) { + node->pfd.revents = 0; + if (FD_ISSET(node->pfd.fd, &rfds)) { + node->pfd.revents |= G_IO_IN; + blocking = false; + } + + if (FD_ISSET(node->pfd.fd, &wfds)) { + node->pfd.revents |= G_IO_OUT; + blocking = false; + } + } } ctx->walking_handlers--; @@ -184,6 +320,7 @@ bool aio_poll(AioContext *ctx, bool blocking) ctx->walking_handlers++; + // Handle ? if (!node->deleted && event_notifier_get_handle(node->e) == events[ret - WAIT_OBJECT_0] && node->io_notify) { @@ -195,6 +332,27 @@ bool aio_poll(AioContext *ctx, bool blocking) } } + // SOCKET ? + if (!node->deleted && + ((HANDLE)node->e == events[ret - WAIT_OBJECT_0])) { + + // what happened ? + WSANETWORKEVENTS ev; + ev.lNetworkEvents = 0xC0FFEE; + + WSAEnumNetworkEvents(node->pfd.fd, (HANDLE)node->e, &ev); + + if ((ev.lNetworkEvents & FD_READ) != 0 && node->io_read) { + node->io_read(node->opaque); + progress = true; + } + + if ((ev.lNetworkEvents & FD_WRITE) != 0 && node->io_write) { + node->io_write(node->opaque); + progress = true; + } + } + tmp = node; node = QLIST_NEXT(node, node); @@ -210,14 +368,10 @@ bool aio_poll(AioContext *ctx, bool blocking) events[ret - WAIT_OBJECT_0] = events[--count]; } - if (blocking) { - /* Run the timers a second time. We do this because otherwise aio_wait - * will not note progress - and will stop a drain early - if we have - * a timer that was not ready to run entering g_poll but is ready - * after g_poll. This will only do anything if a timer has expired. - */ - progress |= timerlistgroup_run_timers(&ctx->tlg); - } + /* Run dispatch even if there were no readable fds to run timers */ + if (aio_dispatch(ctx)) { + progress = true; + } return progress; } diff --git a/block/Makefile.objs b/block/Makefile.objs index 4e8c91e..e28f916 100644 --- a/block/Makefile.objs +++ b/block/Makefile.objs @@ -1,4 +1,4 @@ -block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o +block-obj-y += raw_bsd.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o nbd.o block-obj-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o qcow2-cache.o block-obj-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o block-obj-y += qed-check.o @@ -10,7 +10,7 @@ block-obj-$(CONFIG_POSIX) += raw-posix.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o ifeq ($(CONFIG_POSIX),y) -block-obj-y += nbd.o nbd-client.o sheepdog.o +block-obj-y += nbd-client.o sheepdog.o block-obj-$(CONFIG_LIBISCSI) += iscsi.o block-obj-$(CONFIG_CURL) += curl.o block-obj-$(CONFIG_RBD) += rbd.o diff --git a/block/nbd-client.h b/block/nbd-client.h index f2a6337..d02acc1 100644 --- a/block/nbd-client.h +++ b/block/nbd-client.h @@ -19,7 +19,7 @@ typedef struct NbdClientSession { int sock; uint32_t nbdflags; - off_t size; + uint64_t size; size_t blocksize; CoMutex send_mutex; diff --git a/include/block/aio.h b/include/block/aio.h index 2efdf41..effc8c2 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -199,7 +199,6 @@ bool aio_pending(AioContext *ctx); */ bool aio_poll(AioContext *ctx, bool blocking); -#ifdef CONFIG_POSIX /* Register a file descriptor and associated callbacks. Behaves very similarly * to qemu_set_fd_handler2. Unlike qemu_set_fd_handler2, these callbacks will * be invoked when using qemu_aio_wait(). @@ -212,7 +211,6 @@ void aio_set_fd_handler(AioContext *ctx, IOHandler *io_read, IOHandler *io_write, void *opaque); -#endif /* Register an event notifier and associated callbacks. Behaves very similarly * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks diff --git a/include/block/nbd.h b/include/block/nbd.h index c90f5e4..7a84882 100644 --- a/include/block/nbd.h +++ b/include/block/nbd.h @@ -69,7 +69,7 @@ int unix_socket_outgoing(const char *path); int unix_socket_incoming(const char *path); int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, - off_t *size, size_t *blocksize); + uint64_t *size, size_t *blocksize); int nbd_init(int fd, int csock, uint32_t flags, off_t size, size_t blocksize); ssize_t nbd_send_request(int csock, struct nbd_request *request); ssize_t nbd_receive_reply(int csock, struct nbd_reply *reply); diff --git a/main-loop.c b/main-loop.c index c3c9c28..0c82193 100644 --- a/main-loop.c +++ b/main-loop.c @@ -503,7 +503,6 @@ bool qemu_aio_wait(void) return aio_poll(qemu_aio_context, true); } -#ifdef CONFIG_POSIX void qemu_aio_set_fd_handler(int fd, IOHandler *io_read, IOHandler *io_write, @@ -511,7 +510,6 @@ void qemu_aio_set_fd_handler(int fd, { aio_set_fd_handler(qemu_aio_context, fd, io_read, io_write, opaque); } -#endif void qemu_aio_set_event_notifier(EventNotifier *notifier, EventNotifierHandler *io_read) diff --git a/nbd.c b/nbd.c index 030f56b..475503d 100644 --- a/nbd.c +++ b/nbd.c @@ -149,7 +149,7 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read) err = socket_error(); /* recoverable error */ - if (err == EINTR || (offset > 0 && err == EAGAIN)) { + if (err == EINTR || (offset > 0 && (err == EAGAIN || err == EWOULDBLOCK))) { continue; } @@ -434,7 +434,7 @@ fail: } int nbd_receive_negotiate(int csock, const char *name, uint32_t *flags, - off_t *size, size_t *blocksize) + uint64_t *size, size_t *blocksize) { char buf[256]; uint64_t magic, s; diff --git a/qemu-coroutine-io.c b/qemu-coroutine-io.c index 054ca70..eb89817 100644 --- a/qemu-coroutine-io.c +++ b/qemu-coroutine-io.c @@ -34,13 +34,15 @@ qemu_co_sendv_recvv(int sockfd, struct iovec *iov, unsigned iov_cnt, { size_t done = 0; ssize_t ret; + int err; while (done < bytes) { ret = iov_send_recv(sockfd, iov, iov_cnt, offset + done, bytes - done, do_send); if (ret > 0) { done += ret; } else if (ret < 0) { - if (errno == EAGAIN) { + err = socket_error(); + if (err == EAGAIN || err == EWOULDBLOCK) { qemu_coroutine_yield(); } else if (done == 0) { return -1; -- 1.7.9