This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new ea0985f64 PROTON-2812: Implement async name lookup with c-ares
ea0985f64 is described below

commit ea0985f643c6019050a40f69753f4cceaeccb9b2
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Jan 30 02:15:32 2026 -0500

    PROTON-2812: Implement async name lookup with c-ares
    
    * TODO: Would be useful to implement a specific async name lookup call
      with a specific proactor event signalling the completed lookup for use
      in code that just provides us with fds.
---
 .github/workflows/build.yml              |   2 +-
 azure-pipelines/azure-pipelines.yml      |   2 +-
 c/CMakeLists.txt                         |   8 +
 c/src/proactor/epoll-internal.h          |  16 +-
 c/src/proactor/epoll.c                   | 105 +++++---
 c/src/proactor/epoll_name_lookup.h       |  57 +++++
 c/src/proactor/epoll_name_lookup_async.c | 397 +++++++++++++++++++++++++++++++
 c/src/proactor/epoll_name_lookup_sync.c  |  60 +++++
 c/src/proactor/epoll_raw_connection.c    |  40 +++-
 c/tests/proactor_test.cpp                |  76 +++++-
 10 files changed, 706 insertions(+), 57 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 5db098808..fc2d00b22 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -45,7 +45,7 @@ jobs:
     - name: Install Linux dependencies
       if: runner.os == 'Linux'
       run: |
-        sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev 
softhsm2 opensc
+        sudo apt install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev 
libc-ares-dev softhsm2 opensc
     - name: Install Windows dependencies
       if: runner.os == 'Windows'
       run: |
diff --git a/azure-pipelines/azure-pipelines.yml 
b/azure-pipelines/azure-pipelines.yml
index e106971f1..f3ebda958 100644
--- a/azure-pipelines/azure-pipelines.yml
+++ b/azure-pipelines/azure-pipelines.yml
@@ -36,7 +36,7 @@ jobs:
   steps:
   - script: |
       sudo apt-get update
-      sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev
+      sudo apt-get install -y swig libpython3-dev libsasl2-dev libjsoncpp-dev 
libc-ares-dev
     name: InstallExtraStuff
   - template: steps.yml
 - job: MacOS
diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index b214126a2..33930c3c3 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -368,6 +368,14 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT 
BUILD_PROACTOR))
     set (PROACTOR_OK epoll)
     set (qpid-proton-proactor src/proactor/epoll.c 
src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c 
${qpid-proton-proactor-common})
     set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
+    find_package(c-ares 1.16 CONFIG)
+    option(ENABLE_ASYNC_DNS "Enable async DNS lookups (Using c-ares)" 
${c-ares_FOUND})
+    if (ENABLE_ASYNC_DNS)
+      list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_async.c)
+      list(APPEND PROACTOR_LIBS c-ares::cares)
+    else()
+      list(APPEND qpid-proton-proactor src/proactor/epoll_name_lookup_sync.c)
+    endif()
   endif()
 endif()
 
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 550324ccd..5a6a37ac3 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -61,7 +61,8 @@ typedef enum {
   LISTENER_IO,
   PCONNECTION_IO,
   RAW_CONNECTION_IO,
-  TIMER
+  TIMER,
+  NAME_LOOKUP_EPOLL  /* Inner epoll for async name lookup (e.g. c-ares) */
 } epoll_type_t;
 
 // Data to use with epoll.
@@ -78,7 +79,8 @@ typedef enum {
   PCONNECTION,
   LISTENER,
   RAW_CONNECTION,
-  TIMER_MANAGER
+  TIMER_MANAGER,
+  NAME_LOOKUP
 } task_type_t;
 
 typedef struct task_t {
@@ -142,9 +144,16 @@ typedef struct pni_timer_manager_t {
   bool sched_timeout;
 } pni_timer_manager_t;
 
+typedef struct pname_lookup_t {
+  task_t task;
+  epoll_extended_t epoll_name_lookup;
+  void *impl;   /* NULL for sync; for async: implementation context */
+} pname_lookup_t;
+
 struct pn_proactor_t {
   task_t task;
   pni_timer_manager_t timer_manager;
+  pname_lookup_t name_lookup;
   epoll_extended_t epoll_schedule;     /* ready list */
   epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
@@ -364,7 +373,7 @@ bool proactor_remove(task_t *tsk);
 bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, 
tslot_t **resume_thread);
 
 void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p);
