This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push: new 34ca28f NO-JIRA: Split out epoll proactor poller logic to separate routine for readability 34ca28f is described below commit 34ca28fae4a880151440c33312ca7f723441c2b2 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Tue Oct 20 00:04:13 2020 -0700 NO-JIRA: Split out epoll proactor poller logic to separate routine for readability --- c/src/proactor/epoll.c | 292 +++++++++++++++++++++++++------------------------ 1 file changed, 151 insertions(+), 141 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 01d9db8..4c00d47 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -101,7 +101,7 @@ // Maybe futex is even better? // See other "TODO" in code. // -// Consider case of large number of wakes: proactor_do_epoll() could start by +// Consider case of large number of wakes: next_event_batch() could start by // looking for pending wakes before a kernel call to epoll_wait(), or there // could be several eventfds with random assignment of wakeables. @@ -691,6 +691,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup); static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); +static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block); static void poller_done(struct pn_proactor_t* p, tslot_t *ts); static inline pconnection_t *psocket_pconnection(psocket_t* ps) { @@ -2580,7 +2581,7 @@ static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { return NULL; } -static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { +static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { lock(&p->tslot_mutex); tslot_t * ts = find_tslot(p); unlock(&p->tslot_mutex); @@ -2591,9 +2592,9 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { assert(ts->state == UNUSED || ts->state == NEW); ts->state = PROCESSING; + // Process outstanding epoll events until we get a batch or need to block. while (true) { - // Process outstanding epoll events until we get a batch or need to block. - + // First see if there are any contexts waiting to run and perhaps generate new Proton events, pcontext_t *ctx = next_runnable(p, ts); if (ctx) { ts->state = BATCHING; @@ -2611,155 +2612,164 @@ static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { continue; // Long time may have passed. Back to beginning. } - // poll or wait for a runnable context + // Poll or wait for a runnable context if (p->poller == NULL) { + bool return_immediately; p->poller = ts; - // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. - assert(p->n_runnables == 0); - if (p->thread_count > p->thread_capacity) - grow_poller_bufs(p); - p->next_runnable = 0; - p->n_warm_runnables = 0; - p->last_earmark = NULL; - - bool unfinished_earmarks = p->earmark_count > 0; - bool new_wakes = false; - bool epoll_immediate = unfinished_earmarks || !can_block; - assert(!p->sched_wake_first); - if (!epoll_immediate) { - lock(&p->eventfd_mutex); - if (p->wake_list_first) { - epoll_immediate = true; - new_wakes = true; - } else { - p->wakes_in_progress = false; - } - unlock(&p->eventfd_mutex); + // Get new epoll events (if any) and mark the relevant contexts as runnable + return_immediately = poller_do_epoll(p, ts, can_block); + p->poller = NULL; + if (return_immediately) { + // Check if another thread is available to continue epoll-ing. + tslot_t *res_ts = resume_one_thread(p); + ts->state = UNUSED; + unlock(&p->sched_mutex); + if (res_ts) resume(p, res_ts); + return NULL; } - int timeout = (epoll_immediate) ? 0 : -1; - p->poller_suspended = (timeout == -1); + poller_done(p, ts); // put suspended threads to work. + } else if (!can_block) { + ts->state = UNUSED; unlock(&p->sched_mutex); + return NULL; + } else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); + } + } // while +} - int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout); - - lock(&p->sched_mutex); - p->poller_suspended = false; +// Call with sched lock. Return true if !can_block and no new events to process. +static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) { + // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. + int n_events; + pcontext_t *ctx; - bool unpolled_work = false; - if (p->earmark_count > 0) { - p->earmark_drain = true; - unpolled_work = true; - } - if (new_wakes) { - lock(&p->eventfd_mutex); - schedule_wake_list(p); - unlock(&p->eventfd_mutex); - unpolled_work = true; + while (true) { + assert(p->n_runnables == 0); + if (p->thread_count > p->thread_capacity) + grow_poller_bufs(p); + p->next_runnable = 0; + p->n_warm_runnables = 0; + p->last_earmark = NULL; + + bool unfinished_earmarks = p->earmark_count > 0; + bool new_wakes = false; + bool epoll_immediate = unfinished_earmarks || !can_block; + assert(!p->sched_wake_first); + if (!epoll_immediate) { + lock(&p->eventfd_mutex); + if (p->wake_list_first) { + epoll_immediate = true; + new_wakes = true; + } else { + p->wakes_in_progress = false; } + unlock(&p->eventfd_mutex); + } + int timeout = (epoll_immediate) ? 0 : -1; + p->poller_suspended = (timeout == -1); + unlock(&p->sched_mutex); - if (n < 0) { - if (errno != EINTR) - perror("epoll_wait"); // TODO: proper log - if (!can_block && !unpolled_work) { - p->poller = NULL; - tslot_t *res_ts = resume_one_thread(p); - ts->state = UNUSED; - unlock(&p->sched_mutex); - if (res_ts) resume(p, res_ts); - return NULL; - } - else { - p->poller = NULL; - continue; - } - } else if (n == 0) { - if (!can_block && !unpolled_work) { - p->poller = NULL; - tslot_t *res_ts = resume_one_thread(p); - ts->state = UNUSED; - unlock(&p->sched_mutex); - if (res_ts) resume(p, res_ts); - return NULL; - } - else { - if (!epoll_immediate) - perror("epoll_wait unexpected timeout"); // TODO: proper log - if (!unpolled_work) { - p->poller = NULL; - continue; - } - } - } + n_events = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout); - for (int i = 0; i < n; i++) { - ctx = post_event(p, &p->kevents[i]); - if (ctx) - make_runnable(ctx); - } - if (n > 0) - memset(p->kevents, 0, sizeof(struct epoll_event) * n); - - // The list of pending wakes can be very long. Traverse part of it looking for warm pairings. - pcontext_t *wctx = p->sched_wake_current; - int max_runnables = p->runnables_capacity; - while (wctx && p->n_runnables < max_runnables) { - if (wctx->runner == REWAKE_PLACEHOLDER) - wctx->runner = NULL; // Allow context to run again. - ctx = post_wake(p, wctx); - if (ctx) - make_runnable(ctx); - pop_wake(wctx); - wctx = wctx->wake_next; - } - p->sched_wake_current = wctx; - // More wakes than places on the runnables list - while (wctx) { - if (wctx->runner == REWAKE_PLACEHOLDER) - wctx->runner = NULL; // Allow context to run again. - wctx->sched_wake = true; - wctx->sched_pending = true; - if (wctx->runnable || wctx->runner) - pop_wake(wctx); - wctx = wctx->wake_next; - } + lock(&p->sched_mutex); + p->poller_suspended = false; - if (pni_immediate && !ts->context) { - // Poller gets to run if possible - pcontext_t *pctx; - if (p->n_runnables) { - assert(p->next_runnable == 0); - pctx = p->runnables[0]; - if (++p->next_runnable == p->n_runnables) - p->n_runnables = 0; - } else if (p->n_warm_runnables) { - pctx = p->warm_runnables[--p->n_warm_runnables]; - tslot_t *ts2 = pctx->runner; - ts2->prev_context = ts2->context = NULL; - pctx->runner = NULL; - } else if (p->last_earmark) { - pctx = p->last_earmark->context; - remove_earmark(p->last_earmark); - if (p->earmark_count == 0) - p->earmark_drain = false; - } else { - pctx = NULL; - } - if (pctx) { - assign_thread(ts, pctx); - } + bool unpolled_work = false; + if (p->earmark_count > 0) { + p->earmark_drain = true; + unpolled_work = true; + } + if (new_wakes) { + lock(&p->eventfd_mutex); + schedule_wake_list(p); + unlock(&p->eventfd_mutex); + unpolled_work = true; + } + + if (n_events < 0) { + if (errno != EINTR) + perror("epoll_wait"); // TODO: proper log + if (!can_block && !unpolled_work) + return true; + else + continue; + } else if (n_events == 0) { + if (!can_block && !unpolled_work) + return true; + else { + if (!epoll_immediate) + perror("epoll_wait unexpected timeout"); // TODO: proper log + if (!unpolled_work) + continue; } + } - poller_done(p, ts); // put suspended threads to work. - // p->poller has been released, so a new poller may already be running. - } else if (!can_block) { - ts->state = UNUSED; - unlock(&p->sched_mutex); - return NULL; + break; + } + + // We have unpolled work or at least one new epoll event + + + for (int i = 0; i < n_events; i++) { + ctx = post_event(p, &p->kevents[i]); + if (ctx) + make_runnable(ctx); + } + if (n_events > 0) + memset(p->kevents, 0, sizeof(struct epoll_event) * n_events); + + // The list of pending wakes can be very long. Traverse part of it looking for warm pairings. + pcontext_t *wctx = p->sched_wake_current; + int max_runnables = p->runnables_capacity; + while (wctx && p->n_runnables < max_runnables) { + if (wctx->runner == REWAKE_PLACEHOLDER) + wctx->runner = NULL; // Allow context to run again. + ctx = post_wake(p, wctx); + if (ctx) + make_runnable(ctx); + pop_wake(wctx); + wctx = wctx->wake_next; + } + p->sched_wake_current = wctx; + // More wakes than places on the runnables list + while (wctx) { + if (wctx->runner == REWAKE_PLACEHOLDER) + wctx->runner = NULL; // Allow context to run again. + wctx->sched_wake = true; + wctx->sched_pending = true; + if (wctx->runnable || wctx->runner) + pop_wake(wctx); + wctx = wctx->wake_next; + } + + if (pni_immediate && !ts->context) { + // Poller gets to run if possible + pcontext_t *pctx; + if (p->n_runnables) { + assert(p->next_runnable == 0); + pctx = p->runnables[0]; + if (++p->next_runnable == p->n_runnables) + p->n_runnables = 0; + } else if (p->n_warm_runnables) { + pctx = p->warm_runnables[--p->n_warm_runnables]; + tslot_t *ts2 = pctx->runner; + ts2->prev_context = ts2->context = NULL; + pctx->runner = NULL; + } else if (p->last_earmark) { + pctx = p->last_earmark->context; + remove_earmark(p->last_earmark); + if (p->earmark_count == 0) + p->earmark_drain = false; } else { - // TODO: loop while !poller_suspended, since new work coming - suspend(p, ts); + pctx = NULL; } - } // while + if (pctx) { + assign_thread(ts, pctx); + } + } + return false; } // Call with sched lock, but only from poller context. @@ -2819,11 +2829,11 @@ static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { } pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { - return proactor_do_epoll(p, true); + return next_event_batch(p, true); } pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { - return proactor_do_epoll(p, false); + return next_event_batch(p, false); } // Call with no locks --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org