This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 36fe831ec PROTON-2818: Move epoll proctor connection logic to a task 
thread.
36fe831ec is described below

commit 36fe831ec8df52dea56ae04eec02304f2572f13a
Author: Clifford Jansen <cliffjan...@apache.org>
AuthorDate: Sun May 12 11:04:19 2024 -0700

    PROTON-2818: Move epoll proctor connection logic to a task thread.
---
 c/src/proactor/epoll-internal.h       |  1 +
 c/src/proactor/epoll.c                | 57 ++++++++++++++++++-----------
 c/src/proactor/epoll_raw_connection.c | 69 ++++++++++++++++++++++++-----------
 c/tests/raw_wake_test.cpp             |  1 -
 4 files changed, 84 insertions(+), 44 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 8f765121e..550324ccd 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -238,6 +238,7 @@ typedef struct pconnection_t {
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  bool first_schedule;
   pn_condition_t *disconnect_condition;
   // Following values only changed by (sole) working task:
   uint32_t current_arm;  // active epoll io events
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 7abd884ef..7714c23fe 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   pc->wbuf_current = NULL;
   pc->hog_count = 0;
   pc->batch.next_event = pconnection_batch_next;
+  pc->first_schedule = false;
 
   if (server) {
     pn_transport_set_server(pc->driver.transport);
@@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) {
 
 static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
+static bool pconnection_first_connect_lh(pconnection_t *pc);
 
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool sched_ready, bool topup) {
   bool waking = false;
@@ -1139,6 +1141,17 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   }
   if (sched_ready) schedule_done(&pc->task);
 
+  if (pc->first_schedule) {
+    pc->first_schedule = false;
+    assert(!topup && !events);
+    if (!pc->queued_disconnect) {
+      if (pconnection_first_connect_lh(pc)) {
+        unlock(&pc->task.mutex);
+        return NULL;
+      }
+    }
+  }
+
   if (topup) {
     // Only called by the batch owner.  Does not loop, just "tops up"
     // once.  May be back depending on hog_count.
@@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t 
*pc) {
 
 int pgetaddrinfo(const char *host, const char *port, int flags, struct 
addrinfo **res)
 {
+  // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
   struct addrinfo hints = { 0 };
   hints.ai_family = AF_UNSPEC;
   hints.ai_socktype = SOCK_STREAM;
@@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) {
   return false;
 }
 
+// Call from pconnection_process with task lock held.
+// Return true if the socket is connecting and there are no Proton events to 
deliver.
+static bool pconnection_first_connect_lh(pconnection_t *pc) {
+  unlock(&pc->task.mutex);
+  // TODO: move this step to a separate worker thread that scales in response 
to multiple blocking DNS lookups.
+  int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
+  lock(&pc->task.mutex);
+
+  if (!gai_error) {
+    pc->ai = pc->addrinfo;
+    pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+    if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && 
!pni_task_wake_pending(&pc->task))
+      return true;
+  } else {
+    psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+  }
+  return false;
+}
+
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t 
*t, const char *addr) {
+  // Called from an arbitrary thread.  Do setup prior to getaddrinfo, then 
switch to a worker thread.
   size_t addrlen = strlen(addr);
   pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
   assert(pc); // TODO: memory safety
@@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   lock(&pc->task.mutex);
   proactor_add(&pc->task);
   pn_connection_open(pc->driver.connection); /* Auto-open */
-
-  bool notify = false;
-
-  if (pc->disconnected) {
-    notify = schedule(&pc->task);    /* Error during initialization */
-  } else {
-    int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
-    if (!gai_error) {
-      pn_connection_open(pc->driver.connection); /* Auto-open */
-      pc->ai = pc->addrinfo;
-      pconnection_maybe_connect_lh(pc); /* Start connection attempts */
-      if (pc->disconnected) notify = schedule(&pc->task);
-    } else {
-      psocket_gai_error(&pc->psocket, gai_error, "connect to ");
-      notify = schedule(&pc->task);
-      lock(&p->task.mutex);
-      notify |= schedule_if_inactive(p);
-      unlock(&p->task.mutex);
-    }
-  }
-  /* We need to issue INACTIVE on immediate failure */
+  pc->first_schedule = true; // Resume connection setup when next scheduled.
+  bool notify = schedule(&pc->task);
   unlock(&pc->task.mutex);
   if (notify) notify_poller(p);
 }
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index b7547f9f9..350c16ba8 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -52,6 +52,8 @@ struct praw_connection_t {
   bool disconnected;
   bool hup_detected;
   bool read_check;
+  bool first_schedule;
+  char *taddr;
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc, 
pn_proactor_t *p, pn_ra
 
   prc->connected = false;
   prc->disconnected = false;
+  prc->first_schedule = false;
+  prc->taddr = NULL;
   prc->batch.next_event = pni_raw_batch_next;
 
   pmutex_init(&prc->rearm_mutex);
@@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) 
{
     task_finalize(&prc->task);
     if (prc->addrinfo)
       freeaddrinfo(prc->addrinfo);
+    free(prc->taddr);
     free(prc);
   }
   // else proactor_disconnect logic owns prc and its final free
