Author: brane Date: Tue Jul 15 20:59:40 2025 New Revision: 1927249 URL: http://svn.apache.org/viewvc?rev=1927249&view=rev Log: Add support fir asynchronous address resolution in Serf, making it completely asynchronous. This was the only part of the API that would block on I/O during normal operation. Asynchronouse address resolution is optional.
* CMakeLists.txt (SOURCES): Add src/resolve.c. * serf.h (serf_address_resolved_t): New; notification callback for resolved addresses. (serf_address_resolve_async): New; createa a new address resolution task. * serf_private.h: Include apr_thread_mutex.h. (serf__resolve_result_t): New; address resolution result. (serf_context_t): Add members resolve_guard and resolve_head to collect the results of asynchronous address resolution. (serf__process_async_resolve_results): New. * src/context.c (serf_context_create_ex): Initialize resolve_guard and resolve_head. (serf_context_prerun): Call serf__process_async_resolve_results to gather the asynchronously resolved addresses for this context. * src/resolve.c: New file; implements the asynchronous address resolver. * test/test_context.c (test_async_resolve): New test for asynchronous address resolution. (test_context): Register it. * test/test_serf.h (use_new_async_connection): New prototype. * test/test_util.c (async_reolved_baton, address_resolved, use_new_async_connection): Creates a new connection asynchronously. Added: serf/trunk/src/resolve.c (with props) Modified: serf/trunk/CMakeLists.txt serf/trunk/serf.h serf/trunk/serf_private.h serf/trunk/src/context.c serf/trunk/test/test_context.c serf/trunk/test/test_serf.h serf/trunk/test/test_util.c Modified: serf/trunk/CMakeLists.txt URL: http://svn.apache.org/viewvc/serf/trunk/CMakeLists.txt?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/CMakeLists.txt (original) +++ serf/trunk/CMakeLists.txt Tue Jul 15 20:59:40 2025 @@ -137,6 +137,7 @@ list(APPEND SOURCES "src/outgoing.c" "src/outgoing_request.c" "src/pump.c" + "src/resolve.c" "src/ssltunnel.c" "auth/auth.c" "auth/auth_basic.c" Modified: serf/trunk/serf.h URL: http://svn.apache.org/viewvc/serf/trunk/serf.h?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/serf.h (original) +++ serf/trunk/serf.h Tue Jul 15 20:59:40 2025 @@ -540,6 +540,58 @@ apr_status_t serf_connection_create3( apr_pool_t *pool); +/** + * Notification callback when an address hae been resolved. + * + * The @a ctx and @a resolved_baton arguments are the same that were passed + * to serf_address_resolve_async(). + * + * @a status contains the result of the address resolution. If it is notably + * @c APR_SUCCESS, then @a host_address is invalid and should be ignored. + * + * The resolved @a host_address is ephemeral, allocated iun @a pool and lives + * only for the duration of the callback. If ti is not consumed, it should be + * copied to a more permanent pool, using for example apr_sockaddr_info_copy(). + * + * All temporary allocations should be made in @a pool. + * + * @since New in 1.4. + */ +/* FIXME: EXPERIMENTAL */ +typedef void (*serf_address_resolved_t)( + serf_context_t *ctx, + void *resolved_baton, + apr_sockaddr_t *host_address, + apr_status_t status, + apr_pool_t *pool); + +/** + * Asynchronously resolve an address. + * + * The address represented by @a host_info is intended to be used to create + * new connections in @a ctx; proxy configuration will be taken into account + * during resolution. See, for example, serf_connection_create3(). + * + * The @a resolve callback will be called during a subsequent call to + * serf_context_run() or serf_context_prerun() and will receive the same + * @a ctx and @a resolved_baton that are preovided here. + * + * The lifetime of all function arguments except @a pool must extend until + * either @a resolve is called or an error is reported. + * + * All temporary allocations should be made in @a pool. + * + * @since New in 1.4. + */ +/* FIXME: EXPERIMENTAL */ +apr_status_t serf_address_resolve_async( + serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *pool); + + typedef apr_status_t (*serf_accept_client_t)( serf_context_t *ctx, serf_listener_t *l, Modified: serf/trunk/serf_private.h URL: http://svn.apache.org/viewvc/serf/trunk/serf_private.h?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/serf_private.h (original) +++ serf/trunk/serf_private.h Tue Jul 15 20:59:40 2025 @@ -43,7 +43,8 @@ typedef int serf__bool_t; /* Not _Bool * #endif #endif -#include <apr.h> /* For __attribute__ */ +#include <apr.h> /* For __attribute__ and APR_HAS_THREADS */ +#include <apr_thread_mutex.h> /* For apr_thread_mutext_t */ /* Define a MAX macro if we don't already have one */ #ifndef MAX @@ -438,6 +439,18 @@ apr_status_t serf__config_store_remove_host(serf__config_store_t config_store, const char *hostname_port); + +typedef struct serf__resolve_result_t serf__resolve_result_t; +struct serf__resolve_result_t +{ + apr_sockaddr_t *host_address; + apr_status_t status; + serf_address_resolved_t resolved; + void *resolved_baton; + apr_pool_t *result_pool; + serf__resolve_result_t *next; +}; + struct serf_context_t { /* the pool used for self and for other allocations */ apr_pool_t *pool; @@ -484,6 +497,12 @@ struct serf_context_t { serf_credentials_callback_t cred_cb; serf_config_t *config; + + /* The results of asynchronous address resolution. */ +#if APR_HAS_THREADS + apr_thread_mutex_t *resolve_guard; +#endif + serf__resolve_result_t *resolve_head; }; struct serf_listener_t { @@ -665,6 +684,12 @@ struct serf_connection_t { up buckets that may still reference buckets of this request */ void serf__connection_pre_cleanup(serf_connection_t *); +/* Called from serf_context_prerun() before handling the connections. + Processes the results of any asynchronously resolved addresses + that were initiated for CTX. */ +apr_status_t serf__process_async_resolve_results(serf_context_t *ctx); + + /*** Internal bucket functions ***/ /* Copies all data contained in vecs to *data, optionally telling how much was Modified: serf/trunk/src/context.c URL: http://svn.apache.org/viewvc/serf/trunk/src/context.c?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/src/context.c (original) +++ serf/trunk/src/context.c Tue Jul 15 20:59:40 2025 @@ -192,6 +192,15 @@ serf_context_t *serf_context_create_ex( ctx->authn_types = SERF_AUTHN_ALL; ctx->server_authn_info = apr_hash_make(pool); + /* Initialize async resolver result queue. */ +#if APR_HAS_THREADS + /* FIXME: Ignore the status? */ + apr_thread_mutex_create(&ctx->resolve_guard, + APR_THREAD_MUTEX_DEFAULT, + ctx->pool); +#endif + ctx->resolve_head = NULL; + /* Assume returned status is APR_SUCCESS */ serf__config_store_init(ctx); @@ -210,7 +219,14 @@ serf_context_t *serf_context_create(apr_ apr_status_t serf_context_prerun(serf_context_t *ctx) { - apr_status_t status = APR_SUCCESS; + apr_status_t status; + + /* Process async resolver results here, as that gives users a chance + to get their connections active in the same context run when the + result was made available. */ + if ((status = serf__process_async_resolve_results(ctx)) != APR_SUCCESS) + return status; + if ((status = serf__open_connections(ctx)) != APR_SUCCESS) return status; Added: serf/trunk/src/resolve.c URL: http://svn.apache.org/viewvc/serf/trunk/src/resolve.c?rev=1927249&view=auto ============================================================================== --- serf/trunk/src/resolve.c (added) +++ serf/trunk/src/resolve.c Tue Jul 15 20:59:40 2025 @@ -0,0 +1,329 @@ +/* ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + */ + +#include <apr.h> +#include <apr_errno.h> +#include <apr_network_io.h> +#include <apr_pools.h> +#include <apr_thread_mutex.h> +#include <apr_thread_pool.h> + +#include "serf.h" +#include "serf_private.h" + + +#define HAVE_ASYNC_RESOLVER (SERF_USE_ASYNC_RESOLVER || APR_HAS_THREADS) + +/* + * FIXME: EXPERIMENTAL + * TODO: + * - Add cleanup function for in-flight resolve tasks if their owning + * context is destroyed. This function should be called from the + * context's pool cleanup handler. + * - Figure out what to do if the lock/unlock calls return an error. + * This should not be possible unless we messed up the implementation, + * but there should be a way for clients to back out of this situation. + * Failed lock/unlock could potentially leave the context in an + * inconsistent state. + */ + + +#if HAVE_ASYNC_RESOLVER + +/* Pushes the result of a successful or failed address resolution + onto the context's result queue. */ +static void push_resolve_result(serf_context_t *ctx, + apr_sockaddr_t *host_address, + apr_status_t status, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *resolve_pool); + +/* This is the core of the asynchronous resolver implementation. */ +static apr_status_t resolve_address_async(serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *resolve_pool, + apr_pool_t *scratch_pool); + +/* Public API */ +apr_status_t serf_address_resolve_async(serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *pool) +{ + apr_pool_t *resolve_pool; + + apr_pool_create(&resolve_pool, ctx->pool); + + /* See serf_connection_create3(): if there's a proxy configured in the + context, don't resolve the host address, just register the result. */ + if (ctx->proxy_address) + { + push_resolve_result(ctx, NULL, APR_SUCCESS, + resolved, resolved_baton, resolve_pool); + return APR_SUCCESS; + } + + return resolve_address_async(ctx, host_info, resolved, resolved_baton, + resolve_pool, pool); +} + +#else /* !HAVE_ASYNC_RESOLVER */ + +/* Public API */ +apr_status_t serf_address_resolve_async(serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *pool) +{ + /* We have no external asynchronous resolver library, nor threads, + therefore no async resolver at all. */ + return APR_ENOTIMPL; +} + +#endif /* !HAVE_ASYNC_RESOLVER */ + + +#if SERF_USE_ASYNC_RESOLVER + +/* TODO: Add implementation for one or more async resolver libraries. */ +#if 0 +static apr_status_t resolve_address_async(serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *resolve_pool, + apr_pool_t *scratch_pool) +{ + ... +} + +/* Some asynchronous resolved libraries use event loop to harvest results. + This function will be called from serf__process_async_resolve_results() + so, in effect, from serf_context_prerun(). */ +static void run_async_resolver_loop(void) +{ + ... +} +#endif + +#else /* !SERF_USE_ASYNC_RESOLVER */ +#if APR_HAS_THREADS + +/* This could be made configurable, but given that this is a fallback + implementation, it really shouldn't be necessary. */ +#define MAX_WORK_QUEUE_THREADS 50 +static apr_pool_t *work_pool = NULL; +static apr_thread_pool_t *work_queue = NULL; +static apr_status_t init_work_queue(void *baton) +{ + serf_context_t *ctx = baton; + apr_status_t status; + + apr_pool_create(&work_pool, NULL); + status = apr_thread_pool_create(&work_queue, + 1, MAX_WORK_QUEUE_THREADS, + work_pool); + + serf__log((status ? LOGLVL_ERROR : LOGLVL_DEBUG), + LOGCOMP_CONN, __FILE__, ctx->config, + "Init async resolve work queue, status %d\n", status); + return status; +} + + +/* Task data for the thred pool resolver. */ +typedef struct resolve_task_t resolve_task_t; +struct resolve_task_t +{ + serf_context_t *ctx; + apr_uri_t host_info; + serf_address_resolved_t resolved; + void *resolved_baton; + apr_pool_t *resolve_pool; +}; + + +static void *APR_THREAD_FUNC resolve(apr_thread_t *thread, void *baton) +{ + resolve_task_t *task = baton; + apr_sockaddr_t *host_address; + apr_status_t status; + + status = apr_sockaddr_info_get(&host_address, + task->host_info.hostname, + APR_UNSPEC, + task->host_info.port, + 0, task->resolve_pool); + push_resolve_result(task->ctx, host_address, status, + task->resolved, task->resolved_baton, + task->resolve_pool); + return NULL; +} + +static apr_status_t resolve_address_async(serf_context_t *ctx, + apr_uri_t host_info, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *resolve_pool, + apr_pool_t *scratch_pool) +{ + resolve_task_t *task; + apr_status_t status; + SERF__DECLARE_STATIC_INIT_ONCE_CONTEXT(init_ctx); + + status = serf__init_once(&init_ctx, init_work_queue, ctx); + if (status) + return status; + + task = apr_palloc(resolve_pool, sizeof(*task)); + task->ctx = ctx; + task->host_info = host_info; + task->resolved = resolved; + task->resolved_baton = resolved_baton; + task->resolve_pool = resolve_pool; + return apr_thread_pool_push(work_queue, resolve, task, + APR_THREAD_TASK_PRIORITY_NORMAL, + (void*)ctx); +} + +/* This is a no-op since we're using a thread pool that + does its own task queue management. */ +static void run_async_resolver_loop(void) {} + +#endif /* !APR_HAS_THREADS */ +#endif /* !SERF_USE_ASYNC_RESOLVER */ + + +/*******************************************************************/ +/* The result queue implementation. */ +#if HAVE_ASYNC_RESOLVER + +static apr_status_t lock_results(serf_context_t *ctx) +{ +#if APR_HAS_THREADS + apr_status_t status = apr_thread_mutex_lock(ctx->resolve_guard); + if (status) { + /* TODO: ctx->error_callback... */ + char buffer[256]; + serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config, + "Lock async resolve results: %s\n", + apr_strerror(status, buffer, sizeof(buffer))); + } + return status; +#else + return APR_SUCCESS; +#endif +} + +static apr_status_t unlock_results(serf_context_t *ctx) +{ +#if APR_HAS_THREADS + apr_status_t status = apr_thread_mutex_unlock(ctx->resolve_guard); + if (status) { + /* TODO: ctx->error_callback... */ + char buffer[256]; + serf__log(LOGLVL_ERROR, LOGCOMP_CONN, __FILE__, ctx->config, + "Unlock async resolve results: %s\n", + apr_strerror(status, buffer, sizeof(buffer))); + } + return status; +#else + return APR_SUCCESS; +#endif +} + + +static void push_resolve_result(serf_context_t *ctx, + apr_sockaddr_t *host_address, + apr_status_t status, + serf_address_resolved_t resolved, + void *resolved_baton, + apr_pool_t *resolve_pool) +{ + serf__resolve_result_t *result; + apr_status_t lock_status; + + result = apr_palloc(resolve_pool, sizeof(*result)); + result->host_address = host_address; + result->status = status; + result->resolved = resolved; + result->resolved_baton = resolved_baton; + result->result_pool = resolve_pool; + + lock_status = lock_results(ctx); + if (!lock_status) + { + result->next = ctx->resolve_head; + ctx->resolve_head = result; + lock_status = unlock_results(ctx); + } + + /* TODO: if (lock_status) ... then what? */ +} + + +/* Internal API */ +apr_status_t serf__process_async_resolve_results(serf_context_t *ctx) +{ + serf__resolve_result_t *result = NULL; + apr_status_t lock_status; + + run_async_resolver_loop(); + + lock_status = lock_results(ctx); + if (lock_status) + return lock_status; + + result = ctx->resolve_head; + ctx->resolve_head = NULL; + lock_status = unlock_results(ctx); + + /* TODO: if (lock_status) ... then what? Shouldn't be possible. */ + /* if (lock_status) */ + /* return lock_status; */ + + while (result) + { + serf__resolve_result_t *const next = result->next; + result->resolved(ctx, result->resolved_baton, + result->host_address, result->status, + result->result_pool); + apr_pool_destroy(result->result_pool); + result = next; + } + return APR_SUCCESS; +} + +#else /* !HAVE_ASYNC_RESOLVER */ + +/* Internal API */ +apr_status_t serf__process_async_resolve_results(serf_context_t *ctx) +{ + /* The fallback is a no-op, the context should just continue to + work without an asynchronous resolver. */ + return APR_SUCCESS; +} + +#endif /* !HAVE_ASYNC_RESOLVER */ Propchange: serf/trunk/src/resolve.c ------------------------------------------------------------------------------ svn:eol-style = native Modified: serf/trunk/test/test_context.c URL: http://svn.apache.org/viewvc/serf/trunk/test/test_context.c?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/test/test_context.c (original) +++ serf/trunk/test/test_context.c Tue Jul 15 20:59:40 2025 @@ -1017,6 +1017,48 @@ static void test_outgoing_request_err(Cu CuAssertIntEquals(tc, 0, tb->handled_requests->nelts); } +/* Test that asynchronus name resolution happens. */ +static void test_async_resolve(CuTest *tc) +{ + test_baton_t *tb = tc->testBaton; + apr_status_t status; + handler_baton_t handler_ctx[2]; + const int num_requests = sizeof(handler_ctx)/sizeof(handler_ctx[0]); + int i; + + /* Set up a test context with a server */ + setup_test_mock_server(tb); + status = setup_test_client_context(tb, NULL, tb->pool); + CuAssertIntEquals(tc, APR_SUCCESS, status); + + Given(tb->mh) + DefaultResponse(WithCode(200), WithRequestBody) + + GETRequest(URLEqualTo("/"), ChunkedBodyEqualTo("1")) + GETRequest(URLEqualTo("/"), ChunkedBodyEqualTo("2")) + EndGiven + + status = use_new_async_connection(tb, tb->pool); + CuAssertIntEquals(tc, APR_SUCCESS, status); + + while (!tb->connection && tb->user_status == APR_SUCCESS) { + status = serf_context_run(tb->context, 70, tb->pool); + if (!APR_STATUS_IS_TIMEUP(status)) + CuAssertIntEquals(tc, APR_SUCCESS, status); + } + CuAssertPtrNotNull(tc, tb->connection); + CuAssertIntEquals(tc, APR_SUCCESS, tb->user_status); + + /* Send some requests on the connections */ + for (i = 0 ; i < num_requests ; i++) { + create_new_request(tb, &handler_ctx[i], "GET", "/", i+1); + } + + run_client_and_mock_servers_loops_expect_ok(tc, tb, num_requests, + handler_ctx, tb->pool); +} + + /*****************************************************************************/ CuSuite *test_context(void) { @@ -1043,6 +1085,6 @@ CuSuite *test_context(void) SUITE_ADD_TEST(suite, test_connection_large_request); SUITE_ADD_TEST(suite, test_max_keepalive_requests); SUITE_ADD_TEST(suite, test_outgoing_request_err); - + SUITE_ADD_TEST(suite, test_async_resolve); return suite; } Modified: serf/trunk/test/test_serf.h URL: http://svn.apache.org/viewvc/serf/trunk/test/test_serf.h?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/test/test_serf.h (original) +++ serf/trunk/test/test_serf.h Tue Jul 15 20:59:40 2025 @@ -120,6 +120,8 @@ apr_status_t default_https_conn_setup(ap apr_status_t use_new_connection(test_baton_t *tb, apr_pool_t *pool); +apr_status_t use_new_async_connection(test_baton_t *tb, + apr_pool_t *pool); void *test_setup(void *baton); void *test_teardown(void *baton); Modified: serf/trunk/test/test_util.c URL: http://svn.apache.org/viewvc/serf/trunk/test/test_util.c?rev=1927249&r1=1927248&r2=1927249&view=diff ============================================================================== --- serf/trunk/test/test_util.c (original) +++ serf/trunk/test/test_util.c Tue Jul 15 20:59:40 2025 @@ -142,6 +142,72 @@ apr_status_t use_new_connection(test_bat return status; } +struct async_reolved_baton +{ + test_baton_t *tb; + apr_uri_t url; +}; + +static void address_resolved(serf_context_t *ctx, + void *resolved_baton, + apr_sockaddr_t *host_address, + apr_status_t status, + apr_pool_t *unused_scratch_pool) +{ + struct async_reolved_baton *baton = resolved_baton; + test_baton_t *tb = baton->tb; + serf_connection_t *conn; + apr_pool_t *conn_pool = tb->pool; + + if (tb->context != ctx) + REPORT_TEST_SUITE_ERROR(); + + if (status == APR_SUCCESS) + { + status = apr_sockaddr_info_copy(&host_address, host_address, conn_pool); + if (status == APR_SUCCESS) + status = serf_connection_create3(&conn, ctx, + baton->url, + host_address, + tb->conn_setup, + tb, + default_closed_connection, + tb, + conn_pool); + if (status == APR_SUCCESS) + { + tb->connection = conn; + apr_pool_cleanup_register(conn_pool, tb->connection, cleanup_conn, + apr_pool_cleanup_null); + } + } + + tb->user_status = status; +} + +apr_status_t use_new_async_connection(test_baton_t *tb, + apr_pool_t *pool) +{ + apr_uri_t url; + apr_status_t status; + struct async_reolved_baton *baton; + + if (tb->connection) + cleanup_conn(tb->connection); + tb->connection = NULL; + + status = apr_uri_parse(pool, tb->serv_url, &url); + if (status != APR_SUCCESS) + return status; + + baton = apr_palloc(pool, sizeof(*baton)); + baton->tb = tb; + baton->url = url; + tb->user_status = APR_SUCCESS; + return serf_address_resolve_async(tb->context, url, + address_resolved, baton, pool); +} + static test_baton_t *initTestCtx(CuTest *tc, apr_pool_t *pool) { test_baton_t *tb; @@ -513,7 +579,7 @@ run_client_and_mock_servers_loops(test_b /* run server event loop */ err = mhRunServerLoop(mh); - /* Even if the mock server returned an error, it may have written + /* Even if the mock server returned an error, it may have written something to the client. So process that data first, handle the error later. */ @@ -534,7 +600,7 @@ run_client_and_mock_servers_loops(test_b return REPORT_TEST_SUITE_ERROR(); } apr_pool_destroy(iter_pool); - + return APR_SUCCESS; }