This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 4fc4c7869f470c688777c4f040b0fc31219f4f42 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Sun Jan 24 11:18:30 2021 -0800 PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290 --- c/src/proactor/epoll-internal.h | 26 ++++++++++++++++++++++++-- c/src/proactor/epoll.c | 11 +++++------ c/src/proactor/epoll_raw_connection.c | 19 ++++++++++--------- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 0d63f90..e8c9f0a 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -86,8 +86,9 @@ typedef struct task_t { pn_proactor_t *proactor; /* Immutable */ task_type_t type; bool working; - bool on_ready_list; bool ready; // ready to run and on ready list. Poller notified by eventfd. + bool waking; + bool on_ready_list; // todo: protected by eventfd_mutex or sched mutex? needed? struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex bool closing; // Next 4 are protected by the proactor mutex @@ -223,7 +224,6 @@ typedef struct pconnection_t { pni_timer_t *timer; const char *host, *port; uint32_t new_events; - int wake_count; // TODO: protected by task.mutex so should be moved in there (also really bool) bool server; /* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ @@ -382,6 +382,28 @@ pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou void pni_pconnection_timeout(pconnection_t *pc); void pni_proactor_timeout(pn_proactor_t *p); +// Generic wake primitives for a task. + +// Call with task lock held. Must call notify_poller() if returns true. +static inline bool pni_task_wake(task_t *tsk) { + if (!tsk->waking) { + tsk->waking = true; + return schedule(tsk); + } + return false; +} + +// Call with task lock held. +static inline bool pni_task_wake_pending(task_t *tsk) { + return tsk->waking; +} + +// Call with task lock held and only from the running task. +static inline void pni_task_wake_done(task_t *tsk) { + tsk->waking = false; +} + + #ifdef __cplusplus } #endif diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index c4f028d..7467683 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -49,6 +49,7 @@ TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt) transitions from "private to the scheduler" to "visible to the task". + TODO: document task.working duration can be long: from xxx_process() to xxx_done() or null batch. */ @@ -742,7 +743,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con psocket_init(&pc->psocket, PCONNECTION_IO); pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port); pc->new_events = 0; - pc->wake_count = 0; pc->tick_pending = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -980,7 +980,7 @@ static bool pconnection_sched_sync(pconnection_t *pc) { /* Call with task lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) + if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) return true; @@ -1153,9 +1153,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, return &pc->batch; } bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc); - if (pc->wake_count) { + if (pni_task_wake_pending(&pc->task)) { waking = !closed; - pc->wake_count = 0; + pni_task_wake_done(&pc->task); } if (pc->tick_pending) { pc->tick_pending = false; @@ -1441,8 +1441,7 @@ void pn_connection_wake(pn_connection_t* c) { if (pc) { lock(&pc->task.mutex); if (!pc->task.closing) { - pc->wake_count++; - notify = schedule(&pc->task); + notify = pni_task_wake(&pc->task); } unlock(&pc->task.mutex); } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 3fd2b36..8722ff0 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -50,7 +50,6 @@ struct praw_connection_t { struct addrinfo *ai; /* Current connect address */ bool connected; bool disconnected; - bool waking; // TODO: This is actually protected by task.mutex so should be moved into task (pconnection too) }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -144,7 +143,6 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra prc->connected = false; prc->disconnected = false; - prc->waking = false; prc->batch.next_event = pni_raw_batch_next; pmutex_init(&prc->rearm_mutex); @@ -268,8 +266,7 @@ void pn_raw_connection_wake(pn_raw_connection_t *rc) { praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); lock(&prc->task.mutex); if (!prc->task.closing) { - prc->waking = true; - notify = schedule(&prc->task); + notify = pni_task_wake(&prc->task); } unlock(&prc->task.mutex); if (notify) notify_poller(prc->task.proactor); @@ -290,8 +287,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t *batch) { // Check wake status every event processed bool waking = false; lock(&rc->task.mutex); - waking = rc->waking; - rc->waking = false; + if (pni_task_wake_pending(&rc->task)) { + waking = true; + pni_task_wake_done(&rc->task); + } unlock(&rc->task.mutex); if (waking) pni_raw_wake(raw); @@ -346,8 +345,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, bool sched_ready) { t->working = true; if (sched_ready) { schedule_done(t); - wake = rc->waking; - rc->waking = false; + if (pni_task_wake_pending(&rc->task)) { + wake = true; + pni_task_wake_done(&rc->task); + } } unlock(&t->mutex); @@ -364,7 +365,7 @@ void pni_raw_connection_done(praw_connection_t *rc) { pn_proactor_t *p = rc->task.proactor; tslot_t *ts = rc->task.runner; rc->task.working = false; - notify = rc->waking && schedule(&rc->task); + notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task); // The task may be in the ready state even if we've got no raw connection // wakes outstanding because we dealt with it already in pni_raw_batch_next() ready = rc->task.ready; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org