-static void task_finalize(task_t* tsk) {
+static inline void task_finalize(task_t* tsk) {
   pmutex_finalize(&tsk->mutex);
 }
 
@@ -377,7 +386,6 @@ bool start_polling(epoll_extended_t *ee, int epollfd);
 void stop_polling(epoll_extended_t *ee, int epollfd);
 void rearm_polling(epoll_extended_t *ee, int epollfd);
 
-int pgetaddrinfo(const char *host, const char *port, int flags, struct 
addrinfo **res);
 void configure_socket(int sock);
 
 accepted_t *listener_accepted_next(pn_listener_t *listener);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 52068c59a..6cc0d38f0 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -63,6 +63,7 @@
 #undef _GNU_SOURCE
 
 #include "epoll-internal.h"
+#include "epoll_name_lookup.h"
 #include "proactor-internal.h"
 #include "core/engine-internal.h"
 #include "core/logger_private.h"
@@ -632,6 +633,10 @@ static inline pn_listener_t *task_listener(task_t *t) {
   return t->type == LISTENER ? containerof(t, pn_listener_t, task) : NULL;
 }
 
+static inline pname_lookup_t *task_name_lookup(task_t *t) {
+  return t->type == NAME_LOOKUP ? containerof(t, pname_lookup_t, task) : NULL;
+}
+
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch);
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch);
@@ -1408,15 +1413,6 @@ static void pconnection_maybe_connect_lh(pconnection_t 
*pc) {
   pc->disconnected = true;
 }
 
-int pgetaddrinfo(const char *host, const char *port, int flags, struct 
addrinfo **res)
-{
-  // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
-  struct addrinfo hints = { 0 };
-  hints.ai_family = AF_UNSPEC;
-  hints.ai_socktype = SOCK_STREAM;
-  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
-  return getaddrinfo(host, port, &hints, res);
-}
 
 static inline bool is_inactive(pn_proactor_t *p) {
   return (!p->tasks && !p->disconnects_pending && !p->timeout_set && 
!p->shutting_down);
@@ -1431,23 +1427,39 @@ bool schedule_if_inactive(pn_proactor_t *p) {
   return false;
 }
 
+/* Called when connection name lookup completes (from name_lookup done_cb). 
Call with task lock held. */
+static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai, 
int gai_error) {
+  pn_proactor_t *p = pc->task.proactor;
+  bool notify = false;
+  if (gai_error) {
+    psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+  } else if (ai) {
+    pc->addrinfo = ai;
+    pc->ai = ai;
+    pconnection_maybe_connect_lh(pc);
+    if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && 
!pni_task_wake_pending(&pc->task)) {
+      return;
+    }
+  }
+  notify = schedule(&pc->task);
+  if (notify) notify_poller(p);
+}
+
+static void connection_done_cb(void *user_data, struct addrinfo *ai, int 
gai_error) {
+  pconnection_t *pc = (pconnection_t *)user_data;
+  lock(&pc->task.mutex);
+  connection_lookup_done_lh(pc, ai, gai_error);
+  unlock(&pc->task.mutex);
+}
+
 // Call from pconnection_process with task lock held.
 // Return true if the socket is connecting and there are no Proton events to 
deliver.
 static bool pconnection_first_connect_lh(pconnection_t *pc) {
+  pn_proactor_t *p = pc->task.proactor;
   unlock(&pc->task.mutex);
-  // TODO: move this step to a separate worker thread that scales in response 
to multiple blocking DNS lookups.
-  int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
+  bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc, 
connection_done_cb);
   lock(&pc->task.mutex);
-
-  if (!gai_error) {
-    pc->ai = pc->addrinfo;
-    pconnection_maybe_connect_lh(pc); /* Start connection attempts */
-    if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && 
!pni_task_wake_pending(&pc->task))
-      return true;
-  } else {
-    psocket_gai_error(&pc->psocket, gai_error, "connect to ");
-  }
-  return false;
+  return rc;
 }
 
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t 
*t, const char *addr) {
@@ -1579,7 +1591,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
   pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);
 
   struct addrinfo *addrinfo = NULL;
