Author: brane
Date: Mon Jan  5 16:44:26 2026
New Revision: 1931128

Log:
[SERF-211] Finalize the asynchronouse resolver API. This adds a private
interface for waking the context from a poll, so that the resolver can
signal when results are available.

* serf.h
  (serf_context_create_ex): Make the docstring a docstring.
  (serf_address_resolved_t,
   serf_address_resolve_async,
   serf_connection_created_t,
   serf_connection_create_async): Remove the experimental comments
    and fix some typos in the docstrings.

* serf_private.h
  (SERF_IO_WAKEUP_PIPE): New constant for the io baton type.
  (serf_context_t::wakeup): New member.
  (serf__context_wakeup): New prototype.

* src/context.c: Include <apr_atomic.h>
  (WAKEUP_LOOPBACK, WAKEUP_FAMILY): New constants.
  (serf__context_wakeup_t): New struct for the self-pinging wakeup socket.
  (init_wakeup, process_wakeup): New private helper functions.
  (serf__context_wakeup): Implement here.
  (serf_context_create_ex): Initialize the wakeup socket.
  (serf_event_trigger): Process the wakeup signal.

* src/resolve.c: Remove the experimental/todo top-level comment.
  (resolve): Tweak log message.
  (push_resolve_result): Wake the context when a new result is available.
  (serf__process_async_resolve_results): Return immediately if the async
   resolver was not properly initialized. Add debug logging.

Modified:
   serf/trunk/serf.h
   serf/trunk/serf_private.h
   serf/trunk/src/context.c
   serf/trunk/src/resolve.c

Modified: serf/trunk/serf.h
==============================================================================
--- serf/trunk/serf.h   Mon Jan  5 15:58:09 2026        (r1931127)
+++ serf/trunk/serf.h   Mon Jan  5 16:44:26 2026        (r1931128)
@@ -331,7 +331,8 @@ typedef apr_status_t (*serf_socket_remov
     apr_pollfd_t *pfd,
     void *serf_baton);
 
-/* Create a new context for serf operations.
+/**
+ * Create a new context for serf operations.
  *
  * Use this function to make serf not use its internal control loop, but
  * instead rely on an external event loop. Serf will use the @a addf and @a rmf
@@ -650,7 +651,6 @@ apr_status_t serf_connection_create2(
  *
  * @since New in 1.5.
  */
