The AioContext event loop uses ppoll(2) or epoll_wait(2) to monitor file
descriptors or until a timer expires.  In cases like virtqueues, Linux
AIO, and ThreadPool it is technically possible to wait for events via
polling (i.e. continuously checking for events without blocking).

Polling can be faster than blocking syscalls because file descriptors,
the process scheduler, and system calls are bypassed.

The main disadvantage to polling is that it increases CPU utilization.
In classic polling configuration a full host CPU thread might run at
100% to respond to events as quickly as possible.  This patch implements
a timeout so we fall back to blocking syscalls if polling detects no
activity.  After the timeout no CPU cycles are wasted on polling until
the next event loop iteration.

This patch implements an experimental polling mode that can be
controlled with the QEMU_AIO_POLL_MAX_NS=<nanoseconds> environment
variable.  The aio_poll() event loop function will attempt to poll
instead of using blocking syscalls.

The run_poll_handlers_begin() and run_poll_handlers_end() trace events
are added to aid performance analysis and troubleshooting.  If you need
to know whether polling mode is being used, trace these events to find
out.

Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
---
 aio-posix.c         | 104 +++++++++++++++++++++++++++++++++++++++++++++++++++-
 async.c             |  11 +++++-
 include/block/aio.h |   3 ++
 trace-events        |   4 ++
 4 files changed, 120 insertions(+), 2 deletions(-)

diff --git a/aio-posix.c b/aio-posix.c
index 4379c13..2f76825 100644
--- a/aio-posix.c
+++ b/aio-posix.c
@@ -18,6 +18,8 @@
 #include "block/block.h"
 #include "qemu/queue.h"
 #include "qemu/sockets.h"
+#include "qemu/cutils.h"
+#include "trace.h"
 #ifdef CONFIG_EPOLL_CREATE1
 #include <sys/epoll.h>
 #endif
@@ -27,12 +29,16 @@ struct AioHandler
     GPollFD pfd;
     IOHandler *io_read;
     IOHandler *io_write;
+    AioPollFn *io_poll;
     int deleted;
     void *opaque;
     bool is_external;
     QLIST_ENTRY(AioHandler) node;
 };
 
+/* How long to poll AioPollHandlers before monitoring file descriptors */
+static int64_t aio_poll_max_ns;
+
 #ifdef CONFIG_EPOLL_CREATE1
 
 /* The fd number threashold to switch to epoll */
