This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 996b9b114fdb4682c8114ad700705446ff3a24fd Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Wed Dec 15 17:54:51 2021 -0800 PROTON-2362: epoll proactor: remove ready list sneak peek optimization causing early task deletion --- c/src/proactor/epoll.c | 68 +++++++------------------------------------------- 1 file changed, 9 insertions(+), 59 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 6a5beb4..6207267 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -991,35 +991,6 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { // Return immediately. pc may have just been freed by another thread. } -/* Only call when context switch is imminent. Sched lock is highly contested. */ -// Call with both task and sched locks. -static bool pconnection_sched_sync(pconnection_t *pc) { - uint32_t sync_events = 0; - uint32_t sync_args = pc->tick_pending << 1; - if (pc->psocket.sched_io_events) { - pc->new_events = pc->psocket.sched_io_events; - pc->psocket.sched_io_events = 0; - pc->current_arm = 0; // or outside lock? - sync_events = pc->new_events; - } - if (pc->task.sched_ready) { - pc->task.sched_ready = false; - schedule_done(&pc->task); - sync_args |= 1; - } - pc->task.sched_pending = false; - - if (sync_args || sync_events) { - // Only replace if poller has found new work for us. - pc->process_args = (1 << 2) | sync_args; - pc->process_events = sync_events; - } - - // Indicate if there are free proactor threads - pn_proactor_t *p = pc->task.proactor; - return p->poller_suspended || p->suspend_list_head; -} - /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) @@ -1038,14 +1009,9 @@ static void pconnection_done(pconnection_t *pc) { bool self_sched = false; lock(&pc->task.mutex); pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto - // working task instance while the lock is held. Need sched_sync too to drain - // a possible stale sched_ready. + // working task instance while the lock is held. pc->hog_count = 0; bool has_event = pconnection_has_event(pc); - // Do as little as possible while holding the sched lock - lock(&p->sched_mutex); - pconnection_sched_sync(pc); - unlock(&p->sched_mutex); if (has_event || pconnection_work_pending(pc)) { self_sched = true; @@ -1295,9 +1261,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } // Never stop working while work remains. hog_count exception to this rule is elsewhere. - lock(&pc->task.proactor->sched_mutex); - bool workers_free = pconnection_sched_sync(pc); - unlock(&pc->task.proactor->sched_mutex); if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking @@ -1314,8 +1277,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } + // check one last time for new io before context switch + bool workers_free; + pn_proactor_t *p = pc->task.proactor; + lock(&p->sched_mutex); + workers_free = (p->suspend_list_head != NULL); + unlock(&p->sched_mutex); if (workers_free && !pc->task.closing && !pc->io_doublecheck) { - // check one last time for new io before context switch pc->io_doublecheck = true; pc->read_blocked = false; pc->write_blocked = false; @@ -1831,25 +1799,7 @@ static void listener_done(pn_listener_t *l) { bool notify = false; l->task.working = false; - lock(&p->sched_mutex); - int n_events = 0; - for (size_t i = 0; i < l->acceptors_size; i++) { - psocket_t *ps = &l->acceptors[i].psocket; - if (ps->sched_io_events) { - ps->working_io_events = ps->sched_io_events; - ps->sched_io_events = 0; - } - if (ps->working_io_events) - n_events++; - } - - if (l->task.sched_ready) { - l->task.sched_ready = false; - schedule_done(&l->task); - } - unlock(&p->sched_mutex); - - if (!n_events && listener_can_free(l)) { + if (listener_can_free(l)) { unlock(&l->task.mutex); pn_listener_free(l); lock(&p->sched_mutex); @@ -1859,7 +1809,7 @@ static void listener_done(pn_listener_t *l) { if (notify) notify_poller(p); if (resume_thread) resume(p, resume_thread); return; - } else if (n_events || listener_has_event(l)) + } else if (listener_has_event(l)) notify = schedule(&l->task); unlock(&l->task.mutex); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org