This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 667f78c6d2ac17256ea67da54ff5703d424328f1 Author: Cliff Jansen <cjan...@redhat.com> AuthorDate: Thu Nov 7 00:55:50 2019 -0800 PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events --- c/src/proactor/epoll.c | 1627 ++++++++++++++++++++++++++++++++++----------- c/tests/proactor_test.cpp | 2 +- 2 files changed, 1258 insertions(+), 371 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 5cc3b65..d0664d9 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -19,6 +19,37 @@ * */ +/* + The epoll proactor works with multiple concurrent threads. If there is no work to do, + one thread temporarily becomes the poller thread, while other inbound threads suspend + waiting for work. The poller calls epoll_wait(), generates work lists, resumes suspended + threads, and grabs work for itself or polls again. + + A serialized grouping of Proton events is a context (connection, listener, proactor). + Each has multiple pollable fds that make it schedulable. E.g. a connection could have a + socket fd, timerfd, and (indirect) eventfd all signaled in a single epoll_wait(). + + At the conclusion of each + N = epoll_wait(..., N_MAX, timeout) + + there will be N epoll events and M wakes on the wake list. M can be very large in a + server with many active connections. The poller makes the contexts "runnable" if they are + not already running. A running context can only be made runnable once until it completes + a chunk of work and calls unassign_thread(). (N + M - duplicates) contexts will be + scheduled. A new poller will occur when next_runnable() returns NULL. + + A running context, before it stops "working" must check to see if there were new incoming + events that the poller posted to the context, but could not make it runnable since it was + already running. The context will know if it needs to put itself back on the wake list + to be runnable later to process the pending events. + + Lock ordering - never add locks right to left: + context -> sched -> wake + non-proactor-context -> proactor-context + tslot -> sched + */ + + /* Enable POSIX features beyond c99 for modern pthread and standard strerror_r() */ #ifndef _POSIX_C_SOURCE #define _POSIX_C_SOURCE 200809L @@ -26,8 +57,9 @@ /* Avoid GNU extensions, in particular the incompatible alternative strerror_r() */ #undef _GNU_SOURCE -#include "core/logger_private.h" #include "proactor-internal.h" +#include "core/logger_private.h" +#include "core/util.h" #include <proton/condition.h> #include <proton/connection_driver.h> @@ -92,9 +124,7 @@ static void pstrerror(int err, strerrorbuf msg) { // ======================================================================== // In general all locks to be held singly and shortly (possibly as spin locks). -// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire -// psocket first to avoid deadlock). TODO: revisit the exception and its -// awkwardness in the code (additional mutex? different type?). +// See above about lock ordering. typedef pthread_mutex_t pmutex; static void pmutex_init(pthread_mutex_t *pm){ @@ -113,14 +143,13 @@ static inline void lock(pmutex *m) { pthread_mutex_lock(m); } static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); } typedef struct acceptor_t acceptor_t; +typedef struct tslot_t tslot_t; typedef enum { WAKE, /* see if any work to do in proactor/psocket context */ PCONNECTION_IO, - PCONNECTION_IO_2, PCONNECTION_TIMER, LISTENER_IO, - CHAINED_EPOLL, PROACTOR_TIMER } epoll_type_t; // Data to use with epoll. @@ -161,6 +190,8 @@ static void memory_barrier(epoll_extended_t *ee) { * The original implementation with counters to track expiry counts * was abandoned in favor of "in doubt" transitions and resolution * at shutdown. + * + * TODO: review above in light of single poller thread. */ typedef struct ptimer_t { @@ -262,6 +293,14 @@ static void ptimer_finalize(ptimer_t *pt) { pmutex_finalize(&pt->mutex); } +pn_timestamp_t pn_i_now2(void) +{ + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000); +} + + // ======================================================================== // Proactor common code // ======================================================================== @@ -330,30 +369,6 @@ static void stop_polling(epoll_extended_t *ee, int epollfd) { * eventfd to allow a lock-free pn_proactor_interrupt() implementation. */ -/* - * **** epollfd and epollfd_2 **** - * - * This implementation allows multiple threads to call epoll_wait() - * concurrently (as opposed to having a single thread call - * epoll_wait() and feed work to helper threads). Unfortunately - * with this approach, it is not possible to change the event - * mask in one thread and be certain if zero or one callbacks occurred - * on the previous event mask. This can greatly complicate ordered - * shutdown. (See PROTON-1842) - * - * Currently, only pconnection sockets change between EPOLLIN, - * EPOLLOUT, or both. The rest use a constant EPOLLIN event mask. - * Instead of trying to change the event mask for pconnection sockets, - * if there is a missing attribute, it is added (EPOLLIN or EPOLLOUT) - * as an event mask on the secondary or chained epollfd_2. epollfd_2 - * is part of the epollfd fd set, so active events in epollfd_2 are - * also seen in epollfd (but require a separate epoll_wait() and - * rearm() to extract). - * - * Using this method and EPOLLONESHOT, it is possible to wait for all - * outstanding armings on a socket to "resolve" via epoll_wait() - * callbacks before freeing resources. - */ typedef enum { PROACTOR, PCONNECTION, @@ -366,7 +381,8 @@ typedef struct pcontext_t { void *owner; /* Instance governed by the context */ pcontext_type_t type; bool working; - int wake_ops; // unprocessed eventfd wake callback (convert to bool?) + bool on_wake_list; + bool wake_pending; // unprocessed eventfd wake callback (convert to bool?) struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex @@ -374,8 +390,43 @@ typedef struct pcontext_t { struct pcontext_t* prev; /* Protected by proactor.mutex */ int disconnect_ops; /* ops remaining before disconnect complete */ bool disconnecting; /* pn_proactor_disconnect */ + // Protected by schedule mutex + tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ + tslot_t *prev_runner; + bool sched_wake; + bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ + bool runnable ; /* in need of scheduling */ } pcontext_t; +typedef enum { + NEW, + UNUSED, /* pn_proactor_done() called, may never come back */ + SUSPENDED, + PROCESSING, /* Hunting for a context */ + BATCHING, /* Doing work on behalf of a context */ + DELETING, + POLLING } tslot_state; + +// Epoll proactor's concept of a worker thread provided by the application. +struct tslot_t { + pmutex mutex; // suspend and resume + pthread_cond_t cond; + unsigned int generation; + bool suspended; + volatile bool scheduled; + tslot_state state; + pcontext_t *context; + pcontext_t *prev_context; + bool earmarked; + tslot_t *suspend_list_prev; + tslot_t *suspend_list_next; + tslot_t *earmark_override; // on earmark_drain, which thread was unassigned + unsigned int earmark_override_gen; +}; + +// Fake thread for temporarily disabling the scheduling of a context. +static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1; + static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p, void *o) { memset(ctx, 0, sizeof(*ctx)); pmutex_init(&ctx->mutex); @@ -388,6 +439,7 @@ static void pcontext_finalize(pcontext_t* ctx) { pmutex_finalize(&ctx->mutex); } + /* common to connection and listener */ typedef struct psocket_t { pn_proactor_t *proactor; @@ -397,19 +449,17 @@ typedef struct psocket_t { pn_listener_t *listener; /* NULL for a connection socket */ char addr_buf[PN_MAX_ADDR]; const char *host, *port; + uint32_t sched_io_events; + uint32_t working_io_events; } psocket_t; struct pn_proactor_t { pcontext_t context; - int epollfd; - int epollfd_2; ptimer_t timer; - pn_collector_t *collector; - pcontext_t *contexts; /* in-use contexts for PN_PROACTOR_INACTIVE and cleanup */ epoll_extended_t epoll_wake; epoll_extended_t epoll_interrupt; - epoll_extended_t epoll_secondary; pn_event_batch_t batch; + pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() bool need_interrupt; @@ -418,7 +468,8 @@ struct pn_proactor_t { bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ bool timer_armed; /* timer is armed in epoll */ - bool shutting_down; + int context_count; + // wake subsystem int eventfd; pmutex eventfd_mutex; @@ -430,30 +481,188 @@ struct pn_proactor_t { // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. acceptor_t *overflow; pmutex overflow_mutex; + + // Sched vars specific to proactor context. + bool sched_timeout; + bool sched_interrupt; + + // Global scheduling/poller vars. + // Warm runnables have assigned or earmarked tslots and can run right away. + // Other runnables are run as tslots come available. + pmutex sched_mutex; + int n_runnables; + int next_runnable; + int n_warm_runnables; + tslot_t *suspend_list_head; + tslot_t *suspend_list_tail; + int suspend_list_count; + tslot_t *poller; + bool poller_suspended; + tslot_t *last_earmark; + pcontext_t *sched_wake_first; + pcontext_t *sched_wake_last; + pcontext_t *sched_wake_current; + pmutex tslot_mutex; + int earmark_count; + bool earmark_drain; + bool sched_wakes_pending; + + // Mostly read only: after init or once thread_count stabilizes + pn_collector_t *collector __attribute__((aligned(64))); + pcontext_t **warm_runnables; + pcontext_t **runnables; + tslot_t **resume_list; + pn_hash_t *tslot_map; + struct epoll_event *kevents; + int epollfd; + int thread_count; + int thread_capacity; + int runnables_capacity; + int kevents_capacity; + bool shutting_down; +}; + +typedef struct pconnection_t { + psocket_t psocket; + pcontext_t context; + uint32_t new_events; + int wake_count; + bool server; /* accept, not connect */ + bool tick_pending; + bool timer_armed; + bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + pn_condition_t *disconnect_condition; + ptimer_t timer; // TODO: review one timerfd per connection + // Following values only changed by (sole) working context: + uint32_t current_arm; // active epoll io events + bool connected; + bool read_blocked; + bool write_blocked; + bool disconnected; + int hog_count; // thread hogging limiter + pn_event_batch_t batch; + pn_connection_driver_t driver; + bool wbuf_valid; + const char *wbuf_current; + size_t wbuf_remaining; + size_t wbuf_completed; + struct pn_netaddr_t local, remote; /* Actual addresses */ + struct addrinfo *addrinfo; /* Resolved address list */ + struct addrinfo *ai; /* Current connect address */ + pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ + bool io_doublecheck; /* callbacks made and new IO may have arrived */ + bool sched_timeout; +} pconnection_t; + +/* + * A listener can have mutiple sockets (as specified in the addrinfo). They + * are armed separately. The individual psockets can be part of at most one + * list: the global proactor overflow retry list or the per-listener list of + * pending accepts (valid inbound socket obtained, but pn_listener_accept not + * yet called by the application). These lists will be small and quick to + * traverse. + */ + +struct acceptor_t{ + psocket_t psocket; + int accepted_fd; + bool armed; + bool overflowed; + acceptor_t *next; /* next listener list member */ + struct pn_netaddr_t addr; /* listening address */ +}; + +struct pn_listener_t { + acceptor_t *acceptors; /* Array of listening sockets */ + size_t acceptors_size; + int active_count; /* Number of listener sockets registered with epoll */ + pcontext_t context; + pn_condition_t *condition; + pn_collector_t *collector; + pn_event_batch_t batch; + pn_record_t *attachments; + void *listener_context; + acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/ + int pending_count; + bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */ + size_t backlog; + bool close_dispatched; + pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ + uint32_t sched_io_events; }; + static void rearm(pn_proactor_t *p, epoll_extended_t *ee); /* * Wake strategy with eventfd. * - wakees can be in the list only once - * - wakers only write() if wakes_in_progress is false - * - wakees only read() if about to set wakes_in_progress to false - * When multiple wakes are pending, the kernel cost is a single rearm(). - * Otherwise it is the trio of write/read/rearm. - * Only the writes and reads need to be carefully ordered. + * - wakers only use the eventfd if wakes_in_progress is false + * There is a single rearm between wakes > 0 and wakes == 0 + * + * There can potentially be many contexts with wakes pending. * - * Multiple eventfds could be used and shared amongst the pcontext_t's. + * The wake list is in two parts. The front is the chunk the + * scheduler will process until the next epoll_wait(). sched_wake + * indicates which chunk it is on. The ctx may already be running or + * scheduled to run. + * + * The ctx must be actually running to absorb ctx->wake_pending. + * + * The wake list can keep growing while popping wakes. The list between + * sched_wake_first and sched_wake_last are protected by the sched + * lock (for pop operations), sched_wake_last to wake_list_last are + * protected by the eventfd mutex (for add operations). Both locks + * are needed to cross or reconcile the two portions of the list. */ +// Call with sched lock held. +static void pop_wake(pcontext_t *ctx) { + // every context on the sched_wake_list is either currently running, + // or to be scheduled. wake() will not "see" any of the wake_next + // pointers until wake_pending and working have transitioned to 0 + // and false, when a context stops working. + // + // every context must transition as: + // + // !wake_pending .. wake() .. on wake_list .. on sched_wake_list .. working context .. !sched_wake && !wake_pending + // + // Intervening locks at each transition ensures wake_next has memory coherence throughout the wake cycle. + pn_proactor_t *p = ctx->proactor; + if (ctx == p->sched_wake_current) + p->sched_wake_current = ctx->wake_next; + if (ctx == p->sched_wake_first) { + // normal code path + if (ctx == p->sched_wake_last) { + p->sched_wake_first = p->sched_wake_last = NULL; + } else { + p->sched_wake_first = ctx->wake_next; + } + if (!p->sched_wake_first) + p->sched_wake_last = NULL; + } else { + // ctx is not first in a multi-element list + pcontext_t *prev = NULL; + for (pcontext_t *i = p->sched_wake_first; i != ctx; i = i->wake_next) + prev = i; + prev->wake_next = ctx->wake_next; + if (ctx == p->sched_wake_last) + p->sched_wake_last = prev; + } + ctx->on_wake_list = false; +} + // part1: call with ctx->owner lock held, return true if notify required by caller static bool wake(pcontext_t *ctx) { bool notify = false; - if (!ctx->wake_ops) { + + if (!ctx->wake_pending) { if (!ctx->working) { - ctx->wake_ops++; + ctx->wake_pending = true; pn_proactor_t *p = ctx->proactor; lock(&p->eventfd_mutex); + ctx->wake_next = NULL; + ctx->on_wake_list = true; if (!p->wake_list_first) { p->wake_list_first = p->wake_list_last = ctx; } else { @@ -468,50 +677,231 @@ static bool wake(pcontext_t *ctx) { unlock(&p->eventfd_mutex); } } + return notify; } // part2: make OS call without lock held static inline void wake_notify(pcontext_t *ctx) { - if (ctx->proactor->eventfd == -1) + pn_proactor_t *p = ctx->proactor; + if (p->eventfd == -1) return; - uint64_t increment = 1; - if (write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t)) - EPOLL_FATAL("setting eventfd", errno); + rearm(p, &p->epoll_wake); } -// call with no locks -static pcontext_t *wake_pop_front(pn_proactor_t *p) { - pcontext_t *ctx = NULL; - lock(&p->eventfd_mutex); - assert(p->wakes_in_progress); - if (p->wake_list_first) { - ctx = p->wake_list_first; - p->wake_list_first = ctx->wake_next; - if (!p->wake_list_first) p->wake_list_last = NULL; - ctx->wake_next = NULL; - - if (!p->wake_list_first) { - /* Reset the eventfd until a future write. - * Can the read system call be made without holding the lock? - * Note that if the reads/writes happen out of order, the wake - * mechanism will hang. */ - (void)read_uint64(p->eventfd); - p->wakes_in_progress = false; +// call with owner lock held, once for each pop from the wake list +static inline void wake_done(pcontext_t *ctx) { +// assert(ctx->wake_pending > 0); + ctx->wake_pending = false; +} + + +/* + * Scheduler/poller +*/ + +// Internal use only +/* How long to defer suspending */ +static int pni_spins = 0; +/* Prefer immediate running by poller over warm running by suspended thread */ +static bool pni_immediate = false; +/* Toggle use of warm scheduling */ +static int pni_warm_sched = true; + + +// Call with sched lock +static void suspend_list_add_tail(pn_proactor_t *p, tslot_t *ts) { + LL_ADD(p, suspend_list, ts); +} + +// Call with sched lock +static void suspend_list_insert_head(pn_proactor_t *p, tslot_t *ts) { + ts->suspend_list_next = p->suspend_list_head; + ts->suspend_list_prev = NULL; + if (p->suspend_list_head) + p->suspend_list_head->suspend_list_prev = ts; + else + p->suspend_list_tail = ts; + p->suspend_list_head = ts; +} + +// Call with sched lock +static void suspend(pn_proactor_t *p, tslot_t *ts) { + if (ts->state == NEW) + suspend_list_add_tail(p, ts); + else + suspend_list_insert_head(p, ts); + p->suspend_list_count++; + ts->state = SUSPENDED; + ts->scheduled = false; + unlock(&p->sched_mutex); + + lock(&ts->mutex); + if (pni_spins && !ts->scheduled) { + // Medium length spinning tried here. Raises cpu dramatically, + // unclear throughput or latency benefit (not seen where most + // expected, modest at other times). + bool locked = true; + for (volatile int i = 0; i < pni_spins; i++) { + if (locked) { + unlock(&ts->mutex); + locked = false; + } + if ((i % 1000) == 0) { + locked = (pthread_mutex_trylock(&ts->mutex) == 0); + } + if (ts->scheduled) break; } + if (!locked) + lock(&ts->mutex); + } + + ts->suspended = true; + while (!ts->scheduled) { + pthread_cond_wait(&ts->cond, &ts->mutex); + } + ts->suspended = false; + unlock(&ts->mutex); + lock(&p->sched_mutex); + assert(ts->state == PROCESSING); +} + +// Call with no lock +static void resume(pn_proactor_t *p, tslot_t *ts) { + lock(&ts->mutex); + ts->scheduled = true; + if (ts->suspended) { + pthread_cond_signal(&ts->cond); + } + unlock(&ts->mutex); + +} + +// Call with sched lock +static void assign_thread(tslot_t *ts, pcontext_t *ctx) { + assert(!ctx->runner); + ctx->runner = ts; + ctx->prev_runner = NULL; + ctx->runnable = false; + ts->context = ctx; + ts->prev_context = NULL; +} + +// call with sched lock +static bool rewake(pcontext_t *ctx) { + // Special case wake() where context is unassigned and a popped wake needs to be put back on the list. + // Should be rare. + bool notify = false; + pn_proactor_t *p = ctx->proactor; + lock(&p->eventfd_mutex); + assert(ctx->wake_pending); + assert(!ctx->on_wake_list); + ctx->wake_next = NULL; + ctx->on_wake_list = true; + if (!p->wake_list_first) { + p->wake_list_first = p->wake_list_last = ctx; + } else { + p->wake_list_last->wake_next = ctx; + p->wake_list_last = ctx; + } + if (!p->wakes_in_progress) { + // force a wakeup via the eventfd + p->wakes_in_progress = true; + notify = true; } unlock(&p->eventfd_mutex); - rearm(p, &p->epoll_wake); - return ctx; + return notify; } -// call with owner lock held, once for each pop from the wake list -static inline void wake_done(pcontext_t *ctx) { - assert(ctx->wake_ops > 0); - ctx->wake_ops--; +// Call with sched lock +static bool unassign_thread(tslot_t *ts, tslot_state new_state) { + pcontext_t *ctx = ts->context; + bool notify = false; + bool deleting = (ts->state == DELETING); + ts->context = NULL; + ts->state = new_state; + if (ctx) { + ctx->runner = NULL; + ctx->prev_runner = ts; + } + + // Check if context has unseen events/wake that need processing. + + if (ctx && !deleting) { + pn_proactor_t *p = ctx->proactor; + ts->prev_context = ts->context; + if (ctx->sched_pending) { + // Need a new wake + if (ctx->sched_wake) { + if (!ctx->on_wake_list) { + // Remember it for next poller + ctx->sched_wake = false; + notify = rewake(ctx); // back on wake list for poller to see + } + // else already scheduled + } else { + // bad corner case. Block ctx from being scheduled again until a later post_wake() + ctx->runner = REWAKE_PLACEHOLDER; + unlock(&p->sched_mutex); + lock(&ctx->mutex); + notify = wake(ctx); + unlock(&ctx->mutex); + lock(&p->sched_mutex); + } + } + } + return notify; +} + +// Call with sched lock +static void earmark_thread(tslot_t *ts, pcontext_t *ctx) { + assign_thread(ts, ctx); + ts->earmarked = true; + ctx->proactor->earmark_count++; +} + +// Call with sched lock +static void remove_earmark(tslot_t *ts) { + pcontext_t *ctx = ts->context; + ts->context = NULL; + ctx->runner = NULL; + ts->earmarked = false; + ctx->proactor->earmark_count--; +} + +// Call with sched lock +static void make_runnable(pcontext_t *ctx) { + pn_proactor_t *p = ctx->proactor; + assert(p->n_runnables <= p->runnables_capacity); + assert(!ctx->runnable); + if (ctx->runner) return; + + ctx->runnable = true; + // Track it as normal or warm or earmarked + if (pni_warm_sched) { + tslot_t *ts = ctx->prev_runner; + if (ts && ts->prev_context == ctx) { + if (ts->state == SUSPENDED || ts->state == PROCESSING) { + if (p->n_warm_runnables < p->thread_capacity) { + p->warm_runnables[p->n_warm_runnables++] = ctx; + assign_thread(ts, ctx); + } + else + p->runnables[p->n_runnables++] = ctx; + return; + } + if (ts->state == UNUSED && !p->earmark_drain) { + earmark_thread(ts, ctx); + p->last_earmark = ts; + return; + } + } + } + p->runnables[p->n_runnables++] = ctx; } + static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener, const char *addr) { ps->epoll_io.psocket = ps; @@ -525,35 +915,6 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listene pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port); } -typedef struct pconnection_t { - psocket_t psocket; - pcontext_t context; - uint32_t new_events; - uint32_t new_events_2; - int wake_count; - bool server; /* accept, not connect */ - bool tick_pending; - bool timer_armed; - bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ - pn_condition_t *disconnect_condition; - ptimer_t timer; // TODO: review one timerfd per connection - // Following values only changed by (sole) working context: - uint32_t current_arm; // active epoll io events - uint32_t current_arm_2; // secondary active epoll io events - bool connected; - bool read_blocked; - bool write_blocked; - bool disconnected; - int hog_count; // thread hogging limiter - pn_event_batch_t batch; - pn_connection_driver_t driver; - struct pn_netaddr_t local, remote; /* Actual addresses */ - struct addrinfo *addrinfo; /* Resolved address list */ - struct addrinfo *ai; /* Current connect address */ - pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ - epoll_extended_t epoll_io_2; - epoll_extended_t *rearm_target; /* main or secondary epollfd */ -} pconnection_t; /* Protects read/update of pn_connection_t pointer to it's pconnection_t * @@ -580,43 +941,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { unlock(&driver_ptr_mutex); } -/* - * A listener can have multiple sockets (as specified in the addrinfo). They - * are armed separately. The individual psockets can be part of at most one - * list: the global proactor overflow retry list or the per-listener list of - * pending accepts (valid inbound socket obtained, but pn_listener_accept not - * yet called by the application). These lists will be small and quick to - * traverse. - */ - -struct acceptor_t{ - psocket_t psocket; - int accepted_fd; - bool armed; - bool overflowed; - acceptor_t *next; /* next listener list member */ - struct pn_netaddr_t addr; /* listening address */ -}; - -struct pn_listener_t { - acceptor_t *acceptors; /* Array of listening sockets */ - size_t acceptors_size; - int active_count; /* Number of listener sockets registered with epoll */ - pcontext_t context; - pn_condition_t *condition; - pn_collector_t *collector; - pn_event_batch_t batch; - pn_record_t *attachments; - void *listener_context; - acceptor_t *pending_acceptors; /* list of those with a valid inbound fd*/ - int pending_count; - bool unclaimed; /* attach event dispatched but no pn_listener_attach() call yet */ - size_t backlog; - bool close_dispatched; - pmutex rearm_mutex; /* orders rearms/disarms, nothing else */ -}; - -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup, bool is_io_2); +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup); static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static void proactor_add(pcontext_t *ctx); @@ -705,24 +1030,6 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { EPOLL_FATAL("arming polled file descriptor", errno); } -// Only used by pconnection_t if two separate epoll interests in play -static void rearm_2(pn_proactor_t *p, epoll_extended_t *ee) { - // Delay registration until first use. It's not OK to register or arm - // with an event mask of 0 (documented below). It is OK to leave a - // disabled event registered until the next EPOLLONESHOT. - if (!ee->polling) { - ee->fd = ee->psocket->sockfd; - start_polling(ee, p->epollfd_2); - } else { - struct epoll_event ev = {0}; - ev.data.ptr = ee; - ev.events = ee->wanted | EPOLLONESHOT; - memory_barrier(ee); - if (epoll_ctl(p->epollfd_2, EPOLL_CTL_MOD, ee->fd, &ev) == -1) - EPOLL_FATAL("arming polled file descriptor (secondary)", errno); - } -} - static void listener_list_append(acceptor_t **start, acceptor_t *item) { assert(item->next == NULL); if (*start) { @@ -810,7 +1117,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pcontext_init(&pc->context, PCONNECTION, p, pc); psocket_init(&pc->psocket, p, NULL, addr); pc->new_events = 0; - pc->new_events_2 = 0; pc->wake_count = 0; pc->tick_pending = false; pc->timer_armed = false; @@ -818,11 +1124,14 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->disconnect_condition = NULL; pc->current_arm = 0; - pc->current_arm_2 = 0; pc->connected = false; pc->read_blocked = true; pc->write_blocked = true; pc->disconnected = false; + pc->wbuf_valid = false; + pc->wbuf_completed = 0; + pc->wbuf_remaining = 0; + pc->wbuf_current = NULL; pc->hog_count = 0; pc->batch.next_event = pconnection_batch_next; @@ -836,13 +1145,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con } pmutex_init(&pc->rearm_mutex); - epoll_extended_t *ee = &pc->epoll_io_2; - ee->psocket = &pc->psocket; - ee->fd = -1; - ee->type = PCONNECTION_IO_2; - ee->wanted = 0; - ee->polling = false; - /* Set the pconnection_t backpointer last. Connections that were released by pn_proactor_release_connection() must not reveal themselves to be re-associated with a proactor till setup is complete. @@ -855,7 +1157,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->current_arm_2 && !pc->timer_armed && !pc->context.wake_ops; + return !pc->current_arm && !pc->timer_armed && !pc->context.wake_pending; } static void pconnection_final_free(pconnection_t *pc) { @@ -875,13 +1177,16 @@ static void pconnection_final_free(pconnection_t *pc) { free(pc); } -// call without lock, but only if pconnection_is_final() is true + +// call without lock static void pconnection_cleanup(pconnection_t *pc) { + assert(pconnection_is_final(pc)); stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); if (pc->psocket.sockfd != -1) pclosefd(pc->psocket.proactor, pc->psocket.sockfd); stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd); ptimer_finalize(&pc->timer); + lock(&pc->context.mutex); bool can_free = proactor_remove(&pc->context); unlock(&pc->context.mutex); @@ -890,11 +1195,33 @@ static void pconnection_cleanup(pconnection_t *pc) { // else proactor_disconnect logic owns psocket and its final free } +static void invalidate_wbuf(pconnection_t *pc) { + if (pc->wbuf_valid) { + if (pc->wbuf_completed) + pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed); + pc->wbuf_completed = 0; + pc->wbuf_remaining = 0; + pc->wbuf_valid = false; + } +} + +// Never call with any locks held. +static void ensure_wbuf(pconnection_t *pc) { + if (!pc->wbuf_valid) { + // next connection_driver call is the expensive output generator + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); + pc->wbuf_completed = 0; + pc->wbuf_remaining = wbuf.size; + pc->wbuf_current = wbuf.start; + pc->wbuf_valid = true; + } +} + // Call with lock held or from forced_shutdown static void pconnection_begin_close(pconnection_t *pc) { if (!pc->context.closing) { pc->context.closing = true; - if (pc->current_arm || pc->current_arm_2) { + if (pc->current_arm) { // Force EPOLLHUP callback(s) shutdown(pc->psocket.sockfd, SHUT_RDWR); } @@ -914,12 +1241,10 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { // Called by proactor_free, no competing threads, no epoll activity. pc->current_arm = 0; pc->new_events = 0; - pc->current_arm_2 = 0; - pc->new_events_2 = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. pc->timer_armed = false; - pc->context.wake_ops = 0; + pc->context.wake_pending = 0; pn_collector_release(pc->driver.collector); assert(pconnection_is_final(pc)); pconnection_cleanup(pc); @@ -930,14 +1255,28 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { if (!pc->driver.connection) return NULL; pn_event_t *e = pn_connection_driver_next_event(&pc->driver); if (!e) { - write_flush(pc); // May generate transport event - e = pn_connection_driver_next_event(&pc->driver); - if (!e && pc->hog_count < HOG_MAX) { - if (pconnection_process(pc, 0, false, true, false)) { + pn_proactor_t *p = pc->context.proactor; + bool idle_threads; + lock(&p->sched_mutex); + idle_threads = (p->suspend_list_head != NULL); + unlock(&p->sched_mutex); + if (idle_threads) { + write_flush(pc); // May generate transport event + pc->read_blocked = pc->write_blocked = false; + pconnection_process(pc, 0, false, false, true); + e = pn_connection_driver_next_event(&pc->driver); + } + else { + write_flush(pc); // May generate transport event + e = pn_connection_driver_next_event(&pc->driver); + if (!e && pc->hog_count < HOG_MAX) { + pconnection_process(pc, 0, false, false, true); e = pn_connection_driver_next_event(&pc->driver); } } } + if (e) invalidate_wbuf(pc); + return e; } @@ -960,7 +1299,7 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic. */ static bool pconnection_rearm_check(pconnection_t *pc) { - if (pc->current_arm && pc->current_arm_2) return false; // Maxed out + assert(pc->wbuf_valid); if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { return false; } @@ -969,79 +1308,121 @@ static bool pconnection_rearm_check(pconnection_t *pc) { if (pc->write_blocked) wanted_now |= EPOLLOUT; else { - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - if (wbuf.size > 0) + if (pc->wbuf_remaining > 0) wanted_now |= EPOLLOUT; } } if (!wanted_now) return false; - - uint32_t have_now = pc->current_arm ? pc->current_arm : pc->current_arm_2; - uint32_t needed = wanted_now & ~have_now; - if (!needed) return false; + if (wanted_now == pc->current_arm) return false; lock(&pc->rearm_mutex); /* unlocked in pconnection_rearm... */ - // Always favour main epollfd - if (!pc->current_arm) { - pc->current_arm = pc->psocket.epoll_io.wanted = needed; - pc->rearm_target = &pc->psocket.epoll_io; - } else { - pc->current_arm_2 = pc->epoll_io_2.wanted = needed; - pc->rearm_target = &pc->epoll_io_2; - } + pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now; return true; /* ... so caller MUST call pconnection_rearm */ } /* Call without lock */ static inline void pconnection_rearm(pconnection_t *pc) { - if (pc->rearm_target == &pc->psocket.epoll_io) { - rearm(pc->psocket.proactor, pc->rearm_target); - } else { - rearm_2(pc->psocket.proactor, pc->rearm_target); - } - pc->rearm_target = NULL; + rearm(pc->psocket.proactor, &pc->psocket.epoll_io); unlock(&pc->rearm_mutex); // Return immediately. pc may have just been freed by another thread. } +/* Only call when context switch is imminent. Sched lock is highly contested. */ +// Call with both context and sched locks. +static bool pconnection_sched_sync(pconnection_t *pc) { + if (pc->sched_timeout) { + pc->tick_pending = true; + pc->sched_timeout = false; + } + if (pc->psocket.sched_io_events) { + pc->new_events = pc->psocket.sched_io_events; + pc->psocket.sched_io_events = 0; + pc->current_arm = 0; // or outside lock? + } + if (pc->context.sched_wake) { + pc->context.sched_wake = false; + wake_done(&pc->context); + } + pc->context.sched_pending = false; + + // Indicate if there are free proactor threads + pn_proactor_t *p = pc->context.proactor; + return p->poller_suspended || p->suspend_list_head; +} + +/* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pc->new_events_2 || pc->wake_count || pc->tick_pending || pc->queued_disconnect) + assert(pc->wbuf_valid); + if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - return (wbuf.size > 0 && !pc->write_blocked); + return (pc->wbuf_remaining > 0 && !pc->write_blocked); } +/* Call with no locks. */ static void pconnection_done(pconnection_t *pc) { + pn_proactor_t *p = pc->context.proactor; + tslot_t *ts = pc->context.runner; + write_flush(pc); bool notify = false; + bool self_wake = false; lock(&pc->context.mutex); pc->context.working = false; // So we can wake() ourself if necessary. We remain the de facto - // working context while the lock is held. + // working context while the lock is held. Need sched_sync too to drain possible stale wake. pc->hog_count = 0; - if (pconnection_has_event(pc) || pconnection_work_pending(pc)) { - notify = wake(&pc->context); + bool has_event = pconnection_has_event(pc); + // Do as little as possible while holding the sched lock + lock(&p->sched_mutex); + pconnection_sched_sync(pc); + unlock(&p->sched_mutex); + + if (has_event || pconnection_work_pending(pc)) { + self_wake = true; } else if (pn_connection_driver_finished(&pc->driver)) { pconnection_begin_close(pc); if (pconnection_is_final(pc)) { unlock(&pc->context.mutex); pconnection_cleanup(pc); + // pc may be undefined now + lock(&p->sched_mutex); + notify = unassign_thread(ts, UNUSED); + unlock(&p->sched_mutex); + if (notify) + wake_notify(&p->context); return; } } + if (self_wake) + notify = wake(&pc->context); + bool rearm = pconnection_rearm_check(pc); unlock(&pc->context.mutex); - if (notify) wake_notify(&pc->context); + if (rearm) pconnection_rearm(pc); // May free pc on another thread. Return. + lock(&p->sched_mutex); + if (unassign_thread(ts, UNUSED)) + notify = true; + unlock(&p->sched_mutex); + if (notify) wake_notify(&p->context); return; } // Return true unless error -static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) { - ssize_t n = send(pc->psocket.sockfd, wbuf.start, wbuf.size, MSG_NOSIGNAL); + static bool pconnection_write(pconnection_t *pc) { + size_t wbuf_size = pc->wbuf_remaining; + ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL); if (n > 0) { - pn_connection_driver_write_done(&pc->driver, n); - if ((size_t) n < wbuf.size) pc->write_blocked = true; + pc->wbuf_completed += n; + pc->wbuf_remaining -= n; + pc->io_doublecheck = false; + if (pc->wbuf_remaining) + pc->write_blocked = true; + else { + // No need to aggregate multiple writes + pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed); + pc->wbuf_completed = 0; + } } else if (errno == EWOULDBLOCK) { pc->write_blocked = true; } else if (!(errno == EAGAIN || errno == EINTR)) { @@ -1050,11 +1431,12 @@ static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) { return true; } +// Never call with any locks held. static void write_flush(pconnection_t *pc) { + ensure_wbuf(pc); if (!pc->write_blocked && !pconnection_wclosed(pc)) { - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - if (wbuf.size > 0) { - if (!pconnection_write(pc, wbuf)) { + if (pc->wbuf_remaining > 0) { + if (!pconnection_write(pc)) { psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to"); } } @@ -1070,24 +1452,14 @@ static void write_flush(pconnection_t *pc) { static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); -/* - * May be called concurrently from multiple threads: - * pn_event_batch_t loop (topup is true) - * timer (timeout is true) - * socket io (events != 0) from PCONNECTION_IO - * and PCONNECTION_IO_2 event masks (possibly simultaneously) - * one or more wake() - * Only one thread becomes (or always was) the working thread. - */ -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup, bool is_io_2) { - bool inbound_wake = !(events | timeout | topup); +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool sched_wake, bool topup) { + bool inbound_wake = sched_wake; bool rearm_timer = false; bool timer_fired = false; bool waking = false; bool tick_required = false; // Don't touch data exclusive to working thread (yet). - if (timeout) { rearm_timer = true; timer_fired = ptimer_callback(&pc->timer) != 0; @@ -1095,17 +1467,15 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, lock(&pc->context.mutex); if (events) { - if (is_io_2) - pc->new_events_2 = events; - else - pc->new_events = events; + pc->new_events = events; + pc->current_arm = 0; events = 0; } - else if (timer_fired) { + if (timer_fired) { pc->tick_pending = true; timer_fired = false; } - else if (inbound_wake) { + if (inbound_wake) { wake_done(&pc->context); inbound_wake = false; } @@ -1120,9 +1490,8 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } else { if (pc->context.working) { - // Another thread is the working context. - unlock(&pc->context.mutex); - return NULL; + // Another thread is the working context. Should be impossible with new scheduler. + EPOLL_FATAL("internal epoll proactor error: two worker threads", 0); } pc->context.working = true; } @@ -1155,18 +1524,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, tick_required = !closed; } - uint32_t update_events = 0; if (pc->new_events) { - update_events = pc->new_events; + uint32_t update_events = pc->new_events; pc->current_arm = 0; pc->new_events = 0; - } - if (pc->new_events_2) { - update_events |= pc->new_events_2; - pc->current_arm_2 = 0; - pc->new_events_2 = 0; - } - if (update_events) { if (!pc->context.closing) { if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) pconnection_maybe_connect_lh(pc); @@ -1201,11 +1562,12 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); if (rbuf.size > 0 && !pc->read_blocked) { ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size); - if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); + invalidate_wbuf(pc); pconnection_tick(pc); /* check for tick changes. */ tick_required = false; + pc->io_doublecheck = false; if (!pn_connection_driver_read_closed(&pc->driver) && (size_t)n < rbuf.size) pc->read_blocked = true; } @@ -1223,6 +1585,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (tick_required) { pconnection_tick(pc); /* check for tick changes. */ tick_required = false; + invalidate_wbuf(pc); } if (topup) { @@ -1231,6 +1594,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (pconnection_has_event(pc)) { + invalidate_wbuf(pc); return &pc->batch; } @@ -1244,8 +1608,13 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } // Never stop working while work remains. hog_count exception to this rule is elsewhere. - if (pconnection_work_pending(pc)) + lock(&pc->context.proactor->sched_mutex); + bool workers_free = pconnection_sched_sync(pc); + unlock(&pc->context.proactor->sched_mutex); + + if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking + } pc->context.working = false; pc->hog_count = 0; @@ -1258,6 +1627,15 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } + if (workers_free && !pc->context.closing && !pc->io_doublecheck) { + // check one last time for new io before context switch + pc->io_doublecheck = true; + pc->read_blocked = false; + pc->write_blocked = false; + pc->context.working = true; + goto retry; + } + if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) { pc->timer_armed = true; rearm(pc->psocket.proactor, &pc->timer.epoll_io); @@ -1564,7 +1942,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in // call with lock held and context.working false static inline bool listener_can_free(pn_listener_t *l) { - return l->context.closing && l->close_dispatched && !l->context.wake_ops && !l->active_count; + return l->context.closing && l->close_dispatched && !l->context.wake_pending && !l->active_count; } static inline void listener_final_free(pn_listener_t *l) { @@ -1651,7 +2029,7 @@ static void listener_forced_shutdown(pn_listener_t *l) { listener_begin_close(l); unlock(&l->context.mutex); // pconnection_process will never be called again. Zero everything. - l->context.wake_ops = 0; + l->context.wake_pending = 0; l->close_dispatched = true; l->active_count = 0; assert(listener_can_free(l)); @@ -1679,30 +2057,39 @@ static void listener_accept_lh(psocket_t *ps) { } /* Process a listening socket */ -static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) { +static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool wake) { // TODO: some parallelization of the accept mechanism. - pn_listener_t *l = psocket_listener(ps); - acceptor_t *a = psocket_acceptor(ps); +// pn_listener_t *l = psocket_listener(ps); +// acceptor_t *a = psocket_acceptor(ps); + lock(&l->context.mutex); - if (events) { - a->armed = false; - if (l->context.closing) { - lock(&l->rearm_mutex); - stop_polling(&ps->epoll_io, ps->proactor->epollfd); - unlock(&l->rearm_mutex); - close(ps->sockfd); - ps->sockfd = -1; - l->active_count--; - } - else { - if (events & EPOLLRDHUP) { - /* Calls listener_begin_close which closes all the listener's sockets */ - psocket_error(ps, errno, "listener epoll"); - } else if (!l->context.closing && events & EPOLLIN) { - listener_accept_lh(ps); + if (n_events) { + for (size_t i = 0; i < l->acceptors_size; i++) { + psocket_t *ps = &l->acceptors[i].psocket; + if (ps->working_io_events) { + uint32_t events = ps->working_io_events; + ps->working_io_events = 0; + l->acceptors[i].armed = false; + if (l->context.closing) { + lock(&l->rearm_mutex); + stop_polling(&ps->epoll_io, ps->proactor->epollfd); + unlock(&l->rearm_mutex); + close(ps->sockfd); + ps->sockfd = -1; + l->active_count--; + } + else { + if (events & EPOLLRDHUP) { + /* Calls listener_begin_close which closes all the listener's sockets */ + psocket_error(ps, errno, "listener epoll"); + } else if (!l->context.closing && events & EPOLLIN) { + listener_accept_lh(ps); + } + } } } - } else { + } + if (wake) { wake_done(&l->context); // callback accounting } pn_event_batch_t *lb = NULL; @@ -1741,17 +2128,46 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { } static void listener_done(pn_listener_t *l) { + pn_proactor_t *p = l->context.proactor; + tslot_t *ts = l->context.runner; bool notify = false; lock(&l->context.mutex); l->context.working = false; - if (listener_can_free(l)) { + lock(&p->sched_mutex); + int n_events = 0; + for (size_t i = 0; i < l->acceptors_size; i++) { + psocket_t *ps = &l->acceptors[i].psocket; + if (ps->sched_io_events) { + ps->working_io_events = ps->sched_io_events; + ps->sched_io_events = 0; + } + if (ps->working_io_events) + n_events++; + } + if (l->context.sched_wake) { + l->context.sched_wake = false; + wake_done(&l->context); + } + unlock(&p->sched_mutex); + + if (!n_events && listener_can_free(l)) { unlock(&l->context.mutex); pn_listener_free(l); + lock(&p->sched_mutex); + notify = unassign_thread(ts, UNUSED); + unlock(&p->sched_mutex); + if (notify) + wake_notify(&p->context); return; - } else if (listener_has_event(l)) + } else if (n_events || listener_has_event(l)) notify = wake(&l->context); unlock(&l->context.mutex); + + lock(&p->sched_mutex); + if (unassign_thread(ts, UNUSED)) + notify = true; + unlock(&p->sched_mutex); if (notify) wake_notify(&l->context); } @@ -1831,35 +2247,65 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t // proactor // ======================================================================== +// Call with sched_mutex. Alloc calls are expensive but only used when thread_count changes. +static void grow_poller_bufs(pn_proactor_t* p) { + // call if p->thread_count > p->thread_capacity + assert(p->thread_count == 0 || p->thread_count > p->thread_capacity); + do { + p->thread_capacity += 8; + } while (p->thread_count > p->thread_capacity); + + p->warm_runnables = (pcontext_t **) realloc(p->warm_runnables, p->thread_capacity * sizeof(pcontext_t *)); + p->resume_list = (tslot_t **) realloc(p->resume_list, p->thread_capacity * sizeof(tslot_t *)); + + int old_cap = p->runnables_capacity; + if (p->runnables_capacity == 0) + p->runnables_capacity = 16; + else if (p->runnables_capacity < p->thread_capacity) + p->runnables_capacity = p->thread_capacity; + if (p->runnables_capacity != old_cap) { + p->runnables = (pcontext_t **) realloc(p->runnables, p->runnables_capacity * sizeof(pcontext_t *)); + p->kevents_capacity = p->runnables_capacity; + size_t sz = p->kevents_capacity * sizeof(struct epoll_event); + p->kevents = (struct epoll_event *) realloc(p->kevents, sz); + memset(p->kevents, 0, sz); + } +} + /* Set up an epoll_extended_t to be used for wakeup or interrupts */ -static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) { + static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) { ee->psocket = NULL; ee->fd = eventfd; ee->type = WAKE; - ee->wanted = EPOLLIN; - ee->polling = false; - start_polling(ee, epollfd); // TODO: check for error -} - -/* Set up the epoll_extended_t to be used for secondary socket events */ -static void epoll_secondary_init(epoll_extended_t *ee, int epoll_fd_2, int epollfd) { - ee->psocket = NULL; - ee->fd = epoll_fd_2; - ee->type = CHAINED_EPOLL; - ee->wanted = EPOLLIN; + if (always_set) { + uint64_t increment = 1; + if (write(eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t)) + EPOLL_FATAL("setting eventfd", errno); + // eventfd is set forever. No reads, just rearms as needed. + ee->wanted = 0; + } else { + ee->wanted = EPOLLIN; + } ee->polling = false; start_polling(ee, epollfd); // TODO: check for error + if (always_set) + ee->wanted = EPOLLIN; // for all subsequent rearms } pn_proactor_t *pn_proactor() { + if (getenv("PNI_EPOLL_NOWARM")) pni_warm_sched = false; + if (getenv("PNI_EPOLL_IMMEDIATE")) pni_immediate = true; + if (getenv("PNI_EPOLL_SPINS")) pni_spins = atoi(getenv("PNI_EPOLL_SPINS")); pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); if (!p) return NULL; p->epollfd = p->eventfd = p->timer.timerfd = -1; pcontext_init(&p->context, PROACTOR, p, p); pmutex_init(&p->eventfd_mutex); + pmutex_init(&p->sched_mutex); + pmutex_init(&p->tslot_mutex); ptimer_init(&p->timer, 0); - if ((p->epollfd = epoll_create(1)) >= 0 && (p->epollfd_2 = epoll_create(1)) >= 0) { + if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) { if (p->timer.timerfd >= 0) @@ -1867,20 +2313,24 @@ pn_proactor_t *pn_proactor() { p->batch.next_event = &proactor_batch_next; start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error p->timer_armed = true; - epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd); - epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd); - epoll_secondary_init(&p->epoll_secondary, p->epollfd_2, p->epollfd); + epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true); + epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false); + p->tslot_map = pn_hash(PN_VOID, 0, 0.75); + grow_poller_bufs(p); return p; } } } } if (p->epollfd >= 0) close(p->epollfd); - if (p->epollfd_2 >= 0) close(p->epollfd_2); if (p->eventfd >= 0) close(p->eventfd); if (p->interruptfd >= 0) close(p->interruptfd); ptimer_finalize(&p->timer); + pmutex_finalize(&p->tslot_mutex); + pmutex_finalize(&p->sched_mutex); + pmutex_finalize(&p->eventfd_mutex); if (p->collector) pn_free(p->collector); + assert(p->thread_count == 0); free (p); return NULL; } @@ -1890,8 +2340,6 @@ void pn_proactor_free(pn_proactor_t *p) { p->shutting_down = true; close(p->epollfd); p->epollfd = -1; - close(p->epollfd_2); - p->epollfd_2 = -1; close(p->eventfd); p->eventfd = -1; close(p->interruptfd); @@ -1913,8 +2361,20 @@ void pn_proactor_free(pn_proactor_t *p) { } pn_collector_free(p->collector); + pmutex_finalize(&p->tslot_mutex); + pmutex_finalize(&p->sched_mutex); pmutex_finalize(&p->eventfd_mutex); pcontext_finalize(&p->context); + for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) { + tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry); + pmutex_finalize(&ts->mutex); + free(ts); + } + pn_free(p->tslot_map); + free(p->kevents); + free(p->runnables); + free(p->warm_runnables); + free(p->resume_list); free(p); } @@ -1968,17 +2428,23 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { return pni_log_event(p, e); } -static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t event) { - bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && ptimer_callback(&p->timer) != 0; +static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool interrupt, bool wake) { + bool timer_fired = timeout && ptimer_callback(&p->timer) != 0; + if (interrupt) { + (void)read_uint64(p->interruptfd); + rearm(p, &p->epoll_interrupt); + } lock(&p->context.mutex); - if (event == PN_PROACTOR_INTERRUPT) { + if (interrupt) { p->need_interrupt = true; - } else if (event == PN_PROACTOR_TIMEOUT) { + } + if (timeout) { p->timer_armed = false; if (timer_fired && p->timeout_set) { p->need_timeout = true; } - } else { + } + if (wake) { wake_done(&p->context); } if (!p->context.working) { /* Can generate proactor events */ @@ -1996,26 +2462,6 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t even return NULL; } -static pn_event_batch_t *proactor_chained_epoll_wait(pn_proactor_t *p) { - // process one ready pconnection socket event from the secondary/chained epollfd_2 - struct epoll_event ev = {0}; - int n = epoll_wait(p->epollfd_2, &ev, 1, 0); - if (n < 0) { - if (errno != EINTR) - perror("epoll_wait"); // TODO: proper log - } else if (n > 0) { - assert(n == 1); - rearm(p, &p->epoll_secondary); - epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr; - memory_barrier(ee); - assert(ee->type == PCONNECTION_IO_2); - pconnection_t *pc = psocket_pconnection(ee->psocket); - return pconnection_process(pc, ev.events, false, false, true); - } - rearm(p, &p->epoll_secondary); - return NULL; -} - static void proactor_add(pcontext_t *ctx) { pn_proactor_t *p = ctx->proactor; lock(&p->context.mutex); @@ -2024,6 +2470,7 @@ static void proactor_add(pcontext_t *ctx) { ctx->next = p->contexts; } p->contexts = ctx; + p->context_count++; unlock(&p->context.mutex); } @@ -2031,6 +2478,20 @@ static void proactor_add(pcontext_t *ctx) { // return true if safe for caller to free psocket static bool proactor_remove(pcontext_t *ctx) { pn_proactor_t *p = ctx->proactor; + // Disassociate this context from scheduler + if (!p->shutting_down) { + lock(&p->sched_mutex); + ctx->runner->state = DELETING; + for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) { + tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry); + if (ts->context == ctx) + ts->context = NULL; + if (ts->prev_context == ctx) + ts->prev_context = NULL; + } + unlock(&p->sched_mutex); + } + lock(&p->context.mutex); bool can_free = true; if (ctx->disconnecting) { @@ -2054,6 +2515,7 @@ static bool proactor_remove(pcontext_t *ctx) { if (ctx->next) { ctx->next->prev = ctx->prev; } + p->context_count--; } bool notify = wake_if_inactive(p); unlock(&p->context.mutex); @@ -2061,81 +2523,449 @@ static bool proactor_remove(pcontext_t *ctx) { return can_free; } -static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) { - if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ - (void)read_uint64(p->interruptfd); - rearm(p, &p->epoll_interrupt); - return proactor_process(p, PN_PROACTOR_INTERRUPT); +static tslot_t *find_tslot(pn_proactor_t *p) { + pthread_t tid = pthread_self(); + void *v = pn_hash_get(p->tslot_map, (uintptr_t) tid); + if (v) + return (tslot_t *) v; + tslot_t *ts = (tslot_t *) calloc(1, sizeof(tslot_t)); + ts->state = NEW; + pmutex_init(&ts->mutex); + + lock(&p->sched_mutex); + // keep important tslot related info thread-safe when holding either the sched or tslot mutex + p->thread_count++; + pn_hash_put(p->tslot_map, (uintptr_t) tid, ts); + unlock(&p->sched_mutex); + return ts; +} + +// Call with shed_lock held +// Caller must resume() return value if not null +static tslot_t *resume_one_thread(pn_proactor_t *p) { + // If pn_proactor_get has an early return, we need to resume one suspended thread (if any) + // to be the new poller. + + tslot_t *ts = p->suspend_list_head; + if (ts) { + LL_REMOVE(p, suspend_list, ts); + p->suspend_list_count--; + ts->state = PROCESSING; + } + return ts; +} + +// Call with sched lock. +static pn_event_batch_t *process(pcontext_t *ctx) { + bool ctx_wake = false; + ctx->sched_pending = false; + if (ctx->sched_wake) { + // update the wake status before releasing the sched_mutex + ctx->sched_wake = false; + ctx_wake = true; + } + + if (ctx->type == PROACTOR) { + pn_proactor_t *p = ctx->proactor; + bool timeout = p->sched_timeout; + if (timeout) p->sched_timeout = false; + bool intr = p->sched_interrupt; + if (intr) p->sched_interrupt = false; + unlock(&p->sched_mutex); + return proactor_process(p, timeout, intr, ctx_wake); + } + pconnection_t *pc = pcontext_pconnection(ctx); + if (pc) { + uint32_t events = pc->psocket.sched_io_events; + if (events) pc->psocket.sched_io_events = 0; + bool timeout = pc->sched_timeout; + if (timeout) pc->sched_timeout = false; + unlock(&ctx->proactor->sched_mutex); + return pconnection_process(pc, events, timeout, ctx_wake, false); + } + pn_listener_t *l = pcontext_listener(ctx); + int n_events = 0; + for (size_t i = 0; i < l->acceptors_size; i++) { + psocket_t *ps = &l->acceptors[i].psocket; + if (ps->sched_io_events) { + ps->working_io_events = ps->sched_io_events; + ps->sched_io_events = 0; + } + if (ps->working_io_events) + n_events++; } - pcontext_t *ctx = wake_pop_front(p); - if (ctx) { - switch (ctx->type) { - case PROACTOR: - return proactor_process(p, PN_EVENT_NONE); - case PCONNECTION: - return pconnection_process((pconnection_t *) ctx->owner, 0, false, false, false); - case LISTENER: - return listener_process(&((pn_listener_t *) ctx->owner)->acceptors[0].psocket, 0); - default: - assert(ctx->type == WAKEABLE); // TODO: implement or remove + unlock(&ctx->proactor->sched_mutex); + return listener_process(l, n_events, ctx_wake); +} + + +// Call with both sched and wake locks +static void schedule_wake_list(pn_proactor_t *p) { + // append wake_list_first..wake_list_last to end of sched_wake_last + if (p->wake_list_first) { + if (p->sched_wake_last) + p->sched_wake_last->wake_next = p->wake_list_first; // join them + if (!p->sched_wake_first) + p->sched_wake_first = p->wake_list_first; + p->sched_wake_last = p->wake_list_last; + if (!p->sched_wake_current) + p->sched_wake_current = p->sched_wake_first; + p->wake_list_first = p->wake_list_last = NULL; + } +} + +// Call with schedule lock held. Called only by poller thread. +static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { + epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr; + pcontext_t *ctx = NULL; + + if (ee->type == WAKE) { + if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ + p->sched_interrupt = true; + ctx = &p->context; + ctx->sched_pending = true; + } else { + // main eventfd wake + lock(&p->eventfd_mutex); + schedule_wake_list(p); + ctx = p->sched_wake_current; + unlock(&p->eventfd_mutex); + } + } else if (ee->type == PROACTOR_TIMER) { + p->sched_timeout = true; + ctx = &p->context; + ctx->sched_pending = true; + } else { + pconnection_t *pc = psocket_pconnection(ee->psocket); + if (pc) { + ctx = &pc->context; + if (ee->type == PCONNECTION_IO) { + ee->psocket->sched_io_events = evp->events; + } else { + pc->sched_timeout = true;; + } + ctx->sched_pending = true; + } + else { + pn_listener_t *l = psocket_listener(ee->psocket); + assert(l); + ctx = &l->context; + ee->psocket->sched_io_events = evp->events; + ctx->sched_pending = true; } } + if (ctx && !ctx->runnable && !ctx->runner) + return ctx; return NULL; } -static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_block) { - int timeout = can_block ? -1 : 0; - while(true) { - pn_event_batch_t *batch = NULL; - struct epoll_event ev = {0}; - int n = epoll_wait(p->epollfd, &ev, 1, timeout); - if (n < 0) { - if (errno != EINTR) - perror("epoll_wait"); // TODO: proper log - if (!can_block) - return NULL; - else - continue; - } else if (n == 0) { - if (!can_block) - return NULL; - else { - perror("epoll_wait unexpected timeout"); // TODO: proper log - continue; +static pcontext_t *post_wake(pn_proactor_t *p, pcontext_t *ctx) { + ctx->sched_wake = true; + ctx->sched_pending = true; + if (!ctx->runnable && !ctx->runner) + return ctx; + return NULL; +} + +// call with sched_lock held +static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) { + // This should be called seldomly, best case once per thread removal on shutdown. + // TODO: how to reduce? Instrumented near 5 percent of earmarks, 1 in 2000 calls to do_epoll(). + + for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) { + tslot_t *ts2 = (tslot_t *) pn_hash_value(p->tslot_map, entry); + if (ts2->earmarked) { + // undo the old assign thread and earmark. ts2 may never come back + pcontext_t *switch_ctx = ts2->context; + remove_earmark(ts2); + assign_thread(ts, switch_ctx); + ts->earmark_override = ts2; + ts->earmark_override_gen = ts2->generation; + return switch_ctx; + } + } + assert(false); + return NULL; +} + +// call with sched_lock held +static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { + if (ts->context) { + // Already assigned + if (ts->earmarked) { + ts->earmarked = false; + if (--p->earmark_count == 0) + p->earmark_drain = false; + } + return ts->context; + } + + // warm pairing ? + pcontext_t *ctx = ts->prev_context; + if (ctx && (ctx->runnable)) { // or ctx->sched_wake too? + assign_thread(ts, ctx); + return ctx; + } + + // check for an unassigned runnable context or unprocessed wake + if (p->n_runnables) { + // Any unclaimed runnable? + while (p->n_runnables) { + ctx = p->runnables[p->next_runnable++]; + if (p->n_runnables == p->next_runnable) + p->n_runnables = 0; + if (ctx->runnable) { + assign_thread(ts, ctx); + return ctx; } } - assert(n == 1); - epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr; - memory_barrier(ee); - - if (ee->type == WAKE) { - batch = process_inbound_wake(p, ee); - } else if (ee->type == PROACTOR_TIMER) { - batch = proactor_process(p, PN_PROACTOR_TIMEOUT); - } else if (ee->type == CHAINED_EPOLL) { - batch = proactor_chained_epoll_wait(p); // expect a PCONNECTION_IO_2 - } else { - pconnection_t *pc = psocket_pconnection(ee->psocket); - if (pc) { - if (ee->type == PCONNECTION_IO) { - batch = pconnection_process(pc, ev.events, false, false, false); + } + + if (p->sched_wake_current) { + ctx = p->sched_wake_current; + pop_wake(ctx); // updates sched_wake_current + assert(!ctx->runnable && !ctx->runner); + assign_thread(ts, ctx); + return ctx; + } + + if (p->earmark_drain) { + ctx = next_drain(p, ts); + if (p->earmark_count == 0) + p->earmark_drain = false; + return ctx; + } + + return NULL; +} + +static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) { + lock(&p->tslot_mutex); + tslot_t * ts = find_tslot(p); + unlock(&p->tslot_mutex); + ts->generation++; // wrapping OK. Just looking for any change + pn_event_batch_t *batch = NULL; + + lock(&p->sched_mutex); + assert(ts->context == NULL || ts->earmarked); + assert(ts->state == UNUSED || ts->state == NEW); + ts->state = PROCESSING; + + while (true) { + // Process outstanding epoll events until we get a batch or need to block. + + pcontext_t *ctx = next_runnable(p, ts); + if (ctx) { + ts->state = BATCHING; + batch = process(ctx); // unlocks sched_lock before returning + if (batch) { + return batch; + } + lock(&p->sched_mutex); + bool notify = unassign_thread(ts, PROCESSING); + if (notify) { + unlock(&p->sched_mutex); + wake_notify(&p->context); + lock(&p->sched_mutex); + } + continue; // Long time may have passed. Back to beginning. + } + + // poll or wait for a runnable context + if (p->poller == NULL) { + p->poller = ts; + // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. + assert(p->n_runnables == 0); + if (p->thread_count > p->thread_capacity) + grow_poller_bufs(p); + p->next_runnable = 0; + p->n_warm_runnables = 0; + p->last_earmark = NULL; + + bool unfinished_earmarks = p->earmark_count > 0; + bool new_wakes = false; + bool epoll_immediate = unfinished_earmarks || !can_block; + assert(!p->sched_wake_first); + if (!epoll_immediate) { + lock(&p->eventfd_mutex); + if (p->wake_list_first) { + epoll_immediate = true; + new_wakes = true; } else { - assert(ee->type == PCONNECTION_TIMER); - batch = pconnection_process(pc, 0, true, false, false); + p->wakes_in_progress = false; } + unlock(&p->eventfd_mutex); } - else { - // TODO: can any of the listener processing be parallelized like IOCP? - batch = listener_process(ee->psocket, ev.events); + int timeout = (epoll_immediate) ? 0 : -1; + p->poller_suspended = (timeout == -1); + unlock(&p->sched_mutex); + + int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout); + + lock(&p->sched_mutex); + p->poller_suspended = false; + + bool unpolled_work = false; + if (p->earmark_count > 0) { + p->earmark_drain = true; + unpolled_work = true; + } + if (new_wakes) { + lock(&p->eventfd_mutex); + schedule_wake_list(p); + unlock(&p->eventfd_mutex); + unpolled_work = true; } - } - if (batch) return batch; - // No Proton event generated. epoll_wait() again. - } + if (n < 0) { + if (errno != EINTR) + perror("epoll_wait"); // TODO: proper log + if (!can_block && !unpolled_work) { + p->poller = NULL; + tslot_t *res_ts = resume_one_thread(p); + ts->state = UNUSED; + unlock(&p->sched_mutex); + if (res_ts) resume(p, res_ts); + return NULL; + } + else { + p->poller = NULL; + continue; + } + } else if (n == 0) { + if (!can_block && !unpolled_work) { + p->poller = NULL; + tslot_t *res_ts = resume_one_thread(p); + ts->state = UNUSED; + unlock(&p->sched_mutex); + if (res_ts) resume(p, res_ts); + return NULL; + } + else { + if (!epoll_immediate) + perror("epoll_wait unexpected timeout"); // TODO: proper log + if (!unpolled_work) { + p->poller = NULL; + continue; + } + } + } + + for (int i = 0; i < n; i++) { + ctx = post_event(p, &p->kevents[i]); + if (ctx) + make_runnable(ctx); + } + if (n > 0) + memset(p->kevents, 0, sizeof(struct epoll_event) * n); + + // The list of pending wakes can be very long. Traverse part of it looking for warm pairings. + pcontext_t *wctx = p->sched_wake_current; + int max_runnables = p->runnables_capacity; + while (wctx && p->n_runnables < max_runnables) { + if (wctx->runner == REWAKE_PLACEHOLDER) + wctx->runner = NULL; // Allow context to run again. + ctx = post_wake(p, wctx); + if (ctx) + make_runnable(ctx); + pop_wake(wctx); + wctx = wctx->wake_next; + } + p->sched_wake_current = wctx; + // More wakes than places on the runnables list + while (wctx) { + if (wctx->runner == REWAKE_PLACEHOLDER) + wctx->runner = NULL; // Allow context to run again. + wctx->sched_wake = true; + wctx->sched_pending = true; + if (wctx->runnable || wctx->runner) + pop_wake(wctx); + wctx = wctx->wake_next; + } + + if (pni_immediate && !ts->context) { + // Poller gets to run if possible + pcontext_t *pctx; + if (p->n_runnables) { + assert(p->next_runnable == 0); + pctx = p->runnables[0]; + if (++p->next_runnable == p->n_runnables) + p->n_runnables = 0; + } else if (p->n_warm_runnables) { + pctx = p->warm_runnables[--p->n_warm_runnables]; + tslot_t *ts2 = pctx->runner; + ts2->prev_context = ts2->context = NULL; + pctx->runner = NULL; + } else if (p->last_earmark) { + pctx = p->last_earmark->context; + remove_earmark(p->last_earmark); + if (p->earmark_count == 0) + p->earmark_drain = false; + } else { + pctx = NULL; + } + if (pctx) { + assign_thread(ts, pctx); + } + } + + // Create a list of available threads to put to work. + + int resume_list_count = 0; + for (int i = 0; i < p->n_warm_runnables ; i++) { + ctx = p->warm_runnables[i]; + tslot_t *tsp = ctx->runner; + if (tsp->state == SUSPENDED) { + p->resume_list[resume_list_count++] = tsp; + LL_REMOVE(p, suspend_list, tsp); + p->suspend_list_count--; + tsp->state = PROCESSING; + } + } + + int can_use = p->suspend_list_count; + if (!ts->context) + can_use++; + // Run as many unpaired runnable contexts as possible and allow for a new poller. + int new_runners = pn_min(p->n_runnables + 1, can_use); + if (!ts->context) + new_runners--; // poller available and does not need resume + + // Rare corner case on startup. New inbound threads can make the suspend_list too big for resume list. + new_runners = pn_min(new_runners, p->thread_capacity - resume_list_count); + + for (int i = 0; i < new_runners; i++) { + tslot_t *tsp = p->suspend_list_head; + assert(tsp); + p->resume_list[resume_list_count++] = tsp; + LL_REMOVE(p, suspend_list, tsp); + p->suspend_list_count--; + tsp->state = PROCESSING; + } + + p->poller = NULL; + // New poller may run concurrently. Touch only this thread's stack for rest of block. + + if (resume_list_count) { + unlock(&p->sched_mutex); + for (int i = 0; i < resume_list_count; i++) { + resume(p, p->resume_list[i]); + } + lock(&p->sched_mutex); + } + } else if (!can_block) { + ts->state = UNUSED; + unlock(&p->sched_mutex); + return NULL; + } else { + // TODO: loop while !poller_suspended, since new work coming + suspend(p, ts); + } + } // while } + pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { return proactor_do_epoll(p, true); } @@ -2144,21 +2974,68 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* p) { return proactor_do_epoll(p, false); } +// Call with no locks +static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) { + + if (!ts || !ts->earmark_override) + return; + if (ts->earmark_override->generation == ts->earmark_override_gen) { + // Other (overridden) thread not seen since this thread started and finished the event batch. + // Thread is perhaps gone forever, which may leave us short of a poller thread + lock(&p->sched_mutex); + tslot_t *res_ts = resume_one_thread(p); + unlock(&p->sched_mutex); + if (res_ts) resume(p, res_ts); + } + ts->earmark_override = NULL; +} + void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (pc) { + tslot_t *ts = pc->context.runner; pconnection_done(pc); + // pc possibly freed/invalid + check_earmark_override(p, ts); return; } pn_listener_t *l = batch_listener(batch); if (l) { + tslot_t *ts = l->context.runner; listener_done(l); + // l possibly freed/invalid + check_earmark_override(p, ts); return; } pn_proactor_t *bp = batch_proactor(batch); if (bp == p) { bool notify = false; + bool rearm_interrupt = false; lock(&p->context.mutex); + lock(&p->sched_mutex); + + bool timeout = p->sched_timeout; + if (timeout) p->sched_timeout = false; + bool intr = p->sched_interrupt; + if (intr) { + p->sched_interrupt = false; + rearm_interrupt = true; + p->need_interrupt = true; + } + if (p->context.sched_wake) { + p->context.sched_wake = false; + wake_done(&p->context); + } + + // ptimer_callback is slow. Revisit timer cancel code in light of change to single poller thread. + bool timer_fired = timeout && ptimer_callback(&p->timer) != 0; + if (timeout) { + p->timer_armed = false; + if (timer_fired && p->timeout_set) { + p->need_timeout = true; + } + } + bool rearm_timer = !p->timer_armed && !p->shutting_down; p->timer_armed = true; p->context.working = false; @@ -2171,11 +3048,20 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (proactor_has_event(p)) if (wake(&p->context)) notify = true; + tslot_t *ts = p->context.runner; + if (unassign_thread(ts, UNUSED)) + notify = true; + unlock(&p->sched_mutex); unlock(&p->context.mutex); if (notify) wake_notify(&p->context); if (rearm_timer) rearm(p, &p->timer.epoll_io); + if (rearm_interrupt) { + (void)read_uint64(p->interruptfd); + rearm(p, &p->epoll_interrupt); + } + check_earmark_override(p, ts); return; } } @@ -2232,6 +3118,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { ctx->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order. p->disconnects_pending++; ctx = ctx->next; + p->context_count--; } notify = wake_if_inactive(p); unlock(&p->context.mutex); diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp index 3e4365a..aa0ab57 100644 --- a/c/tests/proactor_test.cpp +++ b/c/tests/proactor_test.cpp @@ -173,7 +173,7 @@ TEST_CASE("proactor_connection_wake") { REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN); - CHECK(!pn_proactor_get(p)); /* Should be idle */ + while (p.flush().first != 0); pn_connection_wake(c); REQUIRE_RUN(p, PN_CONNECTION_WAKE); REQUIRE_RUN(p, PN_TRANSPORT_CLOSED); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org