This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new bbe9a4b15 PROTON-2832: proactor raw connection shutdown race with 
simultaneous epoll event
bbe9a4b15 is described below

commit bbe9a4b15476060beccff32fde8cda667a204dc0
Author: Clifford Jansen <[email protected]>
AuthorDate: Thu Aug 1 10:03:49 2024 -0700

    PROTON-2832: proactor raw connection shutdown race with simultaneous epoll 
event
---
 c/src/proactor/epoll_raw_connection.c | 42 ++++++++++++++++++++++++++---------
 1 file changed, 32 insertions(+), 10 deletions(-)

diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 350c16ba8..10870ffe7 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -48,6 +48,8 @@ struct praw_connection_t {
   pn_event_batch_t batch;
   struct addrinfo *addrinfo;         /* Resolved address list */
   struct addrinfo *ai;               /* Current connect address */
+  int current_arm;                   /* Active epoll io events */
+  bool armed;
   bool connected;
   bool disconnected;
   bool hup_detected;
@@ -101,7 +103,8 @@ static void praw_connection_start(praw_connection_t *prc, 
int fd) {
     pclosefd(prc->task.proactor, fd);
   }
   ee->fd = fd;
-  ee->wanted = EPOLLIN | EPOLLOUT;
+  prc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
+  prc->armed = true;
   start_polling(ee, efd);  // TODO: check for error
 }
 
@@ -148,6 +151,8 @@ static void praw_connection_init(praw_connection_t *prc, 
pn_proactor_t *p, pn_ra
   prc->connected = false;
   prc->disconnected = false;
   prc->first_schedule = false;
+  prc->armed = false;
+  prc->current_arm = 0;
   prc->taddr = NULL;
   prc->batch.next_event = pni_raw_batch_next;
 
@@ -173,6 +178,17 @@ static void praw_connection_cleanup(praw_connection_t 
*prc) {
   // else proactor_disconnect logic owns prc and its final free
 }
 
+static void praw_initiate_cleanup(praw_connection_t *prc) {
+  if (prc->armed) {
+    // Possible race with epoll event.  Wait for it to clear.
+    // Force EPOLLHUP callback if not already pending.
+    shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR);
+    return;
+  }
+  pni_raw_finalize(&prc->raw_connection);
+  praw_connection_cleanup(prc);
+}
+
 pn_raw_connection_t *pn_raw_connection(void) {
   praw_connection_t *conn = (praw_connection_t*) calloc(1, 
sizeof(praw_connection_t));
   if (!conn) return NULL;
@@ -400,10 +416,13 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
uint32_t io_events, bool
     else
       pni_task_wake_done(&rc->task);  // Complete task wake without event.
   }
+  if (io_events) {
+    rc->armed = false;
+    rc->current_arm = 0;
+  }
   if (pni_raw_finished(&rc->raw_connection)) {
     unlock(&rc->task.mutex);
-    pni_raw_finalize(&rc->raw_connection);
-    praw_connection_cleanup(rc);
+    praw_initiate_cleanup(rc);
     return NULL;
   }
   int events = io_events;
@@ -492,8 +511,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
 
   if (pni_raw_finished(raw) && !ready) {
     // If raw connection has no more work to do and safe to free resources, do 
so.
-    pni_raw_finalize(raw);
-    praw_connection_cleanup(rc);
+    praw_initiate_cleanup(rc);
   } else if (ready) {
     // Already scheduled to run.  Skip poll.  Remember if we want a read.
     rc->read_check = pni_raw_can_read(raw);
@@ -509,12 +527,16 @@ void pni_raw_connection_done(praw_connection_t *rc) {
     // If wanted == 0 and hup_detected, blocking not possible, so skip arming 
until
     // application provides read buffers.
     if (wanted || !rc->hup_detected) {
-      rc->psocket.epoll_io.wanted = wanted;
-      rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for 
error
+      // Arm only if there is a change.
+      if (!rc->armed || (wanted != rc->current_arm)) {
+        rc->psocket.epoll_io.wanted = rc->current_arm = wanted;
+        rc->armed = true;
+        rearm_polling(&rc->psocket.epoll_io, p->epollfd);  // TODO: check for 
error
+      }
     }
   }
 
-  // praw_connection_cleanup() may have been called above. Can no longer touch 
rc or raw.
+  // praw_initiate_cleanup() may have been called above. Can no longer touch 
rc or raw.
 
   lock(&p->sched_mutex);
   tslot_t *resume_thread;
@@ -525,6 +547,6 @@ void pni_raw_connection_done(praw_connection_t *rc) {
 }
 
 void pni_raw_connection_forced_shutdown(praw_connection_t *rc) {
-  pni_raw_finalize(&rc->raw_connection);
-  praw_connection_cleanup(rc);
+  rc->armed = false;  // Tear down. No epoll event callbacks.
+  praw_initiate_cleanup(rc);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to