This is an automated email from the ASF dual-hosted git repository.
asf-gitbox-commits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 13275ca27 PROTON-2931: epoll proactor thread races using async c-ares
name resolver library
13275ca27 is described below
commit 13275ca2710d4f659bda540e8358f22b0a10be24
Author: Cliff Jansen <[email protected]>
AuthorDate: Fri May 1 11:52:47 2026 -0700
PROTON-2931: epoll proactor thread races using async c-ares name resolver
library
---
c/src/proactor/epoll-internal.h | 1 +
c/src/proactor/epoll.c | 30 ++++++++++++---
c/src/proactor/epoll_raw_connection.c | 72 ++++++++++++++++++++++++++---------
3 files changed, 80 insertions(+), 23 deletions(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 7ae86f136..967464e4e 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -271,6 +271,7 @@ typedef struct pconnection_t {
pmutex rearm_mutex; /* protects pconnection_rearm from out of
order arming*/
bool io_doublecheck; /* callbacks made and new IO may have
arrived */
uint64_t expected_timeout;
+ bool name_lookup_pending;
char addr_buf[1];
} pconnection_t;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index df1bde8b2..0479c5718 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -836,7 +836,7 @@ static const char *pconnection_setup(pconnection_t *pc,
pn_proactor_t *p, pn_con
// Call with lock held and closing == true (i.e.
pn_connection_driver_finished() == true), no pending timer.
// Return true when all possible outstanding epoll events associated with this
pconnection have been processed.
static inline bool pconnection_is_final(pconnection_t *pc) {
- return !pc->current_arm && !pc->task.ready && !pc->tick_pending;
+ return !pc->current_arm && !pc->task.ready && !pc->tick_pending &&
!pc->name_lookup_pending;
}
static void pconnection_final_free(pconnection_t *pc) {
@@ -1400,6 +1400,7 @@ static void pconnection_start(pconnection_t *pc, int fd) {
}
/* Called on initial connect, and if connection fails to try another address */
+/* May be called within the pconnection task or from an external name_lookup
task */
static void pconnection_maybe_connect_lh(pconnection_t *pc) {
errno = 0;
if (!pc->connected) { /* Not yet connected */
@@ -1445,8 +1446,7 @@ bool schedule_if_inactive(pn_proactor_t *p) {
/* Called when connection name lookup completes (from name_lookup done_cb).
Call with task lock held. */
static void connection_lookup_done_lh(pconnection_t *pc, struct addrinfo *ai,
int gai_error) {
- pn_proactor_t *p = pc->task.proactor;
- bool notify = false;
+ pc->name_lookup_pending = false;
if (gai_error) {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
} else if (ai) {
@@ -1457,8 +1457,8 @@ static void connection_lookup_done_lh(pconnection_t *pc,
struct addrinfo *ai, in
return;
}
}
- notify = schedule(&pc->task);
- if (notify) notify_poller(p);
+ bool notify = schedule(&pc->task);
+ if (notify) notify_poller(pc->task.proactor);
}
static void connection_done_cb(void *user_data, struct addrinfo *ai, int
gai_error) {
@@ -1472,10 +1472,28 @@ static void connection_done_cb(void *user_data, struct
addrinfo *ai, int gai_err
// Return true if the socket is connecting and there are no Proton events to
deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
pn_proactor_t *p = pc->task.proactor;
+ pn_transport_t *tp = pc->driver.transport;
+ pc->name_lookup_pending = true;
+
unlock(&pc->task.mutex);
bool rc = pni_name_lookup_start(&p->name_lookup, pc->host, pc->port, pc,
connection_done_cb);
lock(&pc->task.mutex);
- return rc;
+
+ if (!rc) {
+ // Either the callback was synchronous or no callback was possible
+ if (pc->name_lookup_pending) {
+ // Clean up since there will be no callback.
+ pc->name_lookup_pending = false;
+ psocket_error(&pc->psocket, EAI_FAIL, "internal error on connect");
+ }
+ return false;
+ }
+ // Name lookup started. Callback may have already completed and failed.
+ if (!pc->name_lookup_pending) {
+ if (pn_condition_is_set(pn_transport_condition(tp)))
+ return false;
+ }
+ return true;
}
void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t
*t, const char *addr) {
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index 1f056c85c..b073bee1b 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -56,6 +56,7 @@ struct praw_connection_t {
bool hup_detected;
bool read_check;
bool first_schedule;
+ bool name_lookup_pending;
char *taddr;
};
@@ -110,8 +111,14 @@ static void praw_connection_start(praw_connection_t *prc,
int fd) {
}
/* Called on initial connect, and if connection fails to try another address */
+/* May be called within the praw_connection task or from an external
name_lookup task */
static void praw_connection_maybe_connect_lh(praw_connection_t *prc) {
+ if (prc->task.closing) {
+ return;
+ }
+ int err = 0; /* Initialized in case while loop has zero
iterations */
while (prc->ai) { /* Have an address */
+ err = 0;
struct addrinfo *ai = prc->ai;
prc->ai = prc->ai->ai_next; /* Move to next address in case this fails */
int fd = socket(ai->ai_family, SOCK_STREAM, 0);
@@ -125,14 +132,19 @@ static void
praw_connection_maybe_connect_lh(praw_connection_t *prc) {
praw_connection_start(prc, fd);
return; /* Async connection started */
} else {
+ err = errno;
close(fd);
}
+ } else {
+ err = errno;
}
/* connect failed immediately, go round the loop to try the next addr */
}
- int err;
- socklen_t errlen = sizeof(err);
- getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err,
&errlen);
+
+ if (err == 0 && prc->psocket.epoll_io.fd >= 0) {
+ socklen_t errlen = sizeof(err);
+ getsockopt(prc->psocket.epoll_io.fd, SOL_SOCKET, SO_ERROR, (void *)&err,
&errlen);
+ }
psocket_error(prc, err, "on connect");
freeaddrinfo(prc->addrinfo);
@@ -144,6 +156,8 @@ static void
praw_connection_maybe_connect_lh(praw_connection_t *prc) {
static void raw_connection_lookup_done_lh(praw_connection_t *prc, struct
addrinfo *ai, int gai_error) {
pn_proactor_t *p = prc->task.proactor;
bool notify = false;
+
+ prc->name_lookup_pending = false;
if (gai_error) {
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
} else if (ai) {
@@ -224,21 +238,29 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}
-// Call from pconnection_process with task lock held.
-// Return true if the socket is connecting and there are no Proton events to
deliver.
-static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
- const char *host;
- const char *port;
+// Call from pconnection_process with no locks.
+// Callback may complete before pni_name_lookup_start returns.
+static void praw_connection_first_connect(praw_connection_t *prc) {
pn_proactor_t *p = prc->task.proactor;
-
- unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
+ const char *host;
+ const char *port;
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
bool rc = pni_name_lookup_start(&p->name_lookup, host, port, prc,
raw_connection_done_cb);
- lock(&prc->task.mutex);
-
- return rc;
+ if (!rc) {
+ // Either the callback was synchronous or no callback was possible
+ bool notify = false;
+ lock(&prc->task.mutex);
+ if (prc->name_lookup_pending) {
+ // Clean up since there will be no callback.
+ prc->name_lookup_pending = false;
+ psocket_error(prc, EAI_FAIL, "internal error on connect");
+ notify = schedule(&prc->task);
+ }
+ unlock(&prc->task.mutex);
+ if (notify) notify_poller(p);
+ }
}
void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const
char *addr) {
@@ -438,11 +460,17 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
rc->armed = false;
rc->current_arm = 0;
}
- if (pni_raw_finished(&rc->raw_connection)) {
+ if (pni_raw_finished(&rc->raw_connection) && !rc->name_lookup_pending) {
+ t->working = false;
unlock(&rc->task.mutex);
praw_initiate_cleanup(rc);
return NULL;
}
+ if (rc->task.closing) {
+ // rclosed and wclosed. Allow final events to be processed.
+ unlock(&rc->task.mutex);
+ return &rc->batch;
+ }
int events = io_events;
int fd = rc->psocket.epoll_io.fd;
@@ -450,10 +478,19 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
- if (praw_connection_first_connect_lh(rc)) {
+ bool wake_event = pni_task_wake_pending(&rc->task);
+
+ t->working = false;
+ rc->name_lookup_pending = true;
+ unlock(&rc->task.mutex);
+ praw_connection_first_connect(rc);
+ if (wake_event) {
+ lock(&rc->task.mutex);
+ t->working = true;
unlock(&rc->task.mutex);
- return NULL;
+ return &rc->batch;
}
+ return NULL;
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
@@ -525,9 +562,10 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// wakes outstanding because we dealt with it already in pni_raw_batch_next()
notify = (wake_pending || have_event) && schedule(&rc->task);
ready = rc->task.ready; // No need to poll. Already scheduled.
+ bool praw_finished = pni_raw_finished(&rc->raw_connection) &&
!rc->name_lookup_pending;
unlock(&rc->task.mutex);
- if (pni_raw_finished(raw) && !ready) {
+ if (praw_finished && !ready) {
// If raw connection has no more work to do and safe to free resources, do
so.
praw_initiate_cleanup(rc);
} else if (ready) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]