This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 7b000f12448a93a7aa1168fbca416a036547a92a Author: Cliff Jansen <cjan...@redhat.com> AuthorDate: Wed Dec 11 10:19:08 2019 -0800 PROTON-2130: epoll proactor race/deadlock fixes --- c/src/proactor/epoll.c | 66 +++++++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 21c611f..9390595 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -87,6 +87,7 @@ #include <sys/eventfd.h> #include <limits.h> #include <time.h> +#include <alloca.h> #include "./netaddr-internal.h" /* Include after socket/inet headers */ @@ -940,6 +941,8 @@ static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); +static void poller_done(struct pn_proactor_t* p, tslot_t *ts); + static inline pconnection_t *psocket_pconnection(psocket_t* ps) { return ps->listener ? NULL : (pconnection_t*)ps; @@ -2928,14 +2931,36 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { } } - // Create a list of available threads to put to work. + poller_done(p, ts); // put suspended threads to work. + // p->poller has been released, so a new poller may already be running. + } else if (!can_block) { + ts->state = UNUSED; + unlock(&p->sched_mutex); + return NULL; + } else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); + } + } // while +} - int resume_list_count = 0; +// Call with sched lock, but only from poller context. +static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { + // Create a list of available threads to put to work. + // ts is the poller thread + int resume_list_count = 0; + tslot_t **resume_list2 = NULL; + + if (p->suspend_list_count) { + int max_resumes = p->n_warm_runnables + p->n_runnables; + max_resumes = pn_min(max_resumes, p->suspend_list_count); + if (max_resumes) { + resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *)); for (int i = 0; i < p->n_warm_runnables ; i++) { - ctx = p->warm_runnables[i]; + pcontext_t *ctx = p->warm_runnables[i]; tslot_t *tsp = ctx->runner; if (tsp->state == SUSPENDED) { - p->resume_list[resume_list_count++] = tsp; + resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; @@ -2956,34 +2981,25 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { for (int i = 0; i < new_runners; i++) { tslot_t *tsp = p->suspend_list_head; assert(tsp); - p->resume_list[resume_list_count++] = tsp; + resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); p->suspend_list_count--; tsp->state = PROCESSING; } + } + } + p->poller = NULL; - p->poller = NULL; - // New poller may run concurrently. Touch only this thread's stack for rest of block. - - if (resume_list_count) { - unlock(&p->sched_mutex); - for (int i = 0; i < resume_list_count; i++) { - resume(p, p->resume_list[i]); - } - lock(&p->sched_mutex); - } - } else if (!can_block) { - ts->state = UNUSED; - unlock(&p->sched_mutex); - return NULL; - } else { - // TODO: loop while !poller_suspended, since new work coming - suspend(p, ts); + if (resume_list_count) { + // Allows a new poller to run concurrently. Touch only stack vars. + unlock(&p->sched_mutex); + for (int i = 0; i < resume_list_count; i++) { + resume(p, resume_list2[i]); } - } // while + lock(&p->sched_mutex); + } } - pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { return proactor_do_epoll(p, true); } @@ -3042,12 +3058,12 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (wake(&p->context)) notify = true; + unlock(&p->context.mutex); lock(&p->sched_mutex); tslot_t *ts = p->context.runner; if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); - unlock(&p->context.mutex); if (notify) wake_notify(&p->context); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org