This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 0643d7c769744515d88f2eec629513ae308c52ad
Author: Cliff Jansen <[email protected]>
AuthorDate: Sun Apr 12 11:06:49 2026 -0700

    PROTON-2928: epoll proactor: fix amqp connection task scheduling if Proton 
events are pending
---
 c/src/proactor/epoll-internal.h |  4 ++--
 c/src/proactor/epoll.c          | 22 +++++++++++-----------
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 5a6a37ac3..7ae86f136 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -243,7 +243,7 @@ typedef struct pconnection_t {
   psocket_t psocket;
   pni_timer_t *timer;
   const char *host, *port;
-  uint32_t new_events;
+  uint32_t new_os_events;
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
@@ -352,7 +352,7 @@ static inline void pmutex_finalize(pthread_mutex_t *m) { 
pthread_mutex_destroy(m
 static inline void lock(pmutex *m) { pthread_mutex_lock(m); }
 static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
 
-static inline bool pconnection_has_event(pconnection_t *pc) {
+static inline bool pconnection_has_pn_event(pconnection_t *pc) {
   return pn_connection_driver_has_event(&pc->driver);
 }
 
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 6cc0d38f0..79662b239 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -800,7 +800,7 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   task_init(&pc->task, PCONNECTION, p);
   psocket_init(&pc->psocket, PCONNECTION_IO);
   pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
-  pc->new_events = 0;
+  pc->new_os_events = 0;
   pc->tick_pending = false;
   pc->queued_disconnect = false;
   pc->disconnect_condition = NULL;
@@ -905,7 +905,7 @@ static void pconnection_begin_close(pconnection_t *pc) {
 static void pconnection_forced_shutdown(pconnection_t *pc) {
   // Called by proactor_free, no competing threads, no epoll activity.
   pc->current_arm = 0;
-  pc->new_events = 0;
+  pc->new_os_events = 0;
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
   pc->task.ready = 0;
@@ -1011,7 +1011,8 @@ static inline void pconnection_rearm(pconnection_t *pc, 
int wanted_now) {
 
 /* Call with task lock and having done a write_flush() to "know" the value of 
wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending 
|| pc->queued_disconnect)
+  if (pc->new_os_events || pni_task_wake_pending(&pc->task) || 
pconnection_has_pn_event(pc) ||
+      pc->tick_pending || pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
@@ -1029,9 +1030,8 @@ static void pconnection_done(pconnection_t *pc) {
   pc->task.working = false;  // So we can schedule() ourself if necessary.  We 
remain the de facto
                              // working task instance while the lock is held.
   pc->hog_count = 0;
-  bool has_event = pconnection_has_event(pc);
 
-  if (has_event || pconnection_work_pending(pc)) {
+  if (pconnection_work_pending(pc)) {
     self_sched = true;
   } else if (pn_connection_driver_finished(&pc->driver)) {
     pconnection_begin_close(pc);
@@ -1141,7 +1141,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     pc->process_args = (pc->tick_pending << 1) | sched_ready;
   }
   if (events) {
-    pc->new_events = events;
+    pc->new_os_events = events;
     pc->current_arm = 0;
     events = 0;
   }
@@ -1185,7 +1185,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     }
   }
 
-  if (pconnection_has_event(pc)) {
+  if (pconnection_has_pn_event(pc)) {
     unlock(&pc->task.mutex);
     return &pc->batch;
   }
@@ -1199,10 +1199,10 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     tick_required = !closed;
   }
 
-  if (pc->new_events) {
-    uint32_t update_events = pc->new_events;
+  if (pc->new_os_events) {
+    uint32_t update_events = pc->new_os_events;
     pc->current_arm = 0;
-    pc->new_events = 0;
+    pc->new_os_events = 0;
     if (!pc->task.closing) {
       if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) 
&& !pconnection_wclosed(pc))
         pconnection_maybe_connect_lh(pc);
@@ -1290,7 +1290,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     return NULL;  // caller already owns the batch
   }
 
-  if (pconnection_has_event(pc)) {
+  if (pconnection_has_pn_event(pc)) {
     pc->output_drained = false;
     return &pc->batch;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to