This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 4f8ac48c48e66d1650308bc1be694dae3215fc9a Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Wed Dec 15 17:43:30 2021 -0800 PROTON-2362: epoll proactor: prevent opportunistic warm task from running twice --- c/src/proactor/epoll-internal.h | 4 ++-- c/src/proactor/epoll.c | 46 +++++++++++++++++++++++++++-------------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 8e9e1b2..155277b 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -102,7 +102,7 @@ typedef struct task_t { tslot_t *prev_runner; bool sched_ready; bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* on one of the runnable lists */ + int runnables_idx; /* 0 means unset, idx-1 is array position */ } task_t; typedef enum { @@ -198,7 +198,7 @@ struct pn_proactor_t { task_t *resched_cutoff; // last resched task of current poller work snapshot. TODO: superseded by polled_resched_count? task_t *resched_next; unsigned int resched_count; - unsigned int polled_resched_count; + unsigned int polled_resched_count; pmutex tslot_mutex; int earmark_count; bool earmark_drain; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index ea2e25a..2363f48 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -47,6 +47,8 @@ non-proactor-task -> proactor-task tslot -> sched + TODO: doc new work: warm (assigned), earmarked (assigned), runnables (unordered), sched_ready + list (ordered), resched list (ordered). TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) transitions from "private to the scheduler" to "visible to the task". TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch. @@ -442,7 +444,7 @@ static void assign_thread(tslot_t *ts, task_t *tsk) { assert(!tsk->runner); tsk->runner = ts; tsk->prev_runner = NULL; - tsk->runnable = false; + tsk->runnables_idx = 0; ts->task = tsk; ts->prev_task = NULL; } @@ -539,10 +541,9 @@ static void remove_earmark(tslot_t *ts) { static void make_runnable(task_t *tsk) { pn_proactor_t *p = tsk->proactor; assert(p->n_runnables <= p->runnables_capacity); - assert(!tsk->runnable); + assert(!tsk->runnables_idx); if (tsk->runner) return; - tsk->runnable = true; // Track it as normal or warm or earmarked if (pni_warm_sched) { tslot_t *ts = tsk->prev_runner; @@ -552,8 +553,11 @@ static void make_runnable(task_t *tsk) { p->warm_runnables[p->n_warm_runnables++] = tsk; assign_thread(ts, tsk); } - else - p->runnables[p->n_runnables++] = tsk; + else { + p->runnables[p->n_runnables] = tsk; + tsk->runnables_idx = p->n_runnables + 1; // off by one accounting + p->n_runnables++; + } return; } if (ts->state == UNUSED && !p->earmark_drain) { @@ -563,7 +567,9 @@ static void make_runnable(task_t *tsk) { } } } - p->runnables[p->n_runnables++] = tsk; + p->runnables[p->n_runnables] = tsk; + tsk->runnables_idx = p->n_runnables + 1; // off by one accounting + p->n_runnables++; } @@ -2339,7 +2345,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { break; } } - if (tsk && !tsk->runnable && !tsk->runner && !on_sched_ready_list(tsk, p)) + if (tsk && !tsk->runnables_idx && !tsk->runner && !on_sched_ready_list(tsk, p)) return tsk; return NULL; } @@ -2348,7 +2354,7 @@ static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { static inline task_t *post_ready(pn_proactor_t *p, task_t *tsk) { tsk->sched_ready = true; tsk->sched_pending = true; - if (!tsk->runnable && !tsk->runner) + if (!tsk->runnables_idx && !tsk->runner) return tsk; return NULL; } @@ -2386,33 +2392,38 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { return ts->task; } - // warm pairing ? + // Take any runnables task if it results in a warm pairing. task_t *tsk = ts->prev_task; - if (tsk && (tsk->runnable)) { + if (tsk && (tsk->runnables_idx)) { + // A task can self delete, so don't allow it to run twice. + task_t **runnables_slot = &p->runnables[tsk->runnables_idx -1]; + assert(*runnables_slot == tsk); + *runnables_slot = NULL; assign_thread(ts, tsk); return tsk; } - // check for an unassigned runnable task or ready list task + // check for remaining runnable tasks if (p->n_runnables) { // Any unclaimed runnable? while (p->n_runnables) { tsk = p->runnables[p->next_runnable++]; if (p->n_runnables == p->next_runnable) p->n_runnables = 0; - if (tsk->runnable) { + if (tsk) { assign_thread(ts, tsk); return tsk; } } } + // rest of sched_ready list while (p->sched_ready_count) { tsk = p->sched_ready_current; assert(tsk->ready); // eventfd_mutex required post ready set and pre move to sched_ready_list if (post_ready(p, tsk)) { pop_ready_task(tsk); // updates sched_ready_current - assert(!tsk->runnable && !tsk->runner); + assert(!tsk->runnables_idx && !tsk->runner); assign_thread(ts, tsk); return tsk; } else { @@ -2420,10 +2431,11 @@ static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { } } + // the resched list if (p->polled_resched_count) { // Unprocessed resched tasks remain. tsk = resched_pop_front(p); - assert(tsk->sched_pending && !tsk->runnable && tsk->runner == RESCHEDULE_PLACEHOLDER); + assert(tsk->sched_pending && !tsk->runnables_idx && tsk->runner == RESCHEDULE_PLACEHOLDER); tsk->runner = NULL; assign_thread(ts, tsk); return tsk; @@ -2582,9 +2594,11 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block break; } - // We have unpolled work or at least one new epoll event + // We have unpolled work or at least one new epoll event. + // Remember tasks that together constitute new work. See note at beginning about duplicates. lock(&p->eventfd_mutex); + // Longest hold of eventfd_mutex. The following must be quick with no external calls: // post_event(), make_runnable(), assign_thread(), earmark_thread(). for (int i = 0; i < n_events; i++) { @@ -2620,7 +2634,7 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block while (p->resched_cutoff && p->n_runnables < max_runnables && warm_tries) { ctsk = resched_pop_front(p); - assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnable); + assert(ctsk->runner == RESCHEDULE_PLACEHOLDER && !ctsk->runnables_idx); ctsk->runner = NULL; // Allow task to run again. warm_tries--; make_runnable(ctsk); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org