[qpid-proton] 01/03: PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" instead of "context"

2021-01-24 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 38c2dab1083d1cada116360d7219192614467277
Author: Cliff Jansen 
AuthorDate: Sun Jan 24 10:54:21 2021 -0800

PROTON-2326: epoll proactor refactor - "schedule" instead of "wake", "task" 
instead of "context"
---
 c/src/proactor/epoll-internal.h   |   99 ++-
 c/src/proactor/epoll.c| 1184 +
 c/src/proactor/epoll_raw_connection.c |  126 ++--
 c/src/proactor/epoll_timer.c  |   88 +--
 4 files changed, 749 insertions(+), 748 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 66fb15e..21226a9 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -57,7 +57,7 @@ typedef pthread_mutex_t pmutex;
 typedef struct pni_timer_t pni_timer_t;
 
 typedef enum {
-  WAKE,   /* see if any work to do in proactor/psocket context */
+  EVENT_FD,   /* schedule() or pn_proactor_interrupt() */
   LISTENER_IO,
   PCONNECTION_IO,
   RAW_CONNECTION_IO,
@@ -67,7 +67,7 @@ typedef enum {
 // Data to use with epoll.
 typedef struct epoll_extended_t {
   int fd;
-  epoll_type_t type;   // io/timer/wakeup
+  epoll_type_t type;   // io/timer/eventfd
   uint32_t wanted; // events to poll for
   bool polling;
   pmutex barrier_mutex;
@@ -79,36 +79,36 @@ typedef enum {
   LISTENER,
   RAW_CONNECTION,
   TIMER_MANAGER
-} pcontext_type_t;
+} task_type_t;
 
-typedef struct pcontext_t {
+typedef struct task_t {
   pmutex mutex;
   pn_proactor_t *proactor;  /* Immutable */
-  pcontext_type_t type;
+  task_type_t type;
   bool working;
-  bool on_wake_list;
-  bool wake_pending; // unprocessed eventfd wake callback
-  struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+  bool on_ready_list;
+  bool ready;// ready to run and on ready list.  Poller 
notified by eventfd.
+  struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
-  struct pcontext_t* next;  /* Protected by proactor.mutex */
-  struct pcontext_t* prev;  /* Protected by proactor.mutex */
+  struct task_t* next;  /* Protected by proactor.mutex */
+  struct task_t* prev;  /* Protected by proactor.mutex */
   int disconnect_ops;   /* ops remaining before disconnect complete */
   bool disconnecting;   /* pn_proactor_disconnect */
   // Protected by schedule mutex
   tslot_t *runner __attribute__((aligned(64)));  /* designated or running 
thread */
   tslot_t *prev_runner;
-  bool sched_wake;
+  bool sched_ready;
   bool sched_pending;   /* If true, one or more unseen epoll or other 
events to process() */
-  bool runnable ;   /* in need of scheduling */
-} pcontext_t;
+  bool runnable ;   /* on one of the runnable lists */
+} task_t;
 
 typedef enum {
   NEW,
   UNUSED,   /* pn_proactor_done() called, may never come 
back */
   SUSPENDED,
-  PROCESSING,   /* Hunting for a context  */
-  BATCHING, /* Doing work on behalf of a context */
+  PROCESSING,   /* Hunting for a task  */
+  BATCHING, /* Doing work on behalf of a task */
   DELETING,
   POLLING
 } tslot_state;
@@ -121,8 +121,8 @@ struct tslot_t {
   bool suspended;
   volatile bool scheduled;
   tslot_state state;
-  pcontext_t *context;
-  pcontext_t *prev_context;
+  task_t *task;
+  task_t *prev_task;
   bool earmarked;
   tslot_t *suspend_list_prev;
   tslot_t *suspend_list_next;
@@ -131,7 +131,7 @@ struct tslot_t {
 };
 
 typedef struct pni_timer_manager_t {
-  pcontext_t context;
+  task_t task;
   epoll_extended_t epoll_timer;
   pmutex deletion_mutex;
   pni_timer_t *proactor_timer;
@@ -141,12 +141,12 @@ typedef struct pni_timer_manager_t {
 } pni_timer_manager_t;
 
 struct pn_proactor_t {
-  pcontext_t context;
+  task_t task;
   pni_timer_manager_t timer_manager;
-  epoll_extended_t epoll_wake;
+  epoll_extended_t epoll_schedule; /* ready list */
   epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
-  pcontext_t *contexts; /* track in-use contexts for 
PN_PROACTOR_INACTIVE and disconnect */
+  task_t *tasks; /* track in-use tasks for PN_PROACTOR_INACTIVE and 
disconnect */
   pni_timer_t *timer;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
   // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next 
update_batch()
@@ -155,21 +155,21 @@ struct pn_proactor_t {
   bool need_timeout;
   bool timeout_set; /* timeout has been set by user and not yet cancelled or 
generated event */
   bool timeout_processed;  /* timeout event dispatched in the most recent 
event batch */
-  int context_count;
+  int task_count;
 
-  // wake subsystem
+  // ready list 

[qpid-proton] branch master updated (ea4dbf1 -> 4fc4c78)

2021-01-24 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git.


from ea4dbf1  PROTON-2327: Fix example build breakage on cmake 2.8.12
 new 38c2dab  PROTON-2326: epoll proactor refactor - "schedule" instead of 
"wake", "task" instead of "context"
 new f6734ed  PROTON-2326: epoll proactor refactor - provide proactor as 
direct argument to notify_poller(), not indirect via task
 new 4fc4c78  PROTON-2326: epoll proactor refactor - make all tasks 
wakeable in generic manner.  This closes #290

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/src/proactor/epoll-internal.h   |  121 ++--
 c/src/proactor/epoll.c| 1192 -
 c/src/proactor/epoll_raw_connection.c |  142 ++--
 c/src/proactor/epoll_timer.c  |   88 +--
 4 files changed, 780 insertions(+), 763 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] 02/03: PROTON-2326: epoll proactor refactor - provide proactor as direct argument to notify_poller(), not indirect via task

2021-01-24 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit f6734ed6cdab7ca7ff84d208774735a0edad7731
Author: Cliff Jansen 
AuthorDate: Wed Jan 20 22:33:34 2021 -0800

PROTON-2326: epoll proactor refactor - provide proactor as direct argument 
to notify_poller(), not indirect via task
---
 c/src/proactor/epoll-internal.h   |  2 +-
 c/src/proactor/epoll.c| 49 ---
 c/src/proactor/epoll_raw_connection.c | 19 ++
 c/src/proactor/epoll_timer.c  |  4 +--
 4 files changed, 34 insertions(+), 40 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 21226a9..0d63f90 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -352,7 +352,7 @@ static void task_finalize(task_t* tsk) {
 }
 
 bool schedule(task_t *tsk);
-void notify_poller(task_t *tsk);
+void notify_poller(pn_proactor_t *p);
 void schedule_done(task_t *tsk);
 
 void psocket_init(psocket_t* ps, epoll_type_t type);
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index ae0c37b..c4f028d 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -314,8 +314,7 @@ bool schedule(task_t *tsk) {
 }
 
 // part2: unblock epoll_wait().  Make OS call without lock held.
-void notify_poller(task_t *tsk) {
-  pn_proactor_t *p = tsk->proactor;
+void notify_poller(pn_proactor_t *p) {
   if (p->eventfd == -1)
 return;
   rearm(p, >epoll_schedule);
@@ -706,7 +705,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
 }
 else notify = schedule(>task);
 unlock(>task.mutex);
-if (notify) notify_poller(>task);
+if (notify) notify_poller(p);
 a = acceptor_list_next();
   }
 }
@@ -871,7 +870,7 @@ void pni_pconnection_timeout(pconnection_t  *pc) {
   }
   unlock(>task.mutex);
   if (notify)
-notify_poller(>task);
+notify_poller(pc->task.proactor);
 }
 
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
@@ -1018,7 +1017,7 @@ static void pconnection_done(pconnection_t *pc) {
   notify = unassign_thread(ts, UNUSED);
   unlock(>sched_mutex);
   if (notify)
-notify_poller(>task);
+notify_poller(p);
   return;
 }
   }
@@ -1033,7 +1032,7 @@ static void pconnection_done(pconnection_t *pc) {
   if (unassign_thread(ts, UNUSED))
 notify = true;
   unlock(>sched_mutex);
-  if (notify) notify_poller(>task);
+  if (notify) notify_poller(p);
   return;
 }
 
@@ -1399,7 +1398,6 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   pn_connection_open(pc->driver.connection); /* Auto-open */
 
   bool notify = false;
-  bool notify_proactor = false;
 
   if (pc->disconnected) {
 notify = schedule(>task);/* Error during initialization */
@@ -1414,14 +1412,13 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   psocket_gai_error(>psocket, gai_error, "connect to ");
   notify = schedule(>task);
   lock(>task.mutex);
-  notify_proactor = schedule_if_inactive(p);
+  notify |= schedule_if_inactive(p);
   unlock(>task.mutex);
 }
   }
   /* We need to issue INACTIVE on immediate failure */
   unlock(>task.mutex);
-  if (notify) notify_poller(>task);
-  if (notify_proactor) notify_poller(>task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 static void pconnection_tick(pconnection_t *pc) {
@@ -1449,7 +1446,7 @@ void pn_connection_wake(pn_connection_t* c) {
 }
 unlock(>task.mutex);
   }
-  if (notify) notify_poller(>task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 void pn_proactor_release_connection(pn_connection_t *c) {
@@ -1463,7 +1460,7 @@ void pn_proactor_release_connection(pn_connection_t *c) {
 notify = schedule(>task);
 unlock(>task.mutex);
   }
-  if (notify) notify_poller(>task);
+  if (notify) notify_poller(pc->task.proactor);
 }
 
 // 
@@ -1576,7 +1573,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
   }
   proactor_add(>task);
   unlock(>task.mutex);
-  if (notify) notify_poller(>task);
+  if (notify) notify_poller(p);
   return;
 }
 
@@ -1656,7 +1653,7 @@ void pn_listener_close(pn_listener_t* l) {
 notify = schedule(>task);
   }
   unlock(>task.mutex);
-  if (notify) notify_poller(>task);
+  if (notify) notify_poller(l->task.proactor);
 }
 
 static void listener_forced_shutdown(pn_listener_t *l) {
@@ -1814,7 +1811,7 @@ static void listener_done(pn_listener_t *l) {
 notify = unassign_thread(ts, UNUSED);
 unlock(>sched_mutex);
 if (notify)
-  notify_poller(>task);
+  notify_poller(p);
 return;
   } else if (n_events || listener_has_event(l))
 notify = schedule(>task);
@@ -1824,7 +1821,7 @@ static void listener_done(pn_listener_t 

[qpid-proton] 03/03: PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic manner. This closes #290

2021-01-24 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4fc4c7869f470c688777c4f040b0fc31219f4f42
Author: Cliff Jansen 
AuthorDate: Sun Jan 24 11:18:30 2021 -0800

PROTON-2326: epoll proactor refactor - make all tasks wakeable in generic 
manner.  This closes #290
---
 c/src/proactor/epoll-internal.h   | 26 --
 c/src/proactor/epoll.c| 11 +--
 c/src/proactor/epoll_raw_connection.c | 19 ++-
 3 files changed, 39 insertions(+), 17 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 0d63f90..e8c9f0a 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -86,8 +86,9 @@ typedef struct task_t {
   pn_proactor_t *proactor;  /* Immutable */
   task_type_t type;
   bool working;
-  bool on_ready_list;
   bool ready;// ready to run and on ready list.  Poller 
notified by eventfd.
+  bool waking;
+  bool on_ready_list;// todo: protected by eventfd_mutex or sched 
mutex?  needed?
   struct task_t *ready_next; // ready list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -223,7 +224,6 @@ typedef struct pconnection_t {
   pni_timer_t *timer;
   const char *host, *port;
   uint32_t new_events;
-  int wake_count;   // TODO: protected by task.mutex so should be moved in 
there (also really bool)
   bool server;/* accept, not connect */
   bool tick_pending;
   bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
@@ -382,6 +382,28 @@ pn_event_batch_t 
*pni_timer_manager_process(pni_timer_manager_t *tm, bool timeou
 void pni_pconnection_timeout(pconnection_t *pc);
 void pni_proactor_timeout(pn_proactor_t *p);
 
+// Generic wake primitives for a task.
+
+// Call with task lock held.  Must call notify_poller() if returns true.
+static inline bool pni_task_wake(task_t *tsk) {
+  if (!tsk->waking) {
+tsk->waking = true;
+return schedule(tsk);
+  }
+  return false;
+}
+
+// Call with task lock held.
+static inline bool pni_task_wake_pending(task_t *tsk) {
+  return tsk->waking;
+}
+
+// Call with task lock held and only from the running task.
+static inline void pni_task_wake_done(task_t *tsk) {
+  tsk->waking = false;
+}
+
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index c4f028d..7467683 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -49,6 +49,7 @@
 
  TODO: document role of sched_pending and how sched_XXX (i.e. sched_interrupt)
  transitions from "private to the scheduler" to "visible to the task".
+ TODO: document task.working duration can be long: from xxx_process() to 
xxx_done() or null batch.
  */
 
 
@@ -742,7 +743,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   psocket_init(>psocket, PCONNECTION_IO);
   pni_parse_addr(addr, pc->addr_buf, addrlen+1, >host, >port);
   pc->new_events = 0;
-  pc->wake_count = 0;
   pc->tick_pending = false;
   pc->queued_disconnect = false;
   pc->disconnect_condition = NULL;
@@ -980,7 +980,7 @@ static bool pconnection_sched_sync(pconnection_t *pc) {
 
 /* Call with task lock and having done a write_flush() to "know" the value of 
wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->wake_count || pc->tick_pending || 
pc->queued_disconnect)
+  if (pc->new_events || pni_task_wake_pending(>task) || pc->tick_pending 
|| pc->queued_disconnect)
 return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
 return true;
@@ -1153,9 +1153,9 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
 return >batch;
   }
   bool closed = pconnection_rclosed(pc) && pconnection_wclosed(pc);
-  if (pc->wake_count) {
+  if (pni_task_wake_pending(>task)) {
 waking = !closed;
-pc->wake_count = 0;
+pni_task_wake_done(>task);
   }
   if (pc->tick_pending) {
 pc->tick_pending = false;
@@ -1441,8 +1441,7 @@ void pn_connection_wake(pn_connection_t* c) {
   if (pc) {
 lock(>task.mutex);
 if (!pc->task.closing) {
-  pc->wake_count++;
-  notify = schedule(>task);
+  notify = pni_task_wake(>task);
 }
 unlock(>task.mutex);
   }
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 3fd2b36..8722ff0 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -50,7 +50,6 @@ struct praw_connection_t {
   struct addrinfo *ai;   /* Current connect address */
   bool connected;
   bool disconnected;
-  bool waking; // TODO: This is actually protected by task.mutex so should be 
moved into task (pconnection too)
 };
 
 static void psocket_error(praw_connection_t *rc, int err,