-  int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
+  int gai_err = pni_name_lookup_blocking(l->host, l->port, AI_PASSIVE | 
AI_ALL, &addrinfo);
   if (!gai_err) {
     /* Count addresses, allocate enough space for sockets */
     size_t len = 0;
@@ -2021,23 +2033,27 @@ pn_proactor_t *pn_proactor(void) {
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
       if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
-        if (pni_timer_manager_init(&p->timer_manager))
-          if ((p->collector = pn_collector()) != NULL) {
-            p->batch.next_event = &proactor_batch_next;
-            start_polling(&p->timer_manager.epoll_timer, p->epollfd);  // 
TODO: check for error
-            epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, 
true);
-            epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, 
p->epollfd, false);
-            p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
-            grow_poller_bufs(p);
-            p->ready_list_generation = 1;
-            return p;
+        if (pni_timer_manager_init(&p->timer_manager)) {
+          if (pni_name_lookup_init(&p->name_lookup, p)) {
+            if ((p->collector = pn_collector()) != NULL) {
+              p->batch.next_event = &proactor_batch_next;
+              start_polling(&p->timer_manager.epoll_timer, p->epollfd);  // 
TODO: check for error
+              epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, 
true);
+              epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, 
p->epollfd, false);
+              p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
+              grow_poller_bufs(p);
+              p->ready_list_generation = 1;
+              return p;
+            }
           }
+        }
       }
     }
   }
   if (p->epollfd >= 0) close(p->epollfd);
   if (p->eventfd >= 0) close(p->eventfd);
   if (p->interruptfd >= 0) close(p->interruptfd);
+  pni_name_lookup_cleanup(&p->name_lookup, p);
   pni_timer_manager_finalize(&p->timer_manager);
   pmutex_finalize(&p->timeout_mutex);
   pmutex_finalize(&p->tslot_mutex);
@@ -2071,11 +2087,15 @@ void pn_proactor_free(pn_proactor_t *p) {
      case RAW_CONNECTION:
       pni_raw_connection_forced_shutdown(pni_task_raw_connection(tsk));
       break;
+     case NAME_LOOKUP:
+      pni_name_lookup_forced_shutdown(task_name_lookup(tsk));
+      break;
      default:
       break;
     }
   }
 
+  pni_name_lookup_cleanup(&p->name_lookup, p);
   pni_timer_manager_finalize(&p->timer_manager);
   pn_collector_free(p->collector);
   pmutex_finalize(&p->timeout_mutex);
@@ -2309,6 +2329,11 @@ static pn_event_batch_t *process(task_t *tsk) {
     batch = pni_timer_manager_process(tm, timeout, tsk_ready);
     break;
   }
+  case NAME_LOOKUP:
+    unlock(&p->sched_mutex);
+    pni_name_lookup_process_events(&p->name_lookup);
+    batch = NULL;
+    break;
   default:
     assert(NULL);
   }
@@ -2350,6 +2375,11 @@ static task_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
     }
     // else if (ee->fd == p->eventfd)... schedule_ready_list already performed 
by poller task.
     break;
+  case NAME_LOOKUP_EPOLL: {
+    tsk = &p->name_lookup.task;
+    tsk->sched_pending = true;
+    break;
+  }
   case PCONNECTION_IO: {
     psocket_t *ps = containerof(ee, psocket_t, epoll_io);
     pconnection_t *pc = psocket_pconnection(ps);
@@ -2581,11 +2611,10 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
       unlock(&p->eventfd_mutex);
     }
 
-    int timeout = (epoll_immediate) ? 0 : -1;
-    p->poller_suspended = (timeout == -1);
+    p->poller_suspended = !epoll_immediate;
     unlock(&p->sched_mutex);
 
-    n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, 
timeout);
+    n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, 
epoll_immediate ? 0 : -1);
 
     lock(&p->sched_mutex);
     p->poller_suspended = false;