-/* FIXME: EXPERIMENTAL */
 typedef void (*serf_address_resolved_t)(
     serf_context_t *ctx,
     void *resolved_baton,
@@ -668,18 +668,17 @@ typedef void (*serf_address_resolved_t)(
  * address resolution, use serf_connection_create_async(), which does take
  * proxy configuration into account.
  *
- * The @a resolve callback will be called during a subsequent call to
+ * The @a resolved callback will be called during a subsequent call to
  * serf_context_run() or serf_context_prerun() and will receive the same
  * @a ctx and @a resolved_baton that are provided here.
  *
  * The lifetime of all function arguments except @a pool must extend until
- * either @a resolve is called or an error is reported.
+ * either @a resolved is called or an error is reported.
  *
  * All temporary allocations should be made in @a pool.
  *
  * @since New in 1.5.
  */
-/* FIXME: EXPERIMENTAL */
 apr_status_t serf_address_resolve_async(
     serf_context_t *ctx,
     apr_uri_t host_info,
@@ -704,7 +703,6 @@ apr_status_t serf_address_resolve_async(
  *
  * @since New in 1.5.
  */
-/* FIXME: EXPERIMENTAL */
 typedef void (*serf_connection_created_t)(
     serf_context_t *ctx,
     void *created_baton,
@@ -728,7 +726,6 @@ typedef void (*serf_connection_created_t
  *
  * @since New in 1.5.
  */
-/* FIXME: EXPERIMENTAL */
 apr_status_t serf_connection_create_async(
     serf_context_t *ctx,
     apr_uri_t host_info,

Modified: serf/trunk/serf_private.h
==============================================================================
--- serf/trunk/serf_private.h   Mon Jan  5 15:58:09 2026        (r1931127)
+++ serf/trunk/serf_private.h   Mon Jan  5 16:44:26 2026        (r1931128)
@@ -89,6 +89,7 @@ typedef int serf__bool_t; /* Not _Bool *
 #define SERF_IO_CLIENT (1)
 #define SERF_IO_CONN (2)
 #define SERF_IO_LISTENER (3)
+#define SERF_IO_WAKEUP_PIPE (4)
 
 /*** Narrowing conversions ***/
 
@@ -544,6 +545,9 @@ struct serf_context_t {
 
     serf_config_t *config;
 
+    /* The wakeup socket */
+    struct serf__context_wakeup_t *wakeup;
+
     /* Support for asynchronous address resolution. */
     void *volatile resolve_head;
     apr_status_t resolve_init_status;
@@ -741,6 +745,9 @@ struct serf_connection_t {
    up buckets that may still reference buckets of this request */
 void serf__connection_pre_cleanup(serf_connection_t *);
 
+/* Called when an asynchronous event should wake up the context's pollset.  */
+void serf__context_wakeup(serf_context_t *ctx);
+
 /* Called from serf_context_create_ex() to set up the context-specific
    asynchronous address resolver context. */
 apr_status_t serf__create_resolve_context(serf_context_t *ctx);

Modified: serf/trunk/src/context.c
==============================================================================
--- serf/trunk/src/context.c    Mon Jan  5 15:58:09 2026        (r1931127)
+++ serf/trunk/src/context.c    Mon Jan  5 16:44:26 2026        (r1931128)
@@ -18,6 +18,7 @@
  * ====================================================================
  */
 
+#include <apr_atomic.h>
 #include <apr_pools.h>
 #include <apr_poll.h>
 #include <apr_version.h>
@@ -27,6 +28,145 @@
 
 #include "serf_private.h"
 
+
+/* APR has wakeable pollsets, but we can't use them: the main reason is that
+   the context may not own the pollset, or indeed there may not even be a
+   pollset but a different event source.
+
+   Instead, we create a UDP socket bound to the loopback address and add it
+   to the context. The socket pings itself to wake up. */
+
+#if APR_HAVE_IPV6
+/* Bind to the IPv6 loopback address, if supported. */
+#define WAKEUP_LOOPBACK "::1"
+#define WAKEUP_FAMILY   APR_INET6
+#else
+#define WAKEUP_LOOPBACK "127.0.0.1"
+#define WAKEUP_FAMILY   APR_INET
+#endif
+
+/* The socket that we'll use to wake the context's pollset. */
+struct serf__context_wakeup_t
+{
+    apr_sockaddr_t *addr;
+    apr_socket_t *skt;
+    apr_pollfd_t pfd;
+    serf_io_baton_t io_baton;
+    volatile apr_uint32_t pending;
+};
+
+static void init_wakeup(serf_context_t *ctx)
+{
+    struct serf__context_wakeup_t *wakeup;
+    apr_status_t status;
+
+    ctx->wakeup = NULL;
+    wakeup = apr_pcalloc(ctx->pool, sizeof(*wakeup));
+
+    /* Get the loopback address. This shouldn't block. */
+    status = apr_sockaddr_info_get(&wakeup->addr,
+                                   WAKEUP_LOOPBACK, WAKEUP_FAMILY,
+                                   0, 0, ctx->pool);
+    if (status) {
+        serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p init wakeup: <%d> resolve %s\n",
+                  ctx, status, WAKEUP_LOOPBACK);
+    }
+
+    /* Create and bind the socket to a random port. */
+    if (!status) {
+        status = apr_socket_create(&wakeup->skt, WAKEUP_FAMILY,
+                                   SOCK_DGRAM, APR_PROTO_UDP, ctx->pool);
+        if (!status)
+            status = apr_socket_bind(wakeup->skt, wakeup->addr);
+        if (!status)
+            status = apr_socket_addr_get(&wakeup->addr, APR_LOCAL, 
wakeup->skt);
+
+        if (status) {
+            serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                      "context 0x%p init wakeup: <%d> socket\n",
+                      ctx, status);
+        }
+
+        if (!status) {
+            wakeup->pfd.desc_type = APR_POLL_SOCKET;
+            wakeup->pfd.desc.s = wakeup->skt;
+            wakeup->pfd.reqevents = APR_POLLIN;
+            wakeup->io_baton.type = SERF_IO_WAKEUP_PIPE;
+            wakeup->io_baton.ctx = ctx;
+            wakeup->io_baton.reqevents = wakeup->pfd.reqevents;
+            status = ctx->pollset_add(ctx->pollset_baton, &wakeup->pfd,
+                                      &wakeup->io_baton);
+        }
+
+        if (status) {
+            serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                      "context 0x%p init wakeup: <%d> pollset add\n",
+                      ctx, status);
+        }
+    }
+
+    if (!status) {
+        wakeup->pending = 0;
+        ctx->wakeup = wakeup;
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p init wakeup done\n", ctx);
+    }
+}
+
+static apr_status_t process_wakeup(serf_context_t *ctx)
+{
+    struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
+    apr_status_t status;
+    apr_sockaddr_t from;
+    apr_size_t length = 3;
+    char buffer[3];
+
+    if (!wakeup)
+        return APR_SUCCESS;
+
+    status = apr_socket_recvfrom(&from, wakeup->skt, 0, buffer, &length);
+    if (status) {
+        serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p received wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
+                  ctx, status, length);
+        return status;
+    }
+    /* TODO: Should we check if the socket actually did ping itself? */
+
+    serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+              "context 0x%p received wakeup: [%" APR_SIZE_T_FMT "]\n",
+              ctx, length);
+    apr_atomic_set32(&wakeup->pending, 0);
+    return APR_SUCCESS;
+}
+
+void serf__context_wakeup(serf_context_t *ctx)
+{
+    struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
+
+    if (!wakeup)
+        return;
+
+    /* Don't signal a new wakeup before the previous one has been processed. */
+    if (apr_atomic_cas32(&wakeup->pending, 1, 0) == 0) {
+        apr_size_t length = 1;
+        apr_status_t status = apr_socket_sendto(wakeup->skt, wakeup->addr, 0,
+                                                "\b", &length);
+        if (status) {
+            serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                      "context 0x%p wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
+                      ctx, status, length);
+        }
+        else {
+            serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+                      "context 0x%p wakeup: [%" APR_SIZE_T_FMT "]\n",
+                      ctx, length);
+        }
+    }
+}
+
+
 /**
  * Callback function (implements serf_progress_t). Takes a number of bytes
  * read @a read and bytes written @a written, adds those to the total for this
@@ -114,7 +254,6 @@ static apr_status_t pollset_rm(void *use
     return apr_pollset_remove(s->pollset, pfd);
 }
 
-
 void serf_config_proxy(serf_context_t *ctx,
                        apr_sockaddr_t *address)
 {
@@ -194,6 +333,13 @@ serf_context_t *serf_context_create_ex(
     ctx->authn_types = SERF_AUTHN_ALL;
     ctx->server_authn_info = apr_hash_make(pool);
 
+    /* Assume returned status is APR_SUCCESS */
+    serf__config_store_init(ctx);
+
+    serf__config_store_create_ctx_config(ctx, &ctx->config);
+
+    serf__log_init(ctx);
+
     /* Initialize async resolver result queue. */
     ctx->resolve_head = NULL;
     ctx->resolve_init_status = APR_SUCCESS;
@@ -201,13 +347,12 @@ serf_context_t *serf_context_create_ex(
     if (ctx->resolve_init_status != APR_SUCCESS) {
         ctx->resolve_context = NULL;
     }
-
-    /* Assume returned status is APR_SUCCESS */
-    serf__config_store_init(ctx);
-
-    serf__config_store_create_ctx_config(ctx, &ctx->config);
-
-    serf__log_init(ctx);
+    else {
+        /* Initialize the context's wakeup event. */
+        /* FIXME: For now, we only use this from the asynchronouse resolver.
+                  We could expose awakable contexts in the public API. */
+        init_wakeup(ctx);
+    }
 
     return ctx;
 }
@@ -282,6 +427,9 @@ apr_status_t serf_event_trigger(
             return status;
         }
     }
+    else if (io->type == SERF_IO_WAKEUP_PIPE) {
+        status = process_wakeup(io->ctx);
+    }
     return status;
 }
 