@@ -177,39 +182,48 @@ pn_raw_connection_t *pn_raw_connection(void) {
   return &conn->raw_connection;
 }
 
-void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const 
char *addr) {
-  assert(rc);
-  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
-  praw_connection_init(prc, p, rc);
-  // TODO: check case of proactor shutting down
-
-  lock(&prc->task.mutex);
-  proactor_add(&prc->task);
-
-  bool notify = false;
-
+// Call from pconnection_process with task lock held.
+// Return true if the socket is connecting and there are no Proton events to 
deliver.
+static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
   const char *host;
   const char *port;
-  size_t addrlen = strlen(addr);
-  char *addr_buf = (char*) alloca(addrlen+1);
-  pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);
 
+  unlock(&prc->task.mutex);
+  size_t addrlen = strlen(prc->taddr);
+  char *addr_buf = (char*) alloca(addrlen+1);
+  pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
+  // TODO: move this step to a separate worker thread that scales in response 
to multiple blocking DNS lookups.
   int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+  lock(&prc->task.mutex);
+
   if (!gai_error) {
     prc->ai = prc->addrinfo;
     praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
-    if (prc->disconnected) notify = schedule(&prc->task);
+    if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
+      return true;
   } else {
-    psocket_gai_error(prc, gai_error, "connect to ", addr);
-    prc->disconnected = true;
-    notify = schedule(&prc->task);
-    lock(&p->task.mutex);
-    notify |= schedule_if_inactive(p);
-    unlock(&p->task.mutex);
+    psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
   }
+  return false;
+}
 
-  /* We need to issue INACTIVE on immediate failure */
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const 
char *addr) {
+  // Called from an arbitrary thread.  Do setup prior to getaddrinfo, then 
switch to a worker thread.
+  assert(rc);
+  praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+  praw_connection_init(prc, p, rc);
+  // TODO: check case of proactor shutting down
+
+  lock(&prc->task.mutex);
+  size_t addrlen = strlen(addr);
+  prc->taddr = (char*) malloc(addrlen+1);
+  assert(prc->taddr); // TODO: memory safety
+  memcpy(prc->taddr, addr, addrlen+1);
+  prc->first_schedule = true; // Resume connection setup when next scheduled.
+  proactor_add(&prc->task);
+  bool notify = schedule(&prc->task);
   unlock(&prc->task.mutex);
+
   if (notify) notify_poller(p);
 }
 
@@ -394,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
uint32_t io_events, bool
   }
   int events = io_events;
   int fd = rc->psocket.epoll_io.fd;
+
+  if (rc->first_schedule) {
+    rc->first_schedule = false;
+    assert(!events); // No socket yet.
+    assert(!rc->connected);
+    if (praw_connection_first_connect_lh(rc)) {
+      unlock(&rc->task.mutex);
+      return NULL;
+    }
+  }
   if (!rc->connected) {
     if (events & (EPOLLHUP | EPOLLERR)) {
       praw_connection_maybe_connect_lh(rc);
@@ -413,6 +437,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
uint32_t io_events, bool
     }
     if (events & EPOLLOUT)
       praw_connection_connected_lh(rc);
+
     unlock(&rc->task.mutex);
     return &rc->batch;
   }
diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp
index fee780b15..4f2f88257 100644
--- a/c/tests/raw_wake_test.cpp
+++ b/c/tests/raw_wake_test.cpp
@@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") {
   pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
 
 
-  REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
   REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
   REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
   CHECK(pn_proactor_get(p) == NULL); /* idle */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to