@@ -2612,8 +2641,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
     unlock(&p->eventfd_mutex);
 
     if (n_events < 0) {
-      if (errno != EINTR)
+      if (errno != EINTR) {
         perror("epoll_wait"); // TODO: proper log
+      }
       if (!can_block && !unpolled_work)
         return true;
       else
@@ -2622,8 +2652,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, 
tslot_t *ts, bool can_block
       if (!can_block && !unpolled_work)
         return true;
       else {
-        if (!epoll_immediate)
+        if (!epoll_immediate) {
           perror("epoll_wait unexpected timeout"); // TODO: proper log
+        }
         if (!unpolled_work)
           continue;
       }
diff --git a/c/src/proactor/epoll_name_lookup.h 
b/c/src/proactor/epoll_name_lookup.h
new file mode 100644
index 000000000..032fd77e3
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#ifndef PROACTOR_NAME_LOOKUP_H
+#define PROACTOR_NAME_LOOKUP_H
+
+#include "epoll-internal.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Blocking name lookup - used by listeners and as fallback for connections.
+ * Same signature as getaddrinfo.
+ */
+int pni_name_lookup_blocking(const char *host, const char *port, int flags, 
struct addrinfo **res);
+
+/* Initialize name lookup subsystem. Returns false if error. */
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p);
+
+/* Cleanup name lookup subsystem. */
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p);
+
+/* Callback when lookup completes. Called with (user_data, addrinfo or NULL, 
gai_error). */
+typedef void (*pni_nl_done_cb)(void *user_data, struct addrinfo *ai, int 
gai_error);
+
+/* Start a name lookup. Returns true if started successfully. */
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char 
*port, void *user_data, pni_nl_done_cb done_cb);
+
+/* Clear lookup state when forcing shutdown. */
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl);
+
+/* Process events. */
+void pni_name_lookup_process_events(pname_lookup_t *nl);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* PROACTOR_NAME_LOOKUP_H */
diff --git a/c/src/proactor/epoll_name_lookup_async.c 
b/c/src/proactor/epoll_name_lookup_async.c
new file mode 100644
index 000000000..c7a51abcd
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup_async.c
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Asynchronous name lookup using c-ares. */
+
+#include "epoll_name_lookup.h"
+
+#include <limits.h>
+#include <netdb.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/epoll.h>
+// Ares 1.27 doesn't include sys/select.h in ares.h but uses fd_set
+// So even though we don't use that we still need to include it here
+// before we include ares.h
+#include <sys/select.h>
+#include <sys/timerfd.h>
+#include <unistd.h>
+
+#include <ares.h>
+
+// 0x012200 corresponds to version 1.34 of c-ares which added ares_process_fds
+#if ARES_VERSION >= 0x012200
+#  define CARES_HAVE_PROCESS_FDS 1
+#else
+#  define CARES_HAVE_PROCESS_FDS 0
+#endif
+
+
+/* Opaque context for async name lookup. */
+typedef struct pni_nl_async_ctx {
+  pmutex pending_mutex;
+  struct ares_channeldata *channel;
+  struct pni_nl_request *pending_head;
+  int timerfd;  /* In inner epoll; fires for ares timeout, -1 when not in use 
*/
+} pni_nl_async_ctx_t;
+
+static void nl_timerfd_set(int fd, uint64_t t_millis)
+{
+  struct itimerspec newt;
+  memset(&newt, 0, sizeof(newt));
+  if (t_millis > 0) {
+    newt.it_value.tv_sec = t_millis / 1000;
+    newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
+  }
+  timerfd_settime(fd, 0, &newt, NULL);
+}
+
+static void nl_timerfd_drain(int fd)
+{
+  uint64_t result = 0;
+  __attribute__((unused)) ssize_t r = read(fd, &result, sizeof(result));
+}
+
+/* Set timerfd from ares next deadline; call after start or process_events. */
+static void nl_update_ares_timer(pname_lookup_t *nl)
+{
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+  if (!ctx || ctx->timerfd < 0) return;
+  struct timeval tv;
+  if (ares_timeout(ctx->channel, NULL, &tv) != NULL) {
+    long ares_ms = (long)tv.tv_sec * 1000 + (int)(tv.tv_usec / 1000);
+    if (ares_ms > 0 && ares_ms <= INT_MAX)
+      nl_timerfd_set(ctx->timerfd, (uint64_t)ares_ms);
+    else
+      nl_timerfd_set(ctx->timerfd, 0);
+  } else {
+    nl_timerfd_set(ctx->timerfd, 0);
+  }
+}
+
+typedef struct pni_nl_request {
+  pni_nl_async_ctx_t *ctx;
+  void *user_data;
+  pni_nl_done_cb done_cb;
+  struct pni_nl_request *next;
+} pni_nl_request_t;
+
+static void nl_sock_state_cb(void *data, ares_socket_t socket_fd, int 
readable, int writable)
+{
+  pname_lookup_t *nl = (pname_lookup_t *)data;
+  int epfd = nl->epoll_name_lookup.fd;
+  if (epfd < 0) return;
+  struct epoll_event ev;
+  ev.data.fd = socket_fd;
+  int events = 0;
+  if (readable) events |= EPOLLIN;
+  if (writable) events |= EPOLLOUT;
+
+  if (readable || writable) {
+    ev.events = events;
+    if (epoll_ctl(epfd, EPOLL_CTL_MOD, socket_fd, &ev) < 0) {
+      epoll_ctl(epfd, EPOLL_CTL_ADD, socket_fd, &ev);
+    }
+  } else {
+    epoll_ctl(epfd, EPOLL_CTL_DEL, socket_fd, NULL);
+  }
+}
+
+static struct addrinfo *nl_ares_addrinfo_to_addrinfo(struct ares_addrinfo 
*result)
+{
+  struct addrinfo *ai_head = NULL;
+  struct addrinfo *ai_prev = NULL;
+  for (struct ares_addrinfo_node *node = result->nodes; node != NULL; node = 
node->ai_next) {
+    struct addrinfo *ai = (struct addrinfo *)calloc(1, sizeof(struct 
addrinfo)+node->ai_addrlen);
+    if (!ai) break;
+    ai->ai_family = node->ai_family;
+    ai->ai_socktype = node->ai_socktype;
+    ai->ai_protocol = node->ai_protocol;
+    ai->ai_flags = node->ai_flags;
+    ai->ai_addrlen = node->ai_addrlen;
+    ai->ai_addr = (struct sockaddr *)(ai + 1);
+    memcpy(ai->ai_addr, node->ai_addr, node->ai_addrlen);
+    ai->ai_next = NULL;
+    if (ai_prev) {
+      ai_prev->ai_next = ai;
+    } else {
+      ai_head = ai;
+    }
+    ai_prev = ai;
+  }
+  return ai_head;
+}
+
+static int nl_ares_status_to_gai(int status)
+{
+  switch (status) {
+  case ARES_ENOTFOUND:
+  case ARES_ESERVICE:
+    return EAI_NONAME;
+  case ARES_ETIMEOUT:
+    return EAI_AGAIN;
+  case ARES_ESERVFAIL:
+  case ARES_ECANCELLED:
+  case ARES_EDESTRUCTION:
+    return EAI_FAIL;
+  case ARES_ENOMEM:
+    return EAI_MEMORY;
+  default:
+    return EAI_NONAME;
+  }
+}
+
+static void nl_ares_lookup_cb(void *data, int status, int timeouts, struct 
ares_addrinfo *result)
+{
+  pni_nl_request_t *req = (pni_nl_request_t *)data;
+  pni_nl_async_ctx_t *ctx = req->ctx;
+
+  lock(&ctx->pending_mutex);
+  /* Unlink from pending list */
+  {
+    pni_nl_request_t **pp = &ctx->pending_head;
+    while (*pp && *pp != req) pp = &(*pp)->next;
+    if (*pp) *pp = req->next;
+  }
+  unlock(&ctx->pending_mutex);
+
+  struct addrinfo *ai = NULL;
+  int gai_error = EAI_NONAME;
+  if (status == ARES_SUCCESS && result) {
+    ai = nl_ares_addrinfo_to_addrinfo(result);
+    gai_error = ai ? 0 : EAI_MEMORY;
+  } else {
+    gai_error = nl_ares_status_to_gai(status);
+  }
+  if (result) {
+    ares_freeaddrinfo(result);
+  }
+  if (status != ARES_ECANCELLED && status != ARES_EDESTRUCTION) {
+    req->done_cb(req->user_data, ai, gai_error);
+  }
+  free(req);
+}
+
+int pni_name_lookup_blocking(const char *host, const char *port, int flags, 
struct addrinfo **res)
+{
+  struct addrinfo hints = { 0 };
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+  return getaddrinfo(host, port, &hints, res);
+}
+
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p)
+{
+  task_init(&nl->task, NAME_LOOKUP, p);
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)calloc(1, sizeof(*ctx));
+  if (!ctx) return false;
+  nl->impl = ctx;
+  pmutex_init(&ctx->pending_mutex);
+  ctx->pending_head = NULL;
+
+  if (ares_library_init(ARES_LIB_INIT_ALL) != ARES_SUCCESS) {
+    free(ctx);
+    nl->impl = NULL;
+    return false;
+  }
+  if ((nl->epoll_name_lookup.fd = epoll_create1(0)) < 0) {
+    ares_library_cleanup();
+    return false;
+  }
+  struct ares_options options = {
+    .sock_state_cb = nl_sock_state_cb,
+    .sock_state_cb_data = nl
+  };
+  if (ares_init_options(&ctx->channel, &options, ARES_OPT_SOCK_STATE_CB) != 
ARES_SUCCESS) {
+    close(nl->epoll_name_lookup.fd);
+    nl->epoll_name_lookup.fd = -1;
+    ares_library_cleanup();
+    return false;
+  }
+  ctx->timerfd = -1;
+  int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+  if (tfd >= 0) {
+    struct epoll_event ev = { .events = EPOLLIN, .data.fd = tfd };
+    if (epoll_ctl(nl->epoll_name_lookup.fd, EPOLL_CTL_ADD, tfd, &ev) == 0) {
+      ctx->timerfd = tfd;
+    } else {
+      close(tfd);
+      return false;
+    }
+  } else {
+    return false;
+  }
+  /* Register inner epoll with main proactor epoll (fd already in 
nl->epoll_name_lookup) */
+  nl->epoll_name_lookup.type = NAME_LOOKUP_EPOLL;
+  nl->epoll_name_lookup.wanted = EPOLLIN;
+  nl->epoll_name_lookup.polling = false;
+  pmutex_init(&nl->epoll_name_lookup.barrier_mutex);
+  if (!start_polling(&nl->epoll_name_lookup, p->epollfd)) {
+    if (ctx->timerfd >= 0) {
+      close(ctx->timerfd);
+      ctx->timerfd = -1;
+    }
+    ares_destroy(ctx->channel);
+    pmutex_finalize(&nl->epoll_name_lookup.barrier_mutex);
+    close(nl->epoll_name_lookup.fd);
+    nl->epoll_name_lookup.fd = -1;
+    ares_library_cleanup();
+    free(ctx);
+    nl->impl = NULL;
+    return false;
+  }
+  nl_update_ares_timer(nl);
+  return true;
+}
+
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p)
+{
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+  if (!ctx) return;
+  if (nl->epoll_name_lookup.fd >= 0) {
+    stop_polling(&nl->epoll_name_lookup, p->epollfd);
+    if (ctx->timerfd >= 0) {
+      close(ctx->timerfd);
+      ctx->timerfd = -1;
+    }
+    ares_cancel(ctx->channel);
+    ares_destroy(ctx->channel);
+    pmutex_finalize(&nl->epoll_name_lookup.barrier_mutex);
+    close(nl->epoll_name_lookup.fd);
+    nl->epoll_name_lookup.fd = -1;
+    ares_library_cleanup();
+  }
+  /* Free requests still on pending list */
+  lock(&ctx->pending_mutex);
+  while (ctx->pending_head) {
+    pni_nl_request_t *req = ctx->pending_head;
+    ctx->pending_head = req->next;
+    free(req);
+  }
+  unlock(&ctx->pending_mutex);
+  free(ctx);
+  nl->impl = NULL;
+}
+
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char 
*port, void *user_data, pni_nl_done_cb done_cb)
+{
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+
+  if (!ctx || nl->epoll_name_lookup.fd < 0 || !done_cb)
+    return false;
+  /* ares can't cope with a NULL host; do a blocking lookup for that case. */
+  if (host == NULL) {
+    struct addrinfo *res = NULL;
+    int gai_error = pni_name_lookup_blocking(host, port, 0, &res);
+    done_cb(user_data, res, gai_error);
+    return gai_error == 0;
+  }
+  pni_nl_request_t *req = (pni_nl_request_t *)malloc(sizeof(*req));
+  if (!req) {
+    done_cb(user_data, NULL, EAI_MEMORY);
+    return false;
+  }
+
+  lock(&ctx->pending_mutex);
+  *req = (pni_nl_request_t){
+    .ctx = ctx,
+    .user_data = user_data,
+    .done_cb = done_cb,
+    .next = ctx->pending_head,
+  };
+  ctx->pending_head = req;
+  unlock(&ctx->pending_mutex);
+
+  struct ares_addrinfo_hints hints = {
+    .ai_flags = ARES_AI_V4MAPPED | ARES_AI_ADDRCONFIG,
+    .ai_family = AF_UNSPEC,
+    .ai_socktype = SOCK_STREAM,
+  };
+  ares_getaddrinfo(ctx->channel, host, port, &hints, nl_ares_lookup_cb, req);
+  nl_update_ares_timer(nl);
+  return true;
+}
+
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl)
+{
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+  if (!ctx) return;
+  lock(&ctx->pending_mutex);
+  pni_nl_request_t **pp = &ctx->pending_head;
+  while (*pp) {
+    pni_nl_request_t *req = *pp;
+    *pp = req->next;
+    free(req);
+    pp = &(*pp)->next;
+  }
+  unlock(&ctx->pending_mutex);
+}
+
+void pni_name_lookup_process_events(pname_lookup_t *nl)
+{
+  int epfd = nl->epoll_name_lookup.fd;
+  if (epfd < 0) return;
+  pni_nl_async_ctx_t *ctx = (pni_nl_async_ctx_t *)nl->impl;
+  if (!ctx) return;
+  struct epoll_event events[10];
+  int n = epoll_wait(epfd, events, 10, 0);
+  if (n < 0) return;
+  int n_fds = 0;
+  bool timer_fired = false;
+#if CARES_HAVE_PROCESS_FDS
+  ares_fd_events_t fd_events[10];
+  for (int j = 0; j < n && n_fds < 10; j++) {
+    int fd = events[j].data.fd;
+    if (fd == ctx->timerfd) {
+      nl_timerfd_drain(ctx->timerfd);
+      timer_fired = true;
+    } else {
+      fd_events[n_fds].fd = fd;
+      fd_events[n_fds].events = 0;
+      if (events[j].events & EPOLLIN) fd_events[n_fds].events |= 
ARES_FD_EVENT_READ;
+      if (events[j].events & EPOLLOUT) fd_events[n_fds].events |= 
ARES_FD_EVENT_WRITE;
+      n_fds++;
+    }
+  }
+  if (n_fds > 0)
+    ares_process_fds(ctx->channel, fd_events, n_fds, 0);
+  else if (timer_fired)
+    ares_process_fds(ctx->channel, NULL, 0, 0);
+#else
+  for (int j = 0; j < n; j++) {
+    int fd = events[j].data.fd;
+    if (fd == ctx->timerfd) {
+      nl_timerfd_drain(ctx->timerfd);
+      timer_fired = true;
+    } else {
+      ares_socket_t r = (events[j].events & EPOLLIN) ? fd : ARES_SOCKET_BAD;
+      ares_socket_t w = (events[j].events & EPOLLOUT) ? fd : ARES_SOCKET_BAD;
+      ares_process_fd(ctx->channel, r, w);
+      n_fds++;
+    }
+  }
+  if (timer_fired && n_fds == 0)
+    ares_process_fd(ctx->channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
+#endif
+  nl_update_ares_timer(nl);
+  if (nl->epoll_name_lookup.polling)
+    rearm_polling(&nl->epoll_name_lookup, nl->task.proactor->epollfd);
+}
diff --git a/c/src/proactor/epoll_name_lookup_sync.c 
b/c/src/proactor/epoll_name_lookup_sync.c
new file mode 100644
index 000000000..ad2c2ee74
--- /dev/null
+++ b/c/src/proactor/epoll_name_lookup_sync.c
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/* Synchronous (blocking) name lookup using getaddrinfo. */
+
+#include "epoll_name_lookup.h"
+
+#include <netdb.h>
+#include <string.h>
+
+int pni_name_lookup_blocking(const char *host, const char *port, int flags, 
struct addrinfo **res)
+{
+  struct addrinfo hints = { 0 };
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | flags;
+  return getaddrinfo(host, port, &hints, res);
+}
+
+bool pni_name_lookup_init(pname_lookup_t *nl, pn_proactor_t *p)
+{
+  return true;
+}
+
+void pni_name_lookup_cleanup(pname_lookup_t *nl, pn_proactor_t *p)
+{
+}
+
+bool pni_name_lookup_start(pname_lookup_t *nl, const char *host, const char 
*port, void *user_data, pni_nl_done_cb done_cb)
+{
+  struct addrinfo *res = NULL;
+  int gai_error = pni_name_lookup_blocking(host, port, 0, &res);
+  done_cb(user_data, res, gai_error);
+  return gai_error == 0;
+}
+
+void pni_name_lookup_forced_shutdown(pname_lookup_t *nl)
+{
+}
+
+void pni_name_lookup_process_events(pname_lookup_t *nl)
+{
+}
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 10870ffe7..1f056c85c 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -22,6 +22,7 @@
 /* This is currently epoll implementation specific - and will need changing 
for the other proactors */
 
 #include "epoll-internal.h"
+#include "epoll_name_lookup.h"
 #include "proactor-internal.h"
 #include "raw_connection-internal.h"
 
@@ -139,6 +140,31 @@ static void 
praw_connection_maybe_connect_lh(praw_connection_t *prc) {
   prc->disconnected = true;
 }
 
+/* Called when raw connection name lookup completes (from name_lookup 
done_cb). Call with task lock held. */
+static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct 
addrinfo *ai, int gai_error) {
+  pn_proactor_t *p = prc->task.proactor;
+  bool notify = false;
+  if (gai_error) {
+    psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
+  } else if (ai) {
+    prc->addrinfo = ai;
+    prc->ai = ai;
+    praw_connection_maybe_connect_lh(prc);
+    if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task)) {
+      return;
+    }
+  }
+  notify = schedule(&prc->task);
+  if (notify) notify_poller(p);
+}
+
+static void raw_connection_done_cb(void *user_data, struct addrinfo *ai, int 
gai_error) {
+  praw_connection_t *prc = (praw_connection_t *)user_data;
+  lock(&prc->task.mutex);
+  raw_connection_lookup_done_lh(prc, ai, gai_error);
+  unlock(&prc->task.mutex);
+}
+
 //
 // Raw socket API
 //