@@ -206,11 +212,18 @@ void aio_set_fd_handler(AioContext *ctx,
     AioHandler *node;
     bool is_new = false;
     bool deleted = false;
+    int poll_disable_cnt;
 
     node = find_aio_handler(ctx, fd);
 
+    if (node) {
+        poll_disable_cnt = !io_poll - !node->io_poll;
+    } else {
+        poll_disable_cnt = !io_poll;
+    }
+
     /* Are we deleting the fd handler? */
-    if (!io_read && !io_write) {
+    if (!io_read && !io_write && !io_poll) {
         if (node == NULL) {
             return;
         }
@@ -239,9 +252,11 @@ void aio_set_fd_handler(AioContext *ctx,
             g_source_add_poll(&ctx->source, &node->pfd);
             is_new = true;
         }
+
         /* Update handler with latest information */
         node->io_read = io_read;
         node->io_write = io_write;
+        node->io_poll = io_poll;
         node->opaque = opaque;
         node->is_external = is_external;
 
@@ -251,6 +266,9 @@ void aio_set_fd_handler(AioContext *ctx,
 
     aio_epoll_update(ctx, node, is_new);
     aio_notify(ctx);
+
+    ctx->poll_disable_cnt += poll_disable_cnt;
+
     if (deleted) {
         g_free(node);
     }
@@ -268,6 +286,7 @@ void aio_set_event_notifier(AioContext *ctx,
 
 bool aio_prepare(AioContext *ctx)
 {
+    /* TODO run poll handlers? */
     return false;
 }
 
@@ -402,6 +421,50 @@ static void add_pollfd(AioHandler *node)
     npfd++;
 }
 
+/* run_poll_handlers:
+ * @ctx: the AioContext
+ * @max_ns: maximum time to poll for, in nanoseconds
+ *
+ * Polls for a given time.
+ *
+ * Note that ctx->notify_me must be non-zero so this function can detect
+ * aio_notify().
+ *
+ * Note that the caller must have incremented ctx->walking_handlers.
+ *
+ * Returns: true if progress was made, false otherwise
+ */
+static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
+{
+    bool progress = false;
+    int64_t end_time;
+
+    assert(ctx->notify_me);
+    assert(ctx->walking_handlers > 0);
+    assert(ctx->poll_disable_cnt == 0);
+
+    trace_run_poll_handlers_begin(ctx, max_ns);
+
+    end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns;
+
+    do {
+        AioHandler *node;
+
+        QLIST_FOREACH(node, &ctx->aio_handlers, node) {
+            if (!node->deleted && node->io_poll &&
+                node->io_poll(node->opaque)) {
+                progress = true;
+            }
+
+            /* Caller handles freeing deleted nodes.  Don't do it here. */
+        }
+    } while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time);
+
+    trace_run_poll_handlers_end(ctx, progress);
+
+    return progress;
+}
+
 bool aio_poll(AioContext *ctx, bool blocking)
 {
     AioHandler *node;
@@ -425,6 +488,29 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
     ctx->walking_handlers++;
 
+    if (blocking && aio_poll_max_ns && ctx->poll_disable_cnt == 0) {
+        /* See qemu_soonest_timeout() uint64_t hack */
+        int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx),
+                             (uint64_t)aio_poll_max_ns);
+
+        if (max_ns && run_poll_handlers(ctx, max_ns)) {
+            /* Skip file descriptor monitoring if polling succeeded.  Just run
+             * BHs and timers.
+             */
+
+            atomic_sub(&ctx->notify_me, 2);
+            aio_notify_accept(ctx);
+
+            ctx->walking_handlers--;
+
+            aio_bh_poll(ctx);
+            timerlistgroup_run_timers(&ctx->tlg);
+
+            aio_context_release(ctx);
+            return true;
+        }
+    }
+
     assert(npfd == 0);
 
     /* fill pollfds */
@@ -486,6 +572,22 @@ bool aio_poll(AioContext *ctx, bool blocking)
 
 void aio_context_setup(AioContext *ctx)
 {
+    if (!aio_poll_max_ns) {
+        int64_t val;
+        const char *env_str = getenv("QEMU_AIO_POLL_MAX_NS");
+
+        if (!env_str) {
+            env_str = "0";
+        }
+
+        if (!qemu_strtoll(env_str, NULL, 10, &val)) {
+            aio_poll_max_ns = val;
+        } else {
+            fprintf(stderr, "Unable to parse QEMU_AIO_POLL_MAX_NS "
+                            "environment variable\n");
+        }
+    }
+
 #ifdef CONFIG_EPOLL_CREATE1
     assert(!ctx->epollfd);
     ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
diff --git a/async.c b/async.c
index c8fbd63..aad2c8a 100644
--- a/async.c
+++ b/async.c
@@ -349,6 +349,15 @@ static void event_notifier_dummy_cb(EventNotifier *e)
 {
 }
 
+/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
+static bool event_notifier_poll(void *opaque)
+{
+    EventNotifier *e = opaque;
+    AioContext *ctx = container_of(e, AioContext, notifier);
+
+    return atomic_read(&ctx->notified);
+}
+
 AioContext *aio_context_new(Error **errp)
 {
     int ret;
@@ -367,7 +376,7 @@ AioContext *aio_context_new(Error **errp)
                            false,
                            (EventNotifierHandler *)
                            event_notifier_dummy_cb,
-                           NULL);
+                           event_notifier_poll);
 #ifdef CONFIG_LINUX_AIO
     ctx->linux_aio = NULL;
 #endif
diff --git a/include/block/aio.h b/include/block/aio.h
index 1fac404..8aa5219 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -131,6 +131,9 @@ struct AioContext {
 
     int external_disable_cnt;
 
+    /* Number of AioHandlers without .io_poll() */
+    int poll_disable_cnt;
+
     /* epoll(7) state used when built with CONFIG_EPOLL */
     int epollfd;
     bool epoll_enabled;
diff --git a/trace-events b/trace-events
index f74e1d3..7fe3a1b 100644
--- a/trace-events
+++ b/trace-events
@@ -25,6 +25,10 @@
 #
 # The <format-string> should be a sprintf()-compatible format string.
 
+# aio-posix.c
+run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
+run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
+
 # thread-pool.c
 thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque 
%p"
 thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p 
req %p opaque %p ret %d"
-- 
2.7.4


Reply via email to