Author: brane
Date: Mon Jan 5 16:44:26 2026
New Revision: 1931128
Log:
[SERF-211] Finalize the asynchronouse resolver API. This adds a private
interface for waking the context from a poll, so that the resolver can
signal when results are available.
* serf.h
(serf_context_create_ex): Make the docstring a docstring.
(serf_address_resolved_t,
serf_address_resolve_async,
serf_connection_created_t,
serf_connection_create_async): Remove the experimental comments
and fix some typos in the docstrings.
* serf_private.h
(SERF_IO_WAKEUP_PIPE): New constant for the io baton type.
(serf_context_t::wakeup): New member.
(serf__context_wakeup): New prototype.
* src/context.c: Include <apr_atomic.h>
(WAKEUP_LOOPBACK, WAKEUP_FAMILY): New constants.
(serf__context_wakeup_t): New struct for the self-pinging wakeup socket.
(init_wakeup, process_wakeup): New private helper functions.
(serf__context_wakeup): Implement here.
(serf_context_create_ex): Initialize the wakeup socket.
(serf_event_trigger): Process the wakeup signal.
* src/resolve.c: Remove the experimental/todo top-level comment.
(resolve): Tweak log message.
(push_resolve_result): Wake the context when a new result is available.
(serf__process_async_resolve_results): Return immediately if the async
resolver was not properly initialized. Add debug logging.
Modified:
serf/trunk/serf.h
serf/trunk/serf_private.h
serf/trunk/src/context.c
serf/trunk/src/resolve.c
Modified: serf/trunk/serf.h
==============================================================================
--- serf/trunk/serf.h Mon Jan 5 15:58:09 2026 (r1931127)
+++ serf/trunk/serf.h Mon Jan 5 16:44:26 2026 (r1931128)
@@ -331,7 +331,8 @@ typedef apr_status_t (*serf_socket_remov
apr_pollfd_t *pfd,
void *serf_baton);
-/* Create a new context for serf operations.
+/**
+ * Create a new context for serf operations.
*
* Use this function to make serf not use its internal control loop, but
* instead rely on an external event loop. Serf will use the @a addf and @a rmf
@@ -650,7 +651,6 @@ apr_status_t serf_connection_create2(
*
* @since New in 1.5.
*/
-/* FIXME: EXPERIMENTAL */
typedef void (*serf_address_resolved_t)(
serf_context_t *ctx,
void *resolved_baton,
@@ -668,18 +668,17 @@ typedef void (*serf_address_resolved_t)(
* address resolution, use serf_connection_create_async(), which does take
* proxy configuration into account.
*
- * The @a resolve callback will be called during a subsequent call to
+ * The @a resolved callback will be called during a subsequent call to
* serf_context_run() or serf_context_prerun() and will receive the same
* @a ctx and @a resolved_baton that are provided here.
*
* The lifetime of all function arguments except @a pool must extend until
- * either @a resolve is called or an error is reported.
+ * either @a resolved is called or an error is reported.
*
* All temporary allocations should be made in @a pool.
*
* @since New in 1.5.
*/
-/* FIXME: EXPERIMENTAL */
apr_status_t serf_address_resolve_async(
serf_context_t *ctx,
apr_uri_t host_info,
@@ -704,7 +703,6 @@ apr_status_t serf_address_resolve_async(
*
* @since New in 1.5.
*/
-/* FIXME: EXPERIMENTAL */
typedef void (*serf_connection_created_t)(
serf_context_t *ctx,
void *created_baton,
@@ -728,7 +726,6 @@ typedef void (*serf_connection_created_t
*
* @since New in 1.5.
*/
-/* FIXME: EXPERIMENTAL */
apr_status_t serf_connection_create_async(
serf_context_t *ctx,
apr_uri_t host_info,
Modified: serf/trunk/serf_private.h
==============================================================================
--- serf/trunk/serf_private.h Mon Jan 5 15:58:09 2026 (r1931127)
+++ serf/trunk/serf_private.h Mon Jan 5 16:44:26 2026 (r1931128)
@@ -89,6 +89,7 @@ typedef int serf__bool_t; /* Not _Bool *
#define SERF_IO_CLIENT (1)
#define SERF_IO_CONN (2)
#define SERF_IO_LISTENER (3)
+#define SERF_IO_WAKEUP_PIPE (4)
/*** Narrowing conversions ***/
@@ -544,6 +545,9 @@ struct serf_context_t {
serf_config_t *config;
+ /* The wakeup socket */
+ struct serf__context_wakeup_t *wakeup;
+
/* Support for asynchronous address resolution. */
void *volatile resolve_head;
apr_status_t resolve_init_status;
@@ -741,6 +745,9 @@ struct serf_connection_t {
up buckets that may still reference buckets of this request */
void serf__connection_pre_cleanup(serf_connection_t *);
+/* Called when an asynchronous event should wake up the context's pollset. */
+void serf__context_wakeup(serf_context_t *ctx);
+
/* Called from serf_context_create_ex() to set up the context-specific
asynchronous address resolver context. */
apr_status_t serf__create_resolve_context(serf_context_t *ctx);
Modified: serf/trunk/src/context.c
==============================================================================
--- serf/trunk/src/context.c Mon Jan 5 15:58:09 2026 (r1931127)
+++ serf/trunk/src/context.c Mon Jan 5 16:44:26 2026 (r1931128)
@@ -18,6 +18,7 @@
* ====================================================================
*/
+#include <apr_atomic.h>
#include <apr_pools.h>
#include <apr_poll.h>
#include <apr_version.h>
@@ -27,6 +28,145 @@
#include "serf_private.h"
+
+/* APR has wakeable pollsets, but we can't use them: the main reason is that
+ the context may not own the pollset, or indeed there may not even be a
+ pollset but a different event source.
+
+ Instead, we create a UDP socket bound to the loopback address and add it
+ to the context. The socket pings itself to wake up. */
+
+#if APR_HAVE_IPV6
+/* Bind to the IPv6 loopback address, if supported. */
+#define WAKEUP_LOOPBACK "::1"
+#define WAKEUP_FAMILY APR_INET6
+#else
+#define WAKEUP_LOOPBACK "127.0.0.1"
+#define WAKEUP_FAMILY APR_INET
+#endif
+
+/* The socket that we'll use to wake the context's pollset. */
+struct serf__context_wakeup_t
+{
+ apr_sockaddr_t *addr;
+ apr_socket_t *skt;
+ apr_pollfd_t pfd;
+ serf_io_baton_t io_baton;
+ volatile apr_uint32_t pending;
+};
+
+static void init_wakeup(serf_context_t *ctx)
+{
+ struct serf__context_wakeup_t *wakeup;
+ apr_status_t status;
+
+ ctx->wakeup = NULL;
+ wakeup = apr_pcalloc(ctx->pool, sizeof(*wakeup));
+
+ /* Get the loopback address. This shouldn't block. */
+ status = apr_sockaddr_info_get(&wakeup->addr,
+ WAKEUP_LOOPBACK, WAKEUP_FAMILY,
+ 0, 0, ctx->pool);
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p init wakeup: <%d> resolve %s\n",
+ ctx, status, WAKEUP_LOOPBACK);
+ }
+
+ /* Create and bind the socket to a random port. */
+ if (!status) {
+ status = apr_socket_create(&wakeup->skt, WAKEUP_FAMILY,
+ SOCK_DGRAM, APR_PROTO_UDP, ctx->pool);
+ if (!status)
+ status = apr_socket_bind(wakeup->skt, wakeup->addr);
+ if (!status)
+ status = apr_socket_addr_get(&wakeup->addr, APR_LOCAL,
wakeup->skt);
+
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p init wakeup: <%d> socket\n",
+ ctx, status);
+ }
+
+ if (!status) {
+ wakeup->pfd.desc_type = APR_POLL_SOCKET;
+ wakeup->pfd.desc.s = wakeup->skt;
+ wakeup->pfd.reqevents = APR_POLLIN;
+ wakeup->io_baton.type = SERF_IO_WAKEUP_PIPE;
+ wakeup->io_baton.ctx = ctx;
+ wakeup->io_baton.reqevents = wakeup->pfd.reqevents;
+ status = ctx->pollset_add(ctx->pollset_baton, &wakeup->pfd,
+ &wakeup->io_baton);
+ }
+
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p init wakeup: <%d> pollset add\n",
+ ctx, status);
+ }
+ }
+
+ if (!status) {
+ wakeup->pending = 0;
+ ctx->wakeup = wakeup;
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p init wakeup done\n", ctx);
+ }
+}
+
+static apr_status_t process_wakeup(serf_context_t *ctx)
+{
+ struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
+ apr_status_t status;
+ apr_sockaddr_t from;
+ apr_size_t length = 3;
+ char buffer[3];
+
+ if (!wakeup)
+ return APR_SUCCESS;
+
+ status = apr_socket_recvfrom(&from, wakeup->skt, 0, buffer, &length);
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p received wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
+ ctx, status, length);
+ return status;
+ }
+ /* TODO: Should we check if the socket actually did ping itself? */
+
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p received wakeup: [%" APR_SIZE_T_FMT "]\n",
+ ctx, length);
+ apr_atomic_set32(&wakeup->pending, 0);
+ return APR_SUCCESS;
+}
+
+void serf__context_wakeup(serf_context_t *ctx)
+{
+ struct serf__context_wakeup_t *const wakeup = ctx->wakeup;
+
+ if (!wakeup)
+ return;
+
+ /* Don't signal a new wakeup before the previous one has been processed. */
+ if (apr_atomic_cas32(&wakeup->pending, 1, 0) == 0) {
+ apr_size_t length = 1;
+ apr_status_t status = apr_socket_sendto(wakeup->skt, wakeup->addr, 0,
+ "\b", &length);
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p wakeup: <%d> [%" APR_SIZE_T_FMT "]\n",
+ ctx, status, length);
+ }
+ else {
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p wakeup: [%" APR_SIZE_T_FMT "]\n",
+ ctx, length);
+ }
+ }
+}
+
+
/**
* Callback function (implements serf_progress_t). Takes a number of bytes
* read @a read and bytes written @a written, adds those to the total for this
@@ -114,7 +254,6 @@ static apr_status_t pollset_rm(void *use
return apr_pollset_remove(s->pollset, pfd);
}
-
void serf_config_proxy(serf_context_t *ctx,
apr_sockaddr_t *address)
{
@@ -194,6 +333,13 @@ serf_context_t *serf_context_create_ex(
ctx->authn_types = SERF_AUTHN_ALL;
ctx->server_authn_info = apr_hash_make(pool);
+ /* Assume returned status is APR_SUCCESS */
+ serf__config_store_init(ctx);
+
+ serf__config_store_create_ctx_config(ctx, &ctx->config);
+
+ serf__log_init(ctx);
+
/* Initialize async resolver result queue. */
ctx->resolve_head = NULL;
ctx->resolve_init_status = APR_SUCCESS;
@@ -201,13 +347,12 @@ serf_context_t *serf_context_create_ex(
if (ctx->resolve_init_status != APR_SUCCESS) {
ctx->resolve_context = NULL;
}
-
- /* Assume returned status is APR_SUCCESS */
- serf__config_store_init(ctx);
-
- serf__config_store_create_ctx_config(ctx, &ctx->config);
-
- serf__log_init(ctx);
+ else {
+ /* Initialize the context's wakeup event. */
+ /* FIXME: For now, we only use this from the asynchronouse resolver.
+ We could expose awakable contexts in the public API. */
+ init_wakeup(ctx);
+ }
return ctx;
}
@@ -282,6 +427,9 @@ apr_status_t serf_event_trigger(
return status;
}
}
+ else if (io->type == SERF_IO_WAKEUP_PIPE) {
+ status = process_wakeup(io->ctx);
+ }
return status;
}
Modified: serf/trunk/src/resolve.c
==============================================================================
--- serf/trunk/src/resolve.c Mon Jan 5 15:58:09 2026 (r1931127)
+++ serf/trunk/src/resolve.c Mon Jan 5 16:44:26 2026 (r1931128)
@@ -67,13 +67,6 @@
#define HAVE_ASYNC_RESOLVER (SERF_HAVE_ASYNC_RESOLVER || APR_HAS_THREADS)
-/*
- * FIXME: EXPERIMENTAL
- * TODO:
- * - Wake the poll/select in serf_context_run() when new resolve
- * results are available.
- */
-
#if HAVE_ASYNC_RESOLVER
@@ -770,7 +763,7 @@ static void *APR_THREAD_FUNC resolve(apr
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), addr)) {
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN,
__FILE__, task->ctx->config,
- "apr async resolve: %s: %s\n", addr->hostname, buf);
+ "thread pool resolve: %s: %s\n", addr->hostname,
buf);
}
addr = addr->next;
}
@@ -869,6 +862,8 @@ static void push_resolve_result(serf_con
result->next = head;
head = apr_atomic_casptr(&ctx->resolve_head, result, head);
} while(head != result->next);
+
+ serf__context_wakeup(ctx);
}
@@ -884,12 +879,24 @@ apr_status_t serf__process_async_resolve
{
resolve_result_t *result;
apr_status_t status;
+ unsigned counter;
+
+ if (ctx->resolve_init_status != APR_SUCCESS) {
+ /* The async resolver initialization failed, so just return. */
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p async resolver is not initialized\n", ctx);
+ return APR_SUCCESS;
+ }
status = run_async_resolver_loop(ctx);
- if (status)
+ if (status) {
+ serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p async resolve: <%d>\n", ctx, status);
return status;
+ }
/* Grab the whole stack, leaving it empty, and process the contents. */
+ counter = 0;
result = apr_atomic_xchgptr(&ctx->resolve_head, NULL);
while (result)
{
@@ -899,6 +906,13 @@ apr_status_t serf__process_async_resolve
result->result_pool);
apr_pool_destroy(result->result_pool);
result = next;
+ ++counter;
+ }
+
+ if (counter > 0) {
+ serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, ctx->config,
+ "context 0x%p async resolve: %d event%s\n",
+ ctx, counter, counter == 1 ? "" : "s");
}
return APR_SUCCESS;
}