@@ -203,24 +229,16 @@ pn_raw_connection_t *pn_raw_connection(void) {
 static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
   const char *host;
   const char *port;
+  pn_proactor_t *p = prc->task.proactor;
 
   unlock(&prc->task.mutex);
   size_t addrlen = strlen(prc->taddr);
   char *addr_buf = (char*) alloca(addrlen+1);
   pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
-  // TODO: move this step to a separate worker thread that scales in response 
to multiple blocking DNS lookups.
-  int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+  bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc, 
raw_connection_done_cb);
   lock(&prc->task.mutex);
 
-  if (!gai_error) {
-    prc->ai = prc->addrinfo;
-    praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
-    if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
-      return true;
-  } else {
-    psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
-  }
-  return false;
+  return rc;
 }
 
 void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const 
char *addr) {
diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp
index f4c075fb6..8c1e4c9d7 100644
--- a/c/tests/proactor_test.cpp
+++ b/c/tests/proactor_test.cpp
@@ -33,7 +33,6 @@
 #include <proton/transport.h>
 
 #include <string.h>
-
 #include <filesystem>
 
 using namespace pn_test;
@@ -145,6 +144,77 @@ TEST_CASE("proactor_connect") {
   REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
 }
 
+/* Connect using hostname "localhost" explicitly - exercises name resolution. 
*/
+TEST_CASE("proactor_connect_localhost") {
+  close_on_open_handler h;
+  proactor p(&h);
+  pn_listener_t *l = p.listen(":0", &h);
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  p.connect("localhost:" + listening_port(l));
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+}
+
+/* Close a connection before it has opened - verify clean shutdown. */
+TEST_CASE("proactor_connect_close_before_open") {
+  struct handler : public close_on_open_handler {
+    bool transport_closed_ = false;
+    bool listener_close_ = false;
+    bool proactor_inactive_ = false;
+    bool handle(pn_event_t *e) override {
+      // Override the parent handler so we can note the events
+      // but just run to the end of the batch.
+      switch (pn_event_type(e)) {
+      case PN_TRANSPORT_ERROR:
+        return false;
+      case PN_TRANSPORT_CLOSED:
+        transport_closed_ = true;
+        return false;
+      case PN_LISTENER_CLOSE:
+        listener_close_ = true;
+        return false;
+      case PN_PROACTOR_INACTIVE:
+        proactor_inactive_ = true;
+        return false;
+      default:
+        return common_handler::handle(e);
+      }
+    }
+  } h;
+  proactor p(&h);
+  pn_listener_t *l = p.listen();
+  REQUIRE_RUN(p, PN_LISTENER_OPEN);
+  pn_connection_t *c = p.connect("localhost:" + listening_port(l), &h);
+  REQUIRE(c != nullptr);
+  pn_connection_close(c);
+  pn_listener_close(l);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+  REQUIRE(h.transport_closed_);
+  REQUIRE(h.listener_close_);
+  REQUIRE(h.proactor_inactive_);
+}
+
+/* Failing name lookup - connect to invalid hostname. */
+TEST_CASE("proactor_name_lookup_fails") {
+  common_handler h;
+  proactor p(&h);
+  p.connect("nosuch.example.invalid:");
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
+  REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
+}
+
+/* Interrupt a connection during a lookup that will fail - verify clean 
shutdown. */
+TEST_CASE("proactor_name_lookup_interrupted") {
+  common_handler h;
+  proactor p(&h);
+  pn_connection_t *c = p.connect("nosuch.example.com:", &h);
+  REQUIRE(c != nullptr);
+  pn_connection_close(c);
+  REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
+  REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
+}
+
 namespace {
 /* Return on connection open, close and return on wake */
 struct close_on_wake_handler : public common_handler {
@@ -318,13 +388,13 @@ TEST_CASE("proactor_errors") {
   REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
 
   /* Invalid connect/listen host name */
-  p.connect("nosuch.example.com:");
+  p.connect("nosuch.example.invalid:");
   REQUIRE_RUN(p, PN_TRANSPORT_ERROR);
   CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
   REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);
   REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);
 
-  pn_proactor_listen(p, pn_listener(), "nosuch.example.com:", 1);
+  pn_proactor_listen(p, pn_listener(), "nosuch.example.invalid:", 1);
   REQUIRE_RUN(p, PN_LISTENER_CLOSE);
   CHECK_THAT(*h.last_condition, cond_matches("proton:io", "nosuch"));
   REQUIRE_RUN(p, PN_PROACTOR_INACTIVE);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to