[qpid-proton] 01/03: PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context"
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 38c2dab1083d1cada116360d7219192614467277 Author: Cliff Jansen AuthorDate: Sun Jan 24 10:54:21 2021 -0800 PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context" --- c/src/proactor/epoll-internal.h | 99 ++- c/src/proactor/epoll.c| 1184 + c/src/proactor/epoll_raw_connection.c | 126 ++-- c/src/proactor/epoll_timer.c | 88 +-- 4 files changed, 749 insertions(+), 748 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 66fb15e..21226a9 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,7 @@ typedef pthread_mutex_t pmutex; typedef struct pni_timer_t pni_timer_t; typedef enum { - WAKE, /* see if any work to do in proactor/psocket context */ + EVENT_FD, /* schedule() or pn_proactor_interrupt() */ LISTENER_IO, PCONNECTION_IO, RAW_CONNECTION_IO, @@ -67,7 +67,7 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { int fd; - epoll_type_t type; // io/timer/wakeup + epoll_type_t type; // io/timer/eventfd uint32_t wanted; // events to poll for bool polling; pmutex barrier_mutex; @@ -79,36 +79,36 @@ typedef enum { LISTENER, RAW_CONNECTION, TIMER_MANAGER -} pcontext_type_t; +} task_type_t; -typedef struct pcontext_t { +typedef struct task_t { pmutex mutex; pn_proactor_t *proactor; /* Immutable */ - pcontext_type_t type; + task_type_t type; bool working; - bool on_wake_list; - bool wake_pending; // unprocessed eventfd wake callback - struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex + bool on_ready_list; + bool ready;// ready to run and on ready list. Poller notified by eventfd. + struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex - struct pcontext_t* next; /* Protected by proactor.mutex */ - struct pcontext_t* prev; /* Protected by proactor.mutex */ + struct task_t* next; /* Protected by proactor.mutex */ + struct task_t* prev; /* Protected by proactor.mutex */ int disconnect_ops; /* ops remaining before disconnect complete */ bool disconnecting; /* pn_proactor_disconnect */ // Protected by schedule mutex tslot_t *runner __attribute__((aligned(64))); /* designated or running thread */ tslot_t *prev_runner; - bool sched_wake; + bool sched_ready; bool sched_pending; /* If true, one or more unseen epoll or other events to process() */ - bool runnable ; /* in need of scheduling */ -} pcontext_t; + bool runnable ; /* on one of the runnable lists */ +} task_t; typedef enum { NEW, UNUSED, /* pn_proactor_done() called, may never come back */ SUSPENDED, - PROCESSING, /* Hunting for a context */ - BATCHING, /* Doing work on behalf of a context */ + PROCESSING, /* Hunting for a task */ + BATCHING, /* Doing work on behalf of a task */ DELETING, POLLING } tslot_state; @@ -121,8 +121,8 @@ struct tslot_t { bool suspended; volatile bool scheduled; tslot_state state; - pcontext_t *context; - pcontext_t *prev_context; + task_t *task; + task_t *prev_task; bool earmarked; tslot_t *suspend_list_prev; tslot_t *suspend_list_next; @@ -131,7 +131,7 @@ struct tslot_t { }; typedef struct pni_timer_manager_t { - pcontext_t context; + task_t task; epoll_extended_t epoll_timer; pmutex deletion_mutex; pni_timer_t *proactor_timer; @@ -141,12 +141,12 @@ typedef struct pni_timer_manager_t { } pni_timer_manager_t; struct pn_proactor_t { - pcontext_t context; + task_t task; pni_timer_manager_t timer_manager; - epoll_extended_t epoll_wake; + epoll_extended_t epoll_schedule; /* ready list */ epoll_extended_t epoll_interrupt; pn_event_batch_t batch; - pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + task_t *tasks; /* track in-use tasks for PN_PROACTOR_INACTIVE and disconnect */ pni_timer_t *timer; size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() @@ -155,21 +155,21 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - int context_count; + int task_count; - // wake subsystem + // ready list
[qpid-proton] branch master updated (ea4dbf1 -> 4fc4c78)
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git. from ea4dbf1 PROTON-2327: Fix example build breakage on cmake 2.8.12 new 38c2dab PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context" new f6734ed PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task new 4fc4c78 PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290 The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: c/src/proactor/epoll-internal.h | 121 ++-- c/src/proactor/epoll.c| 1192 - c/src/proactor/epoll_raw_connection.c | 142 ++-- c/src/proactor/epoll_timer.c | 88 +-- 4 files changed, 780 insertions(+), 763 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org
[qpid-proton] 02/03: PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit f6734ed6cdab7ca7ff84d208774735a0edad7731 Author: Cliff Jansen AuthorDate: Wed Jan 20 22:33:34 2021 -0800 PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task --- c/src/proactor/epoll-internal.h | 2 +- c/src/proactor/epoll.c| 49 --- c/src/proactor/epoll_raw_connection.c | 19 ++ c/src/proactor/epoll_timer.c | 4 +-- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 21226a9..0d63f90 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -352,7 +352,7 @@ static void task_finalize(task_t* tsk) { } bool schedule(task_t *tsk); -void notify_poller(task_t *tsk); +void notify_poller(pn_proactor_t *p); void schedule_done(task_t *tsk); void psocket_init(psocket_t* ps, epoll_type_t type); diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index ae0c37b..c4f028d 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -314,8 +314,7 @@ bool schedule(task_t *tsk) { } // part2: unblock epoll_wait(). Make OS call without lock held. -void notify_poller(task_t *tsk) { - pn_proactor_t *p = tsk->proactor; +void notify_poller(pn_proactor_t *p) { if (p->eventfd == -1) return; rearm(p, >epoll_schedule); @@ -706,7 +705,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { } else notify = schedule(>task); unlock(>task.mutex); -if (notify) notify_poller(>task); +if (notify) notify_poller(p); a = acceptor_list_next(); } } @@ -871,7 +870,7 @@ void pni_pconnection_timeout(pconnection_t *pc) { } unlock(>task.mutex); if (notify) -notify_poller(>task); +notify_poller(pc->task.proactor); } static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { @@ -1018,7 +1017,7 @@ static void pconnection_done(pconnection_t *pc) { notify = unassign_thread(ts, UNUSED); unlock(>sched_mutex); if (notify) -notify_poller(>task); +notify_poller(p); return; } } @@ -1033,7 +1032,7 @@ static void pconnection_done(pconnection_t *pc) { if (unassign_thread(ts, UNUSED)) notify = true; unlock(>sched_mutex); - if (notify) notify_poller(>task); + if (notify) notify_poller(p); return; } @@ -1399,7 +1398,6 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * pn_connection_open(pc->driver.connection); /* Auto-open */ bool notify = false; - bool notify_proactor = false; if (pc->disconnected) { notify = schedule(>task);/* Error during initialization */ @@ -1414,14 +1412,13 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * psocket_gai_error(>psocket, gai_error, "connect to "); notify = schedule(>task); lock(>task.mutex); - notify_proactor = schedule_if_inactive(p); + notify |= schedule_if_inactive(p); unlock(>task.mutex); } } /* We need to issue INACTIVE on immediate failure */ unlock(>task.mutex); - if (notify) notify_poller(>task); - if (notify_proactor) notify_poller(>task); + if (notify) notify_poller(pc->task.proactor); } static void pconnection_tick(pconnection_t *pc) { @@ -1449,7 +1446,7 @@ void pn_connection_wake(pn_connection_t* c) { } unlock(>task.mutex); } - if (notify) notify_poller(>task); + if (notify) notify_poller(pc->task.proactor); } void pn_proactor_release_connection(pn_connection_t *c) { @@ -1463,7 +1460,7 @@ void pn_proactor_release_connection(pn_connection_t *c) { notify = schedule(>task); unlock(>task.mutex); } - if (notify) notify_poller(>task); + if (notify) notify_poller(pc->task.proactor); } // @@ -1576,7 +1573,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in } proactor_add(>task); unlock(>task.mutex); - if (notify) notify_poller(>task); + if (notify) notify_poller(p); return; } @@ -1656,7 +1653,7 @@ void pn_listener_close(pn_listener_t* l) { notify = schedule(>task); } unlock(>task.mutex); - if (notify) notify_poller(>task); + if (notify) notify_poller(l->task.proactor); } static void listener_forced_shutdown(pn_listener_t *l) { @@ -1814,7 +1811,7 @@ static void listener_done(pn_listener_t *l) { notify = unassign_thread(ts, UNUSED); unlock(>sched_mutex); if (notify) - notify_poller(>task); + notify_poller(p); return; } else if (n_events || listener_has_event(l)) notify = schedule(>task); @@ -1824,7 +1821,7 @@ static void listener_done(pn_listener_t
[qpid-proton] 03/03: PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290
This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git commit 4fc4c7869f470c688777c4f040b0fc31219f4f42 Author: Cliff Jansen AuthorDate: Sun Jan 24 11:18:30 2021 -0800 PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290 --- c/src/proactor/epoll-internal.h | 26 -- c/src/proactor/epoll.c| 11 +-- c/src/proactor/epoll_raw_connection.c | 19 ++- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 0d63f90..e8c9f0a 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -86,8 +86,9 @@ typedef struct task_t { pn_proactor_t *proactor; /* Immutable */ task_type_t type; bool working; - bool on_ready_list; bool ready;// ready to run and on ready list. Poller notified by eventfd. + bool waking; + bool on_ready_list;// todo: protected by eventfd_mutex or sched mutex? needed? struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex @@ -223,7 +224,6 @@ typedef struct pconnection_t { pni_timer_t *timer; const char *host, *port; uint32_t new_events; - int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool) bool server;/* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ @@ -382,6 +382,28 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); +// Generic wake primitives for a task. + +// Call with task lock held. Must call notify_poller() if returns true. +static inline bool pni_task_wake(task_t *tsk) { + if (!tsk->waking) { +tsk->waking = true; +return schedule(tsk); + } + return false; +} + +// Call with task lock held. +static inline bool pni_task_wake_pending(task_t *tsk) { + return tsk->waking; +} + +// Call with task lock held and only from the running task. +static inline void pni_task_wake_done(task_t *tsk) { + tsk->waking = false; +} + + #ifdef __cplusplus } #endif diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index c4f028d..7467683 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -49,6 +49,7 @@ TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) transitions from "private to the scheduler" to "visible to the task". + TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch. */ @@ -742,7 +743,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con psocket_init(>psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, >host, >port); pc->new_events = 0; - pc->wake_count = 0; pc->tick_pending = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -980,7 +980,7 @@ static bool pconnection_sched_sync(pconnection_t *pc) { /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) + if (pc->new_events || pni_task_wake_pending(>task) || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -1153,9 +1153,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, return >batch; } bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc); - if (pc->wake_count) { + if (pni_task_wake_pending(>task)) { waking = !closed; -pc->wake_count = 0; +pni_task_wake_done(>task); } if (pc->tick_pending) { pc->tick_pending = false; @@ -1441,8 +1441,7 @@ void pn_connection_wake(pn_connection_t* c) { if (pc) { lock(>task.mutex); if (!pc->task.closing) { - pc->wake_count++; - notify = schedule(>task); + notify = pni_task_wake(>task); } unlock(>task.mutex); } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 3fd2b36..8722ff0 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,6 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too) }; static void psocket_error(praw_connection_t *rc, int err,