This is an automated email from the ASF dual-hosted git repository.
cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new bbe9a4b15 PROTON-2832: proactor raw connection shutdown race with
simultaneous epoll event
bbe9a4b15 is described below
commit bbe9a4b15476060beccff32fde8cda667a204dc0
Author: Clifford Jansen <[email protected]>
AuthorDate: Thu Aug 1 10:03:49 2024 -0700
PROTON-2832: proactor raw connection shutdown race with simultaneous epoll
event
---
c/src/proactor/epoll_raw_connection.c | 42 ++++++++++++++++++++++++++---------
1 file changed, 32 insertions(+), 10 deletions(-)
diff --git a/c/src/proactor/epoll_raw_connection.c
b/c/src/proactor/epoll_raw_connection.c
index 350c16ba8..10870ffe7 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -48,6 +48,8 @@ struct praw_connection_t {
pn_event_batch_t batch;
struct addrinfo *addrinfo; /* Resolved address list */
struct addrinfo *ai; /* Current connect address */
+ int current_arm; /* Active epoll io events */
+ bool armed;
bool connected;
bool disconnected;
bool hup_detected;
@@ -101,7 +103,8 @@ static void praw_connection_start(praw_connection_t *prc,
int fd) {
pclosefd(prc->task.proactor, fd);
}
ee->fd = fd;
- ee->wanted = EPOLLIN | EPOLLOUT;
+ prc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
+ prc->armed = true;
start_polling(ee, efd); // TODO: check for error
}
@@ -148,6 +151,8 @@ static void praw_connection_init(praw_connection_t *prc,
pn_proactor_t *p, pn_ra
prc->connected = false;
prc->disconnected = false;
prc->first_schedule = false;
+ prc->armed = false;
+ prc->current_arm = 0;
prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;
@@ -173,6 +178,17 @@ static void praw_connection_cleanup(praw_connection_t
*prc) {
// else proactor_disconnect logic owns prc and its final free
}
+static void praw_initiate_cleanup(praw_connection_t *prc) {
+ if (prc->armed) {
+ // Possible race with epoll event. Wait for it to clear.
+ // Force EPOLLHUP callback if not already pending.
+ shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR);
+ return;
+ }
+ pni_raw_finalize(&prc->raw_connection);
+ praw_connection_cleanup(prc);
+}
+
pn_raw_connection_t *pn_raw_connection(void) {
praw_connection_t *conn = (praw_connection_t*) calloc(1,
sizeof(praw_connection_t));
if (!conn) return NULL;
@@ -400,10 +416,13 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t,
uint32_t io_events, bool
else
pni_task_wake_done(&rc->task); // Complete task wake without event.
}
+ if (io_events) {
+ rc->armed = false;
+ rc->current_arm = 0;
+ }
if (pni_raw_finished(&rc->raw_connection)) {
unlock(&rc->task.mutex);
- pni_raw_finalize(&rc->raw_connection);
- praw_connection_cleanup(rc);
+ praw_initiate_cleanup(rc);
return NULL;
}
int events = io_events;
@@ -492,8 +511,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
if (pni_raw_finished(raw) && !ready) {
// If raw connection has no more work to do and safe to free resources, do
so.
- pni_raw_finalize(raw);
- praw_connection_cleanup(rc);
+ praw_initiate_cleanup(rc);
} else if (ready) {
// Already scheduled to run. Skip poll. Remember if we want a read.
rc->read_check = pni_raw_can_read(raw);
@@ -509,12 +527,16 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// If wanted == 0 and hup_detected, blocking not possible, so skip arming
until
// application provides read buffers.
if (wanted || !rc->hup_detected) {
- rc->psocket.epoll_io.wanted = wanted;
- rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for
error
+ // Arm only if there is a change.
+ if (!rc->armed || (wanted != rc->current_arm)) {
+ rc->psocket.epoll_io.wanted = rc->current_arm = wanted;
+ rc->armed = true;
+ rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for
error
+ }
}
}
- // praw_connection_cleanup() may have been called above. Can no longer touch
rc or raw.
+ // praw_initiate_cleanup() may have been called above. Can no longer touch
rc or raw.
lock(&p->sched_mutex);
tslot_t *resume_thread;
@@ -525,6 +547,6 @@ void pni_raw_connection_done(praw_connection_t *rc) {
}
void pni_raw_connection_forced_shutdown(praw_connection_t *rc) {
- pni_raw_finalize(&rc->raw_connection);
- praw_connection_cleanup(rc);
+ rc->armed = false; // Tear down. No epoll event callbacks.
+ praw_initiate_cleanup(rc);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]