This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 0643d7c769744515d88f2eec629513ae308c52ad Author: Cliff Jansen <[email protected]> AuthorDate: Sun Apr 12 11:06:49 2026 -0700 PROTON-2928: epoll proactor: fix amqp connection task scheduling if Proton events are pending --- c/src/proactor/epoll-internal.h | 4 ++-- c/src/proactor/epoll.c | 22 +++++++++++----------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 5a6a37ac3..7ae86f136 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -243,7 +243,7 @@ typedef struct pconnection_t { psocket_t psocket; pni_timer_t *timer; const char *host, *port; - uint32_t new_events; + uint32_t new_os_events; bool server; /* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ @@ -352,7 +352,7 @@ static inline void pmutex_finalize(pthread_mutex_t *m) { pthread_mutex_destroy(m static inline void lock(pmutex *m) { pthread_mutex_lock(m); } static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); } -static inline bool pconnection_has_event(pconnection_t *pc) { +static inline bool pconnection_has_pn_event(pconnection_t *pc) { return pn_connection_driver_has_event(&pc->driver); } diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6cc0d38f0..79662b239 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -800,7 +800,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con task_init(&pc->task, PCONNECTION, p); psocket_init(&pc->psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port); - pc->new_events = 0; + pc->new_os_events = 0; pc->tick_pending = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -905,7 +905,7 @@ static void pconnection_begin_close(pconnection_t *pc) { static void pconnection_forced_shutdown(pconnection_t *pc) { // Called by proactor_free, no competing threads, no epoll activity. pc->current_arm = 0; - pc->new_events = 0; + pc->new_os_events = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. pc->task.ready = 0; @@ -1011,7 +1011,8 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) + if (pc->new_os_events || pni_task_wake_pending(&pc->task) || pconnection_has_pn_event(pc) || + pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -1029,9 +1030,8 @@ static void pconnection_done(pconnection_t *pc) { pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto // working task instance while the lock is held. pc->hog_count = 0; - bool has_event = pconnection_has_event(pc); - if (has_event || pconnection_work_pending(pc)) { + if (pconnection_work_pending(pc)) { self_sched = true; } else if (pn_connection_driver_finished(&pc->driver)) { pconnection_begin_close(pc); @@ -1141,7 +1141,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->process_args = (pc->tick_pending << 1) | sched_ready; } if (events) { - pc->new_events = events; + pc->new_os_events = events; pc->current_arm = 0; events = 0; } @@ -1185,7 +1185,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } - if (pconnection_has_event(pc)) { + if (pconnection_has_pn_event(pc)) { unlock(&pc->task.mutex); return &pc->batch; } @@ -1199,10 +1199,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, tick_required = !closed; } - if (pc->new_events) { - uint32_t update_events = pc->new_events; + if (pc->new_os_events) { + uint32_t update_events = pc->new_os_events; pc->current_arm = 0; - pc->new_events = 0; + pc->new_os_events = 0; if (!pc->task.closing) { if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) pconnection_maybe_connect_lh(pc); @@ -1290,7 +1290,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, return NULL; // caller already owns the batch } - if (pconnection_has_event(pc)) { + if (pconnection_has_pn_event(pc)) { pc->output_drained = false; return &pc->batch; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
