This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 36fe831ec PROTON-2818: Move epoll proctor connection logic to a task
thread.
36fe831ec is described below
commit 36fe831ec8df52dea56ae04eec02304f2572f13a
Author: Clifford Jansen <[email protected]>
AuthorDate: Sun May 12 11:04:19 2024 -0700
PROTON-2818: Move epoll proctor connection logic to a task thread.
---
c/src/proactor/epoll-internal.h | 1 +
c/src/proactor/epoll.c | 57 ++++++++++++++++++-----------
c/src/proactor/epoll_raw_connection.c | 69 ++++++++++++++++++++++++-----------
c/tests/raw_wake_test.cpp | 1 -
4 files changed, 84 insertions(+), 44 deletions(-)
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 8f765121e..550324ccd 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -238,6 +238,7 @@ typedef struct pconnection_t {
bool server; /* accept, not connect */
bool tick_pending;
bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
+ bool first_schedule;
pn_condition_t *disconnect_condition;
// Following values only changed by (sole) working task:
uint32_t current_arm; // active epoll io events
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 7abd884ef..7714c23fe 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc,
pn_proactor_t *p, pn_con
pc->wbuf_current = NULL;
pc->hog_count = 0;
pc->batch.next_event = pconnection_batch_next;
+ pc->first_schedule = false;
if (server) {
pn_transport_set_server(pc->driver.transport);
@@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) {
static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
+static bool pconnection_first_connect_lh(pconnection_t *pc);
static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t
events, bool sched_ready, bool topup) {
bool waking = false;
@@ -1139,6 +1141,17 @@ static pn_event_batch_t
*pconnection_process(pconnection_t *pc, uint32_t events,
}
if (sched_ready) schedule_done(&pc->task);
+ if (pc->first_schedule) {
+ pc->first_schedule = false;
+ assert(!topup && !events);
+ if (!pc->queued_disconnect) {
+ if (pconnection_first_connect_lh(pc)) {
+ unlock(&pc->task.mutex);
+ return NULL;
+ }
+ }
+ }
+
if (topup) {
// Only called by the batch owner. Does not loop, just "tops up"
// once. May be back depending on hog_count.
@@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t
*pc) {
int pgetaddrinfo(const char *host, const char *port, int flags, struct
addrinfo **res)
{
+ // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812).
struct addrinfo hints = { 0 };
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
@@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) {
return false;
}
+// Call from pconnection_process with task lock held.
+// Return true if the socket is connecting and there are no Proton events to
deliver.
+static bool pconnection_first_connect_lh(pconnection_t *pc) {
+ unlock(&pc->task.mutex);
+ // TODO: move this step to a separate worker thread that scales in response
to multiple blocking DNS lookups.
+ int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
+ lock(&pc->task.mutex);
+
+ if (!gai_error) {
+ pc->ai = pc->addrinfo;
+ pconnection_maybe_connect_lh(pc); /* Start connection attempts */
+ if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect &&
!pni_task_wake_pending(&pc->task))
+ return true;
+ } else {
+ psocket_gai_error(&pc->psocket, gai_error, "connect to ");
+ }
+ return false;
+}
+
void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t
*t, const char *addr) {
+ // Called from an arbitrary thread. Do setup prior to getaddrinfo, then
switch to a worker thread.
size_t addrlen = strlen(addr);
pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
assert(pc); // TODO: memory safety
@@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p,
pn_connection_t *c, pn_transport_t *
lock(&pc->task.mutex);
proactor_add(&pc->task);
pn_connection_open(pc->driver.connection); /* Auto-open */
-
- bool notify = false;
-
- if (pc->disconnected) {
- notify = schedule(&pc->task); /* Error during initialization */
- } else {
- int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
- if (!gai_error) {
- pn_connection_open(pc->driver.connection); /* Auto-open */
- pc->ai = pc->addrinfo;
- pconnection_maybe_connect_lh(pc); /* Start connection attempts */
- if (pc->disconnected) notify = schedule(&pc->task);
- } else {
- psocket_gai_error(&pc->psocket, gai_error, "connect to ");
- notify = schedule(&pc->task);
- lock(&p->task.mutex);
- notify |= schedule_if_inactive(p);
- unlock(&p->task.mutex);
- }
- }
- /* We need to issue INACTIVE on immediate failure */
+ pc->first_schedule = true; // Resume connection setup when next scheduled.
+ bool notify = schedule(&pc->task);
unlock(&pc->task.mutex);
if (notify) notify_poller(p);
}
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index b7547f9f9..350c16ba8 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -52,6 +52,8 @@ struct praw_connection_t {
bool disconnected;
bool hup_detected;
bool read_check;
+ bool first_schedule;
+ char *taddr;
};
static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc,
pn_proactor_t *p, pn_ra
prc->connected = false;
prc->disconnected = false;
+ prc->first_schedule = false;
+ prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;
pmutex_init(&prc->rearm_mutex);
@@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc)
{
task_finalize(&prc->task);
if (prc->addrinfo)
freeaddrinfo(prc->addrinfo);
+ free(prc->taddr);
free(prc);
}
// else proactor_disconnect logic owns prc and its final free
@@ -177,39 +182,48 @@ pn_raw_connection_t *pn_raw_connection(void) {
return &conn->raw_connection;
}
-void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const
char *addr) {
- assert(rc);
- praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
- praw_connection_init(prc, p, rc);
- // TODO: check case of proactor shutting down
-
- lock(&prc->task.mutex);
- proactor_add(&prc->task);
-
- bool notify = false;
-
+// Call from pconnection_process with task lock held.
+// Return true if the socket is connecting and there are no Proton events to
deliver.
+static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;
- size_t addrlen = strlen(addr);
- char *addr_buf = (char*) alloca(addrlen+1);
- pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port);
+ unlock(&prc->task.mutex);
+ size_t addrlen = strlen(prc->taddr);
+ char *addr_buf = (char*) alloca(addrlen+1);
+ pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
+ // TODO: move this step to a separate worker thread that scales in response
to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
+ lock(&prc->task.mutex);
+
if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
- if (prc->disconnected) notify = schedule(&prc->task);
+ if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
+ return true;
} else {
- psocket_gai_error(prc, gai_error, "connect to ", addr);
- prc->disconnected = true;
- notify = schedule(&prc->task);
- lock(&p->task.mutex);
- notify |= schedule_if_inactive(p);
- unlock(&p->task.mutex);
+ psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
}
+ return false;
+}
- /* We need to issue INACTIVE on immediate failure */
+void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const
char *addr) {
+ // Called from an arbitrary thread. Do setup prior to getaddrinfo, then
switch to a worker thread.
+ assert(rc);
+ praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
+ praw_connection_init(prc, p, rc);
+ // TODO: check case of proactor shutting down
+
+ lock(&prc->task.mutex);
+ size_t addrlen = strlen(addr);
+ prc->taddr = (char*) malloc(addrlen+1);
+ assert(prc->taddr); // TODO: memory safety
+ memcpy(prc->taddr, addr, addrlen+1);
+ prc->first_schedule = true; // Resume connection setup when next scheduled.
+ proactor_add(&prc->task);
+ bool notify = schedule(&prc->task);
unlock(&prc->task.mutex);
+
if (notify) notify_poller(p);
}
@@ -394,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;
+
+ if (rc->first_schedule) {
+ rc->first_schedule = false;
+ assert(!events); // No socket yet.
+ assert(!rc->connected);
+ if (praw_connection_first_connect_lh(rc)) {
+ unlock(&rc->task.mutex);
+ return NULL;
+ }
+ }
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
praw_connection_maybe_connect_lh(rc);
@@ -413,6 +437,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);
+
unlock(&rc->task.mutex);
return &rc->batch;
}
diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp
index fee780b15..4f2f88257 100644
--- a/c/tests/raw_wake_test.cpp
+++ b/c/tests/raw_wake_test.cpp
@@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") {
pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str());
- REQUIRE_RUN(p, PN_LISTENER_ACCEPT);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS);
CHECK(pn_proactor_get(p) == NULL); /* idle */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]