This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 239a39eb2d04f0588081975a4f173bb5f121d1fa Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Sun Nov 21 12:37:32 2021 -0800 PROTON-2362: epoll proactor fix for tsan_tr1.txt. Check earmark edge case at same time and with same lock as for unassign_thread. --- c/src/proactor/epoll-internal.h | 3 +- c/src/proactor/epoll.c | 114 ++++++++++++++++++---------------- c/src/proactor/epoll_raw_connection.c | 4 +- 3 files changed, 65 insertions(+), 56 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 5f7ed9b..f0f57af 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -348,7 +348,7 @@ int pclosefd(pn_proactor_t *p, int fd); void proactor_add(task_t *tsk); bool proactor_remove(task_t *tsk); -bool unassign_thread(tslot_t *ts, tslot_state new_state); +bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot_t **resume_thread); void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p); static void task_finalize(task_t* tsk) { @@ -385,6 +385,7 @@ void pni_timer_manager_finalize(pni_timer_manager_t *tm); pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready); void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); +void pni_resume(pn_proactor_t *p, tslot_t *ts); // Generic wake primitives for a task. diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 1adaeb3..c481274 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -406,7 +406,26 @@ static void resume(pn_proactor_t *p, tslot_t *ts) { pthread_cond_signal(&ts->cond); } unlock(&ts->mutex); +} + +// Call with no lock +void pni_resume(pn_proactor_t *p, tslot_t *ts) { + resume(p, ts); +} + +// Call with shed_lock held +// Caller must resume() return value if not null +static tslot_t *resume_one_thread(pn_proactor_t *p) { + // If pn_proactor_get has an early return, we need to resume one suspended thread (if any) + // to be the new poller. + tslot_t *ts = p->suspend_list_head; + if (ts) { + LL_REMOVE(p, suspend_list, ts); + p->suspend_list_count--; + ts->state = PROCESSING; + } + return ts; } // Call with sched lock @@ -445,11 +464,14 @@ static bool reschedule(task_t *tsk) { return notify; } -// Call with sched lock -bool unassign_thread(tslot_t *ts, tslot_state new_state) { +// Call with sched lock. +// If true returned, caller must call notify_poller() after releasing the lock. +// If resume_thread is set, the caller must call resume() after releasing the lock. +bool unassign_thread(pn_proactor_t *p, tslot_t *ts, tslot_state new_state, tslot_t **resume_thread) { task_t *tsk = ts->task; bool notify = false; bool deleting = (ts->state == DELETING); + *resume_thread = NULL; ts->task = NULL; ts->state = new_state; if (tsk) { @@ -460,7 +482,6 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { // Check if unseen events or schedule() calls occurred while task was working. if (tsk && !deleting) { - pn_proactor_t *p = tsk->proactor; ts->prev_task = tsk; if (tsk->sched_pending) { // Make sure the task is already scheduled or put it on the ready list @@ -482,6 +503,19 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { } } } + + // Earmark drain accounting. + if (ts->earmark_override) { + // This thread "stole" the task previously assigned to thread ts->earmark_override. + if (ts->earmark_override->generation == ts->earmark_override_gen) { + // Other (overridden) thread not seen since this thread completed the pending work on the task. + // Thread is perhaps gone forever, which may leave us short of a poller thread and hanging. + // Find a thread to resume if available. Worst case is a spurious resume/suspend by an idle thread. + *resume_thread = resume_one_thread(p); + } + ts->earmark_override = NULL; + } + return notify; } @@ -1014,10 +1048,11 @@ static void pconnection_done(pconnection_t *pc) { pconnection_cleanup(pc); // pc may be undefined now lock(&p->sched_mutex); - notify = unassign_thread(ts, UNUSED); + tslot_t *resume_thread; + notify = unassign_thread(p, ts, UNUSED, &resume_thread); unlock(&p->sched_mutex); - if (notify) - notify_poller(p); + if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); return; } } @@ -1029,10 +1064,12 @@ static void pconnection_done(pconnection_t *pc) { if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return without touching pc again. lock(&p->sched_mutex); - if (unassign_thread(ts, UNUSED)) + tslot_t *resume_thread; + if (unassign_thread(p, ts, UNUSED, &resume_thread)) notify = true; unlock(&p->sched_mutex); if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); return; } @@ -1803,20 +1840,23 @@ static void listener_done(pn_listener_t *l) { unlock(&l->task.mutex); pn_listener_free(l); lock(&p->sched_mutex); - notify = unassign_thread(ts, UNUSED); + tslot_t *resume_thread; + notify = unassign_thread(p, ts, UNUSED, &resume_thread); unlock(&p->sched_mutex); - if (notify) - notify_poller(p); + if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); return; } else if (n_events || listener_has_event(l)) notify = schedule(&l->task); unlock(&l->task.mutex); lock(&p->sched_mutex); - if (unassign_thread(ts, UNUSED)) + tslot_t *resume_thread; + if (unassign_thread(p, ts, UNUSED, &resume_thread)) notify = true; unlock(&p->sched_mutex); if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { @@ -2167,21 +2207,6 @@ static tslot_t *find_tslot(pn_proactor_t *p) { return ts; } -// Call with shed_lock held -// Caller must resume() return value if not null -static tslot_t *resume_one_thread(pn_proactor_t *p) { - // If pn_proactor_get has an early return, we need to resume one suspended thread (if any) - // to be the new poller. - - tslot_t *ts = p->suspend_list_head; - if (ts) { - LL_REMOVE(p, suspend_list, ts); - p->suspend_list_count--; - ts->state = PROCESSING; - } - return ts; -} - // Called with sched lock, returns with sched lock still held. static pn_event_batch_t *process(task_t *tsk) { bool tsk_ready = false; @@ -2421,10 +2446,12 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { unlock(&p->sched_mutex); return batch; } - bool notify = unassign_thread(ts, PROCESSING); - if (notify) { + tslot_t *resume_thread; + bool notify = unassign_thread(p, ts, PROCESSING, &resume_thread); + if (notify || resume_thread) { unlock(&p->sched_mutex); - notify_poller(p); + if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); lock(&p->sched_mutex); } continue; // Long time may have passed. Back to beginning. @@ -2654,44 +2681,23 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { return next_event_batch(p, false); } -// Call with no locks -static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) { - if (!ts || !ts->earmark_override) - return; - if (ts->earmark_override->generation == ts->earmark_override_gen) { - // Other (overridden) thread not seen since this thread started and finished the event batch. - // Thread is perhaps gone forever, which may leave us short of a poller thread - lock(&p->sched_mutex); - tslot_t *res_ts = resume_one_thread(p); - unlock(&p->sched_mutex); - if (res_ts) resume(p, res_ts); - } - ts->earmark_override = NULL; -} - void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (pc) { - tslot_t *ts = pc->task.runner; pconnection_done(pc); // pc possibly freed/invalid - check_earmark_override(p, ts); return; } pn_listener_t *l = batch_listener(batch); if (l) { - tslot_t *ts = l->task.runner; listener_done(l); // l possibly freed/invalid - check_earmark_override(p, ts); return; } praw_connection_t *rc = pni_batch_raw_connection(batch); if (rc) { - tslot_t *ts = pni_raw_connection_task(rc)->runner; pni_raw_connection_done(rc); // rc possibly freed/invalid - check_earmark_override(p, ts); return; } pn_proactor_t *bp = batch_proactor(batch); @@ -2712,13 +2718,13 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { unlock(&p->task.mutex); lock(&p->sched_mutex); tslot_t *ts = p->task.runner; - if (unassign_thread(ts, UNUSED)) + tslot_t *resume_thread; + if (unassign_thread(p, ts, UNUSED, &resume_thread)) notify = true; unlock(&p->sched_mutex); - if (notify) - notify_poller(p); - check_earmark_override(p, ts); + if (notify) notify_poller(p); + if (resume_thread) resume(p, resume_thread); return; } } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 99c5dd9..94d6460 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -421,7 +421,9 @@ void pni_raw_connection_done(praw_connection_t *rc) { } lock(&p->sched_mutex); - notify |= unassign_thread(ts, UNUSED); + tslot_t *resume_thread; + notify |= unassign_thread(p, ts, UNUSED, &resume_thread); unlock(&p->sched_mutex); if (notify) notify_poller(p); + if (resume_thread) pni_resume(p, resume_thread); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org