Modified: serf/trunk/src/resolve.c
==============================================================================
--- serf/trunk/src/resolve.c    Mon Jan  5 15:58:09 2026        (r1931127)
+++ serf/trunk/src/resolve.c    Mon Jan  5 16:44:26 2026        (r1931128)
@@ -67,13 +67,6 @@
 
 #define HAVE_ASYNC_RESOLVER (SERF_HAVE_ASYNC_RESOLVER || APR_HAS_THREADS)
 
-/*
- * FIXME: EXPERIMENTAL
- * TODO:
- *  - Wake the poll/select in serf_context_run() when new resolve
- *    results are available.
- */
-
 
 #if HAVE_ASYNC_RESOLVER
 
@@ -770,7 +763,7 @@ static void *APR_THREAD_FUNC resolve(apr
             if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr)) {
                 serf__log(LOGLVL_DEBUG, LOGCOMP_CONN,
                           __FILE__, task->ctx->config,
-                          "apr async resolve: %s: %s\n", addr->hostname, buf);
+                          "thread pool resolve: %s: %s\n", addr->hostname, 
buf);
             }
             addr = addr->next;
         }
@@ -869,6 +862,8 @@ static void push_resolve_result(serf_con
         result->next = head;
         head = apr_atomic_casptr(&ctx->resolve_head, result, head);
     } while(head != result->next);
+
+    serf__context_wakeup(ctx);
 }
 
 
@@ -884,12 +879,24 @@ apr_status_t serf__process_async_resolve
 {
     resolve_result_t *result;
     apr_status_t status;
+    unsigned counter;
+
+    if (ctx->resolve_init_status != APR_SUCCESS) {
+        /* The async resolver initialization failed, so just return. */
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p async resolver is not initialized\n", ctx);
+        return APR_SUCCESS;
+    }
 
     status = run_async_resolver_loop(ctx);
-    if (status)
+    if (status) {
+        serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p async resolve: <%d>\n", ctx, status);
         return status;
+    }
 
     /* Grab the whole stack, leaving it empty, and process the contents. */
+    counter = 0;
     result = apr_atomic_xchgptr(&ctx->resolve_head, NULL);
     while (result)
     {
@@ -899,6 +906,13 @@ apr_status_t serf__process_async_resolve
                          result->result_pool);
         apr_pool_destroy(result->result_pool);
         result = next;
+        ++counter;
+    }
+
+    if (counter > 0) {
+        serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+                  "context 0x%p async resolve: %d event%s\n",
+                  ctx, counter, counter == 1 ? "" : "s");
     }
     return APR_SUCCESS;
 }

Reply via email to