This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new ea0985f64 PROTON-2812: Implement async name lookup with c-ares
ea0985f64 is described below
commit ea0985f643c6019050a40f69753f4cceaeccb9b2
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Jan 30 02:15:32 2026 -0500
PROTON-2812: Implement async name lookup with c-ares
* TODO: Would be useful to implement a specific async name lookup call
with a specific proactor event signalling the completed lookup for use
in code that just provides us with fds.
---
.github/workflows/build.yml | 2 +-
azure-pipelines/azure-pipelines.yml | 2 +-
c/CMakeLists.txt | 8 +
c/src/proactor/epoll-internal.h | 16 +-
c/src/proactor/epoll.c | 105 +++++---
c/src/proactor/epoll_name_lookup.h | 57 +++++
c/src/proactor/epoll_name_lookup_async.c | 397 +++++++++++++++++++++++++++++++
c/src/proactor/epoll_name_lookup_sync.c | 60 +++++
c/src/proactor/epoll_raw_connection.c | 40 +++-
c/tests/proactor_test.cpp | 76 +++++-
10 files changed, 706 insertions(+), 57 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 5db098808..fc2d00b22 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -45,7 +45,7 @@ jobs:
- name: Install Linux dependencies
if: runner.os == 'Linux'
run: |
- sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
softhsm2 opensc
+ sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
libc-ares-dev softhsm2 opensc
- name: Install Windows dependencies
if: runner.os == 'Windows'
run: |
diff --git a/azure-pipelines/azure-pipelines.yml
b/azure-pipelines/azure-pipelines.yml
index e106971f1..f3ebda958 100644
--- a/azure-pipelines/azure-pipelines.yml
+++ b/azure-pipelines/azure-pipelines.yml
@@ -36,7 +36,7 @@ jobs:
steps:
- script: |
sudo apt-get update
- sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
+ sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
libc-ares-dev
name: InstallExtraStuff
- template: steps.yml
- job: MacOS
diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index b214126a2..33930c3c3 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -368,6 +368,14 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT
BUILD_PROACTOR))
set (PROACTOR_OK epoll)
set (qpid-proton-proactor src/proactor/epoll.c
src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c
${qpid-proton-proactor-common})
set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
+ find_package(c-ares 1.16 CONFIG)
+ option(ENABLE_ASYNC_DNS "Enable async DNS lookups (Using c-ares)"
${c-ares_FOUND})
+ if (ENABLE_ASYNC_DNS)
+ list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_async.c)
+ list(APPEND PROACTOR_LIBS c-ares::cares)
+ else()
+ list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_sync.c)
+ endif()
endif()
endif()
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 550324ccd..5a6a37ac3 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -61,7 +61,8 @@ typedef enum {
LISTENER_IO,
PCONNECTION_IO,
RAW_CONNECTION_IO,
- TIMER
+ TIMER,
+ NAME_LOOKUP_EPOLL /* Inner epoll for async name lookup (e.g. c-ares) */
} epoll_type_t;
// Data to use with epoll.
@@ -78,7 +79,8 @@ typedef enum {
PCONNECTION,
LISTENER,
RAW_CONNECTION,
- TIMER_MANAGER
+ TIMER_MANAGER,
+ NAME_LOOKUP
} task_type_t;
typedef struct task_t {
@@ -142,9 +144,16 @@ typedef struct pni_timer_manager_t {
bool sched_timeout;
} pni_timer_manager_t;
+typedef struct pname_lookup_t {
+ task_t task;
+ epoll_extended_t epoll_name_lookup;
+ void *impl; /* NULL for sync; for async: implementation context */
+} pname_lookup_t;
+
struct pn_proactor_t {
task_t task;
pni_timer_manager_t timer_manager;
+ pname_lookup_t name_lookup;
epoll_extended_t epoll_schedule; /* ready list */
epoll_extended_t epoll_interrupt;
pn_event_batch_t batch;
@@ -364,7 +373,7 @@ bool proactor_remove(task_t *tsk);
bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state,
tslot_t **resume_thread);
void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p);
-static void task_finalize(task_t* tsk) {
+static inline void task_finalize(task_t* tsk) {
pmutex_finalize(&tsk->mutex);
}
@@ -377,7 +386,6 @@ bool start_polling(epoll_extended_t *ee, int epollfd);
void stop_polling(epoll_extended_t *ee, int epollfd);
void rearm_polling(epoll_extended_t *ee, int epollfd);
-int pgetaddrinfo(const char *host, const char *port, int flags, struct
addrinfo **res);
void configure_socket(int sock);
accepted_t *listener_accepted_next(pn_listener_t *listener);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 52068c59a..6cc0d38f0 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -63,6 +63,7 @@
#undef _GNU_SOURCE
#include "epoll-internal.h"
+#include "epoll_name_lookup.h"
#include "proactor-internal.h"
#include "core/engine-internal.h"
#include "core/logger_private.h"
@@ -632,6 +633,10 @@ static inline pn_listener_t *task_listener(task_t *t) {
return t->type == LISTENER ? containerof(t, pn_listener_t, task) : NULL;
}
+static inline pname_lookup_t *task_name_lookup(task_t *t) {
+ return t->type == NAME_LOOKUP ? containerof(t, pname_lookup_t, task) : NULL;
+}
+
static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
@@ -1408,15 +1413,6 @@ static void pconnection_maybe_connect_lh(pconnection_t
*pc) {
pc->disconnected = true;
}
-int pgetaddrinfo(const char *host, const char *port, int flags, struct
addrinfo **res)
-{
- // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
- struct addrinfo hints = { 0 };
- hints.ai_family = AF_UNSPEC;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
- return getaddrinfo(host, port, &hints, res);
-}
static inline bool is_inactive(pn_proactor_t *p) {
return (!p->tasks && !p->disconnects_pending && !p->timeout_set &&
!p->shutting_down);
@@ -1431,23 +1427,39 @@ bool schedule_if_inactive(pn_proactor_t *p) {
return false;
}
+/* Called when connection name lookup completes (from name_lookup done_cb).
Call with task lock held. */
+static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai,
int gai_error) {
+ pn_proactor_t *p = pc->task.proactor;
+ bool notify = false;
+ if (gai_error) {
+ psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+ } else if (ai) {
+ pc->addrinfo = ai;
+ pc->ai = ai;
+ pconnection_maybe_connect_lh(pc);
+ if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect &&
!pni_task_wake_pending(&pc->task)) {
+ return;
+ }
+ }
+ notify = schedule(&pc->task);
+ if (notify) notify_poller(p);
+}
+
+static void connection_done_cb(void *user_data, struct addrinfo *ai, int
gai_error) {
+ pconnection_t *pc = (pconnection_t *)user_data;
+ lock(&pc->task.mutex);
+ connection_lookup_done_lh(pc, ai, gai_error);
+ unlock(&pc->task.mutex);
+}
+
// Call from pconnection_process with task lock held.
// Return true if the socket is connecting and there are no Proton events to
deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
+ pn_proactor_t *p = pc->task.proactor;
unlock(&pc->task.mutex);
- // TODO: move this step to a separate worker thread that scales in response
to multiple blocking DNS lookups.
- int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
+ bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc,
connection_done_cb);
lock(&pc->task.mutex);
-
- if (!gai_error) {
- pc->ai = pc->addrinfo;
- pconnection_maybe_connect_lh(pc); /* Start connection attempts */
- if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect &&
!pni_task_wake_pending(&pc->task))
- return true;
- } else {
- psocket_gai_error(&pc->psocket, gai_error, "connect to ");
- }
- return false;
+ return rc;
}
void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t
*t, const char *addr) {
@@ -1579,7 +1591,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t
*l, const char *addr, in
pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);
struct addrinfo *addrinfo = NULL;
- int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
+ int gai_err = pni_name_lookup_blocking(l->host, l->port, AI_PASSIVE |
AI_ALL, &addrinfo);
if (!gai_err) {
/* Count addresses, allocate enough space for sockets */
size_t len = 0;
@@ -2021,23 +2033,27 @@ pn_proactor_t *pn_proactor(void) {
if ((p->epollfd = epoll_create(1)) >= 0) {
if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
- if (pni_timer_manager_init(&p->timer_manager))
- if ((p->collector = pn_collector()) != NULL) {
- p->batch.next_event = &proactor_batch_next;
- start_polling(&p->timer_manager.epoll_timer, p->epollfd); //
TODO: check for error
- epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd,
true);
- epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd,
p->epollfd, false);
- p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
- grow_poller_bufs(p);
- p->ready_list_generation = 1;
- return p;
+ if (pni_timer_manager_init(&p->timer_manager)) {
+ if (pni_name_lookup_init(&p->name_lookup, p)) {
+ if ((p->collector = pn_collector()) != NULL) {
+ p->batch.next_event = &proactor_batch_next;
+ start_polling(&p->timer_manager.epoll_timer, p->epollfd); //
TODO: check for error
+ epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd,
true);
+ epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd,
p->epollfd, false);
+ p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
+ grow_poller_bufs(p);
+ p->ready_list_generation = 1;
+ return p;
+ }
}
+ }
}
}
}
if (p->epollfd >= 0) close(p->epollfd);
if (p->eventfd >= 0) close(p->eventfd);
if (p->interruptfd >= 0) close(p->interruptfd);
+ pni_name_lookup_cleanup(&p->name_lookup, p);
pni_timer_manager_finalize(&p->timer_manager);
pmutex_finalize(&p->timeout_mutex);
pmutex_finalize(&p->tslot_mutex);
@@ -2071,11 +2087,15 @@ void pn_proactor_free(pn_proactor_t *p) {
case RAW_CONNECTION:
pni_raw_connection_forced_shutdown(pni_task_raw_connection(tsk));
break;
+ case NAME_LOOKUP:
+ pni_name_lookup_forced_shutdown(task_name_lookup(tsk));
+ break;
default:
break;
}
}
+ pni_name_lookup_cleanup(&p->name_lookup, p);
pni_timer_manager_finalize(&p->timer_manager);
pn_collector_free(p->collector);
pmutex_finalize(&p->timeout_mutex);
@@ -2309,6 +2329,11 @@ static pn_event_batch_t *process(task_t *tsk) {
batch = pni_timer_manager_process(tm, timeout, tsk_ready);
break;
}
+ case NAME_LOOKUP:
+ unlock(&p->sched_mutex);
+ pni_name_lookup_process_events(&p->name_lookup);
+ batch = NULL;
+ break;
default:
assert(NULL);
}
@@ -2350,6 +2375,11 @@ static task_t *post_event(pn_proactor_t *p, struct
epoll_event *evp) {
}
// else if (ee->fd == p->eventfd)... schedule_ready_list already performed
by poller task.
break;
+ case NAME_LOOKUP_EPOLL: {
+ tsk = &p->name_lookup.task;
+ tsk->sched_pending = true;
+ break;
+ }
case PCONNECTION_IO: {
psocket_t *ps = containerof(ee, psocket_t, epoll_io);
pconnection_t *pc = psocket_pconnection(ps);
@@ -2581,11 +2611,10 @@ static bool poller_do_epoll(struct pn_proactor_t* p,
tslot_t *ts, bool can_block
unlock(&p->eventfd_mutex);
}
- int timeout = (epoll_immediate) ? 0 : -1;
- p->poller_suspended = (timeout == -1);
+ p->poller_suspended = !epoll_immediate;
unlock(&p->sched_mutex);
- n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity,
timeout);
+ n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity,
epoll_immediate ? 0 : -1);
lock(&p->sched_mutex);
p->poller_suspended = false;
@@ -2612,8 +2641,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p,
tslot_t *ts, bool can_block
unlock(&p->eventfd_mutex);
if (n_events < 0) {
- if (errno != EINTR)
+ if (errno != EINTR) {
perror("epoll_wait"); // TODO: proper log
+ }
if (!can_block && !unpolled_work)
return true;
else
@@ -2622,8 +2652,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p,
tslot_t *ts, bool can_block
if (!can_block && !unpolled_work)
return true;
else {
- if (!epoll_immediate)
+ if (!epoll_immediate) {
perror("epoll_wait unexpected timeout"); // TODO: proper log
+ }
if (!unpolled_work)
continue;
}
diff --git a/c/src/proactor/epoll_name_lookup.h
b/c/src/proactor/epoll_name_lookup.h
new file mode 100644
index 000000000..032fd77e3
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef PROACTOR_NAME_LOOKUP_H
+#define PROACTOR_NAME_LOOKUP_H
+
+#include "epoll-internal.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Blocking name lookup - used by listeners and as fallback for connections.
+ * Same signature as getaddrinfo.
+ */
+int pni_name_lookup_blocking(const char *host, const char *port, int flags,
struct addrinfo **res);
+
+/* Initialize name lookup subsystem. Returns false if error. */
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p);
+
+/* Cleanup name lookup subsystem. */
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p);
+
+/* Callback when lookup completes. Called with (user_data, addrinfo or NULL,
gai_error). */
+typedef void (*pni_nl_done_cb)(void *user_data, struct addrinfo *ai, int
gai_error);
+
+/* Start a name lookup. Returns true if started successfully. */
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char
*port, void *user_data, pni_nl_done_cb done_cb);
+
+/* Clear lookup state when forcing shutdown. */
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl);
+
+/* Process events. */
+void pni_name_lookup_process_events(pname_lookup_t *nl);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROACTOR_NAME_LOOKUP_H */
diff --git a/c/src/proactor/epoll_name_lookup_async.c
b/c/src/proactor/epoll_name_lookup_async.c
new file mode 100644
index 000000000..c7a51abcd
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup_async.c
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Asynchronous name lookup using c-ares. */
+
+#include "epoll_name_lookup.h"
+
+#include <limits.h>
+#include <netdb.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/epoll.h>
+// Ares 1.27 doesn't include sys/select.h in ares.h but uses fd_set
+// So even though we don't use that we still need to include it here
+// before we include ares.h
+#include <sys/select.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+
+#include <ares.h>
+
+// 0x012200 corresponds to version 1.34 of c-ares which added ares_process_fds
+#if ARES_VERSION >= 0x012200
+# define CARES_HAVE_PROCESS_FDS 1
+#else
+# define CARES_HAVE_PROCESS_FDS 0
+#endif
+
+
+/* Opaque context for async name lookup. */
+typedef struct pni_nl_async_ctx {
+ pmutex pending_mutex;
+ struct ares_channeldata *channel;
+ struct pni_nl_request *pending_head;
+ int timerfd; /* In inner epoll; fires for ares timeout, -1 when not in use
*/
+} pni_nl_async_ctx_t;
+
+static void nl_timerfd_set(int fd, uint64_t t_millis)
+{
+ struct itimerspec newt;
+ memset(&newt, 0, sizeof(newt));
+ if (t_millis > 0) {
+ newt.it_value.tv_sec = t_millis / 1000;
+ newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
+ }
+ timerfd_settime(fd, 0, &newt, NULL);
+}
+
+static void nl_timerfd_drain(int fd)
+{
+ uint64_t result = 0;
+ __attribute__((unused)) ssize_t r = read(fd, &result, sizeof(result));
+}
+
+/* Set timerfd from ares next deadline; call after start or process_events. */
+static void nl_update_ares_timer(pname_lookup_t *nl)
+{
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+ if (!ctx || ctx->timerfd < 0) return;
+ struct timeval tv;
+ if (ares_timeout(ctx->channel, NULL, &tv) != NULL) {
+ long ares_ms = (long)tv.tv_sec * 1000 + (int)(tv.tv_usec / 1000);
+ if (ares_ms > 0 && ares_ms <= INT_MAX)
+ nl_timerfd_set(ctx->timerfd, (uint64_t)ares_ms);
+ else
+ nl_timerfd_set(ctx->timerfd, 0);
+ } else {
+ nl_timerfd_set(ctx->timerfd, 0);
+ }
+}
+
+typedef struct pni_nl_request {
+ pni_nl_async_ctx_t *ctx;
+ void *user_data;
+ pni_nl_done_cb done_cb;
+ struct pni_nl_request *next;
+} pni_nl_request_t;
+
+static void nl_sock_state_cb(void *data, ares_socket_t socket_fd, int
readable, int writable)
+{
+ pname_lookup_t *nl = (pname_lookup_t *)data;
+ int epfd = nl->epoll_name_lookup.fd;
+ if (epfd < 0) return;
+ struct epoll_event ev;
+ ev.data.fd = socket_fd;
+ int events = 0;
+ if (readable) events |= EPOLLIN;
+ if (writable) events |= EPOLLOUT;
+
+ if (readable || writable) {
+ ev.events = events;
+ if (epoll_ctl(epfd, EPOLL_CTL_MOD, socket_fd, &ev) < 0) {
+ epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
+ }
+ } else {
+ epoll_ctl(epfd, EPOLL_CTL_DEL, socket_fd, NULL);
+ }
+}
+
+static struct addrinfo *nl_ares_addrinfo_to_addrinfo(struct ares_addrinfo
*result)
+{
+ struct addrinfo *ai_head = NULL;
+ struct addrinfo *ai_prev = NULL;
+ for (struct ares_addrinfo_node *node = result->nodes; node != NULL; node =
node->ai_next) {
+ struct addrinfo *ai = (struct addrinfo *)calloc(1, sizeof(struct
addrinfo)+node->ai_addrlen);
+ if (!ai) break;
+ ai->ai_family = node->ai_family;
+ ai->ai_socktype = node->ai_socktype;
+ ai->ai_protocol = node->ai_protocol;
+ ai->ai_flags = node->ai_flags;
+ ai->ai_addrlen = node->ai_addrlen;
+ ai->ai_addr = (struct sockaddr *)(ai + 1);
+ memcpy(ai->ai_addr, node->ai_addr, node->ai_addrlen);
+ ai->ai_next = NULL;
+ if (ai_prev) {
+ ai_prev->ai_next = ai;
+ } else {
+ ai_head = ai;
+ }
+ ai_prev = ai;
+ }
+ return ai_head;
+}
+
+static int nl_ares_status_to_gai(int status)
+{
+ switch (status) {
+ case ARES_ENOTFOUND:
+ case ARES_ESERVICE:
+ return EAI_NONAME;
+ case ARES_ETIMEOUT:
+ return EAI_AGAIN;
+ case ARES_ESERVFAIL:
+ case ARES_ECANCELLED:
+ case ARES_EDESTRUCTION:
+ return EAI_FAIL;
+ case ARES_ENOMEM:
+ return EAI_MEMORY;
+ default:
+ return EAI_NONAME;
+ }
+}
+
+static void nl_ares_lookup_cb(void *data, int status, int timeouts, struct
ares_addrinfo *result)
+{
+ pni_nl_request_t *req = (pni_nl_request_t *)data;
+ pni_nl_async_ctx_t *ctx = req->ctx;
+
+ lock(&ctx->pending_mutex);
+ /* Unlink from pending list */
+ {
+ pni_nl_request_t **pp = &ctx->pending_head;
+ while (*pp && *pp != req) pp = &(*pp)->next;
+ if (*pp) *pp = req->next;
+ }
+ unlock(&ctx->pending_mutex);
+
+ struct addrinfo *ai = NULL;
+ int gai_error = EAI_NONAME;
+ if (status == ARES_SUCCESS && result) {
+ ai = nl_ares_addrinfo_to_addrinfo(result);
+ gai_error = ai ? 0 : EAI_MEMORY;
+ } else {
+ gai_error = nl_ares_status_to_gai(status);
+ }
+ if (result) {
+ ares_freeaddrinfo(result);
+ }
+ if (status != ARES_ECANCELLED && status != ARES_EDESTRUCTION) {
+ req->done_cb(req->user_data, ai, gai_error);
+ }
+ free(req);
+}
+
+int pni_name_lookup_blocking(const char *host, const char *port, int flags,
struct addrinfo **res)
+{
+ struct addrinfo hints = { 0 };
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+ return getaddrinfo(host, port, &hints, res);
+}
+
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p)
+{
+ task_init(&nl->task, NAME_LOOKUP, p);
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)calloc(1, sizeof(*ctx));
+ if (!ctx) return false;
+ nl->impl = ctx;
+ pmutex_init(&ctx->pending_mutex);
+ ctx->pending_head = NULL;
+
+ if (ares_library_init(ARES_LIB_INIT_ALL) != ARES_SUCCESS) {
+ free(ctx);
+ nl->impl = NULL;
+ return false;
+ }
+ if ((nl->epoll_name_lookup.fd = epoll_create1(0)) < 0) {
+ ares_library_cleanup();
+ return false;
+ }
+ struct ares_options options = {
+ .sock_state_cb = nl_sock_state_cb,
+ .sock_state_cb_data = nl
+ };
+ if (ares_init_options(&ctx->channel, &options, ARES_OPT_SOCK_STATE_CB) !=
ARES_SUCCESS) {
+ close(nl->epoll_name_lookup.fd);
+ nl->epoll_name_lookup.fd = -1;
+ ares_library_cleanup();
+ return false;
+ }
+ ctx->timerfd = -1;
+ int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+ if (tfd >= 0) {
+ struct epoll_event ev = { .events = EPOLLIN, .data.fd = tfd };
+ if (epoll_ctl(nl->epoll_name_lookup.fd, EPOLL_CTL_ADD, tfd, &ev) == 0) {
+ ctx->timerfd = tfd;
+ } else {
+ close(tfd);
+ return false;
+ }
+ } else {
+ return false;
+ }
+ /* Register inner epoll with main proactor epoll (fd already in
nl->epoll_name_lookup) */
+ nl->epoll_name_lookup.type = NAME_LOOKUP_EPOLL;
+ nl->epoll_name_lookup.wanted = EPOLLIN;
+ nl->epoll_name_lookup.polling = false;
+ pmutex_init(&nl->epoll_name_lookup.barrier_mutex);
+ if (!start_polling(&nl->epoll_name_lookup, p->epollfd)) {
+ if (ctx->timerfd >= 0) {
+ close(ctx->timerfd);
+ ctx->timerfd = -1;
+ }
+ ares_destroy(ctx->channel);
+ pmutex_finalize(&nl->epoll_name_lookup.barrier_mutex);
+ close(nl->epoll_name_lookup.fd);
+ nl->epoll_name_lookup.fd = -1;
+ ares_library_cleanup();
+ free(ctx);
+ nl->impl = NULL;
+ return false;
+ }
+ nl_update_ares_timer(nl);
+ return true;
+}
+
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p)
+{
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+ if (!ctx) return;
+ if (nl->epoll_name_lookup.fd >= 0) {
+ stop_polling(&nl->epoll_name_lookup, p->epollfd);
+ if (ctx->timerfd >= 0) {
+ close(ctx->timerfd);
+ ctx->timerfd = -1;
+ }
+ ares_cancel(ctx->channel);
+ ares_destroy(ctx->channel);
+ pmutex_finalize(&nl->epoll_name_lookup.barrier_mutex);
+ close(nl->epoll_name_lookup.fd);
+ nl->epoll_name_lookup.fd = -1;
+ ares_library_cleanup();
+ }
+ /* Free requests still on pending list */
+ lock(&ctx->pending_mutex);
+ while (ctx->pending_head) {
+ pni_nl_request_t *req = ctx->pending_head;
+ ctx->pending_head = req->next;
+ free(req);
+ }
+ unlock(&ctx->pending_mutex);
+ free(ctx);
+ nl->impl = NULL;
+}
+
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char
*port, void *user_data, pni_nl_done_cb done_cb)
+{
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+
+ if (!ctx || nl->epoll_name_lookup.fd < 0 || !done_cb)
+ return false;
+ /* ares can't cope with a NULL host; do a blocking lookup for that case. */
+ if (host == NULL) {
+ struct addrinfo *res = NULL;
+ int gai_error = pni_name_lookup_blocking(host, port, 0, &res);
+ done_cb(user_data, res, gai_error);
+ return gai_error == 0;
+ }
+ pni_nl_request_t *req = (pni_nl_request_t *)malloc(sizeof(*req));
+ if (!req) {
+ done_cb(user_data, NULL, EAI_MEMORY);
+ return false;
+ }
+
+ lock(&ctx->pending_mutex);
+ *req = (pni_nl_request_t){
+ .ctx = ctx,
+ .user_data = user_data,
+ .done_cb = done_cb,
+ .next = ctx->pending_head,
+ };
+ ctx->pending_head = req;
+ unlock(&ctx->pending_mutex);
+
+ struct ares_addrinfo_hints hints = {
+ .ai_flags = ARES_AI_V4MAPPED | ARES_AI_ADDRCONFIG,
+ .ai_family = AF_UNSPEC,
+ .ai_socktype = SOCK_STREAM,
+ };
+ ares_getaddrinfo(ctx->channel, host, port, &hints, nl_ares_lookup_cb, req);
+ nl_update_ares_timer(nl);
+ return true;
+}
+
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl)
+{
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+ if (!ctx) return;
+ lock(&ctx->pending_mutex);
+ pni_nl_request_t **pp = &ctx->pending_head;
+ while (*pp) {
+ pni_nl_request_t *req = *pp;
+ *pp = req->next;
+ free(req);
+ pp = &(*pp)->next;
+ }
+ unlock(&ctx->pending_mutex);
+}
+
+void pni_name_lookup_process_events(pname_lookup_t *nl)
+{
+ int epfd = nl->epoll_name_lookup.fd;
+ if (epfd < 0) return;
+ pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+ if (!ctx) return;
+ struct epoll_event events[10];
+ int n = epoll_wait(epfd, events, 10, 0);
+ if (n < 0) return;
+ int n_fds = 0;
+ bool timer_fired = false;
+#if CARES_HAVE_PROCESS_FDS
+ ares_fd_events_t fd_events[10];
+ for (int j = 0; j < n && n_fds < 10; j++) {
+ int fd = events[j].data.fd;
+ if (fd == ctx->timerfd) {
+ nl_timerfd_drain(ctx->timerfd);
+ timer_fired = true;
+ } else {
+ fd_events[n_fds].fd = fd;
+ fd_events[n_fds].events = 0;
+ if (events[j].events & EPOLLIN) fd_events[n_fds].events |=
ARES_FD_EVENT_READ;
+ if (events[j].events & EPOLLOUT) fd_events[n_fds].events |=
ARES_FD_EVENT_WRITE;
+ n_fds++;
+ }
+ }
+ if (n_fds > 0)
+ ares_process_fds(ctx->channel, fd_events, n_fds, 0);
+ else if (timer_fired)
+ ares_process_fds(ctx->channel, NULL, 0, 0);
+#else
+ for (int j = 0; j < n; j++) {
+ int fd = events[j].data.fd;
+ if (fd == ctx->timerfd) {
+ nl_timerfd_drain(ctx->timerfd);
+ timer_fired = true;
+ } else {
+ ares_socket_t r = (events[j].events & EPOLLIN) ? fd : ARES_SOCKET_BAD;
+ ares_socket_t w = (events[j].events & EPOLLOUT) ? fd : ARES_SOCKET_BAD;
+ ares_process_fd(ctx->channel, r, w);
+ n_fds++;
+ }
+ }
+ if (timer_fired && n_fds == 0)
+ ares_process_fd(ctx->channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
+#endif
+ nl_update_ares_timer(nl);
+ if (nl->epoll_name_lookup.polling)
+ rearm_polling(&nl->epoll_name_lookup, nl->task.proactor->epollfd);
+}
diff --git a/c/src/proactor/epoll_name_lookup_sync.c
b/c/src/proactor/epoll_name_lookup_sync.c
new file mode 100644
index 000000000..ad2c2ee74
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup_sync.c
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Synchronous (blocking) name lookup using getaddrinfo. */
+
+#include "epoll_name_lookup.h"
+
+#include <netdb.h>
+#include <string.h>
+
+int pni_name_lookup_blocking(const char *host, const char *port, int flags,
struct addrinfo **res)
+{
+ struct addrinfo hints = { 0 };
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+ return getaddrinfo(host, port, &hints, res);
+}
+
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p)
+{
+ return true;
+}
+
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p)
+{
+}
+
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char
*port, void *user_data, pni_nl_done_cb done_cb)
+{
+ struct addrinfo *res = NULL;
+ int gai_error = pni_name_lookup_blocking(host, port, 0, &res);
+ done_cb(user_data, res, gai_error);
+ return gai_error == 0;
+}
+
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl)
+{
+}
+
+void pni_name_lookup_process_events(pname_lookup_t *nl)
+{
+}
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index 10870ffe7..1f056c85c 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -22,6 +22,7 @@
/* This is currently epoll implementation specific - and will need changing
for the other proactors */
#include "epoll-internal.h"
+#include "epoll_name_lookup.h"
#include "proactor-internal.h"
#include "raw_connection-internal.h"
@@ -139,6 +140,31 @@ static void
praw_connection_maybe_connect_lh(praw_connection_t *prc) {
prc->disconnected = true;
}
+/* Called when raw connection name lookup completes (from name_lookup
done_cb). Call with task lock held. */
+static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct
addrinfo *ai, int gai_error) {
+ pn_proactor_t *p = prc->task.proactor;
+ bool notify = false;
+ if (gai_error) {
+ psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
+ } else if (ai) {
+ prc->addrinfo = ai;
+ prc->ai = ai;
+ praw_connection_maybe_connect_lh(prc);
+ if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task)) {
+ return;
+ }
+ }
+ notify = schedule(&prc->task);
+ if (notify) notify_poller(p);
+}
+
+static void raw_connection_done_cb(void *user_data, struct addrinfo *ai, int
gai_error) {
+ praw_connection_t *prc = (praw_connection_t *)user_data;
+ lock(&prc->task.mutex);
+ raw_connection_lookup_done_lh(prc, ai, gai_error);
+ unlock(&prc->task.mutex);
+}
+
//
// Raw socket API
//
@@ -203,24 +229,16 @@ pn_raw_connection_t *pn_raw_connection(void) {
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
+ pn_proactor_t *p = prc->task.proactor;
unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
- // TODO: move this step to a separate worker thread that scales in response
to multiple blocking DNS lookups.
- int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+ bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc,
raw_connection_done_cb);
lock(&prc->task.mutex);
- if (!gai_error) {
- prc->ai = prc->addrinfo;
- praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
- if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
- return true;
- } else {
- psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
- }
- return false;
+ return rc;
}
void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const
char *addr) {
diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp
index f4c075fb6..8c1e4c9d7 100644
--- a/c/tests/proactor_test.cpp
+++ b/c/tests/proactor_test.cpp
@@ -33,7 +33,6 @@
#include <proton/transport.h>
#include <string.h>
-
#include <filesystem>
using namespace pn_test;
@@ -145,6 +144,77 @@ TEST_CASE("proactor_connect") {
REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
}
+/* Connect using hostname "localhost" explicitly - exercises name resolution.
*/
+TEST_CASE("proactor_connect_localhost") {
+ close_on_open_handler h;
+ proactor p(&h);
+ pn_listener_t *l = p.listen(":0", &h);
+ REQUIRE_RUN(p, PN_LISTENER_OPEN);
+ p.connect("localhost:" + listening_port(l));
+ REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+ REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+}
+
+/* Close a connection before it has opened - verify clean shutdown. */
+TEST_CASE("proactor_connect_close_before_open") {
+ struct handler : public close_on_open_handler {
+ bool transport_closed_ = false;
+ bool listener_close_ = false;
+ bool proactor_inactive_ = false;
+ bool handle(pn_event_t *e) override {
+ // Override the parent handler so we can note the events
+ // but just run to the end of the batch.
+ switch (pn_event_type(e)) {
+ case PN_TRANSPORT_ERROR:
+ return false;
+ case PN_TRANSPORT_CLOSED:
+ transport_closed_ = true;
+ return false;
+ case PN_LISTENER_CLOSE:
+ listener_close_ = true;
+ return false;
+ case PN_PROACTOR_INACTIVE:
+ proactor_inactive_ = true;
+ return false;
+ default:
+ return common_handler::handle(e);
+ }
+ }
+ } h;
+ proactor p(&h);
+ pn_listener_t *l = p.listen();
+ REQUIRE_RUN(p, PN_LISTENER_OPEN);
+ pn_connection_t *c = p.connect("localhost:" + listening_port(l), &h);
+ REQUIRE(c != nullptr);
+ pn_connection_close(c);
+ pn_listener_close(l);
+ REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+ REQUIRE(h.transport_closed_);
+ REQUIRE(h.listener_close_);
+ REQUIRE(h.proactor_inactive_);
+}
+
+/* Failing name lookup - connect to invalid hostname. */
+TEST_CASE("proactor_name_lookup_fails") {
+ common_handler h;
+ proactor p(&h);
+ p.connect("nosuch.example.invalid:");
+ REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+ CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
+ REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+}
+
+/* Interrupt a connection during a lookup that will fail - verify clean
shutdown. */
+TEST_CASE("proactor_name_lookup_interrupted") {
+ common_handler h;
+ proactor p(&h);
+ pn_connection_t *c = p.connect("nosuch.example.com:", &h);
+ REQUIRE(c != nullptr);
+ pn_connection_close(c);
+ REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+ REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}
+
namespace {
/* Return on connection open, close and return on wake */
struct close_on_wake_handler : public common_handler {
@@ -318,13 +388,13 @@ TEST_CASE("proactor_errors") {
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
/* Invalid connect/listen host name */
- p.connect("nosuch.example.com:");
+ p.connect("nosuch.example.invalid:");
REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
- pn_proactor_listen(p, pn_listener(), "nosuch.example.com:", 1);
+ pn_proactor_listen(p, pn_listener(), "nosuch.example.invalid:", 1);
REQUIRE_RUN(p, PN_LISTENER_CLOSE);
CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]