This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 38c2dab1083d1cada116360d7219192614467277 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Sun Jan 24 10:54:21 2021 -0800 PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context" --- c/src/proactor/epoll-internal.h | 99 ++- c/src/proactor/epoll.c | 1184 +++++++++++++++++---------------- c/src/proactor/epoll_raw_connection.c | 126 ++-- c/src/proactor/epoll_timer.c | 88 +-- 4 files changed, 749 insertions(+), 748 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 66fb15e..21226a9 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,7 @@ typedef pthread_mutex_t pmutex; typedef struct pni_timer_t pni_timer_t; typedef enum { - WAKE, /* see if any work to do in proactor/psocket context */ + EVENT_FD, /* schedule() or pn_proactor_interrupt() */ LISTENER_IO, PCONNECTION_IO, RAW_CONNECTION_IO, @@ -67,7 +67,7 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { int fd; - epoll_type_t type; // io/timer/wakeup + epoll_type_t type; // io/timer/eventfd uint32_t wanted; // events to poll for bool polling; pmutex barrier_mutex; @@ -79,36 +79,36 @@ typedef enum { LISTENER, RAW_CONNECTION, TIMER_MANAGER -} pcontext_type_t; +} task_type_t; -typedef struct pcontext_t { +typedef struct task_t { pmutex mutex; pn_proactor_t *proactor; /* Immutable */ - pcontext_type_t type; + task_type_t type; bool working; - bool on_wake_list; - bool wake_pending; // unprocessed eventfd wake callback - struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool on_ready_list; + bool ready; // ready to run and on ready list. Poller notified by eventfd. + struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex - struct pcontext_t* next; /* Protected by proactor.mutex */ - struct pcontext_t* prev; /* Protected by proactor.mutex */ + struct task_t* next; /* Protected by proactor.mutex */ + struct task_t* prev; /* Protected by proactor.mutex */ int disconnect_ops; /* ops remaining before disconnect complete */ bool disconnecting; /* pn_proactor_disconnect */ // Protected by schedule mutex tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ tslot_t *prev_runner; - bool sched_wake; + bool sched_ready; bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* in need of scheduling */ -} pcontext_t; + bool runnable ; /* on one of the runnable lists */ +} task_t; typedef enum { NEW, UNUSED, /* pn_proactor_done() called, may never come back */ SUSPENDED, - PROCESSING, /* Hunting for a context */ - BATCHING, /* Doing work on behalf of a context */ + PROCESSING, /* Hunting for a task */ + BATCHING, /* Doing work on behalf of a task */ DELETING, POLLING } tslot_state; @@ -121,8 +121,8 @@ struct tslot_t { bool suspended; volatile bool scheduled; tslot_state state; - pcontext_t *context; - pcontext_t *prev_context; + task_t *task; + task_t *prev_task; bool earmarked; tslot_t *suspend_list_prev; tslot_t *suspend_list_next; @@ -131,7 +131,7 @@ struct tslot_t { }; typedef struct pni_timer_manager_t { - pcontext_t context; + task_t task; epoll_extended_t epoll_timer; pmutex deletion_mutex; pni_timer_t *proactor_timer; @@ -141,12 +141,12 @@ typedef struct pni_timer_manager_t { } pni_timer_manager_t; struct pn_proactor_t { - pcontext_t context; + task_t task; pni_timer_manager_t timer_manager; - epoll_extended_t epoll_wake; + epoll_extended_t epoll_schedule; /* ready list */ epoll_extended_t epoll_interrupt; pn_event_batch_t batch; - pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + task_t *tasks; /* track in-use tasks for PN_PROACTOR_INACTIVE and disconnect */ pni_timer_t *timer; size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() @@ -155,21 +155,21 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - int context_count; + int task_count; - // wake subsystem + // ready list subsystem int eventfd; pmutex eventfd_mutex; - bool wakes_in_progress; - pcontext_t *wake_list_first; - pcontext_t *wake_list_last; + bool ready_list_active; + task_t *ready_list_first; + task_t *ready_list_last; // Interrupts have a dedicated eventfd because they must be async-signal safe. int interruptfd; // If the process runs out of file descriptors, disarm listening sockets temporarily and save them here. acceptor_t *overflow; pmutex overflow_mutex; - // Sched vars specific to proactor context. + // Sched vars specific to proactor task. bool sched_interrupt; // Global scheduling/poller vars. @@ -185,20 +185,19 @@ struct pn_proactor_t { tslot_t *poller; bool poller_suspended; tslot_t *last_earmark; - pcontext_t *sched_wake_first; - pcontext_t *sched_wake_last; - pcontext_t *sched_wake_current; + task_t *sched_ready_first; + task_t *sched_ready_last; + task_t *sched_ready_current; pmutex tslot_mutex; int earmark_count; bool earmark_drain; - bool sched_wakes_pending; // For debugging help for core dumps with optimized code. pn_event_type_t current_event_type; // Mostly read only: after init or once thread_count stabilizes pn_collector_t *collector __attribute__((aligned(64))); - pcontext_t **warm_runnables; - pcontext_t **runnables; + task_t **warm_runnables; + task_t **runnables; tslot_t **resume_list; pn_hash_t *tslot_map; struct epoll_event *kevents; @@ -219,17 +218,17 @@ typedef struct psocket_t { } psocket_t; typedef struct pconnection_t { - pcontext_t context; + task_t task; psocket_t psocket; pni_timer_t *timer; const char *host, *port; uint32_t new_events; - int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool) + int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool) bool server; /* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; - // Following values only changed by (sole) working context: + // Following values only changed by (sole) working task: uint32_t current_arm; // active epoll io events bool connected; bool read_blocked; @@ -277,7 +276,7 @@ typedef struct accepted_t{ } accepted_t; struct pn_listener_t { - pcontext_t context; + task_t task; acceptor_t *acceptors; /* Array of listening sockets */ size_t acceptors_size; char addr_buf[PN_MAX_ADDR]; @@ -339,22 +338,22 @@ static inline bool proactor_has_event(pn_proactor_t *p) { return pn_collector_peek(p->collector); } -bool wake_if_inactive(pn_proactor_t *p); +bool schedule_if_inactive(pn_proactor_t *p); int pclosefd(pn_proactor_t *p, int fd); -void proactor_add(pcontext_t *ctx); -bool proactor_remove(pcontext_t *ctx); +void proactor_add(task_t *tsk); +bool proactor_remove(task_t *tsk); bool unassign_thread(tslot_t *ts, tslot_state new_state); -void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p); -static void pcontext_finalize(pcontext_t* ctx) { - pmutex_finalize(&ctx->mutex); +void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p); +static void task_finalize(task_t* tsk) { + pmutex_finalize(&tsk->mutex); } -bool wake(pcontext_t *ctx); -void wake_notify(pcontext_t *ctx); -void wake_done(pcontext_t *ctx); +bool schedule(task_t *tsk); +void notify_poller(task_t *tsk); +void schedule_done(task_t *tsk); void psocket_init(psocket_t* ps, epoll_type_t type); bool start_polling(epoll_extended_t *ee, int epollfd); @@ -366,11 +365,11 @@ void configure_socket(int sock); accepted_t *listener_accepted_next(pn_listener_t *listener); -pcontext_t *pni_psocket_raw_context(psocket_t *ps); -pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake); +task_t *pni_psocket_raw_task(psocket_t *ps); +pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready); typedef struct praw_connection_t praw_connection_t; -pcontext_t *pni_raw_connection_context(praw_connection_t *rc); +task_t *pni_raw_connection_task(praw_connection_t *rc); praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch); void pni_raw_connection_done(praw_connection_t *rc); @@ -379,7 +378,7 @@ void pni_timer_free(pni_timer_t *timer); void pni_timer_set(pni_timer_t *timer, uint64_t deadline); bool pni_timer_manager_init(pni_timer_manager_t *tm); void pni_timer_manager_finalize(pni_timer_manager_t *tm); -pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake); +pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready); void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 30ef5f1..ae0c37b 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -25,28 +25,30 @@ waiting for work. The poller calls epoll_wait(), generates work lists, resumes suspended threads, and grabs work for itself or polls again. - A serialized grouping of Proton events is a context (connection, listener, proactor). + A serialized grouping of Proton events is a task (connection, listener, proactor). Each has multiple pollable fds that make it schedulable. E.g. a connection could have a socket fd, (indirect) timerfd, and (indirect) eventfd all signaled in a single epoll_wait(). At the conclusion of each N = epoll_wait(..., N_MAX, timeout) - there will be N epoll events and M wakes on the wake list. M can be very large in a - server with many active connections. The poller makes the contexts "runnable" if they are - not already running. A running context can only be made runnable once until it completes - a chunk of work and calls unassign_thread(). (N + M - duplicates) contexts will be + there will be N epoll events and M tasks on a ready list. M can be very large in a + server with many active connections. The poller makes the tasks "runnable" if they are + not already running. A running task cannot be made runnable again until it completes + a chunk of work and calls unassign_thread(). (N + M - duplicates) tasks will be scheduled. A new poller will occur when next_runnable() returns NULL. - A running context, before it stops "working" must check to see if there were new incoming - events that the poller posted to the context, but could not make it runnable since it was - already running. The context will know if it needs to put itself back on the wake list - to be runnable later to process the pending events. + A task may have its own dedicated kernel file descriptor (socket, timerfd) which can be seen + by the poller to make the task runnable. A task may aslso be scheduled to run by placing it + on a ready queue which is monitored by the poller via two eventfds. Lock ordering - never add locks right to left: - context -> sched -> wake - non-proactor-context -> proactor-context + task -> sched -> ready + non-proactor-task -> proactor-task tslot -> sched + + TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) + transitions from "private to the scheduler" to "visible to the task". */ @@ -99,8 +101,8 @@ // Maybe futex is even better? // See other "TODO" in code. // -// Consider case of large number of wakes: next_event_batch() could start by -// looking for pending wakes before a kernel call to epoll_wait(), or there +// Consider case of large number of ready tasks: next_event_batch() could start by +// looking for ready tasks before a kernel call to epoll_wait(), or there // could be several eventfds with random assignment of wakeables. @@ -190,119 +192,118 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { } /* - * The proactor maintains a number of serialization contexts: each + * The proactor maintains a number of serialization tasks: each * connection, each listener, the proactor itself. The serialization * is presented to the application via each associated event batch. * - * Multiple threads can be trying to do work on a single context - * (i.e. socket IO is ready and wakeup at same time). Mutexes are used - * to manage contention. Some vars are only ever touched by one - * "working" thread and are accessed without holding the mutex. + * A task will only ever run in a single thread at a time. + * + * Other threads of excution (including user threads) can interact with a + * particular task (i.e. connection wake or proactor interrupt). Mutexes are + * needed here for shared access to task data. schedule()/notify_poller() are + * used to ensure a task will run to act on the changed data. * - * Currently internal wakeups (via wake()/wake_notify()) are used to - * force a context to check if it has work to do. To minimize trips - * through the kernel, wake() is a no-op if the context has a working - * thread. Conversely, a thread must never stop working without - * checking if it has newly arrived work. + * To minimize trips through the kernel, schedule() is a no-op if the task is + * already running or about to run. Conversely, a task must never stop working + * without checking state that may have been recently changed by another thread. * - * External wake operations, like pn_connection_wake() are built on top of - * the internal wake mechanism. + * External wake operations, like pn_connection_wake() or expired timers are + * built on top of this schedule() mechanism. * * pn_proactor_interrupt() must be async-signal-safe so it has a dedicated * eventfd to allow a lock-free pn_proactor_interrupt() implementation. */ -// Fake thread for temporarily disabling the scheduling of a context. -static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1; +// Fake thread for temporarily disabling the scheduling of a task. +static struct tslot_t *RESCHEDULE_PLACEHOLDER = (struct tslot_t*) -1; -void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) { - memset(ctx, 0, sizeof(*ctx)); - pmutex_init(&ctx->mutex); - ctx->proactor = p; - ctx->type = t; +void task_init(task_t *tsk, task_type_t t, pn_proactor_t *p) { + memset(tsk, 0, sizeof(*tsk)); + pmutex_init(&tsk->mutex); + tsk->proactor = p; + tsk->type = t; } /* - * Wake strategy with eventfd. - * - wakees can be in the list only once - * - wakers only use the eventfd if wakes_in_progress is false - * There is a single rearm between wakes > 0 and wakes == 0 + * schedule() strategy with eventfd: + * - tasks can be in the ready list only once + * - a scheduling thread will only activate the eventfd if ready_list_active is false + * There is a single rearm between ready list empty and non-empty * - * There can potentially be many contexts with wakes pending. + * There can potentially be many tasks with work pending. * - * The wake list is in two parts. The front is the chunk the - * scheduler will process until the next epoll_wait(). sched_wake - * indicates which chunk it is on. The ctx may already be running or + * The ready list is in two parts. The front is the chunk the + * poller will process until the next epoll_wait(). sched_ready + * indicates which chunk it is on. The task may already be running or * scheduled to run. * - * The ctx must be actually running to absorb ctx->wake_pending. + * The task must be actually running to absorb task_t->ready. * - * The wake list can keep growing while popping wakes. The list between - * sched_wake_first and sched_wake_last are protected by the sched - * lock (for pop operations), sched_wake_last to wake_list_last are + * The ready list can keep growing while popping ready tasks. The list between + * sched_ready_first and sched_ready_last are protected by the sched + * lock (for pop operations), sched_ready_last to ready_list_last are * protected by the eventfd mutex (for add operations). Both locks * are needed to cross or reconcile the two portions of the list. */ // Call with sched lock held. -static void pop_wake(pcontext_t *ctx) { - // every context on the sched_wake_list is either currently running, - // or to be scheduled. wake() will not "see" any of the wake_next - // pointers until wake_pending and working have transitioned to 0 - // and false, when a context stops working. +static void pop_ready_task(task_t *tsk) { + // every task on the sched_ready_list is either currently running, + // or to be scheduled. schedule() will not "see" any of the ready_next + // pointers until ready and working have transitioned to 0 + // and false, when a task stops working. // - // every context must transition as: + // every task must transition as: // - // !wake_pending .. wake() .. on wake_list .. on sched_wake_list .. working context .. !sched_wake && !wake_pending + // !ready .. schedule() .. on ready_list .. on sched_ready_list .. working task .. !sched_ready && !ready // - // Intervening locks at each transition ensures wake_next has memory coherence throughout the wake cycle. - pn_proactor_t *p = ctx->proactor; - if (ctx == p->sched_wake_current) - p->sched_wake_current = ctx->wake_next; - if (ctx == p->sched_wake_first) { + // Intervening locks at each transition ensures ready_next has memory coherence throughout the ready task scheduling cycle. + pn_proactor_t *p = tsk->proactor; + if (tsk == p->sched_ready_current) + p->sched_ready_current = tsk->ready_next; + if (tsk == p->sched_ready_first) { // normal code path - if (ctx == p->sched_wake_last) { - p->sched_wake_first = p->sched_wake_last = NULL; + if (tsk == p->sched_ready_last) { + p->sched_ready_first = p->sched_ready_last = NULL; } else { - p->sched_wake_first = ctx->wake_next; + p->sched_ready_first = tsk->ready_next; } - if (!p->sched_wake_first) - p->sched_wake_last = NULL; + if (!p->sched_ready_first) + p->sched_ready_last = NULL; } else { - // ctx is not first in a multi-element list - pcontext_t *prev = NULL; - for (pcontext_t *i = p->sched_wake_first; i != ctx; i = i->wake_next) + // tsk is not first in a multi-element list + task_t *prev = NULL; + for (task_t *i = p->sched_ready_first; i != tsk; i = i->ready_next) prev = i; - prev->wake_next = ctx->wake_next; - if (ctx == p->sched_wake_last) - p->sched_wake_last = prev; + prev->ready_next = tsk->ready_next; + if (tsk == p->sched_ready_last) + p->sched_ready_last = prev; } - ctx->on_wake_list = false; + tsk->on_ready_list = false; } -// part1: call with ctx->owner lock held, return true if notify required by caller -// Note that this will return false if either there is a pending wake OR if we are already -// in the connection context that is to be woken (as we don't have to wake it up) -bool wake(pcontext_t *ctx) { +// part1: call with tsk->owner lock held, return true if notify_poller required by caller. +// Nothing to do if the task is currently at work or work is already pending. +bool schedule(task_t *tsk) { bool notify = false; - if (!ctx->wake_pending) { - if (!ctx->working) { - ctx->wake_pending = true; - pn_proactor_t *p = ctx->proactor; + if (!tsk->ready) { + if (!tsk->working) { + tsk->ready = true; + pn_proactor_t *p = tsk->proactor; lock(&p->eventfd_mutex); - ctx->wake_next = NULL; - ctx->on_wake_list = true; - if (!p->wake_list_first) { - p->wake_list_first = p->wake_list_last = ctx; + tsk->ready_next = NULL; + tsk->on_ready_list = true; + if (!p->ready_list_first) { + p->ready_list_first = p->ready_list_last = tsk; } else { - p->wake_list_last->wake_next = ctx; - p->wake_list_last = ctx; + p->ready_list_last->ready_next = tsk; + p->ready_list_last = tsk; } - if (!p->wakes_in_progress) { - // force a wakeup via the eventfd - p->wakes_in_progress = true; + if (!p->ready_list_active) { + // unblock poller via the eventfd + p->ready_list_active = true; notify = true; } unlock(&p->eventfd_mutex); @@ -312,18 +313,18 @@ bool wake(pcontext_t *ctx) { return notify; } -// part2: make OS call without lock held -void wake_notify(pcontext_t *ctx) { - pn_proactor_t *p = ctx->proactor; +// part2: unblock epoll_wait(). Make OS call without lock held. +void notify_poller(task_t *tsk) { + pn_proactor_t *p = tsk->proactor; if (p->eventfd == -1) return; - rearm(p, &p->epoll_wake); + rearm(p, &p->epoll_schedule); } -// call with owner lock held, once for each pop from the wake list -void wake_done(pcontext_t *ctx) { -// assert(ctx->wake_pending > 0); - ctx->wake_pending = false; +// call with task lock held from xxx_process(). +void schedule_done(task_t *tsk) { +// assert(tsk->ready > 0); + tsk->ready = false; } @@ -409,35 +410,35 @@ static void resume(pn_proactor_t *p, tslot_t *ts) { } // Call with sched lock -static void assign_thread(tslot_t *ts, pcontext_t *ctx) { - assert(!ctx->runner); - ctx->runner = ts; - ctx->prev_runner = NULL; - ctx->runnable = false; - ts->context = ctx; - ts->prev_context = NULL; +static void assign_thread(tslot_t *ts, task_t *tsk) { + assert(!tsk->runner); + tsk->runner = ts; + tsk->prev_runner = NULL; + tsk->runnable = false; + ts->task = tsk; + ts->prev_task = NULL; } // call with sched lock -static bool rewake(pcontext_t *ctx) { - // Special case wake() where context is unassigned and a popped wake needs to be put back on the list. - // Should be rare. +static bool reschedule(task_t *tsk) { + // Special case schedule() where task is done/unassigned but sched_pending work has arrived. + // Should be an infrequent corner case. bool notify = false; - pn_proactor_t *p = ctx->proactor; + pn_proactor_t *p = tsk->proactor; lock(&p->eventfd_mutex); - assert(ctx->wake_pending); - assert(!ctx->on_wake_list); - ctx->wake_next = NULL; - ctx->on_wake_list = true; - if (!p->wake_list_first) { - p->wake_list_first = p->wake_list_last = ctx; + assert(tsk->ready); + assert(!tsk->on_ready_list); + tsk->ready_next = NULL; + tsk->on_ready_list = true; + if (!p->ready_list_first) { + p->ready_list_first = p->ready_list_last = tsk; } else { - p->wake_list_last->wake_next = ctx; - p->wake_list_last = ctx; + p->ready_list_last->ready_next = tsk; + p->ready_list_last = tsk; } - if (!p->wakes_in_progress) { - // force a wakeup via the eventfd - p->wakes_in_progress = true; + if (!p->ready_list_active) { + // unblock the poller via the eventfd + p->ready_list_active = true; notify = true; } unlock(&p->eventfd_mutex); @@ -446,37 +447,37 @@ static bool rewake(pcontext_t *ctx) { // Call with sched lock bool unassign_thread(tslot_t *ts, tslot_state new_state) { - pcontext_t *ctx = ts->context; + task_t *tsk = ts->task; bool notify = false; bool deleting = (ts->state == DELETING); - ts->context = NULL; + ts->task = NULL; ts->state = new_state; - if (ctx) { - ctx->runner = NULL; - ctx->prev_runner = ts; + if (tsk) { + tsk->runner = NULL; + tsk->prev_runner = ts; } - // Check if context has unseen events/wake that need processing. + // Check if unseen events or schedule() calls occurred while task was working. - if (ctx && !deleting) { - pn_proactor_t *p = ctx->proactor; - ts->prev_context = ts->context; - if (ctx->sched_pending) { - // Need a new wake - if (ctx->sched_wake) { - if (!ctx->on_wake_list) { + if (tsk && !deleting) { + pn_proactor_t *p = tsk->proactor; + ts->prev_task = ts->task; + if (tsk->sched_pending) { + // Make sure the task is already scheduled or put it on the ready list + if (tsk->sched_ready) { + if (!tsk->on_ready_list) { // Remember it for next poller - ctx->sched_wake = false; - notify = rewake(ctx); // back on wake list for poller to see + tsk->sched_ready = false; + notify = reschedule(tsk); // back on ready list for poller to see } // else already scheduled } else { - // bad corner case. Block ctx from being scheduled again until a later post_wake() - ctx->runner = REWAKE_PLACEHOLDER; + // bad corner case. Block tsk from being scheduled again until a later post_ready() + tsk->runner = RESCHEDULE_PLACEHOLDER; unlock(&p->sched_mutex); - lock(&ctx->mutex); - notify = wake(ctx); - unlock(&ctx->mutex); + lock(&tsk->mutex); + notify = schedule(tsk); + unlock(&tsk->mutex); lock(&p->sched_mutex); } } @@ -485,50 +486,50 @@ bool unassign_thread(tslot_t *ts, tslot_state new_state) { } // Call with sched lock -static void earmark_thread(tslot_t *ts, pcontext_t *ctx) { - assign_thread(ts, ctx); +static void earmark_thread(tslot_t *ts, task_t *tsk) { + assign_thread(ts, tsk); ts->earmarked = true; - ctx->proactor->earmark_count++; + tsk->proactor->earmark_count++; } // Call with sched lock static void remove_earmark(tslot_t *ts) { - pcontext_t *ctx = ts->context; - ts->context = NULL; - ctx->runner = NULL; + task_t *tsk = ts->task; + ts->task = NULL; + tsk->runner = NULL; ts->earmarked = false; - ctx->proactor->earmark_count--; + tsk->proactor->earmark_count--; } // Call with sched lock -static void make_runnable(pcontext_t *ctx) { - pn_proactor_t *p = ctx->proactor; +static void make_runnable(task_t *tsk) { + pn_proactor_t *p = tsk->proactor; assert(p->n_runnables <= p->runnables_capacity); - assert(!ctx->runnable); - if (ctx->runner) return; + assert(!tsk->runnable); + if (tsk->runner) return; - ctx->runnable = true; + tsk->runnable = true; // Track it as normal or warm or earmarked if (pni_warm_sched) { - tslot_t *ts = ctx->prev_runner; - if (ts && ts->prev_context == ctx) { + tslot_t *ts = tsk->prev_runner; + if (ts && ts->prev_task == tsk) { if (ts->state == SUSPENDED || ts->state == PROCESSING) { if (p->n_warm_runnables < p->thread_capacity) { - p->warm_runnables[p->n_warm_runnables++] = ctx; - assign_thread(ts, ctx); + p->warm_runnables[p->n_warm_runnables++] = tsk; + assign_thread(ts, tsk); } else - p->runnables[p->n_runnables++] = ctx; + p->runnables[p->n_runnables++] = tsk; return; } if (ts->state == UNUSED && !p->earmark_drain) { - earmark_thread(ts, ctx); + earmark_thread(ts, tsk); p->last_earmark = ts; return; } } } - p->runnables[p->n_runnables++] = ctx; + p->runnables[p->n_runnables++] = tsk; } @@ -567,7 +568,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { unlock(&driver_ptr_mutex); } -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup); +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup); static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block); @@ -585,12 +586,12 @@ static inline acceptor_t *psocket_acceptor(psocket_t* ps) { return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, psocket) : NULL; } -static inline pconnection_t *pcontext_pconnection(pcontext_t *c) { - return c->type == PCONNECTION ? containerof(c, pconnection_t, context) : NULL; +static inline pconnection_t *task_pconnection(task_t *t) { + return t->type == PCONNECTION ? containerof(t, pconnection_t, task) : NULL; } -static inline pn_listener_t *pcontext_listener(pcontext_t *c) { - return c->type == LISTENER ? containerof(c, pn_listener_t, context) : NULL; +static inline pn_listener_t *task_listener(task_t *t) { + return t->type == LISTENER ? containerof(t, pn_listener_t, task) : NULL; } static pn_event_t *listener_batch_next(pn_event_batch_t *batch); @@ -671,10 +672,10 @@ static acceptor_t *acceptor_list_next(acceptor_t **start) { return item; } -// Add an overflowing acceptor to the overflow list. Called with listener context lock held. +// Add an overflowing acceptor to the overflow list. Called with listener task lock held. static void acceptor_set_overflow(acceptor_t *a) { a->overflowed = true; - pn_proactor_t *p = a->listener->context.proactor; + pn_proactor_t *p = a->listener->task.proactor; lock(&p->overflow_mutex); acceptor_list_append(&p->overflow, a); unlock(&p->overflow_mutex); @@ -693,8 +694,8 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { acceptor_t *a = acceptor_list_next(&ovflw); while (a) { pn_listener_t *l = a->listener; - lock(&l->context.mutex); - bool rearming = !l->context.closing; + lock(&l->task.mutex); + bool rearming = !l->task.closing; bool notify = false; assert(!a->armed); assert(a->overflowed); @@ -703,9 +704,9 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { rearm(p, &a->psocket.epoll_io); a->armed = true; } - else notify = wake(&l->context); - unlock(&l->context.mutex); - if (notify) wake_notify(&l->context); + else notify = schedule(&l->task); + unlock(&l->task.mutex); + if (notify) notify_poller(&l->task); a = acceptor_list_next(&ovflw); } } @@ -738,7 +739,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con } - pcontext_init(&pc->context, PCONNECTION, p); + task_init(&pc->task, PCONNECTION, p); psocket_init(&pc->psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port); pc->new_events = 0; @@ -777,7 +778,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->context.wake_pending && !pc->tick_pending; + return !pc->current_arm && !pc->task.ready && !pc->tick_pending; } static void pconnection_final_free(pconnection_t *pc) { @@ -793,7 +794,7 @@ static void pconnection_final_free(pconnection_t *pc) { pmutex_finalize(&pc->rearm_mutex); pn_condition_free(pc->disconnect_condition); pn_connection_driver_destroy(&pc->driver); - pcontext_finalize(&pc->context); + task_finalize(&pc->task); pni_timer_free(pc->timer); free(pc); } @@ -803,13 +804,13 @@ static void pconnection_final_free(pconnection_t *pc) { static void pconnection_cleanup(pconnection_t *pc) { assert(pconnection_is_final(pc)); int fd = pc->psocket.epoll_io.fd; - stop_polling(&pc->psocket.epoll_io, pc->context.proactor->epollfd); + stop_polling(&pc->psocket.epoll_io, pc->task.proactor->epollfd); if (fd != -1) - pclosefd(pc->context.proactor, fd); + pclosefd(pc->task.proactor, fd); - lock(&pc->context.mutex); - bool can_free = proactor_remove(&pc->context); - unlock(&pc->context.mutex); + lock(&pc->task.mutex); + bool can_free = proactor_remove(&pc->task); + unlock(&pc->task.mutex); if (can_free) pconnection_final_free(pc); // else proactor_disconnect logic owns psocket and its final free @@ -832,8 +833,8 @@ static void ensure_wbuf(pconnection_t *pc) { // Call with lock held or from forced_shutdown static void pconnection_begin_close(pconnection_t *pc) { - if (!pc->context.closing) { - pc->context.closing = true; + if (!pc->task.closing) { + pc->task.closing = true; pc->tick_pending = false; if (pc->current_arm) { // Force EPOLLHUP callback(s) @@ -849,7 +850,7 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { pc->new_events = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. - pc->context.wake_pending = 0; + pc->task.ready = 0; pn_collector_release(pc->driver.collector); assert(pconnection_is_final(pc)); pconnection_cleanup(pc); @@ -859,18 +860,18 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { void pni_pconnection_timeout(pconnection_t *pc) { bool notify = false; uint64_t now = pn_proactor_now_64(); - lock(&pc->context.mutex); - if (!pc->context.closing) { + lock(&pc->task.mutex); + if (!pc->task.closing) { // confirm no simultaneous timeout change from another thread. if (pc->expected_timeout && now >= pc->expected_timeout) { pc->tick_pending = true; pc->expected_timeout = 0; - notify = wake(&pc->context); + notify = schedule(&pc->task); } } - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); if (notify) - wake_notify(&pc->context); + notify_poller(&pc->task); } static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { @@ -878,7 +879,7 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { if (!pc->driver.connection) return NULL; pn_event_t *e = pn_connection_driver_next_event(&pc->driver); if (!e) { - pn_proactor_t *p = pc->context.proactor; + pn_proactor_t *p = pc->task.proactor; bool idle_threads; lock(&p->sched_mutex); idle_threads = (p->suspend_list_head != NULL); @@ -913,7 +914,7 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { return pn_connection_driver_write_closed(&pc->driver); } -/* Call only from working context (no competitor for pc->current_arm or +/* Call only from working task (no competitor for pc->current_arm or connection driver). If true returned, caller must do pconnection_rearm(). @@ -944,13 +945,13 @@ static int pconnection_rearm_check(pconnection_t *pc) { static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { lock(&pc->rearm_mutex); pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now; - rearm(pc->context.proactor, &pc->psocket.epoll_io); + rearm(pc->task.proactor, &pc->psocket.epoll_io); unlock(&pc->rearm_mutex); // Return immediately. pc may have just been freed by another thread. } /* Only call when context switch is imminent. Sched lock is highly contested. */ -// Call with both context and sched locks. +// Call with both task and sched locks. static bool pconnection_sched_sync(pconnection_t *pc) { uint32_t sync_events = 0; uint32_t sync_args = pc->tick_pending << 1; @@ -960,12 +961,12 @@ static bool pconnection_sched_sync(pconnection_t *pc) { pc->current_arm = 0; // or outside lock? sync_events = pc->new_events; } - if (pc->context.sched_wake) { - pc->context.sched_wake = false; - wake_done(&pc->context); + if (pc->task.sched_ready) { + pc->task.sched_ready = false; + schedule_done(&pc->task); sync_args |= 1; } - pc->context.sched_pending = false; + pc->task.sched_pending = false; if (sync_args || sync_events) { // Only replace if poller has found new work for us. @@ -974,11 +975,11 @@ static bool pconnection_sched_sync(pconnection_t *pc) { } // Indicate if there are free proactor threads - pn_proactor_t *p = pc->context.proactor; + pn_proactor_t *p = pc->task.proactor; return p->poller_suspended || p->suspend_list_head; } -/* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */ +/* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; @@ -989,14 +990,15 @@ static inline bool pconnection_work_pending(pconnection_t *pc) { /* Call with no locks. */ static void pconnection_done(pconnection_t *pc) { - pn_proactor_t *p = pc->context.proactor; - tslot_t *ts = pc->context.runner; + pn_proactor_t *p = pc->task.proactor; + tslot_t *ts = pc->task.runner; write_flush(pc); bool notify = false; - bool self_wake = false; - lock(&pc->context.mutex); - pc->context.working = false; // So we can wake() ourself if necessary. We remain the de facto - // working context while the lock is held. Need sched_sync too to drain possible stale wake. + bool self_sched = false; + lock(&pc->task.mutex); + pc->task.working = false; // So we can schedule() ourself if necessary. We remain the de facto + // working task instance while the lock is held. Need sched_sync too to drain + // a possible stale sched_ready. pc->hog_count = 0; bool has_event = pconnection_has_event(pc); // Do as little as possible while holding the sched lock @@ -1005,33 +1007,33 @@ static void pconnection_done(pconnection_t *pc) { unlock(&p->sched_mutex); if (has_event || pconnection_work_pending(pc)) { - self_wake = true; + self_sched = true; } else if (pn_connection_driver_finished(&pc->driver)) { pconnection_begin_close(pc); if (pconnection_is_final(pc)) { - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); pconnection_cleanup(pc); // pc may be undefined now lock(&p->sched_mutex); notify = unassign_thread(ts, UNUSED); unlock(&p->sched_mutex); if (notify) - wake_notify(&p->context); + notify_poller(&p->task); return; } } - if (self_wake) - notify = wake(&pc->context); + if (self_sched) + notify = schedule(&pc->task); int wanted = pconnection_rearm_check(pc); - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return without touching pc again. lock(&p->sched_mutex); if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); - if (notify) wake_notify(&p->context); + if (notify) notify_poller(&p->task); return; } @@ -1104,33 +1106,33 @@ static void write_flush(pconnection_t *pc) { static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup) { +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) { bool waking = false; bool tick_required = false; bool immediate_write = false; - lock(&pc->context.mutex); + lock(&pc->task.mutex); if (!topup) { // Save some state in case of crash investigation. pc->process_events = events; - pc->process_args = (pc->tick_pending << 1) | sched_wake; + pc->process_args = (pc->tick_pending << 1) | sched_ready; } if (events) { pc->new_events = events; pc->current_arm = 0; events = 0; } - if (sched_wake) wake_done(&pc->context); + if (sched_ready) schedule_done(&pc->task); if (topup) { // Only called by the batch owner. Does not loop, just "tops up" // once. May be back depending on hog_count. - assert(pc->context.working); + assert(pc->task.working); } else { - if (pc->context.working) { - // Another thread is the working context. Should be impossible with new scheduler. + if (pc->task.working) { + // Another thread is the working task. Should be impossible with new scheduler. EPOLL_FATAL("internal epoll proactor error: two worker threads", 0); } - pc->context.working = true; + pc->task.working = true; } // Confirmed as working thread. Review state and unlock ASAP. @@ -1139,7 +1141,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (pc->queued_disconnect) { // From pn_proactor_disconnect() pc->queued_disconnect = false; - if (!pc->context.closing) { + if (!pc->task.closing) { if (pc->disconnect_condition) { pn_condition_copy(pn_transport_condition(pc->driver.transport), pc->disconnect_condition); } @@ -1148,7 +1150,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (pconnection_has_event(pc)) { - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); return &pc->batch; } bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc); @@ -1165,7 +1167,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, uint32_t update_events = pc->new_events; pc->current_arm = 0; pc->new_events = 0; - if (!pc->context.closing) { + if (!pc->task.closing) { if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) && !pconnection_wclosed(pc)) pconnection_maybe_connect_lh(pc); else @@ -1180,14 +1182,14 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } } - if (pc->context.closing && pconnection_is_final(pc)) { - unlock(&pc->context.mutex); + if (pc->task.closing && pconnection_is_final(pc)) { + unlock(&pc->task.mutex); pconnection_cleanup(pc); return NULL; } - unlock(&pc->context.mutex); - pc->hog_count++; // working context doing work + unlock(&pc->task.mutex); + pc->hog_count++; // working task doing work if (waking) { pn_connection_t *c = pc->driver.connection; @@ -1245,45 +1247,45 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, write_flush(pc); - lock(&pc->context.mutex); - if (pc->context.closing && pconnection_is_final(pc)) { - unlock(&pc->context.mutex); + lock(&pc->task.mutex); + if (pc->task.closing && pconnection_is_final(pc)) { + unlock(&pc->task.mutex); pconnection_cleanup(pc); return NULL; } // Never stop working while work remains. hog_count exception to this rule is elsewhere. - lock(&pc->context.proactor->sched_mutex); + lock(&pc->task.proactor->sched_mutex); bool workers_free = pconnection_sched_sync(pc); - unlock(&pc->context.proactor->sched_mutex); + unlock(&pc->task.proactor->sched_mutex); if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking } - pc->context.working = false; + pc->task.working = false; pc->hog_count = 0; if (pn_connection_driver_finished(&pc->driver)) { pconnection_begin_close(pc); if (pconnection_is_final(pc)) { - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); pconnection_cleanup(pc); return NULL; } } - if (workers_free && !pc->context.closing && !pc->io_doublecheck) { + if (workers_free && !pc->task.closing && !pc->io_doublecheck) { // check one last time for new io before context switch pc->io_doublecheck = true; pc->read_blocked = false; pc->write_blocked = false; - pc->context.working = true; + pc->task.working = true; goto retry; } int wanted = pconnection_rearm_check(pc); // holds rearm_mutex until pconnection_rearm() below - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return right away. return NULL; } @@ -1297,7 +1299,7 @@ void configure_socket(int sock) { (void)setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void*) &tcp_nodelay, sizeof(tcp_nodelay)); } -/* Called with context.lock held */ +/* Called with task.lock held */ void pconnection_connected_lh(pconnection_t *pc) { if (!pc->connected) { pc->connected = true; @@ -1313,7 +1315,7 @@ void pconnection_connected_lh(pconnection_t *pc) { /* multi-address connections may call pconnection_start multiple times with diffferent FDs */ static void pconnection_start(pconnection_t *pc, int fd) { - int efd = pc->context.proactor->epollfd; + int efd = pc->task.proactor->epollfd; /* Get the local socket name now, get the peer name in pconnection_connected */ socklen_t len = sizeof(pc->local.ss); (void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len); @@ -1322,7 +1324,7 @@ static void pconnection_start(pconnection_t *pc, int fd) { if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */ int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */ stop_polling(ee, efd); - pclosefd(pc->context.proactor, fd); + pclosefd(pc->task.proactor, fd); } ee->fd = fd; pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT; @@ -1369,14 +1371,14 @@ int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo } static inline bool is_inactive(pn_proactor_t *p) { - return (!p->contexts && !p->disconnects_pending && !p->timeout_set && !p->shutting_down); + return (!p->tasks && !p->disconnects_pending && !p->timeout_set && !p->shutting_down); } -/* If inactive set need_inactive and return true if the proactor needs a wakeup */ -bool wake_if_inactive(pn_proactor_t *p) { +/* If inactive set need_inactive and return true if poller needs to be unblocked */ +bool schedule_if_inactive(pn_proactor_t *p) { if (is_inactive(p)) { p->need_inactive = true; - return wake(&p->context); + return schedule(&p->task); } return false; } @@ -1392,34 +1394,34 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * } // TODO: check case of proactor shutting down - lock(&pc->context.mutex); - proactor_add(&pc->context); + lock(&pc->task.mutex); + proactor_add(&pc->task); pn_connection_open(pc->driver.connection); /* Auto-open */ bool notify = false; bool notify_proactor = false; if (pc->disconnected) { - notify = wake(&pc->context); /* Error during initialization */ + notify = schedule(&pc->task); /* Error during initialization */ } else { int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo); if (!gai_error) { pn_connection_open(pc->driver.connection); /* Auto-open */ pc->ai = pc->addrinfo; pconnection_maybe_connect_lh(pc); /* Start connection attempts */ - if (pc->disconnected) notify = wake(&pc->context); + if (pc->disconnected) notify = schedule(&pc->task); } else { psocket_gai_error(&pc->psocket, gai_error, "connect to "); - notify = wake(&pc->context); - lock(&p->context.mutex); - notify_proactor = wake_if_inactive(p); - unlock(&p->context.mutex); + notify = schedule(&pc->task); + lock(&p->task.mutex); + notify_proactor = schedule_if_inactive(p); + unlock(&p->task.mutex); } } /* We need to issue INACTIVE on immediate failure */ - unlock(&pc->context.mutex); - if (notify) wake_notify(&pc->context); - if (notify_proactor) wake_notify(&p->context); + unlock(&pc->task.mutex); + if (notify) notify_poller(&pc->task); + if (notify_proactor) notify_poller(&p->task); } static void pconnection_tick(pconnection_t *pc) { @@ -1428,9 +1430,9 @@ static void pconnection_tick(pconnection_t *pc) { uint64_t now = pn_proactor_now_64(); uint64_t next = pn_transport_tick(t, now); if (next) { - lock(&pc->context.mutex); + lock(&pc->task.mutex); pc->expected_timeout = next; - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); pni_timer_set(pc->timer, next); } } @@ -1440,14 +1442,14 @@ void pn_connection_wake(pn_connection_t* c) { bool notify = false; pconnection_t *pc = get_pconnection(c); if (pc) { - lock(&pc->context.mutex); - if (!pc->context.closing) { + lock(&pc->task.mutex); + if (!pc->task.closing) { pc->wake_count++; - notify = wake(&pc->context); + notify = schedule(&pc->task); } - unlock(&pc->context.mutex); + unlock(&pc->task.mutex); } - if (notify) wake_notify(&pc->context); + if (notify) notify_poller(&pc->task); } void pn_proactor_release_connection(pn_connection_t *c) { @@ -1455,13 +1457,13 @@ void pn_proactor_release_connection(pn_connection_t *c) { pconnection_t *pc = get_pconnection(c); if (pc) { set_pconnection(c, NULL); - lock(&pc->context.mutex); + lock(&pc->task.mutex); pn_connection_driver_release_connection(&pc->driver); pconnection_begin_close(pc); - notify = wake(&pc->context); - unlock(&pc->context.mutex); + notify = schedule(&pc->task); + unlock(&pc->task.mutex); } - if (notify) wake_notify(&pc->context); + if (notify) notify_poller(&pc->task); } // ======================================================================== @@ -1484,7 +1486,7 @@ pn_listener_t *pn_listener() { return NULL; } pn_proactor_t *unknown = NULL; // won't know until pn_proactor_listen - pcontext_init(&l->context, LISTENER, unknown); + task_init(&l->task, LISTENER, unknown); } return l; } @@ -1492,8 +1494,8 @@ pn_listener_t *pn_listener() { void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, int backlog) { // TODO: check listener not already listening for this or another proactor - lock(&l->context.mutex); - l->context.proactor = p; + lock(&l->task.mutex); + l->task.proactor = p; l->pending_accepteds = (accepted_t*)calloc(backlog, sizeof(accepted_t)); assert(l->pending_accepteds); @@ -1544,7 +1546,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in ps->epoll_io.fd = fd; ps->epoll_io.wanted = EPOLLIN; ps->epoll_io.polling = false; - start_polling(&ps->epoll_io, l->context.proactor->epollfd); // TODO: check for error + start_polling(&ps->epoll_io, l->task.proactor->epollfd); // TODO: check for error l->active_count++; acceptor->armed = true; } else { @@ -1556,7 +1558,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in if (addrinfo) { freeaddrinfo(addrinfo); } - bool notify = wake(&l->context); + bool notify = schedule(&l->task); if (l->acceptors_size == 0) { /* All failed, create dummy socket with an error */ l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t)); @@ -1572,19 +1574,19 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in } else { pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_OPEN); } - proactor_add(&l->context); - unlock(&l->context.mutex); - if (notify) wake_notify(&l->context); + proactor_add(&l->task); + unlock(&l->task.mutex); + if (notify) notify_poller(&l->task); return; } -// call with lock held and context.working false +// call with lock held and task.working false static inline bool listener_can_free(pn_listener_t *l) { - return l->context.closing && l->close_dispatched && !l->context.wake_pending && !l->active_count; + return l->task.closing && l->close_dispatched && !l->task.ready && !l->active_count; } static inline void listener_final_free(pn_listener_t *l) { - pcontext_finalize(&l->context); + task_finalize(&l->task); free(l->acceptors); free(l->pending_accepteds); free(l); @@ -1599,11 +1601,11 @@ void pn_listener_free(pn_listener_t *l) { if (l->collector) pn_collector_free(l->collector); if (l->condition) pn_condition_free(l->condition); if (l->attachments) pn_free(l->attachments); - lock(&l->context.mutex); - if (l->context.proactor) { - can_free = proactor_remove(&l->context); + lock(&l->task.mutex); + if (l->task.proactor) { + can_free = proactor_remove(&l->task); } - unlock(&l->context.mutex); + unlock(&l->task.mutex); if (can_free) listener_final_free(l); } @@ -1611,8 +1613,8 @@ void pn_listener_free(pn_listener_t *l) { /* Always call with lock held so it can be unlocked around overflow processing. */ static void listener_begin_close(pn_listener_t* l) { - if (!l->context.closing) { - l->context.closing = true; + if (!l->task.closing) { + l->task.closing = true; /* Close all listening sockets */ for (size_t i = 0; i < l->acceptors_size; ++i) { @@ -1623,7 +1625,7 @@ static void listener_begin_close(pn_listener_t* l) { shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback } else { int fd = ps->epoll_io.fd; - stop_polling(&ps->epoll_io, l->context.proactor->epollfd); + stop_polling(&ps->epoll_io, l->task.proactor->epollfd); close(fd); l->active_count--; } @@ -1638,39 +1640,39 @@ static void listener_begin_close(pn_listener_t* l) { } assert(!l->pending_count); - unlock(&l->context.mutex); + unlock(&l->task.mutex); /* Remove all acceptors from the overflow list. closing flag prevents re-insertion.*/ proactor_rearm_overflow(pn_listener_proactor(l)); - lock(&l->context.mutex); + lock(&l->task.mutex); pn_collector_put(l->collector, PN_CLASSCLASS(pn_listener), l, PN_LISTENER_CLOSE); } } void pn_listener_close(pn_listener_t* l) { bool notify = false; - lock(&l->context.mutex); - if (!l->context.closing) { + lock(&l->task.mutex); + if (!l->task.closing) { listener_begin_close(l); - notify = wake(&l->context); + notify = schedule(&l->task); } - unlock(&l->context.mutex); - if (notify) wake_notify(&l->context); + unlock(&l->task.mutex); + if (notify) notify_poller(&l->task); } static void listener_forced_shutdown(pn_listener_t *l) { // Called by proactor_free, no competing threads, no epoll activity. - lock(&l->context.mutex); // needed because of interaction with proactor_rearm_overflow + lock(&l->task.mutex); // needed because of interaction with proactor_rearm_overflow listener_begin_close(l); - unlock(&l->context.mutex); + unlock(&l->task.mutex); // pconnection_process will never be called again. Zero everything. - l->context.wake_pending = 0; + l->task.ready = 0; l->close_dispatched = true; l->active_count = 0; assert(listener_can_free(l)); pn_listener_free(l); } -/* Accept a connection as part of listener_process(). Called with listener context lock held. */ +/* Accept a connection as part of listener_process(). Called with listener task lock held. */ /* Keep on accepting until we fill the backlog, would block or get an error */ static void listener_accept_lh(psocket_t *ps) { pn_listener_t *l = psocket_listener(ps); @@ -1693,22 +1695,22 @@ static void listener_accept_lh(psocket_t *ps) { } /* Process a listening socket */ -static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool wake) { +static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool tsk_ready) { // TODO: some parallelization of the accept mechanism. // pn_listener_t *l = psocket_listener(ps); // acceptor_t *a = psocket_acceptor(ps); - lock(&l->context.mutex); + lock(&l->task.mutex); if (n_events) { for (size_t i = 0; i < l->acceptors_size; i++) { psocket_t *ps = &l->acceptors[i].psocket; if (ps->working_io_events) { uint32_t events = ps->working_io_events; ps->working_io_events = 0; - if (l->context.closing) { + if (l->task.closing) { l->acceptors[i].armed = false; int fd = ps->epoll_io.fd; - stop_polling(&ps->epoll_io, l->context.proactor->epollfd); + stop_polling(&ps->epoll_io, l->task.proactor->epollfd); close(fd); l->active_count--; } else { @@ -1716,37 +1718,37 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool w if (events & EPOLLRDHUP) { /* Calls listener_begin_close which closes all the listener's sockets */ psocket_error(ps, errno, "listener epoll"); - } else if (!l->context.closing && events & EPOLLIN) { + } else if (!l->task.closing && events & EPOLLIN) { listener_accept_lh(ps); } } } } } - if (wake) { - wake_done(&l->context); // callback accounting + if (tsk_ready) { + schedule_done(&l->task); // callback accounting } pn_event_batch_t *lb = NULL; - if (!l->context.working) { - l->context.working = true; + if (!l->task.working) { + l->task.working = true; if (listener_has_event(l)) lb = &l->batch; else { - l->context.working = false; + l->task.working = false; if (listener_can_free(l)) { - unlock(&l->context.mutex); + unlock(&l->task.mutex); pn_listener_free(l); return NULL; } } } - unlock(&l->context.mutex); + unlock(&l->task.mutex); return lb; } static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { pn_listener_t *l = batch_listener(batch); - lock(&l->context.mutex); + lock(&l->task.mutex); pn_event_t *e = pn_collector_next(l->collector); if (!e && l->pending_count) { // empty collector means pn_collector_put() will not coalesce @@ -1755,21 +1757,21 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { } if (e && pn_event_type(e) == PN_LISTENER_CLOSE) l->close_dispatched = true; - unlock(&l->context.mutex); + unlock(&l->task.mutex); return pni_log_event(l, e); } static void listener_done(pn_listener_t *l) { - pn_proactor_t *p = l->context.proactor; - tslot_t *ts = l->context.runner; - lock(&l->context.mutex); + pn_proactor_t *p = l->task.proactor; + tslot_t *ts = l->task.runner; + lock(&l->task.mutex); // Just in case the app didn't accept all the pending accepts // Shuffle the list back to start at 0 memmove(&l->pending_accepteds[0], &l->pending_accepteds[l->pending_first], l->pending_count * sizeof(accepted_t)); l->pending_first = 0; - if (!l->context.closing) { + if (!l->task.closing) { for (size_t i = 0; i < l->acceptors_size; i++) { acceptor_t *a = &l->acceptors[i]; psocket_t *ps = &a->psocket; @@ -1777,7 +1779,7 @@ static void listener_done(pn_listener_t *l) { // Rearm acceptor when appropriate if (ps->epoll_io.polling && l->pending_count==0 && !a->overflowed) { if (!a->armed) { - rearm(l->context.proactor, &ps->epoll_io); + rearm(l->task.proactor, &ps->epoll_io); a->armed = true; } } @@ -1785,7 +1787,7 @@ static void listener_done(pn_listener_t *l) { } bool notify = false; - l->context.working = false; + l->task.working = false; lock(&p->sched_mutex); int n_events = 0; @@ -1799,34 +1801,34 @@ static void listener_done(pn_listener_t *l) { n_events++; } - if (l->context.sched_wake) { - l->context.sched_wake = false; - wake_done(&l->context); + if (l->task.sched_ready) { + l->task.sched_ready = false; + schedule_done(&l->task); } unlock(&p->sched_mutex); if (!n_events && listener_can_free(l)) { - unlock(&l->context.mutex); + unlock(&l->task.mutex); pn_listener_free(l); lock(&p->sched_mutex); notify = unassign_thread(ts, UNUSED); unlock(&p->sched_mutex); if (notify) - wake_notify(&p->context); + notify_poller(&p->task); return; } else if (n_events || listener_has_event(l)) - notify = wake(&l->context); - unlock(&l->context.mutex); + notify = schedule(&l->task); + unlock(&l->task.mutex); lock(&p->sched_mutex); if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); - if (notify) wake_notify(&l->context); + if (notify) notify_poller(&l->task); } pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - return l ? l->context.proactor : NULL; + return l ? l->task.proactor : NULL; } pn_condition_t* pn_listener_condition(pn_listener_t* l) { @@ -1858,8 +1860,8 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t int err2 = 0; int fd = -1; bool notify = false; - lock(&l->context.mutex); - if (l->context.closing) + lock(&l->task.mutex); + if (l->task.closing) err2 = EBADF; else { accepted_t *a = listener_accepted_next(l); @@ -1870,8 +1872,8 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t else err2 = EWOULDBLOCK; } - proactor_add(&pc->context); - lock(&pc->context.mutex); + proactor_add(&pc->task); + lock(&pc->task.mutex); if (fd >= 0) { configure_socket(fd); pconnection_start(pc, fd); @@ -1879,11 +1881,11 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t } else psocket_error(&pc->psocket, err2, "pn_listener_accept"); - if (!l->context.working && listener_has_event(l)) - notify = wake(&l->context); - unlock(&pc->context.mutex); - unlock(&l->context.mutex); - if (notify) wake_notify(&l->context); + if (!l->task.working && listener_has_event(l)) + notify = schedule(&l->task); + unlock(&pc->task.mutex); + unlock(&l->task.mutex); + if (notify) notify_poller(&l->task); } @@ -1899,7 +1901,7 @@ static void grow_poller_bufs(pn_proactor_t* p) { p->thread_capacity += 8; } while (p->thread_count > p->thread_capacity); - p->warm_runnables = (pcontext_t **) realloc(p->warm_runnables, p->thread_capacity * sizeof(pcontext_t *)); + p->warm_runnables = (task_t **) realloc(p->warm_runnables, p->thread_capacity * sizeof(task_t *)); p->resume_list = (tslot_t **) realloc(p->resume_list, p->thread_capacity * sizeof(tslot_t *)); int old_cap = p->runnables_capacity; @@ -1908,7 +1910,7 @@ static void grow_poller_bufs(pn_proactor_t* p) { else if (p->runnables_capacity < p->thread_capacity) p->runnables_capacity = p->thread_capacity; if (p->runnables_capacity != old_cap) { - p->runnables = (pcontext_t **) realloc(p->runnables, p->runnables_capacity * sizeof(pcontext_t *)); + p->runnables = (task_t **) realloc(p->runnables, p->runnables_capacity * sizeof(task_t *)); p->kevents_capacity = p->runnables_capacity; size_t sz = p->kevents_capacity * sizeof(struct epoll_event); p->kevents = (struct epoll_event *) realloc(p->kevents, sz); @@ -1916,10 +1918,10 @@ static void grow_poller_bufs(pn_proactor_t* p) { } } -/* Set up an epoll_extended_t to be used for wakeup or interrupts */ - static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) { +/* Set up an epoll_extended_t to be used for ready list schedule() or interrupts */ + static void epoll_eventfd_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) { ee->fd = eventfd; - ee->type = WAKE; + ee->type = EVENT_FD; if (always_set) { uint64_t increment = 1; if (write(eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t)) @@ -1942,7 +1944,7 @@ pn_proactor_t *pn_proactor() { pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); if (!p) return NULL; p->epollfd = p->eventfd = -1; - pcontext_init(&p->context, PROACTOR, p); + task_init(&p->task, PROACTOR, p); pmutex_init(&p->eventfd_mutex); pmutex_init(&p->sched_mutex); pmutex_init(&p->tslot_mutex); @@ -1954,8 +1956,8 @@ pn_proactor_t *pn_proactor() { if ((p->collector = pn_collector()) != NULL) { p->batch.next_event = &proactor_batch_next; start_polling(&p->timer_manager.epoll_timer, p->epollfd); // TODO: check for error - epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true); - epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false); + epoll_eventfd_init(&p->epoll_schedule, p->eventfd, p->epollfd, true); + epoll_eventfd_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false); p->tslot_map = pn_hash(PN_VOID, 0, 0.75); grow_poller_bufs(p); return p; @@ -1985,15 +1987,15 @@ void pn_proactor_free(pn_proactor_t *p) { p->eventfd = -1; close(p->interruptfd); p->interruptfd = -1; - while (p->contexts) { - pcontext_t *ctx = p->contexts; - p->contexts = ctx->next; - switch (ctx->type) { + while (p->tasks) { + task_t *tsk = p->tasks; + p->tasks = tsk->next; + switch (tsk->type) { case PCONNECTION: - pconnection_forced_shutdown(pcontext_pconnection(ctx)); + pconnection_forced_shutdown(task_pconnection(tsk)); break; case LISTENER: - listener_forced_shutdown(pcontext_listener(ctx)); + listener_forced_shutdown(task_listener(tsk)); break; default: break; @@ -2005,7 +2007,7 @@ void pn_proactor_free(pn_proactor_t *p) { pmutex_finalize(&p->tslot_mutex); pmutex_finalize(&p->sched_mutex); pmutex_finalize(&p->eventfd_mutex); - pcontext_finalize(&p->context); + task_finalize(&p->task); for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) { tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry); pmutex_finalize(&ts->mutex); @@ -2022,7 +2024,7 @@ void pn_proactor_free(pn_proactor_t *p) { pn_proactor_t *pn_event_proactor(pn_event_t *e) { if (pn_event_class(e) == PN_CLASSCLASS(pn_proactor)) return (pn_proactor_t*)pn_event_context(e); pn_listener_t *l = pn_event_listener(e); - if (l) return l->context.proactor; + if (l) return l->task.proactor; pn_connection_t *c = pn_event_connection(e); if (c) return pn_connection_proactor(c); return NULL; @@ -2060,7 +2062,7 @@ static bool proactor_update_batch(pn_proactor_t *p) { static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { pn_proactor_t *p = batch_proactor(batch); - lock(&p->context.mutex); + lock(&p->task.mutex); proactor_update_batch(p); pn_event_t *e = pn_collector_next(p->collector); if (e) { @@ -2068,91 +2070,91 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { if (p->current_event_type == PN_PROACTOR_TIMEOUT) p->timeout_processed = true; } - unlock(&p->context.mutex); + unlock(&p->task.mutex); return pni_log_event(p, e); } -static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, bool wake) { +static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, bool tsk_ready) { if (interrupt) { (void)read_uint64(p->interruptfd); rearm(p, &p->epoll_interrupt); } - lock(&p->context.mutex); + lock(&p->task.mutex); if (interrupt) { p->need_interrupt = true; } - if (wake) { - wake_done(&p->context); + if (tsk_ready) { + schedule_done(&p->task); } - if (!p->context.working) { /* Can generate proactor events */ + if (!p->task.working) { /* Can generate proactor events */ if (proactor_update_batch(p)) { - p->context.working = true; - unlock(&p->context.mutex); + p->task.working = true; + unlock(&p->task.mutex); return &p->batch; } } - unlock(&p->context.mutex); + unlock(&p->task.mutex); return NULL; } -void proactor_add(pcontext_t *ctx) { - pn_proactor_t *p = ctx->proactor; - lock(&p->context.mutex); - if (p->contexts) { - p->contexts->prev = ctx; - ctx->next = p->contexts; +void proactor_add(task_t *tsk) { + pn_proactor_t *p = tsk->proactor; + lock(&p->task.mutex); + if (p->tasks) { + p->tasks->prev = tsk; + tsk->next = p->tasks; } - p->contexts = ctx; - p->context_count++; - unlock(&p->context.mutex); + p->tasks = tsk; + p->task_count++; + unlock(&p->task.mutex); } // call with psocket's mutex held // return true if safe for caller to free psocket -bool proactor_remove(pcontext_t *ctx) { - pn_proactor_t *p = ctx->proactor; - // Disassociate this context from scheduler +bool proactor_remove(task_t *tsk) { + pn_proactor_t *p = tsk->proactor; + // Disassociate this task from scheduler if (!p->shutting_down) { lock(&p->sched_mutex); - ctx->runner->state = DELETING; + tsk->runner->state = DELETING; for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = pn_hash_next(p->tslot_map, entry)) { tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry); - if (ts->context == ctx) - ts->context = NULL; - if (ts->prev_context == ctx) - ts->prev_context = NULL; + if (ts->task == tsk) + ts->task = NULL; + if (ts->prev_task == tsk) + ts->prev_task = NULL; } unlock(&p->sched_mutex); } - lock(&p->context.mutex); + lock(&p->task.mutex); bool can_free = true; - if (ctx->disconnecting) { - // No longer on contexts list + if (tsk->disconnecting) { + // No longer on tasks list --p->disconnects_pending; - if (--ctx->disconnect_ops != 0) { + if (--tsk->disconnect_ops != 0) { // procator_disconnect() does the free can_free = false; } } else { // normal case - if (ctx->prev) - ctx->prev->next = ctx->next; + if (tsk->prev) + tsk->prev->next = tsk->next; else { - p->contexts = ctx->next; - ctx->next = NULL; - if (p->contexts) - p->contexts->prev = NULL; + p->tasks = tsk->next; + tsk->next = NULL; + if (p->tasks) + p->tasks->prev = NULL; } - if (ctx->next) { - ctx->next->prev = ctx->prev; + if (tsk->next) { + tsk->next->prev = tsk->prev; } - p->context_count--; + p->task_count--; } - bool notify = wake_if_inactive(p); - unlock(&p->context.mutex); - if (notify) wake_notify(&p->context); + bool notify = schedule_if_inactive(p); + unlock(&p->task.mutex); + if (notify) notify_poller(&p->task); return can_free; } @@ -2189,34 +2191,34 @@ static tslot_t *resume_one_thread(pn_proactor_t *p) { } // Called with sched lock, returns with sched lock still held. -static pn_event_batch_t *process(pcontext_t *ctx) { - bool ctx_wake = false; - ctx->sched_pending = false; - if (ctx->sched_wake) { - // update the wake status before releasing the sched_mutex - ctx->sched_wake = false; - ctx_wake = true; - } - pn_proactor_t *p = ctx->proactor; +static pn_event_batch_t *process(task_t *tsk) { + bool tsk_ready = false; + tsk->sched_pending = false; + if (tsk->sched_ready) { + // update the ready status before releasing the sched_mutex + tsk->sched_ready = false; + tsk_ready = true; + } + pn_proactor_t *p = tsk->proactor; pn_event_batch_t* batch = NULL; - switch (ctx->type) { + switch (tsk->type) { case PROACTOR: { bool intr = p->sched_interrupt; if (intr) p->sched_interrupt = false; unlock(&p->sched_mutex); - batch = proactor_process(p, intr, ctx_wake); + batch = proactor_process(p, intr, tsk_ready); break; } case PCONNECTION: { - pconnection_t *pc = pcontext_pconnection(ctx); + pconnection_t *pc = task_pconnection(tsk); uint32_t events = pc->psocket.sched_io_events; if (events) pc->psocket.sched_io_events = 0; unlock(&p->sched_mutex); - batch = pconnection_process(pc, events, ctx_wake, false); + batch = pconnection_process(pc, events, tsk_ready, false); break; } case LISTENER: { - pn_listener_t *l = pcontext_listener(ctx); + pn_listener_t *l = task_listener(tsk); int n_events = 0; for (size_t i = 0; i < l->acceptors_size; i++) { psocket_t *ps = &l->acceptors[i].psocket; @@ -2228,12 +2230,12 @@ static pn_event_batch_t *process(pcontext_t *ctx) { n_events++; } unlock(&p->sched_mutex); - batch = listener_process(l, n_events, ctx_wake); + batch = listener_process(l, n_events, tsk_ready); break; } case RAW_CONNECTION: { unlock(&p->sched_mutex); - batch = pni_raw_connection_process(ctx, ctx_wake); + batch = pni_raw_connection_process(tsk, tsk_ready); break; } case TIMER_MANAGER: { @@ -2241,7 +2243,7 @@ static pn_event_batch_t *process(pcontext_t *ctx) { bool timeout = tm->sched_timeout; if (timeout) tm->sched_timeout = false; unlock(&p->sched_mutex); - batch = pni_timer_manager_process(tm, timeout, ctx_wake); + batch = pni_timer_manager_process(tm, timeout, tsk_ready); break; } default: @@ -2252,37 +2254,37 @@ static pn_event_batch_t *process(pcontext_t *ctx) { } -// Call with both sched and wake locks -static void schedule_wake_list(pn_proactor_t *p) { - // append wake_list_first..wake_list_last to end of sched_wake_last - if (p->wake_list_first) { - if (p->sched_wake_last) - p->sched_wake_last->wake_next = p->wake_list_first; // join them - if (!p->sched_wake_first) - p->sched_wake_first = p->wake_list_first; - p->sched_wake_last = p->wake_list_last; - if (!p->sched_wake_current) - p->sched_wake_current = p->sched_wake_first; - p->wake_list_first = p->wake_list_last = NULL; +// Call with both sched_mutex and eventfd_mutex held +static void schedule_ready_list(pn_proactor_t *p) { + // append ready_list_first..ready_list_last to end of sched_ready_last + if (p->ready_list_first) { + if (p->sched_ready_last) + p->sched_ready_last->ready_next = p->ready_list_first; // join them + if (!p->sched_ready_first) + p->sched_ready_first = p->ready_list_first; + p->sched_ready_last = p->ready_list_last; + if (!p->sched_ready_current) + p->sched_ready_current = p->sched_ready_first; + p->ready_list_first = p->ready_list_last = NULL; } } // Call with schedule lock held. Called only by poller thread. -static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { +static task_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr; - pcontext_t *ctx = NULL; + task_t *tsk = NULL; switch (ee->type) { - case WAKE: + case EVENT_FD: if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ p->sched_interrupt = true; - ctx = &p->context; - ctx->sched_pending = true; + tsk = &p->task; + tsk->sched_pending = true; } else { - // main eventfd wake + // main ready tasks eventfd lock(&p->eventfd_mutex); - schedule_wake_list(p); - ctx = p->sched_wake_current; + schedule_ready_list(p); + tsk = p->sched_ready_current; unlock(&p->eventfd_mutex); } break; @@ -2290,51 +2292,51 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { psocket_t *ps = containerof(ee, psocket_t, epoll_io); pconnection_t *pc = psocket_pconnection(ps); assert(pc); - ctx = &pc->context; + tsk = &pc->task; ps->sched_io_events = evp->events; - ctx->sched_pending = true; + tsk->sched_pending = true; break; } case LISTENER_IO: { psocket_t *ps = containerof(ee, psocket_t, epoll_io); pn_listener_t *l = psocket_listener(ps); assert(l); - ctx = &l->context; + tsk = &l->task; ps->sched_io_events = evp->events; - ctx->sched_pending = true; + tsk->sched_pending = true; break; } case RAW_CONNECTION_IO: { psocket_t *ps = containerof(ee, psocket_t, epoll_io); - ctx = pni_psocket_raw_context(ps); + tsk = pni_psocket_raw_task(ps); ps->sched_io_events = evp->events; - ctx->sched_pending = true; + tsk->sched_pending = true; break; } case TIMER: { pni_timer_manager_t *tm = &p->timer_manager; - ctx = &tm->context; + tsk = &tm->task; tm->sched_timeout = true; - ctx->sched_pending = true; + tsk->sched_pending = true; break; } } - if (ctx && !ctx->runnable && !ctx->runner) - return ctx; + if (tsk && !tsk->runnable && !tsk->runner) + return tsk; return NULL; } -static pcontext_t *post_wake(pn_proactor_t *p, pcontext_t *ctx) { - ctx->sched_wake = true; - ctx->sched_pending = true; - if (!ctx->runnable && !ctx->runner) - return ctx; +static task_t *post_ready(pn_proactor_t *p, task_t *tsk) { + tsk->sched_ready = true; + tsk->sched_pending = true; + if (!tsk->runnable && !tsk->runner) + return tsk; return NULL; } // call with sched_lock held -static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) { +static task_t *next_drain(pn_proactor_t *p, tslot_t *ts) { // This should be called seldomly, best case once per thread removal on shutdown. // TODO: how to reduce? Instrumented near 5 percent of earmarks, 1 in 2000 calls to do_epoll(). @@ -2342,12 +2344,12 @@ static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) { tslot_t *ts2 = (tslot_t *) pn_hash_value(p->tslot_map, entry); if (ts2->earmarked) { // undo the old assign thread and earmark. ts2 may never come back - pcontext_t *switch_ctx = ts2->context; + task_t *switch_tsk = ts2->task; remove_earmark(ts2); - assign_thread(ts, switch_ctx); + assign_thread(ts, switch_tsk); ts->earmark_override = ts2; ts->earmark_override_gen = ts2->generation; - return switch_ctx; + return switch_tsk; } } assert(false); @@ -2355,51 +2357,51 @@ static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) { } // call with sched_lock held -static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { - if (ts->context) { +static task_t *next_runnable(pn_proactor_t *p, tslot_t *ts) { + if (ts->task) { // Already assigned if (ts->earmarked) { ts->earmarked = false; if (--p->earmark_count == 0) p->earmark_drain = false; } - return ts->context; + return ts->task; } // warm pairing ? - pcontext_t *ctx = ts->prev_context; - if (ctx && (ctx->runnable)) { // or ctx->sched_wake too? - assign_thread(ts, ctx); - return ctx; + task_t *tsk = ts->prev_task; + if (tsk && (tsk->runnable)) { // or tsk->sched_ready too? + assign_thread(ts, tsk); + return tsk; } - // check for an unassigned runnable context or unprocessed wake + // check for an unassigned runnable task or ready list task if (p->n_runnables) { // Any unclaimed runnable? while (p->n_runnables) { - ctx = p->runnables[p->next_runnable++]; + tsk = p->runnables[p->next_runnable++]; if (p->n_runnables == p->next_runnable) p->n_runnables = 0; - if (ctx->runnable) { - assign_thread(ts, ctx); - return ctx; + if (tsk->runnable) { + assign_thread(ts, tsk); + return tsk; } } } - if (p->sched_wake_current) { - ctx = p->sched_wake_current; - pop_wake(ctx); // updates sched_wake_current - assert(!ctx->runnable && !ctx->runner); - assign_thread(ts, ctx); - return ctx; + if (p->sched_ready_current) { + tsk = p->sched_ready_current; + pop_ready_task(tsk); // updates sched_ready_current + assert(!tsk->runnable && !tsk->runner); + assign_thread(ts, tsk); + return tsk; } if (p->earmark_drain) { - ctx = next_drain(p, ts); + tsk = next_drain(p, ts); if (p->earmark_count == 0) p->earmark_drain = false; - return ctx; + return tsk; } return NULL; @@ -2412,17 +2414,17 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { ts->generation++; // wrapping OK. Just looking for any change lock(&p->sched_mutex); - assert(ts->context == NULL || ts->earmarked); + assert(ts->task == NULL || ts->earmarked); assert(ts->state == UNUSED || ts->state == NEW); ts->state = PROCESSING; // Process outstanding epoll events until we get a batch or need to block. while (true) { - // First see if there are any contexts waiting to run and perhaps generate new Proton events, - pcontext_t *ctx = next_runnable(p, ts); - if (ctx) { + // First see if there are any tasks waiting to run and perhaps generate new Proton events, + task_t *tsk = next_runnable(p, ts); + if (tsk) { ts->state = BATCHING; - pn_event_batch_t *batch = process(ctx); + pn_event_batch_t *batch = process(tsk); if (batch) { unlock(&p->sched_mutex); return batch; @@ -2430,17 +2432,17 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { bool notify = unassign_thread(ts, PROCESSING); if (notify) { unlock(&p->sched_mutex); - wake_notify(&p->context); + notify_poller(&p->task); lock(&p->sched_mutex); } continue; // Long time may have passed. Back to beginning. } - // Poll or wait for a runnable context + // Poll or wait for a runnable task if (p->poller == NULL) { bool return_immediately; p->poller = ts; - // Get new epoll events (if any) and mark the relevant contexts as runnable + // Get new epoll events (if any) and mark the relevant tasks as runnable return_immediately = poller_do_epoll(p, ts, can_block); p->poller = NULL; if (return_immediately) { @@ -2467,7 +2469,7 @@ static pn_event_batch_t *next_event_batch(pn_proactor_t* p, bool can_block) { static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block) { // As poller with lots to do, be mindful of hogging the sched lock. Release when making kernel calls. int n_events; - pcontext_t *ctx; + task_t *tsk; while (true) { assert(p->n_runnables == 0); @@ -2478,16 +2480,16 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block p->last_earmark = NULL; bool unfinished_earmarks = p->earmark_count > 0; - bool new_wakes = false; + bool new_ready_tasks = false; bool epoll_immediate = unfinished_earmarks || !can_block; - assert(!p->sched_wake_first); + assert(!p->sched_ready_first); if (!epoll_immediate) { lock(&p->eventfd_mutex); - if (p->wake_list_first) { + if (p->ready_list_first) { epoll_immediate = true; - new_wakes = true; + new_ready_tasks = true; } else { - p->wakes_in_progress = false; + p->ready_list_active = false; } unlock(&p->eventfd_mutex); } @@ -2505,9 +2507,9 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block p->earmark_drain = true; unpolled_work = true; } - if (new_wakes) { + if (new_ready_tasks) { lock(&p->eventfd_mutex); - schedule_wake_list(p); + schedule_ready_list(p); unlock(&p->eventfd_mutex); unpolled_work = true; } @@ -2537,66 +2539,66 @@ static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block for (int i = 0; i < n_events; i++) { - ctx = post_event(p, &p->kevents[i]); - if (ctx) - make_runnable(ctx); + tsk = post_event(p, &p->kevents[i]); + if (tsk) + make_runnable(tsk); } if (n_events > 0) memset(p->kevents, 0, sizeof(struct epoll_event) * n_events); - // The list of pending wakes can be very long. Traverse part of it looking for warm pairings. - pcontext_t *wctx = p->sched_wake_current; + // The list of ready tasks can be very long. Traverse part of it looking for warm pairings. + task_t *ctsk = p->sched_ready_current; int max_runnables = p->runnables_capacity; - while (wctx && p->n_runnables < max_runnables) { - if (wctx->runner == REWAKE_PLACEHOLDER) - wctx->runner = NULL; // Allow context to run again. - ctx = post_wake(p, wctx); - if (ctx) - make_runnable(ctx); - pop_wake(wctx); - wctx = wctx->wake_next; - } - p->sched_wake_current = wctx; - // More wakes than places on the runnables list - while (wctx) { - if (wctx->runner == REWAKE_PLACEHOLDER) - wctx->runner = NULL; // Allow context to run again. - wctx->sched_wake = true; - wctx->sched_pending = true; - if (wctx->runnable || wctx->runner) - pop_wake(wctx); - wctx = wctx->wake_next; - } - - if (pni_immediate && !ts->context) { + while (ctsk && p->n_runnables < max_runnables) { + if (ctsk->runner == RESCHEDULE_PLACEHOLDER) + ctsk->runner = NULL; // Allow task to run again. + tsk = post_ready(p, ctsk); + if (tsk) + make_runnable(tsk); + pop_ready_task(ctsk); + ctsk = ctsk->ready_next; + } + p->sched_ready_current = ctsk; + // More ready tasks than places on the runnables list + while (ctsk) { + if (ctsk->runner == RESCHEDULE_PLACEHOLDER) + ctsk->runner = NULL; // Allow task to run again. + ctsk->sched_ready = true; + ctsk->sched_pending = true; + if (ctsk->runnable || ctsk->runner) + pop_ready_task(ctsk); + ctsk = ctsk->ready_next; + } + + if (pni_immediate && !ts->task) { // Poller gets to run if possible - pcontext_t *pctx; + task_t *ptsk; if (p->n_runnables) { assert(p->next_runnable == 0); - pctx = p->runnables[0]; + ptsk = p->runnables[0]; if (++p->next_runnable == p->n_runnables) p->n_runnables = 0; } else if (p->n_warm_runnables) { - pctx = p->warm_runnables[--p->n_warm_runnables]; - tslot_t *ts2 = pctx->runner; - ts2->prev_context = ts2->context = NULL; - pctx->runner = NULL; + ptsk = p->warm_runnables[--p->n_warm_runnables]; + tslot_t *ts2 = ptsk->runner; + ts2->prev_task = ts2->task = NULL; + ptsk->runner = NULL; } else if (p->last_earmark) { - pctx = p->last_earmark->context; + ptsk = p->last_earmark->task; remove_earmark(p->last_earmark); if (p->earmark_count == 0) p->earmark_drain = false; } else { - pctx = NULL; + ptsk = NULL; } - if (pctx) { - assign_thread(ts, pctx); + if (ptsk) { + assign_thread(ts, ptsk); } } return false; } -// Call with sched lock, but only from poller context. +// Call with sched lock, but only as poller. static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { // Create a list of available threads to put to work. // ts is the poller thread @@ -2609,8 +2611,8 @@ static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { if (max_resumes) { resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *)); for (int i = 0; i < p->n_warm_runnables ; i++) { - pcontext_t *ctx = p->warm_runnables[i]; - tslot_t *tsp = ctx->runner; + task_t *tsk = p->warm_runnables[i]; + tslot_t *tsp = tsk->runner; if (tsp->state == SUSPENDED) { resume_list2[resume_list_count++] = tsp; LL_REMOVE(p, suspend_list, tsp); @@ -2620,11 +2622,11 @@ static void poller_done(struct pn_proactor_t* p, tslot_t *ts) { } int can_use = p->suspend_list_count; - if (!ts->context) + if (!ts->task) can_use++; - // Run as many unpaired runnable contexts as possible and allow for a new poller. + // Run as many unpaired runnable tasks as possible and allow for a new poller. int new_runners = pn_min(p->n_runnables + 1, can_use); - if (!ts->context) + if (!ts->task) new_runners--; // poller available and does not need resume // Rare corner case on startup. New inbound threads can make the suspend_list too big for resume list. @@ -2678,7 +2680,7 @@ static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) { void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (pc) { - tslot_t *ts = pc->context.runner; + tslot_t *ts = pc->task.runner; pconnection_done(pc); // pc possibly freed/invalid check_earmark_override(p, ts); @@ -2686,7 +2688,7 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { } pn_listener_t *l = batch_listener(batch); if (l) { - tslot_t *ts = l->context.runner; + tslot_t *ts = l->task.runner; listener_done(l); // l possibly freed/invalid check_earmark_override(p, ts); @@ -2694,7 +2696,7 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { } praw_connection_t *rc = pni_batch_raw_connection(batch); if (rc) { - tslot_t *ts = pni_raw_connection_context(rc)->runner; + tslot_t *ts = pni_raw_connection_task(rc)->runner; pni_raw_connection_done(rc); // rc possibly freed/invalid check_earmark_override(p, ts); @@ -2703,27 +2705,27 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pn_proactor_t *bp = batch_proactor(batch); if (bp == p) { bool notify = false; - lock(&p->context.mutex); - p->context.working = false; + lock(&p->task.mutex); + p->task.working = false; if (p->timeout_processed) { p->timeout_processed = false; - if (wake_if_inactive(p)) + if (schedule_if_inactive(p)) notify = true; } proactor_update_batch(p); if (proactor_has_event(p)) - if (wake(&p->context)) + if (schedule(&p->task)) notify = true; - unlock(&p->context.mutex); + unlock(&p->task.mutex); lock(&p->sched_mutex); - tslot_t *ts = p->context.runner; + tslot_t *ts = p->task.runner; if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); if (notify) - wake_notify(&p->context); + notify_poller(&p->task); check_earmark_override(p, ts); return; } @@ -2739,84 +2741,84 @@ void pn_proactor_interrupt(pn_proactor_t *p) { void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { bool notify = false; - lock(&p->context.mutex); + lock(&p->task.mutex); p->timeout_set = true; if (t == 0) { pni_timer_set(p->timer, 0); p->need_timeout = true; - notify = wake(&p->context); + notify = schedule(&p->task); } else { pni_timer_set(p->timer, t + pn_proactor_now_64()); } - unlock(&p->context.mutex); - if (notify) wake_notify(&p->context); + unlock(&p->task.mutex); + if (notify) notify_poller(&p->task); } void pn_proactor_cancel_timeout(pn_proactor_t *p) { - lock(&p->context.mutex); + lock(&p->task.mutex); p->timeout_set = false; p->need_timeout = false; pni_timer_set(p->timer, 0); - bool notify = wake_if_inactive(p); - unlock(&p->context.mutex); - if (notify) wake_notify(&p->context); + bool notify = schedule_if_inactive(p); + unlock(&p->task.mutex); + if (notify) notify_poller(&p->task); } void pni_proactor_timeout(pn_proactor_t *p) { bool notify = false; - lock(&p->context.mutex); - if (!p->context.closing) { + lock(&p->task.mutex); + if (!p->task.closing) { p->need_timeout = true; - notify = wake(&p->context); + notify = schedule(&p->task); } - unlock(&p->context.mutex); - if (notify) wake_notify(&p->context); + unlock(&p->task.mutex); + if (notify) notify_poller(&p->task); } pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { pconnection_t *pc = get_pconnection(c); - return pc ? pc->context.proactor : NULL; + return pc ? pc->task.proactor : NULL; } void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { bool notify = false; - lock(&p->context.mutex); - // Move the whole contexts list into a disconnecting state - pcontext_t *disconnecting_pcontexts = p->contexts; - p->contexts = NULL; - // First pass: mark each pcontext as disconnecting and update global pending count. - pcontext_t *ctx = disconnecting_pcontexts; - while (ctx) { - ctx->disconnecting = true; - ctx->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order. + lock(&p->task.mutex); + // Move the whole tasks list into a disconnecting state + task_t *disconnecting_tasks = p->tasks; + p->tasks = NULL; + // First pass: mark each task as disconnecting and update global pending count. + task_t *tsk = disconnecting_tasks; + while (tsk) { + tsk->disconnecting = true; + tsk->disconnect_ops = 2; // Second pass below and proactor_remove(), in any order. p->disconnects_pending++; - ctx = ctx->next; - p->context_count--; + tsk = tsk->next; + p->task_count--; } - notify = wake_if_inactive(p); - unlock(&p->context.mutex); - if (!disconnecting_pcontexts) { - if (notify) wake_notify(&p->context); + notify = schedule_if_inactive(p); + unlock(&p->task.mutex); + if (!disconnecting_tasks) { + if (notify) notify_poller(&p->task); return; } - // Second pass: different locking, close the pcontexts, free them if !disconnect_ops - pcontext_t *next = disconnecting_pcontexts; + // Second pass: different locking, close the tasks, free them if !disconnect_ops + task_t *next = disconnecting_tasks; while (next) { - ctx = next; - next = ctx->next; /* Save next pointer in case we free ctx */ + tsk = next; + next = tsk->next; /* Save next pointer in case we free tsk */ bool do_free = false; - bool ctx_notify = false; - pmutex *ctx_mutex = NULL; + bool tsk_notify = false; + pmutex *tsk_mutex = NULL; // TODO: Need to extend this for raw connections too - pconnection_t *pc = pcontext_pconnection(ctx); + pconnection_t *pc = task_pconnection(tsk); if (pc) { - ctx_mutex = &pc->context.mutex; - lock(ctx_mutex); - if (!ctx->closing) { - ctx_notify = true; - if (ctx->working) { + tsk_mutex = &pc->task.mutex; + lock(tsk_mutex); + if (!tsk->closing) { + tsk_notify = true; + if (tsk->working) { // Must defer pc->queued_disconnect = true; if (cond) { @@ -2826,7 +2828,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } else { - // No conflicting working context. + // No conflicting working task. if (cond) { pn_condition_copy(pn_transport_condition(pc->driver.transport), cond); } @@ -2834,12 +2836,12 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } } else { - pn_listener_t *l = pcontext_listener(ctx); + pn_listener_t *l = task_listener(tsk); assert(l); - ctx_mutex = &l->context.mutex; - lock(ctx_mutex); - if (!ctx->closing) { - ctx_notify = true; + tsk_mutex = &l->task.mutex; + lock(tsk_mutex); + if (!tsk->closing) { + tsk_notify = true; if (cond) { pn_condition_copy(pn_listener_condition(l), cond); } @@ -2847,29 +2849,29 @@ void pn_proactor_disconnect(pn_proactor_t *p, pn_condition_t *cond) { } } - lock(&p->context.mutex); - if (--ctx->disconnect_ops == 0) { + lock(&p->task.mutex); + if (--tsk->disconnect_ops == 0) { do_free = true; - ctx_notify = false; - notify = wake_if_inactive(p); + tsk_notify = false; + notify = schedule_if_inactive(p); } else { - // If initiating the close, wake the pcontext to do the free. - if (ctx_notify) - ctx_notify = wake(ctx); - if (ctx_notify) - wake_notify(ctx); + // If initiating the close, schedule the task to do the free. + if (tsk_notify) + tsk_notify = schedule(tsk); + if (tsk_notify) + notify_poller(tsk); } - unlock(&p->context.mutex); - unlock(ctx_mutex); + unlock(&p->task.mutex); + unlock(tsk_mutex); - // Unsafe to touch ctx after lock release, except if we are the designated final_free + // Unsafe to touch tsk after lock release, except if we are the designated final_free if (do_free) { if (pc) pconnection_final_free(pc); - else listener_final_free(pcontext_listener(ctx)); + else listener_final_free(task_listener(tsk)); } } if (notify) - wake_notify(&p->context); + notify_poller(&p->task); } const pn_netaddr_t *pn_transport_local_addr(pn_transport_t *t) { diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index c10afe8..0195c57 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -40,7 +40,7 @@ /* epoll specific raw connection struct */ struct praw_connection_t { - pcontext_t context; + task_t task; struct pn_raw_connection_t raw_connection; psocket_t psocket; struct pn_netaddr_t local, remote; /* Actual addresses */ @@ -50,7 +50,7 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool waking; // TODO: This is actually protected by context.mutex so should be moved into context (pconnection too) + bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too) }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -85,7 +85,7 @@ static void praw_connection_connected_lh(praw_connection_t *prc) { /* multi-address connections may call pconnection_start multiple times with diffferent FDs */ static void praw_connection_start(praw_connection_t *prc, int fd) { - int efd = prc->context.proactor->epollfd; + int efd = prc->task.proactor->epollfd; /* Get the local socket name now, get the peer name in pconnection_connected */ socklen_t len = sizeof(prc->local.ss); @@ -95,7 +95,7 @@ static void praw_connection_start(praw_connection_t *prc, int fd) { if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */ int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */ stop_polling(ee, efd); - pclosefd(prc->context.proactor, fd); + pclosefd(prc->task.proactor, fd); } ee->fd = fd; ee->wanted = EPOLLIN | EPOLLOUT; @@ -139,7 +139,7 @@ static void praw_connection_maybe_connect_lh(praw_connection_t *prc) { static pn_event_t * pni_raw_batch_next(pn_event_batch_t *batch); static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_raw_connection_t *rc) { - pcontext_init(&prc->context, RAW_CONNECTION, p); + task_init(&prc->task, RAW_CONNECTION, p); psocket_init(&prc->psocket, RAW_CONNECTION_IO); prc->connected = false; @@ -152,15 +152,15 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra static void praw_connection_cleanup(praw_connection_t *prc) { int fd = prc->psocket.epoll_io.fd; - stop_polling(&prc->psocket.epoll_io, prc->context.proactor->epollfd); + stop_polling(&prc->psocket.epoll_io, prc->task.proactor->epollfd); if (fd != -1) - pclosefd(prc->context.proactor, fd); + pclosefd(prc->task.proactor, fd); - lock(&prc->context.mutex); - bool can_free = proactor_remove(&prc->context); - unlock(&prc->context.mutex); + lock(&prc->task.mutex); + bool can_free = proactor_remove(&prc->task); + unlock(&prc->task.mutex); if (can_free) { - pcontext_finalize(&prc->context); + task_finalize(&prc->task); free(prc); } // else proactor_disconnect logic owns prc and its final free @@ -181,8 +181,8 @@ void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const ch praw_connection_init(prc, p, rc); // TODO: check case of proactor shutting down - lock(&prc->context.mutex); - proactor_add(&prc->context); + lock(&prc->task.mutex); + proactor_add(&prc->task); bool notify = false; bool notify_proactor = false; @@ -197,20 +197,20 @@ void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const ch if (!gai_error) { prc->ai = prc->addrinfo; praw_connection_maybe_connect_lh(prc); /* Start connection attempts */ - if (prc->disconnected) notify = wake(&prc->context); + if (prc->disconnected) notify = schedule(&prc->task); } else { psocket_gai_error(prc, gai_error, "connect to ", addr); prc->disconnected = true; - notify = wake(&prc->context); - lock(&p->context.mutex); - notify_proactor = wake_if_inactive(p); - unlock(&p->context.mutex); + notify = schedule(&prc->task); + lock(&p->task.mutex); + notify_proactor = schedule_if_inactive(p); + unlock(&p->task.mutex); } /* We need to issue INACTIVE on immediate failure */ - unlock(&prc->context.mutex); - if (notify) wake_notify(&prc->context); - if (notify_proactor) wake_notify(&p->context); + unlock(&prc->task.mutex); + if (notify) notify_poller(&prc->task); + if (notify_proactor) notify_poller(&p->task); } void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { @@ -222,8 +222,8 @@ void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { int err = 0; int fd = -1; bool notify = false; - lock(&l->context.mutex); - if (l->context.closing) + lock(&l->task.mutex); + if (l->task.closing) err = EBADF; else { accepted_t *a = listener_accepted_next(l); @@ -234,9 +234,9 @@ void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { else err = EWOULDBLOCK; } - proactor_add(&prc->context); + proactor_add(&prc->task); - lock(&prc->context.mutex); + lock(&prc->task.mutex); if (fd >= 0) { configure_socket(fd); praw_connection_start(prc, fd); @@ -245,12 +245,12 @@ void pn_listener_raw_accept(pn_listener_t *l, pn_raw_connection_t *rc) { psocket_error(prc, err, "pn_listener_accept"); } - if (!l->context.working && listener_has_event(l)) { - notify = wake(&l->context); + if (!l->task.working && listener_has_event(l)) { + notify = schedule(&l->task); } - unlock(&prc->context.mutex); - unlock(&l->context.mutex); - if (notify) wake_notify(&l->context); + unlock(&prc->task.mutex); + unlock(&l->task.mutex); + if (notify) notify_poller(&l->task); } const pn_netaddr_t *pn_raw_connection_local_addr(pn_raw_connection_t *rc) { @@ -268,20 +268,20 @@ const pn_netaddr_t *pn_raw_connection_remote_addr(pn_raw_connection_t *rc) { void pn_raw_connection_wake(pn_raw_connection_t *rc) { bool notify = false; praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); - lock(&prc->context.mutex); - if (!prc->context.closing) { + lock(&prc->task.mutex); + if (!prc->task.closing) { prc->waking = true; - notify = wake(&prc->context); + notify = schedule(&prc->task); } - unlock(&prc->context.mutex); - if (notify) wake_notify(&prc->context); + unlock(&prc->task.mutex); + if (notify) notify_poller(&prc->task); } void pn_raw_connection_close(pn_raw_connection_t *rc) { praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); - lock(&prc->context.mutex); - prc->context.closing = true; - unlock(&prc->context.mutex); + lock(&prc->task.mutex); + prc->task.closing = true; + unlock(&prc->task.mutex); pni_raw_close(rc); } @@ -291,17 +291,17 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { // Check wake status every event processed bool waking = false; - lock(&rc->context.mutex); + lock(&rc->task.mutex); waking = rc->waking; rc->waking = false; - unlock(&rc->context.mutex); + unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); return pni_raw_event_next(raw); } -pcontext_t *pni_psocket_raw_context(psocket_t* ps) { - return &containerof(ps, praw_connection_t, psocket)->context; +task_t *pni_psocket_raw_task(psocket_t* ps) { + return &containerof(ps, praw_connection_t, psocket)->task; } praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) { @@ -309,8 +309,8 @@ praw_connection_t *pni_batch_raw_connection(pn_event_batch_t *batch) { containerof(batch, praw_connection_t, batch) : NULL; } -pcontext_t *pni_raw_connection_context(praw_connection_t *rc) { - return &rc->context; +task_t *pni_raw_connection_task(praw_connection_t *rc) { + return &rc->task; } static long snd(int fd, const void* b, size_t s) { @@ -325,8 +325,8 @@ static void set_error(pn_raw_connection_t *conn, const char *msg, int err) { psocket_error(containerof(conn, praw_connection_t, raw_connection), err, msg); } -pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) { - praw_connection_t *rc = containerof(c, praw_connection_t, context); +pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready) { + praw_connection_t *rc = containerof(t, praw_connection_t, task); int events = rc->psocket.sched_io_events; int fd = rc->psocket.epoll_io.fd; if (!rc->connected) { @@ -344,14 +344,14 @@ pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) { } bool wake = false; - lock(&c->mutex); - c->working = true; - if (sched_wake) { - wake_done(c); + lock(&t->mutex); + t->working = true; + if (sched_ready) { + schedule_done(t); wake = rc->waking; rc->waking = false; } - unlock(&c->mutex); + unlock(&t->mutex); if (wake) pni_raw_wake(&rc->raw_connection); if (events & EPOLLIN) pni_raw_read(&rc->raw_connection, fd, rcv, set_error); @@ -361,17 +361,17 @@ pn_event_batch_t *pni_raw_connection_process(pcontext_t *c, bool sched_wake) { void pni_raw_connection_done(praw_connection_t *rc) { bool self_notify = false; - bool wake_pending = false; - lock(&rc->context.mutex); - pn_proactor_t *p = rc->context.proactor; - tslot_t *ts = rc->context.runner; - rc->context.working = false; - self_notify = rc->waking && wake(&rc->context); - // There could be a scheduler wake pending even if we've got no raw connection + bool ready = false; + lock(&rc->task.mutex); + pn_proactor_t *p = rc->task.proactor; + tslot_t *ts = rc->task.runner; + rc->task.working = false; + self_notify = rc->waking && schedule(&rc->task); + // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() - wake_pending = rc->context.wake_pending; - unlock(&rc->context.mutex); - if (self_notify) wake_notify(&rc->context); + ready = rc->task.ready; + unlock(&rc->task.mutex); + if (self_notify) notify_poller(&rc->task); pn_raw_connection_t *raw = &rc->raw_connection; int wanted = @@ -381,7 +381,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { rc->psocket.epoll_io.wanted = wanted; rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error } else { - bool finished_disconnect = raw->rclosed && raw->wclosed && !wake_pending && !raw->disconnectpending; + bool finished_disconnect = raw->rclosed && raw->wclosed && !ready && !raw->disconnectpending; if (finished_disconnect) { // If we're closed and we've sent the disconnect then close pni_raw_finalize(raw); @@ -392,5 +392,5 @@ void pni_raw_connection_done(praw_connection_t *rc) { lock(&p->sched_mutex); bool notify = unassign_thread(ts, UNUSED); unlock(&p->sched_mutex); - if (notify) wake_notify(&p->context); + if (notify) notify_poller(&p->task); } diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c index 6d288e1..a882128 100644 --- a/c/src/proactor/epoll_timer.c +++ b/c/src/proactor/epoll_timer.c @@ -50,7 +50,7 @@ * second AMQP open frame results in a shorter periodic transport timer than the first open frame. In this case, the * existing timer_deadline is immediately orphaned and a new one created for the rest of the connection's life. * - * Lock ordering: tm->context_mutex --> tm->deletion_mutex. + * Lock ordering: tm->task_mutex --> tm->deletion_mutex. */ static void timerfd_set(int fd, uint64_t t_millis) { @@ -114,7 +114,7 @@ struct pni_timer_t { pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) { timer_deadline_t *td = NULL; pni_timer_t *timer = NULL; - assert(c || !tm->context.proactor->timer); // Proactor timer. Can only be one. + assert(c || !tm->task.proactor->timer); // Proactor timer. Can only be one. timer = (pni_timer_t *) malloc(sizeof(pni_timer_t)); if (!timer) return NULL; if (c) { @@ -126,14 +126,14 @@ pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) { } } - lock(&tm->context.mutex); + lock(&tm->task.mutex); timer->connection = c; timer->manager = tm; timer->timer_deadline = td; timer->deadline = 0; if (c) td->timer = timer; - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); return timer; } @@ -143,7 +143,7 @@ void pni_timer_free(pni_timer_t *timer) { bool can_free_td = false; if (td) pni_timer_set(timer, 0); pni_timer_manager_t *tm = timer->manager; - lock(&tm->context.mutex); + lock(&tm->task.mutex); lock(&tm->deletion_mutex); if (td) { if (td->list_deadline) @@ -152,7 +152,7 @@ void pni_timer_free(pni_timer_t *timer) { can_free_td = true; } unlock(&tm->deletion_mutex); - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); if (can_free_td) { pn_free(td); } @@ -168,7 +168,7 @@ bool pni_timer_manager_init(pni_timer_manager_t *tm) { tm->timers_heap = NULL; tm->proactor_timer = NULL; pn_proactor_t *p = containerof(tm, pn_proactor_t, timer_manager); - pcontext_init(&tm->context, TIMER_MANAGER, p); + task_init(&tm->task, TIMER_MANAGER, p); pmutex_init(&tm->deletion_mutex); // PN_VOID turns off ref counting for the elements in the list. @@ -190,8 +190,8 @@ bool pni_timer_manager_init(pni_timer_manager_t *tm) { // Only call from proactor's destructor, when it is single threaded and scheduling has stopped. void pni_timer_manager_finalize(pni_timer_manager_t *tm) { - lock(&tm->context.mutex); - unlock(&tm->context.mutex); // Memory barrier + lock(&tm->task.mutex); + unlock(&tm->task.mutex); // Memory barrier if (tm->epoll_timer.fd >= 0) close(tm->epoll_timer.fd); pni_timer_free(tm->proactor_timer); if (tm->timers_heap) { @@ -205,14 +205,14 @@ void pni_timer_manager_finalize(pni_timer_manager_t *tm) { pn_free(tm->timers_heap); } pmutex_finalize(&tm->deletion_mutex); - pcontext_finalize(&tm->context); + task_finalize(&tm->task); } -// Call with timer_manager lock held. Return true if wake_notify required. +// Call with timer_manager lock held. Return true if notify_poller required. static bool adjust_deadline(pni_timer_manager_t *tm) { - // Make sure the timer_manager context will get a timeout in time for the earliest connection timeout. - if (tm->context.working) - return false; // timer_manager context will adjust the timer when it stops working + // Make sure the timer_manager task will get a timeout in time for the earliest connection timeout. + if (tm->task.working) + return false; // timer_manager task will adjust the timer when it stops working bool notify = false; uint64_t new_deadline = tm->proactor_timer->deadline; if (pn_list_size(tm->timers_heap)) { @@ -227,7 +227,7 @@ static bool adjust_deadline(pni_timer_manager_t *tm) { uint64_t now = pn_proactor_now_64(); if (new_deadline <= now) { // no need for a timer update. Wake the timer_manager. - notify = wake(&tm->context); + notify = schedule(&tm->task); } else { timerfd_set(tm->epoll_timer.fd, new_deadline - now); @@ -238,16 +238,16 @@ static bool adjust_deadline(pni_timer_manager_t *tm) { return notify; } -// Call without context lock or timer_manager lock. +// Call without task lock or timer_manager lock. // Calls for connection timers are generated in the proactor and serialized per connection. // Calls for the proactor timer can come from arbitrary user threads. void pni_timer_set(pni_timer_t *timer, uint64_t deadline) { pni_timer_manager_t *tm = timer->manager; bool notify = false; - lock(&tm->context.mutex); + lock(&tm->task.mutex); if (deadline == timer->deadline) { - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); return; // No change. } @@ -262,7 +262,7 @@ void pni_timer_set(pni_timer_t *timer, uint64_t deadline) { if (td->resequenced) EPOLL_FATAL("idle timeout sequencing error", 0); // else { - // replace drops the lock for malloc. Safe because there can be no competing call to + // replace drops the lock for malloc. Safe because there can be no competing call to // the timer set function by the same pconnection from another thread. td = replace_timer_deadline(tm, timer); } @@ -279,30 +279,30 @@ void pni_timer_set(pni_timer_t *timer, uint64_t deadline) { // Skip a cancelled timer (deadline == 0) since it doesn't change the timerfd deadline. if (deadline) notify = adjust_deadline(tm); - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); if (notify) - wake_notify(&tm->context); + notify_poller(&tm->task); } -pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake) { +pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool sched_ready) { uint64_t now = pn_proactor_now_64(); - lock(&tm->context.mutex); - tm->context.working = true; + lock(&tm->task.mutex); + tm->task.working = true; if (timeout) tm->timerfd_deadline = 0; - if (wake) - wake_done(&tm->context); + if (sched_ready) + schedule_done(&tm->task); // First check for proactor timer expiry. uint64_t deadline = tm->proactor_timer->deadline; if (deadline && deadline <= now) { tm->proactor_timer->deadline = 0; - unlock(&tm->context.mutex); - pni_proactor_timeout(tm->context.proactor); - lock(&tm->context.mutex); - // If lower latency desired for the proactor timer, we could convert to the proactor context (if not working) and return - // here with the event batch, and wake the timer manager context to process the connection timers. + unlock(&tm->task.mutex); + pni_proactor_timeout(tm->task.proactor); + lock(&tm->task.mutex); + // If lower latency desired for the proactor timer, we could convert to the proactor task (if not working) and return + // here with the event batch, and schedule the timer manager task to process the connection timers. } // Next, find all expired connection timers at front of the ordered heap. @@ -321,20 +321,20 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou // timer deadline extended -> minpush back on list to new spot // timer freed -> free the associated timer_deadline popped off the list if (!td->timer) { - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); pn_free(td); - lock(&tm->context.mutex); + lock(&tm->task.mutex); } else { uint64_t deadline = td->timer->deadline; if (deadline) { if (deadline <= now) { td->timer->deadline = 0; pconnection_t *pc = td->timer->connection; - lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->context.mutex dropped. - unlock(&tm->context.mutex); + lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->task.mutex dropped. + unlock(&tm->task.mutex); pni_pconnection_timeout(pc); unlock(&tm->deletion_mutex); - lock(&tm->context.mutex); + lock(&tm->task.mutex); } else { td->list_deadline = deadline; pn_list_minpush(tm->timers_heap, td); @@ -346,20 +346,20 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou if (timeout) { // TODO: query whether perf gain by doing these system calls outside the lock, perhaps with additional set_reset_mutex. timerfd_drain(tm->epoll_timer.fd); - rearm_polling(&tm->epoll_timer, tm->context.proactor->epollfd); + rearm_polling(&tm->epoll_timer, tm->task.proactor->epollfd); } - tm->context.working = false; // must be false for adjust_deadline to do adjustment + tm->task.working = false; // must be false for adjust_deadline to do adjustment bool notify = adjust_deadline(tm); - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); if (notify) - wake_notify(&tm->context); + notify_poller(&tm->task); // The timer_manager never has events to batch. return NULL; - // TODO: perhaps become context of one of the timed out timers (if otherwise idle) and process() that context. + // TODO: perhaps become task of one of the timed out timers (if otherwise idle) and process() that task. } -// Call with timer_manager lock held. +// Call with timer_manager lock held. // There can be no competing call to this and timer_set() from the same connection. static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, pni_timer_t *timer) { assert(timer->connection); @@ -368,12 +368,12 @@ static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, pni_tim // Mark old struct for deletion. No parent timer. old_td->timer = NULL; - unlock(&tm->context.mutex); + unlock(&tm->task.mutex); // Create replacement timer for life of connection. timer_deadline_t *new_td = pni_timer_deadline(); if (!new_td) EPOLL_FATAL("replacement timer deadline allocation", errno); - lock(&tm->context.mutex); + lock(&tm->task.mutex); new_td->list_deadline = 0; new_td->timer = timer; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org