This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4fc4c7869f470c688777c4f040b0fc31219f4f42
Author: Cliff Jansen <cliffjan...@apache.org>
AuthorDate: Sun Jan 24 11:18:30 2021 -0800

    PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic 
manner.  This closes #290
---
 c/src/proactor/epoll-internal.h       | 26 ++++++++++++++++++++++++--
 c/src/proactor/epoll.c                | 11 +++++------
 c/src/proactor/epoll_raw_connection.c | 19 ++++++++++---------
 3 files changed, 39 insertions(+), 17 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 0d63f90..e8c9f0a 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -86,8 +86,9 @@ typedef struct task_t {
   pn_proactor_t *proactor;  /* Immutable */
   task_type_t type;
   bool working;
-  bool on_ready_list;
   bool ready;                // ready to run and on ready list.  Poller 
notified by eventfd.
+  bool waking;
+  bool on_ready_list;        // todo: protected by eventfd_mutex or sched 
mutex?  needed?
   struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -223,7 +224,6 @@ typedef struct pconnection_t {
   pni_timer_t *timer;
   const char *host, *port;
   uint32_t new_events;
-  int wake_count;   // TODO: protected by task.mutex so should be moved in 
there (also really bool)
   bool server;                /* accept, not connect */
   bool tick_pending;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
@@ -382,6 +382,28 @@ pn_event_batch_t 
*pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou
 void pni_pconnection_timeout(pconnection_t *pc);
 void pni_proactor_timeout(pn_proactor_t *p);
 
+// Generic wake primitives for a task.
+
+// Call with task lock held.  Must call notify_poller() if returns true.
+static inline bool pni_task_wake(task_t *tsk) {
+  if (!tsk->waking) {
+    tsk->waking = true;
+    return schedule(tsk);
+  }
+  return false;
+}
+
+// Call with task lock held.
+static inline bool pni_task_wake_pending(task_t *tsk) {
+  return tsk->waking;
+}
+
+// Call with task lock held and only from the running task.
+static inline void pni_task_wake_done(task_t *tsk) {
+  tsk->waking = false;
+}
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index c4f028d..7467683 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -49,6 +49,7 @@
 
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
+ TODO: document task.working duration can be long: from xxx_process() to 
xxx_done() or null batch.
  */
 
 
@@ -742,7 +743,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   psocket_init(&pc->psocket, PCONNECTION_IO);
   pni_parse_addr(addr, pc->addr_buf, addrlen+1, &pc->host, &pc->port);
   pc->new_events = 0;
-  pc->wake_count = 0;
   pc->tick_pending = false;
   pc->queued_disconnect = false;
   pc->disconnect_condition = NULL;
@@ -980,7 +980,7 @@ static bool pconnection_sched_sync(pconnection_t *pc) {
 
 /* Call with task lock and having done a write_flush() to "know" the value of 
wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->wake_count || pc->tick_pending || 
pc->queued_disconnect)
+  if (pc->new_events || pni_task_wake_pending(&pc->task) || pc->tick_pending 
|| pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
@@ -1153,9 +1153,9 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     return &pc->batch;
   }
   bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
-  if (pc->wake_count) {
+  if (pni_task_wake_pending(&pc->task)) {
     waking = !closed;
-    pc->wake_count = 0;
+    pni_task_wake_done(&pc->task);
   }
   if (pc->tick_pending) {
     pc->tick_pending = false;
@@ -1441,8 +1441,7 @@ void pn_connection_wake(pn_connection_t* c) {
   if (pc) {
     lock(&pc->task.mutex);
     if (!pc->task.closing) {
-      pc->wake_count++;
-      notify = schedule(&pc->task);
+      notify = pni_task_wake(&pc->task);
     }
     unlock(&pc->task.mutex);
   }
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 3fd2b36..8722ff0 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,6 @@ struct praw_connection_t {
   struct addrinfo *ai;               /* Current connect address */
   bool connected;
   bool disconnected;
-  bool waking; // TODO: This is actually protected by task.mutex so should be 
moved into task (pconnection too)
 };
 
 static void psocket_error(praw_connection_t *rc, int err, const char* msg) {
@@ -144,7 +143,6 @@ static void praw_connection_init(praw_connection_t *prc, 
pn_proactor_t *p, pn_ra
 
   prc->connected = false;
   prc->disconnected = false;
-  prc->waking = false;
   prc->batch.next_event = pni_raw_batch_next;
 
   pmutex_init(&prc->rearm_mutex);
@@ -268,8 +266,7 @@ void pn_raw_connection_wake(pn_raw_connection_t *rc) {
   praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection);
   lock(&prc->task.mutex);
   if (!prc->task.closing) {
-    prc->waking = true;
-    notify = schedule(&prc->task);
+    notify = pni_task_wake(&prc->task);
   }
   unlock(&prc->task.mutex);
   if (notify) notify_poller(prc->task.proactor);
@@ -290,8 +287,10 @@ static pn_event_t *pni_raw_batch_next(pn_event_batch_t 
*batch) {
   // Check wake status every event processed
   bool waking = false;
   lock(&rc->task.mutex);
-  waking = rc->waking;
-  rc->waking = false;
+  if (pni_task_wake_pending(&rc->task)) {
+    waking = true;
+    pni_task_wake_done(&rc->task);
+  }
   unlock(&rc->task.mutex);
   if (waking) pni_raw_wake(raw);
 
@@ -346,8 +345,10 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, 
bool sched_ready) {
   t->working = true;
   if (sched_ready) {
     schedule_done(t);
-    wake = rc->waking;
-    rc->waking = false;
+    if (pni_task_wake_pending(&rc->task)) {
+      wake = true;
+      pni_task_wake_done(&rc->task);
+    }
   }
   unlock(&t->mutex);
 
@@ -364,7 +365,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {
   pn_proactor_t *p = rc->task.proactor;
   tslot_t *ts = rc->task.runner;
   rc->task.working = false;
-  notify = rc->waking && schedule(&rc->task);
+  notify = pni_task_wake_pending(&rc->task) && schedule(&rc->task);
   // The task may be in the ready state even if we've got no raw connection
   // wakes outstanding because we dealt with it already in pni_raw_batch_next()
   ready = rc->task.ready;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to