This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 667f78c6d2ac17256ea67da54ff5703d424328f1
Author: Cliff Jansen <cjan...@redhat.com>
AuthorDate: Thu Nov 7 00:55:50 2019 -0800

    PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait 
for multiple events
---
 c/src/proactor/epoll.c    | 1627 ++++++++++++++++++++++++++++++++++-----------
 c/tests/proactor_test.cpp |    2 +-
 2 files changed, 1258 insertions(+), 371 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5cc3b65..d0664d9 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -19,6 +19,37 @@
  *
  */
 
+/*
+ The epoll proactor works with multiple concurrent threads.  If there is no 
work to do,
+ one thread temporarily becomes the poller thread, while other inbound threads 
suspend
+ waiting for work.  The poller calls epoll_wait(), generates work lists, 
resumes suspended
+ threads, and grabs work for itself or polls again.
+
+ A serialized grouping of Proton events is a context (connection, listener, 
proactor).
+ Each has multiple pollable fds that make it schedulable.  E.g. a connection 
could have a
+ socket fd, timerfd, and (indirect) eventfd all signaled in a single 
epoll_wait().
+
+ At the conclusion of each
+      N = epoll_wait(..., N_MAX, timeout)
+
+ there will be N epoll events and M wakes on the wake list.  M can be very 
large in a
+ server with many active connections. The poller makes the contexts "runnable" 
if they are
+ not already running.  A running context can only be made runnable once until 
it completes
+ a chunk of work and calls unassign_thread().  (N + M - duplicates) contexts 
will be
+ scheduled.  A new poller will occur when next_runnable() returns NULL.
+
+ A running context, before it stops "working" must check to see if there were 
new incoming
+ events that the poller posted to the context, but could not make it runnable 
since it was
+ already running.  The context will know if it needs to put itself back on the 
wake list
+ to be runnable later to process the pending events.
+
+ Lock ordering - never add locks right to left: 
+    context -> sched -> wake  
+    non-proactor-context -> proactor-context
+    tslot -> sched
+ */
+
+
 /* Enable POSIX features beyond c99 for modern pthread and standard 
strerror_r() */
 #ifndef _POSIX_C_SOURCE
 #define _POSIX_C_SOURCE 200809L
@@ -26,8 +57,9 @@
 /* Avoid GNU extensions, in particular the incompatible alternative 
strerror_r() */
 #undef _GNU_SOURCE
 
-#include "core/logger_private.h"
 #include "proactor-internal.h"
+#include "core/logger_private.h"
+#include "core/util.h"
 
 #include <proton/condition.h>
 #include <proton/connection_driver.h>
@@ -92,9 +124,7 @@ static void pstrerror(int err, strerrorbuf msg) {
 // ========================================================================
 
 // In general all locks to be held singly and shortly (possibly as spin locks).
-// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire
-// psocket first to avoid deadlock).  TODO: revisit the exception and its
-// awkwardness in the code (additional mutex? different type?).
+// See above about lock ordering.
 
 typedef pthread_mutex_t pmutex;
 static void pmutex_init(pthread_mutex_t *pm){
@@ -113,14 +143,13 @@ static inline void lock(pmutex *m) { 
pthread_mutex_lock(m); }
 static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
 
 typedef struct acceptor_t acceptor_t;
+typedef struct tslot_t tslot_t;
 
 typedef enum {
   WAKE,   /* see if any work to do in proactor/psocket context */
   PCONNECTION_IO,
-  PCONNECTION_IO_2,
   PCONNECTION_TIMER,
   LISTENER_IO,
-  CHAINED_EPOLL,
   PROACTOR_TIMER } epoll_type_t;
 
 // Data to use with epoll.
@@ -161,6 +190,8 @@ static void memory_barrier(epoll_extended_t *ee) {
  * The original implementation with counters to track expiry counts
  * was abandoned in favor of "in doubt" transitions and resolution
  * at shutdown.
+ *
+ * TODO: review above in light of single poller thread.
  */
 
 typedef struct ptimer_t {
@@ -262,6 +293,14 @@ static void ptimer_finalize(ptimer_t *pt) {
   pmutex_finalize(&pt->mutex);
 }
 
+pn_timestamp_t pn_i_now2(void)
+{
+  struct timespec now;
+  clock_gettime(CLOCK_REALTIME, &now);
+  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000);
+}
+
+
 // ========================================================================
 // Proactor common code
 // ========================================================================
@@ -330,30 +369,6 @@ static void stop_polling(epoll_extended_t *ee, int 
epollfd) {
  * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
  */
 
-/*
- * **** epollfd and epollfd_2 ****
- *
- * This implementation allows multiple threads to call epoll_wait()
- * concurrently (as opposed to having a single thread call
- * epoll_wait() and feed work to helper threads).  Unfortunately
- * with this approach, it is not possible to change the event
- * mask in one thread and be certain if zero or one callbacks occurred
- * on the previous event mask.  This can greatly complicate ordered
- * shutdown.  (See PROTON-1842)
- *
- * Currently, only pconnection sockets change between EPOLLIN,
- * EPOLLOUT, or both.  The rest use a constant EPOLLIN event mask.
- * Instead of trying to change the event mask for pconnection sockets,
- * if there is a missing attribute, it is added (EPOLLIN or EPOLLOUT)
- * as an event mask on the secondary or chained epollfd_2.  epollfd_2
- * is part of the epollfd fd set, so active events in epollfd_2 are
- * also seen in epollfd (but require a separate epoll_wait() and
- * rearm() to extract).
- *
- * Using this method and EPOLLONESHOT, it is possible to wait for all
- * outstanding armings on a socket to "resolve" via epoll_wait()
- * callbacks before freeing resources.
- */
 typedef enum {
   PROACTOR,
   PCONNECTION,
@@ -366,7 +381,8 @@ typedef struct pcontext_t {
   void *owner;              /* Instance governed by the context */
   pcontext_type_t type;
   bool working;
-  int wake_ops;             // unprocessed eventfd wake callback (convert to 
bool?)
+  bool on_wake_list;
+  bool wake_pending;             // unprocessed eventfd wake callback (convert 
to bool?)
   struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
   bool closing;
   // Next 4 are protected by the proactor mutex
@@ -374,8 +390,43 @@ typedef struct pcontext_t {
   struct pcontext_t* prev;  /* Protected by proactor.mutex */
   int disconnect_ops;           /* ops remaining before disconnect complete */
   bool disconnecting;           /* pn_proactor_disconnect */
+  // Protected by schedule mutex
+  tslot_t *runner __attribute__((aligned(64)));  /* designated or running 
thread */
+  tslot_t *prev_runner;
+  bool sched_wake;
+  bool sched_pending;           /* If true, one or more unseen epoll or other 
events to process() */
+  bool runnable ;               /* in need of scheduling */
 } pcontext_t;
 
+typedef enum {
+  NEW,
+  UNUSED,                       /* pn_proactor_done() called, may never come 
back */
+  SUSPENDED,
+  PROCESSING,                   /* Hunting for a context  */
+  BATCHING,                     /* Doing work on behalf of a context */
+  DELETING,
+  POLLING } tslot_state;
+
+// Epoll proactor's concept of a worker thread provided by the application.
+struct tslot_t {
+  pmutex mutex;  // suspend and resume
+  pthread_cond_t cond;
+  unsigned int generation;
+  bool suspended;
+  volatile bool scheduled;
+  tslot_state state;
+  pcontext_t *context;
+  pcontext_t *prev_context;
+  bool earmarked;
+  tslot_t *suspend_list_prev;
+  tslot_t *suspend_list_next;
+  tslot_t *earmark_override;   // on earmark_drain, which thread was unassigned
+  unsigned int earmark_override_gen;
+};
+
+// Fake thread for temporarily disabling the scheduling of a context.
+static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
+
 static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t 
*p, void *o) {
   memset(ctx, 0, sizeof(*ctx));
   pmutex_init(&ctx->mutex);
@@ -388,6 +439,7 @@ static void pcontext_finalize(pcontext_t* ctx) {
   pmutex_finalize(&ctx->mutex);
 }
 
+
 /* common to connection and listener */
 typedef struct psocket_t {
   pn_proactor_t *proactor;
@@ -397,19 +449,17 @@ typedef struct psocket_t {
   pn_listener_t *listener;      /* NULL for a connection socket */
   char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
+  uint32_t sched_io_events;
+  uint32_t working_io_events;
 } psocket_t;
 
 struct pn_proactor_t {
   pcontext_t context;
-  int epollfd;
-  int epollfd_2;
   ptimer_t timer;
-  pn_collector_t *collector;
-  pcontext_t *contexts;         /* in-use contexts for PN_PROACTOR_INACTIVE 
and cleanup */
   epoll_extended_t epoll_wake;
   epoll_extended_t epoll_interrupt;
-  epoll_extended_t epoll_secondary;
   pn_event_batch_t batch;
+  pcontext_t *contexts;         /* track in-use contexts for 
PN_PROACTOR_INACTIVE and disconnect */
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
   // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next 
update_batch()
   bool need_interrupt;
@@ -418,7 +468,8 @@ struct pn_proactor_t {
   bool timeout_set; /* timeout has been set by user and not yet cancelled or 
generated event */
   bool timeout_processed;  /* timeout event dispatched in the most recent 
event batch */
   bool timer_armed; /* timer is armed in epoll */
-  bool shutting_down;
+  int context_count;
+
   // wake subsystem
   int eventfd;
   pmutex eventfd_mutex;
@@ -430,30 +481,188 @@ struct pn_proactor_t {
   // If the process runs out of file descriptors, disarm listening sockets 
temporarily and save them here.
   acceptor_t *overflow;
   pmutex overflow_mutex;
+
+  // Sched vars specific to proactor context.
+  bool sched_timeout;
+  bool sched_interrupt;
+
+  // Global scheduling/poller vars.
+  // Warm runnables have assigned or earmarked tslots and can run right away.
+  // Other runnables are run as tslots come available.
+  pmutex sched_mutex;
+  int n_runnables;
+  int next_runnable;
+  int n_warm_runnables;
+  tslot_t *suspend_list_head;
+  tslot_t *suspend_list_tail;
+  int suspend_list_count;
+  tslot_t *poller;
+  bool poller_suspended;
+  tslot_t *last_earmark;
+  pcontext_t *sched_wake_first;
+  pcontext_t *sched_wake_last;
+  pcontext_t *sched_wake_current;
+  pmutex tslot_mutex;
+  int earmark_count;
+  bool earmark_drain;
+  bool sched_wakes_pending;
+
+  // Mostly read only: after init or once thread_count stabilizes
+  pn_collector_t *collector  __attribute__((aligned(64)));
+  pcontext_t **warm_runnables;
+  pcontext_t **runnables;
+  tslot_t **resume_list;
+  pn_hash_t *tslot_map;
+  struct epoll_event *kevents;
+  int epollfd;
+  int thread_count;
+  int thread_capacity;
+  int runnables_capacity;
+  int kevents_capacity;
+  bool shutting_down;
+};
+
+typedef struct pconnection_t {
+  psocket_t psocket;
+  pcontext_t context;
+  uint32_t new_events;
+  int wake_count;
+  bool server;                /* accept, not connect */
+  bool tick_pending;
+  bool timer_armed;
+  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
+  pn_condition_t *disconnect_condition;
+  ptimer_t timer;  // TODO: review one timerfd per connection
+  // Following values only changed by (sole) working context:
+  uint32_t current_arm;  // active epoll io events
+  bool connected;
+  bool read_blocked;
+  bool write_blocked;
+  bool disconnected;
+  int hog_count; // thread hogging limiter
+  pn_event_batch_t batch;
+  pn_connection_driver_t driver;
+  bool wbuf_valid;
+  const char *wbuf_current;
+  size_t wbuf_remaining;
+  size_t wbuf_completed;
+  struct pn_netaddr_t local, remote; /* Actual addresses */
+  struct addrinfo *addrinfo;         /* Resolved address list */
+  struct addrinfo *ai;               /* Current connect address */
+  pmutex rearm_mutex;                /* protects pconnection_rearm from out of 
order arming*/
+  bool io_doublecheck;               /* callbacks made and new IO may have 
arrived */
+  bool sched_timeout;
+} pconnection_t;
+
+/*
+ * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * are armed separately.  The individual psockets can be part of at most one
+ * list: the global proactor overflow retry list or the per-listener list of
+ * pending accepts (valid inbound socket obtained, but pn_listener_accept not
+ * yet called by the application).  These lists will be small and quick to
+ * traverse.
+ */
+
+struct acceptor_t{
+  psocket_t psocket;
+  int accepted_fd;
+  bool armed;
+  bool overflowed;
+  acceptor_t *next;              /* next listener list member */
+  struct pn_netaddr_t addr;      /* listening address */
+};
+
+struct pn_listener_t {
+  acceptor_t *acceptors;          /* Array of listening sockets */
+  size_t acceptors_size;
+  int active_count;               /* Number of listener sockets registered 
with epoll */
+  pcontext_t context;
+  pn_condition_t *condition;
+  pn_collector_t *collector;
+  pn_event_batch_t batch;
+  pn_record_t *attachments;
+  void *listener_context;
+  acceptor_t *pending_acceptors;  /* list of those with a valid inbound fd*/
+  int pending_count;
+  bool unclaimed;                 /* attach event dispatched but no 
pn_listener_attach() call yet */
+  size_t backlog;
+  bool close_dispatched;
+  pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
+  uint32_t sched_io_events;
 };
 
+
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee);
 
 /*
  * Wake strategy with eventfd.
  *  - wakees can be in the list only once
- *  - wakers only write() if wakes_in_progress is false
- *  - wakees only read() if about to set wakes_in_progress to false
- * When multiple wakes are pending, the kernel cost is a single rearm().
- * Otherwise it is the trio of write/read/rearm.
- * Only the writes and reads need to be carefully ordered.
+ *  - wakers only use the eventfd if wakes_in_progress is false
+ * There is a single rearm between wakes > 0 and wakes == 0
+ *
+ * There can potentially be many contexts with wakes pending.
  *
- * Multiple eventfds could be used and shared amongst the pcontext_t's.
+ * The wake list is in two parts.  The front is the chunk the
+ * scheduler will process until the next epoll_wait().  sched_wake
+ * indicates which chunk it is on. The ctx may already be running or
+ * scheduled to run.
+ *
+ * The ctx must be actually running to absorb ctx->wake_pending.
+ *
+ * The wake list can keep growing while popping wakes.  The list between
+ * sched_wake_first and sched_wake_last are protected by the sched
+ * lock (for pop operations), sched_wake_last to wake_list_last are
+ * protected by the eventfd mutex (for add operations).  Both locks
+ * are needed to cross or reconcile the two portions of the list.
  */
 
+// Call with sched lock held.
+static void pop_wake(pcontext_t *ctx) {
+  // every context on the sched_wake_list is either currently running,
+  // or to be scheduled.  wake() will not "see" any of the wake_next
+  // pointers until wake_pending and working have transitioned to 0
+  // and false, when a context stops working.
+  //
+  // every context must transition as:
+  //
+  // !wake_pending .. wake() .. on wake_list .. on sched_wake_list .. working 
context .. !sched_wake && !wake_pending
+  //
+  // Intervening locks at each transition ensures wake_next has memory 
coherence throughout the wake cycle.
+  pn_proactor_t *p = ctx->proactor;
+  if (ctx == p->sched_wake_current)
+    p->sched_wake_current = ctx->wake_next;
+  if (ctx == p->sched_wake_first) {
+    // normal code path
+    if (ctx == p->sched_wake_last) {
+      p->sched_wake_first = p->sched_wake_last = NULL;
+    } else {
+      p->sched_wake_first = ctx->wake_next;
+    }
+    if (!p->sched_wake_first)
+      p->sched_wake_last = NULL;
+  } else {
+    // ctx is not first in a multi-element list
+    pcontext_t *prev = NULL;
+    for (pcontext_t *i = p->sched_wake_first; i != ctx; i = i->wake_next)
+      prev = i;
+    prev->wake_next = ctx->wake_next;
+    if (ctx == p->sched_wake_last)
+      p->sched_wake_last = prev;
+  }
+  ctx->on_wake_list = false;
+}
+
 // part1: call with ctx->owner lock held, return true if notify required by 
caller
 static bool wake(pcontext_t *ctx) {
   bool notify = false;
-  if (!ctx->wake_ops) {
+
+  if (!ctx->wake_pending) {
     if (!ctx->working) {
-      ctx->wake_ops++;
+      ctx->wake_pending = true;
       pn_proactor_t *p = ctx->proactor;
       lock(&p->eventfd_mutex);
+      ctx->wake_next = NULL;
+      ctx->on_wake_list = true;
       if (!p->wake_list_first) {
         p->wake_list_first = p->wake_list_last = ctx;
       } else {
@@ -468,50 +677,231 @@ static bool wake(pcontext_t *ctx) {
       unlock(&p->eventfd_mutex);
     }
   }
+
   return notify;
 }
 
 // part2: make OS call without lock held
 static inline void wake_notify(pcontext_t *ctx) {
-  if (ctx->proactor->eventfd == -1)
+  pn_proactor_t *p = ctx->proactor;
+  if (p->eventfd == -1)
     return;
-  uint64_t increment = 1;
-  if (write(ctx->proactor->eventfd, &increment, sizeof(uint64_t)) != 
sizeof(uint64_t))
-    EPOLL_FATAL("setting eventfd", errno);
+  rearm(p, &p->epoll_wake);
 }
 
-// call with no locks
-static pcontext_t *wake_pop_front(pn_proactor_t *p) {
-  pcontext_t *ctx = NULL;
-  lock(&p->eventfd_mutex);
-  assert(p->wakes_in_progress);
-  if (p->wake_list_first) {
-    ctx = p->wake_list_first;
-    p->wake_list_first = ctx->wake_next;
-    if (!p->wake_list_first) p->wake_list_last = NULL;
-    ctx->wake_next = NULL;
-
-    if (!p->wake_list_first) {
-      /* Reset the eventfd until a future write.
-       * Can the read system call be made without holding the lock?
-       * Note that if the reads/writes happen out of order, the wake
-       * mechanism will hang. */
-      (void)read_uint64(p->eventfd);
-      p->wakes_in_progress = false;
+// call with owner lock held, once for each pop from the wake list
+static inline void wake_done(pcontext_t *ctx) {
+//  assert(ctx->wake_pending > 0);
+  ctx->wake_pending = false;
+}
+
+
+/*
+ * Scheduler/poller
+*/
+
+// Internal use only
+/* How long to defer suspending */
+static int pni_spins = 0;
+/* Prefer immediate running by poller over warm running by suspended thread */
+static bool pni_immediate = false;
+/* Toggle use of warm scheduling */
+static int pni_warm_sched = true;
+
+
+// Call with sched lock
+static void suspend_list_add_tail(pn_proactor_t *p, tslot_t *ts) {
+  LL_ADD(p, suspend_list, ts);
+}
+
+// Call with sched lock
+static void suspend_list_insert_head(pn_proactor_t *p, tslot_t *ts) {
+  ts->suspend_list_next = p->suspend_list_head;
+  ts->suspend_list_prev = NULL;
+  if (p->suspend_list_head)
+    p->suspend_list_head->suspend_list_prev = ts;
+  else
+    p->suspend_list_tail = ts;
+  p->suspend_list_head = ts;
+}
+
+// Call with sched lock
+static void suspend(pn_proactor_t *p, tslot_t *ts) {
+  if (ts->state == NEW)
+    suspend_list_add_tail(p, ts);
+  else
+    suspend_list_insert_head(p, ts);
+  p->suspend_list_count++;
+  ts->state = SUSPENDED;
+  ts->scheduled = false;
+  unlock(&p->sched_mutex);
+
+  lock(&ts->mutex);
+  if (pni_spins && !ts->scheduled) {
+    // Medium length spinning tried here.  Raises cpu dramatically,
+    // unclear throughput or latency benefit (not seen where most
+    // expected, modest at other times).
+    bool locked = true;
+    for (volatile int i = 0; i < pni_spins; i++) {
+      if (locked) {
+        unlock(&ts->mutex);
+        locked = false;
+      }
+      if ((i % 1000) == 0) {
+        locked = (pthread_mutex_trylock(&ts->mutex) == 0);
+      }
+      if (ts->scheduled) break;
     }
+    if (!locked)
+      lock(&ts->mutex);
+  }
+
+  ts->suspended = true;
+  while (!ts->scheduled) {
+    pthread_cond_wait(&ts->cond, &ts->mutex);
+  }
+  ts->suspended = false;
+  unlock(&ts->mutex);
+  lock(&p->sched_mutex);
+  assert(ts->state == PROCESSING);
+}
+
+// Call with no lock
+static void resume(pn_proactor_t *p, tslot_t *ts) {
+  lock(&ts->mutex);
+  ts->scheduled = true;
+  if (ts->suspended) {
+    pthread_cond_signal(&ts->cond);
+  }
+  unlock(&ts->mutex);
+
+}
+
+// Call with sched lock
+static void assign_thread(tslot_t *ts, pcontext_t *ctx) {
+  assert(!ctx->runner);
+  ctx->runner = ts;
+  ctx->prev_runner = NULL;
+  ctx->runnable = false;
+  ts->context = ctx;
+  ts->prev_context = NULL;
+}
+
+// call with sched lock
+static bool rewake(pcontext_t *ctx) {
+  // Special case wake() where context is unassigned and a popped wake needs 
to be put back on the list.
+  // Should be rare.
+  bool notify = false;
+  pn_proactor_t *p = ctx->proactor;
+  lock(&p->eventfd_mutex);
+  assert(ctx->wake_pending);
+  assert(!ctx->on_wake_list);
+  ctx->wake_next = NULL;
+  ctx->on_wake_list = true;
+  if (!p->wake_list_first) {
+    p->wake_list_first = p->wake_list_last = ctx;
+  } else {
+    p->wake_list_last->wake_next = ctx;
+    p->wake_list_last = ctx;
+  }
+  if (!p->wakes_in_progress) {
+    // force a wakeup via the eventfd
+    p->wakes_in_progress = true;
+    notify = true;
   }
   unlock(&p->eventfd_mutex);
-  rearm(p, &p->epoll_wake);
-  return ctx;
+  return notify;
 }
 
-// call with owner lock held, once for each pop from the wake list
-static inline void wake_done(pcontext_t *ctx) {
-  assert(ctx->wake_ops > 0);
-  ctx->wake_ops--;
+// Call with sched lock
+static bool unassign_thread(tslot_t *ts, tslot_state new_state) {
+  pcontext_t *ctx = ts->context;
+  bool notify = false;
+  bool deleting = (ts->state == DELETING);
+  ts->context = NULL;
+  ts->state = new_state;
+  if (ctx) {
+    ctx->runner = NULL;
+    ctx->prev_runner = ts;
+  }
+
+  // Check if context has unseen events/wake that need processing.
+
+  if (ctx && !deleting) {
+    pn_proactor_t *p = ctx->proactor;
+    ts->prev_context = ts->context;
+    if (ctx->sched_pending) {
+      // Need a new wake
+      if (ctx->sched_wake) {
+        if (!ctx->on_wake_list) {
+          // Remember it for next poller
+          ctx->sched_wake = false;
+          notify = rewake(ctx);     // back on wake list for poller to see
+        }
+        // else already scheduled
+      } else {
+        // bad corner case.  Block ctx from being scheduled again until a 
later post_wake()
+        ctx->runner = REWAKE_PLACEHOLDER;
+        unlock(&p->sched_mutex);
+        lock(&ctx->mutex);
+        notify = wake(ctx);
+        unlock(&ctx->mutex);
+        lock(&p->sched_mutex);
+      }
+    }
+  }
+  return notify;
+}
+
+// Call with sched lock
+static void earmark_thread(tslot_t *ts, pcontext_t *ctx) {
+  assign_thread(ts, ctx);
+  ts->earmarked = true;
+  ctx->proactor->earmark_count++;
+}
+
+// Call with sched lock
+static void remove_earmark(tslot_t *ts) {
+  pcontext_t *ctx = ts->context;
+  ts->context = NULL;
+  ctx->runner = NULL;
+  ts->earmarked = false;
+  ctx->proactor->earmark_count--;
+}
+
+// Call with sched lock
+static void make_runnable(pcontext_t *ctx) {
+  pn_proactor_t *p = ctx->proactor;
+  assert(p->n_runnables <= p->runnables_capacity);
+  assert(!ctx->runnable);
+  if (ctx->runner) return;
+
+  ctx->runnable = true;
+  // Track it as normal or warm or earmarked
+  if (pni_warm_sched) {
+    tslot_t *ts = ctx->prev_runner;
+    if (ts && ts->prev_context == ctx) {
+      if (ts->state == SUSPENDED || ts->state == PROCESSING) {
+        if (p->n_warm_runnables < p->thread_capacity) {
+          p->warm_runnables[p->n_warm_runnables++] = ctx;
+          assign_thread(ts, ctx);
+        }
+        else
+          p->runnables[p->n_runnables++] = ctx;
+        return;
+      }
+      if (ts->state == UNUSED && !p->earmark_drain) {
+        earmark_thread(ts, ctx);
+        p->last_earmark = ts;
+        return;
+      }
+    }
+  }
+  p->runnables[p->n_runnables++] = ctx;
 }
 
 
+
 static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t 
*listener, const char *addr)
 {
   ps->epoll_io.psocket = ps;
@@ -525,35 +915,6 @@ static void psocket_init(psocket_t* ps, pn_proactor_t* p, 
pn_listener_t *listene
   pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, 
&ps->port);
 }
 
-typedef struct pconnection_t {
-  psocket_t psocket;
-  pcontext_t context;
-  uint32_t new_events;
-  uint32_t new_events_2;
-  int wake_count;
-  bool server;                /* accept, not connect */
-  bool tick_pending;
-  bool timer_armed;
-  bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
-  pn_condition_t *disconnect_condition;
-  ptimer_t timer;  // TODO: review one timerfd per connection
-  // Following values only changed by (sole) working context:
-  uint32_t current_arm;  // active epoll io events
-  uint32_t current_arm_2;  // secondary active epoll io events
-  bool connected;
-  bool read_blocked;
-  bool write_blocked;
-  bool disconnected;
-  int hog_count; // thread hogging limiter
-  pn_event_batch_t batch;
-  pn_connection_driver_t driver;
-  struct pn_netaddr_t local, remote; /* Actual addresses */
-  struct addrinfo *addrinfo;         /* Resolved address list */
-  struct addrinfo *ai;               /* Current connect address */
-  pmutex rearm_mutex;                /* protects pconnection_rearm from out of 
order arming*/
-  epoll_extended_t epoll_io_2;
-  epoll_extended_t *rearm_target;    /* main or secondary epollfd */
-} pconnection_t;
 
 /* Protects read/update of pn_connection_t pointer to it's pconnection_t
  *
@@ -580,43 +941,7 @@ static void set_pconnection(pn_connection_t* c, 
pconnection_t *pc) {
   unlock(&driver_ptr_mutex);
 }
 
-/*
- * A listener can have multiple sockets (as specified in the addrinfo).  They
- * are armed separately.  The individual psockets can be part of at most one
- * list: the global proactor overflow retry list or the per-listener list of
- * pending accepts (valid inbound socket obtained, but pn_listener_accept not
- * yet called by the application).  These lists will be small and quick to
- * traverse.
- */
-
-struct acceptor_t{
-  psocket_t psocket;
-  int accepted_fd;
-  bool armed;
-  bool overflowed;
-  acceptor_t *next;              /* next listener list member */
-  struct pn_netaddr_t addr;      /* listening address */
-};
-
-struct pn_listener_t {
-  acceptor_t *acceptors;          /* Array of listening sockets */
-  size_t acceptors_size;
-  int active_count;               /* Number of listener sockets registered 
with epoll */
-  pcontext_t context;
-  pn_condition_t *condition;
-  pn_collector_t *collector;
-  pn_event_batch_t batch;
-  pn_record_t *attachments;
-  void *listener_context;
-  acceptor_t *pending_acceptors;  /* list of those with a valid inbound fd*/
-  int pending_count;
-  bool unclaimed;                 /* attach event dispatched but no 
pn_listener_attach() call yet */
-  size_t backlog;
-  bool close_dispatched;
-  pmutex rearm_mutex;             /* orders rearms/disarms, nothing else */
-};
-
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool topup, bool is_io_2);
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool wake, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(pcontext_t *ctx);
@@ -705,24 +1030,6 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) 
{
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
-// Only used by pconnection_t if two separate epoll interests in play
-static void rearm_2(pn_proactor_t *p, epoll_extended_t *ee) {
-  // Delay registration until first use.  It's not OK to register or arm
-  // with an event mask of 0 (documented below).  It is OK to leave a
-  // disabled event registered until the next EPOLLONESHOT.
-  if (!ee->polling) {
-    ee->fd = ee->psocket->sockfd;
-    start_polling(ee, p->epollfd_2);
-  } else {
-    struct epoll_event ev = {0};
-    ev.data.ptr = ee;
-    ev.events = ee->wanted | EPOLLONESHOT;
-    memory_barrier(ee);
-    if (epoll_ctl(p->epollfd_2, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
-      EPOLL_FATAL("arming polled file descriptor (secondary)", errno);
-  }
-}
-
 static void listener_list_append(acceptor_t **start, acceptor_t *item) {
   assert(item->next == NULL);
   if (*start) {
@@ -810,7 +1117,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   pcontext_init(&pc->context, PCONNECTION, p, pc);
   psocket_init(&pc->psocket, p, NULL, addr);
   pc->new_events = 0;
-  pc->new_events_2 = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
   pc->timer_armed = false;
@@ -818,11 +1124,14 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   pc->disconnect_condition = NULL;
 
   pc->current_arm = 0;
-  pc->current_arm_2 = 0;
   pc->connected = false;
   pc->read_blocked = true;
   pc->write_blocked = true;
   pc->disconnected = false;
+  pc->wbuf_valid = false;
+  pc->wbuf_completed = 0;
+  pc->wbuf_remaining = 0;
+  pc->wbuf_current = NULL;
   pc->hog_count = 0;
   pc->batch.next_event = pconnection_batch_next;
 
@@ -836,13 +1145,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   }
   pmutex_init(&pc->rearm_mutex);
 
-  epoll_extended_t *ee = &pc->epoll_io_2;
-  ee->psocket = &pc->psocket;
-  ee->fd = -1;
-  ee->type = PCONNECTION_IO_2;
-  ee->wanted = 0;
-  ee->polling = false;
-
   /* Set the pconnection_t backpointer last.
      Connections that were released by pn_proactor_release_connection() must 
not reveal themselves
      to be re-associated with a proactor till setup is complete.
@@ -855,7 +1157,7 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
 // Call with lock held and closing == true (i.e. 
pn_connection_driver_finished() == true), timer cancelled.
 // Return true when all possible outstanding epoll events associated with this 
pconnection have been processed.
 static inline bool pconnection_is_final(pconnection_t *pc) {
-  return !pc->current_arm && !pc->current_arm_2 && !pc->timer_armed && 
!pc->context.wake_ops;
+  return !pc->current_arm && !pc->timer_armed && !pc->context.wake_pending;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
@@ -875,13 +1177,16 @@ static void pconnection_final_free(pconnection_t *pc) {
   free(pc);
 }
 
-// call without lock, but only if pconnection_is_final() is true
+
+// call without lock
 static void pconnection_cleanup(pconnection_t *pc) {
+  assert(pconnection_is_final(pc));
   stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
   if (pc->psocket.sockfd != -1)
     pclosefd(pc->psocket.proactor, pc->psocket.sockfd);
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
   ptimer_finalize(&pc->timer);
+
   lock(&pc->context.mutex);
   bool can_free = proactor_remove(&pc->context);
   unlock(&pc->context.mutex);
@@ -890,11 +1195,33 @@ static void pconnection_cleanup(pconnection_t *pc) {
   // else proactor_disconnect logic owns psocket and its final free
 }
 
+static void invalidate_wbuf(pconnection_t *pc) {
+  if (pc->wbuf_valid) {
+    if (pc->wbuf_completed)
+      pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
+    pc->wbuf_completed = 0;
+    pc->wbuf_remaining = 0;
+    pc->wbuf_valid = false;
+  }
+}
+
+// Never call with any locks held.
+static void ensure_wbuf(pconnection_t *pc) {
+  if (!pc->wbuf_valid) {
+    // next connection_driver call is the expensive output generator
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+    pc->wbuf_completed = 0;
+    pc->wbuf_remaining = wbuf.size;
+    pc->wbuf_current = wbuf.start;
+    pc->wbuf_valid = true;
+  }
+}
+
 // Call with lock held or from forced_shutdown
 static void pconnection_begin_close(pconnection_t *pc) {
   if (!pc->context.closing) {
     pc->context.closing = true;
-    if (pc->current_arm || pc->current_arm_2) {
+    if (pc->current_arm) {
       // Force EPOLLHUP callback(s)
       shutdown(pc->psocket.sockfd, SHUT_RDWR);
     }
@@ -914,12 +1241,10 @@ static void pconnection_forced_shutdown(pconnection_t 
*pc) {
   // Called by proactor_free, no competing threads, no epoll activity.
   pc->current_arm = 0;
   pc->new_events = 0;
-  pc->current_arm_2 = 0;
-  pc->new_events_2 = 0;
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
   pc->timer_armed = false;
-  pc->context.wake_ops = 0;
+  pc->context.wake_pending = 0;
   pn_collector_release(pc->driver.collector);
   assert(pconnection_is_final(pc));
   pconnection_cleanup(pc);
@@ -930,14 +1255,28 @@ static pn_event_t 
*pconnection_batch_next(pn_event_batch_t *batch) {
   if (!pc->driver.connection) return NULL;
   pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
   if (!e) {
-    write_flush(pc);  // May generate transport event
-    e = pn_connection_driver_next_event(&pc->driver);
-    if (!e && pc->hog_count < HOG_MAX) {
-      if (pconnection_process(pc, 0, false, true, false)) {
+    pn_proactor_t *p = pc->context.proactor;
+    bool idle_threads;
+    lock(&p->sched_mutex);
+    idle_threads = (p->suspend_list_head != NULL);
+    unlock(&p->sched_mutex);
+    if (idle_threads) {
+      write_flush(pc);  // May generate transport event
+      pc->read_blocked = pc->write_blocked = false;
+      pconnection_process(pc, 0, false, false, true);
+      e = pn_connection_driver_next_event(&pc->driver);
+    }
+    else {
+      write_flush(pc);  // May generate transport event
+      e = pn_connection_driver_next_event(&pc->driver);
+      if (!e && pc->hog_count < HOG_MAX) {
+        pconnection_process(pc, 0, false, false, true);
         e = pn_connection_driver_next_event(&pc->driver);
       }
     }
   }
+  if (e) invalidate_wbuf(pc);
+
   return e;
 }
 
@@ -960,7 +1299,7 @@ static inline bool pconnection_wclosed(pconnection_t  *pc) 
{
    close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
 */
 static bool pconnection_rearm_check(pconnection_t *pc) {
-  if (pc->current_arm && pc->current_arm_2) return false;  // Maxed out
+  assert(pc->wbuf_valid);
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
     return false;
   }
@@ -969,79 +1308,121 @@ static bool pconnection_rearm_check(pconnection_t *pc) {
     if (pc->write_blocked)
       wanted_now |= EPOLLOUT;
     else {
-      pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-      if (wbuf.size > 0)
+      if (pc->wbuf_remaining > 0)
         wanted_now |= EPOLLOUT;
     }
   }
   if (!wanted_now) return false;
-
-  uint32_t have_now = pc->current_arm ?  pc->current_arm : pc->current_arm_2;
-  uint32_t needed = wanted_now & ~have_now;
-  if (!needed) return false;
+  if (wanted_now == pc->current_arm) return false;
 
   lock(&pc->rearm_mutex);      /* unlocked in pconnection_rearm... */
-  // Always favour main epollfd
-  if (!pc->current_arm) {
-    pc->current_arm = pc->psocket.epoll_io.wanted = needed;
-    pc->rearm_target = &pc->psocket.epoll_io;
-  } else {
-    pc->current_arm_2 = pc->epoll_io_2.wanted = needed;
-    pc->rearm_target = &pc->epoll_io_2;
-  }
+  pc->current_arm = pc->psocket.epoll_io.wanted = wanted_now;
   return true;                     /* ... so caller MUST call 
pconnection_rearm */
 }
 
 /* Call without lock */
 static inline void pconnection_rearm(pconnection_t *pc) {
-  if (pc->rearm_target == &pc->psocket.epoll_io) {
-    rearm(pc->psocket.proactor, pc->rearm_target);
-  } else {
-    rearm_2(pc->psocket.proactor, pc->rearm_target);
-  }
-  pc->rearm_target = NULL;
+  rearm(pc->psocket.proactor, &pc->psocket.epoll_io);
   unlock(&pc->rearm_mutex);
   // Return immediately.  pc may have just been freed by another thread.
 }
 
+/* Only call when context switch is imminent.  Sched lock is highly contested. 
*/
+// Call with both context and sched locks.
+static bool pconnection_sched_sync(pconnection_t *pc) {
+  if (pc->sched_timeout) {
+    pc->tick_pending = true;
+    pc->sched_timeout = false;
+  }
+  if (pc->psocket.sched_io_events) {
+    pc->new_events = pc->psocket.sched_io_events;
+    pc->psocket.sched_io_events = 0;
+    pc->current_arm = 0;  // or outside lock?
+  }
+  if (pc->context.sched_wake) {
+    pc->context.sched_wake = false;
+    wake_done(&pc->context);
+  }
+  pc->context.sched_pending = false;
+
+  // Indicate if there are free proactor threads
+  pn_proactor_t *p = pc->context.proactor;
+  return p->poller_suspended || p->suspend_list_head;
+}
+
+/* Call with context lock and having done a write_flush() to "know" the value 
of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  if (pc->new_events || pc->new_events_2 || pc->wake_count || pc->tick_pending 
|| pc->queued_disconnect)
+  assert(pc->wbuf_valid);
+  if (pc->new_events || pc->wake_count || pc->tick_pending || 
pc->queued_disconnect)
     return true;
   if (!pc->read_blocked && !pconnection_rclosed(pc))
     return true;
-  pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-  return (wbuf.size > 0 && !pc->write_blocked);
+  return (pc->wbuf_remaining > 0 && !pc->write_blocked);
 }
 
+/* Call with no locks. */
 static void pconnection_done(pconnection_t *pc) {
+  pn_proactor_t *p = pc->context.proactor;
+  tslot_t *ts = pc->context.runner;
+  write_flush(pc);
   bool notify = false;
+  bool self_wake = false;
   lock(&pc->context.mutex);
   pc->context.working = false;  // So we can wake() ourself if necessary.  We 
remain the de facto
-                                // working context while the lock is held.
+                                // working context while the lock is held.  
Need sched_sync too to drain possible stale wake.
   pc->hog_count = 0;
-  if (pconnection_has_event(pc) || pconnection_work_pending(pc)) {
-    notify = wake(&pc->context);
+  bool has_event = pconnection_has_event(pc);
+  // Do as little as possible while holding the sched lock
+  lock(&p->sched_mutex);
+  pconnection_sched_sync(pc);
+  unlock(&p->sched_mutex);
+
+  if (has_event || pconnection_work_pending(pc)) {
+    self_wake = true;
   } else if (pn_connection_driver_finished(&pc->driver)) {
     pconnection_begin_close(pc);
     if (pconnection_is_final(pc)) {
       unlock(&pc->context.mutex);
       pconnection_cleanup(pc);
+      // pc may be undefined now
+      lock(&p->sched_mutex);
+      notify = unassign_thread(ts, UNUSED);
+      unlock(&p->sched_mutex);
+      if (notify)
+        wake_notify(&p->context);
       return;
     }
   }
+  if (self_wake)
+    notify = wake(&pc->context);
+
   bool rearm = pconnection_rearm_check(pc);
   unlock(&pc->context.mutex);
-  if (notify) wake_notify(&pc->context);
+
   if (rearm) pconnection_rearm(pc);  // May free pc on another thread.  Return.
+  lock(&p->sched_mutex);
+  if (unassign_thread(ts, UNUSED))
+    notify = true;
+  unlock(&p->sched_mutex);
+  if (notify) wake_notify(&p->context);
   return;
 }
 
 // Return true unless error
-static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
-  ssize_t n = send(pc->psocket.sockfd, wbuf.start, wbuf.size, MSG_NOSIGNAL);
+ static bool pconnection_write(pconnection_t *pc) {
+  size_t wbuf_size = pc->wbuf_remaining;
+  ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, 
MSG_NOSIGNAL);
   if (n > 0) {
-    pn_connection_driver_write_done(&pc->driver, n);
-    if ((size_t) n < wbuf.size) pc->write_blocked = true;
+    pc->wbuf_completed += n;
+    pc->wbuf_remaining -= n;
+    pc->io_doublecheck = false;
+    if (pc->wbuf_remaining)
+      pc->write_blocked = true;
+    else {
+      // No need to aggregate multiple writes
+      pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed);
+      pc->wbuf_completed = 0;
+    }
   } else if (errno == EWOULDBLOCK) {
     pc->write_blocked = true;
   } else if (!(errno == EAGAIN || errno == EINTR)) {
@@ -1050,11 +1431,12 @@ static bool pconnection_write(pconnection_t *pc, 
pn_bytes_t wbuf) {
   return true;
 }
 
+// Never call with any locks held.
 static void write_flush(pconnection_t *pc) {
+  ensure_wbuf(pc);
   if (!pc->write_blocked && !pconnection_wclosed(pc)) {
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-    if (wbuf.size > 0) {
-      if (!pconnection_write(pc, wbuf)) {
+    if (pc->wbuf_remaining > 0) {
+      if (!pconnection_write(pc)) {
         psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : 
"on write to");
       }
     }
@@ -1070,24 +1452,14 @@ static void write_flush(pconnection_t *pc) {
 static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
 
-/*
- * May be called concurrently from multiple threads:
- *   pn_event_batch_t loop (topup is true)
- *   timer (timeout is true)
- *   socket io (events != 0) from PCONNECTION_IO
- *      and PCONNECTION_IO_2 event masks (possibly simultaneously)
- *   one or more wake()
- * Only one thread becomes (or always was) the working thread.
- */
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool topup, bool is_io_2) {
-  bool inbound_wake = !(events | timeout | topup);
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool sched_wake, bool topup) {
+  bool inbound_wake = sched_wake;
   bool rearm_timer = false;
   bool timer_fired = false;
   bool waking = false;
   bool tick_required = false;
 
   // Don't touch data exclusive to working thread (yet).
-
   if (timeout) {
     rearm_timer = true;
     timer_fired = ptimer_callback(&pc->timer) != 0;
@@ -1095,17 +1467,15 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   lock(&pc->context.mutex);
 
   if (events) {
-    if (is_io_2)
-      pc->new_events_2 = events;
-    else
-      pc->new_events = events;
+    pc->new_events = events;
+    pc->current_arm = 0;
     events = 0;
   }
-  else if (timer_fired) {
+  if (timer_fired) {
     pc->tick_pending = true;
     timer_fired = false;
   }
-  else if (inbound_wake) {
+  if (inbound_wake) {
     wake_done(&pc->context);
     inbound_wake = false;
   }
@@ -1120,9 +1490,8 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   }
   else {
     if (pc->context.working) {
-      // Another thread is the working context.
-      unlock(&pc->context.mutex);
-      return NULL;
+      // Another thread is the working context.  Should be impossible with new 
scheduler.
+      EPOLL_FATAL("internal epoll proactor error: two worker threads", 0);
     }
     pc->context.working = true;
   }
@@ -1155,18 +1524,10 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     tick_required = !closed;
   }
 
-  uint32_t update_events = 0;
   if (pc->new_events) {
-    update_events = pc->new_events;
+    uint32_t update_events = pc->new_events;
     pc->current_arm = 0;
     pc->new_events = 0;
-  }
-  if (pc->new_events_2) {
-    update_events |= pc->new_events_2;
-    pc->current_arm_2 = 0;
-    pc->new_events_2 = 0;
-  }
-  if (update_events) {
     if (!pc->context.closing) {
       if ((update_events & (EPOLLHUP | EPOLLERR)) && !pconnection_rclosed(pc) 
&& !pconnection_wclosed(pc))
         pconnection_maybe_connect_lh(pc);
@@ -1201,11 +1562,12 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
     if (rbuf.size > 0 && !pc->read_blocked) {
       ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
-
       if (n > 0) {
         pn_connection_driver_read_done(&pc->driver, n);
+        invalidate_wbuf(pc);
         pconnection_tick(pc);         /* check for tick changes. */
         tick_required = false;
+        pc->io_doublecheck = false;
         if (!pn_connection_driver_read_closed(&pc->driver) && (size_t)n < 
rbuf.size)
           pc->read_blocked = true;
       }
@@ -1223,6 +1585,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   if (tick_required) {
     pconnection_tick(pc);         /* check for tick changes. */
     tick_required = false;
+    invalidate_wbuf(pc);
   }
 
   if (topup) {
@@ -1231,6 +1594,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   if (pconnection_has_event(pc)) {
+    invalidate_wbuf(pc);
     return &pc->batch;
   }
 
@@ -1244,8 +1608,13 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   // Never stop working while work remains.  hog_count exception to this rule 
is elsewhere.
-  if (pconnection_work_pending(pc))
+  lock(&pc->context.proactor->sched_mutex);
+  bool workers_free = pconnection_sched_sync(pc);
+  unlock(&pc->context.proactor->sched_mutex);
+
+  if (pconnection_work_pending(pc)) {
     goto retry;  // TODO: get rid of goto without adding more locking
+  }
 
   pc->context.working = false;
   pc->hog_count = 0;
@@ -1258,6 +1627,15 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     }
   }
 
+  if (workers_free && !pc->context.closing && !pc->io_doublecheck) {
+    // check one last time for new io before context switch
+    pc->io_doublecheck = true;
+    pc->read_blocked = false;
+    pc->write_blocked = false;
+    pc->context.working = true;
+    goto retry;
+  }
+
   if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) {
     pc->timer_armed = true;
     rearm(pc->psocket.proactor, &pc->timer.epoll_io);
@@ -1564,7 +1942,7 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
 
 // call with lock held and context.working false
 static inline bool listener_can_free(pn_listener_t *l) {
-  return l->context.closing && l->close_dispatched && !l->context.wake_ops && 
!l->active_count;
+  return l->context.closing && l->close_dispatched && !l->context.wake_pending 
&& !l->active_count;
 }
 
 static inline void listener_final_free(pn_listener_t *l) {
@@ -1651,7 +2029,7 @@ static void listener_forced_shutdown(pn_listener_t *l) {
   listener_begin_close(l);
   unlock(&l->context.mutex);
   // pconnection_process will never be called again.  Zero everything.
-  l->context.wake_ops = 0;
+  l->context.wake_pending = 0;
   l->close_dispatched = true;
   l->active_count = 0;
   assert(listener_can_free(l));
@@ -1679,30 +2057,39 @@ static void listener_accept_lh(psocket_t *ps) {
 }
 
 /* Process a listening socket */
-static pn_event_batch_t *listener_process(psocket_t *ps, uint32_t events) {
+static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool 
wake) {
   // TODO: some parallelization of the accept mechanism.
-  pn_listener_t *l = psocket_listener(ps);
-  acceptor_t *a = psocket_acceptor(ps);
+//  pn_listener_t *l = psocket_listener(ps);
+//  acceptor_t *a = psocket_acceptor(ps);
+
   lock(&l->context.mutex);
-  if (events) {
-    a->armed = false;
-    if (l->context.closing) {
-      lock(&l->rearm_mutex);
-      stop_polling(&ps->epoll_io, ps->proactor->epollfd);
-      unlock(&l->rearm_mutex);
-      close(ps->sockfd);
-      ps->sockfd = -1;
-      l->active_count--;
-    }
-    else {
-      if (events & EPOLLRDHUP) {
-        /* Calls listener_begin_close which closes all the listener's sockets 
*/
-        psocket_error(ps, errno, "listener epoll");
-      } else if (!l->context.closing && events & EPOLLIN) {
-        listener_accept_lh(ps);
+  if (n_events) {
+    for (size_t i = 0; i < l->acceptors_size; i++) {
+      psocket_t *ps = &l->acceptors[i].psocket;
+      if (ps->working_io_events) {
+        uint32_t events = ps->working_io_events;
+        ps->working_io_events = 0;
+        l->acceptors[i].armed = false;
+        if (l->context.closing) {
+          lock(&l->rearm_mutex);
+          stop_polling(&ps->epoll_io, ps->proactor->epollfd);
+          unlock(&l->rearm_mutex);
+          close(ps->sockfd);
+          ps->sockfd = -1;
+          l->active_count--;
+        }
+        else {
+          if (events & EPOLLRDHUP) {
+            /* Calls listener_begin_close which closes all the listener's 
sockets */
+            psocket_error(ps, errno, "listener epoll");
+          } else if (!l->context.closing && events & EPOLLIN) {
+            listener_accept_lh(ps);
+          }
+        }
       }
     }
-  } else {
+  }
+  if (wake) {
     wake_done(&l->context); // callback accounting
   }
   pn_event_batch_t *lb = NULL;
@@ -1741,17 +2128,46 @@ static pn_event_t *listener_batch_next(pn_event_batch_t 
*batch) {
 }
 
 static void listener_done(pn_listener_t *l) {
+  pn_proactor_t *p = l->context.proactor;
+  tslot_t *ts = l->context.runner;
   bool notify = false;
   lock(&l->context.mutex);
   l->context.working = false;
 
-  if (listener_can_free(l)) {
+  lock(&p->sched_mutex);
+  int n_events = 0;
+  for (size_t i = 0; i < l->acceptors_size; i++) {
+    psocket_t *ps = &l->acceptors[i].psocket;
+    if (ps->sched_io_events) {
+      ps->working_io_events = ps->sched_io_events;
+      ps->sched_io_events = 0;
+    }
+    if (ps->working_io_events)
+      n_events++;
+  }
+  if (l->context.sched_wake) {
+    l->context.sched_wake = false;
+    wake_done(&l->context);
+  }
+  unlock(&p->sched_mutex);
+
+  if (!n_events && listener_can_free(l)) {
     unlock(&l->context.mutex);
     pn_listener_free(l);
+    lock(&p->sched_mutex);
+    notify = unassign_thread(ts, UNUSED);
+    unlock(&p->sched_mutex);
+    if (notify)
+      wake_notify(&p->context);
     return;
-  } else if (listener_has_event(l))
+  } else if (n_events || listener_has_event(l))
     notify = wake(&l->context);
   unlock(&l->context.mutex);
+
+  lock(&p->sched_mutex);
+  if (unassign_thread(ts, UNUSED))
+    notify = true;
+  unlock(&p->sched_mutex);
   if (notify) wake_notify(&l->context);
 }
 
@@ -1831,35 +2247,65 @@ void pn_listener_accept2(pn_listener_t *l, 
pn_connection_t *c, pn_transport_t *t
 // proactor
 // ========================================================================
 
+// Call with sched_mutex. Alloc calls are expensive but only used when 
thread_count changes.
+static void grow_poller_bufs(pn_proactor_t* p) {
+  // call if p->thread_count > p->thread_capacity
+  assert(p->thread_count == 0 || p->thread_count > p->thread_capacity);
+  do {
+    p->thread_capacity += 8;
+  } while (p->thread_count > p->thread_capacity);
+
+  p->warm_runnables = (pcontext_t **) realloc(p->warm_runnables, 
p->thread_capacity * sizeof(pcontext_t *));
+  p->resume_list = (tslot_t **) realloc(p->resume_list, p->thread_capacity * 
sizeof(tslot_t *));
+
+  int old_cap = p->runnables_capacity;
+  if (p->runnables_capacity == 0)
+    p->runnables_capacity = 16;
+  else if (p->runnables_capacity < p->thread_capacity)
+    p->runnables_capacity = p->thread_capacity;
+  if (p->runnables_capacity != old_cap) {
+    p->runnables = (pcontext_t **) realloc(p->runnables, p->runnables_capacity 
* sizeof(pcontext_t *));
+    p->kevents_capacity = p->runnables_capacity;
+    size_t sz = p->kevents_capacity * sizeof(struct epoll_event);
+    p->kevents = (struct epoll_event *) realloc(p->kevents, sz);
+    memset(p->kevents, 0, sz);
+  }
+}
+
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
-static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+ static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, 
bool always_set) {
   ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
-  ee->wanted = EPOLLIN;
-  ee->polling = false;
-  start_polling(ee, epollfd);  // TODO: check for error
-}
-
-/* Set up the epoll_extended_t to be used for secondary socket events */
-static void epoll_secondary_init(epoll_extended_t *ee, int epoll_fd_2, int 
epollfd) {
-  ee->psocket = NULL;
-  ee->fd = epoll_fd_2;
-  ee->type = CHAINED_EPOLL;
-  ee->wanted = EPOLLIN;
+  if (always_set) {
+    uint64_t increment = 1;
+    if (write(eventfd, &increment, sizeof(uint64_t)) != sizeof(uint64_t))
+      EPOLL_FATAL("setting eventfd", errno);
+    // eventfd is set forever.  No reads, just rearms as needed.
+    ee->wanted = 0;
+  } else {
+    ee->wanted = EPOLLIN;
+  }
   ee->polling = false;
   start_polling(ee, epollfd);  // TODO: check for error
+  if (always_set)
+    ee->wanted = EPOLLIN;      // for all subsequent rearms
 }
 
 pn_proactor_t *pn_proactor() {
+  if (getenv("PNI_EPOLL_NOWARM")) pni_warm_sched = false;
+  if (getenv("PNI_EPOLL_IMMEDIATE")) pni_immediate = true;
+  if (getenv("PNI_EPOLL_SPINS")) pni_spins = atoi(getenv("PNI_EPOLL_SPINS"));
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   if (!p) return NULL;
   p->epollfd = p->eventfd = p->timer.timerfd = -1;
   pcontext_init(&p->context, PROACTOR, p, p);
   pmutex_init(&p->eventfd_mutex);
+  pmutex_init(&p->sched_mutex);
+  pmutex_init(&p->tslot_mutex);
   ptimer_init(&p->timer, 0);
 
-  if ((p->epollfd = epoll_create(1)) >= 0 && (p->epollfd_2 = epoll_create(1)) 
>= 0) {
+  if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
       if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
         if (p->timer.timerfd >= 0)
@@ -1867,20 +2313,24 @@ pn_proactor_t *pn_proactor() {
             p->batch.next_event = &proactor_batch_next;
             start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for 
error
             p->timer_armed = true;
-            epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd);
-            epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd);
-            epoll_secondary_init(&p->epoll_secondary, p->epollfd_2, 
p->epollfd);
+            epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true);
+            epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, 
false);
+            p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
+            grow_poller_bufs(p);
             return p;
           }
       }
     }
   }
   if (p->epollfd >= 0) close(p->epollfd);
-  if (p->epollfd_2 >= 0) close(p->epollfd_2);
   if (p->eventfd >= 0) close(p->eventfd);
   if (p->interruptfd >= 0) close(p->interruptfd);
   ptimer_finalize(&p->timer);
+  pmutex_finalize(&p->tslot_mutex);
+  pmutex_finalize(&p->sched_mutex);
+  pmutex_finalize(&p->eventfd_mutex);
   if (p->collector) pn_free(p->collector);
+  assert(p->thread_count == 0);
   free (p);
   return NULL;
 }
@@ -1890,8 +2340,6 @@ void pn_proactor_free(pn_proactor_t *p) {
   p->shutting_down = true;
   close(p->epollfd);
   p->epollfd = -1;
-  close(p->epollfd_2);
-  p->epollfd_2 = -1;
   close(p->eventfd);
   p->eventfd = -1;
   close(p->interruptfd);
@@ -1913,8 +2361,20 @@ void pn_proactor_free(pn_proactor_t *p) {
   }
 
   pn_collector_free(p->collector);
+  pmutex_finalize(&p->tslot_mutex);
+  pmutex_finalize(&p->sched_mutex);
   pmutex_finalize(&p->eventfd_mutex);
   pcontext_finalize(&p->context);
+  for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = 
pn_hash_next(p->tslot_map, entry)) {
+    tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry);
+    pmutex_finalize(&ts->mutex);
+    free(ts);
+  }
+  pn_free(p->tslot_map);
+  free(p->kevents);
+  free(p->runnables);
+  free(p->warm_runnables);
+  free(p->resume_list);
   free(p);
 }
 
@@ -1968,17 +2428,23 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t 
*batch) {
   return pni_log_event(p, e);
 }
 
-static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t 
event) {
-  bool timer_fired = (event == PN_PROACTOR_TIMEOUT) && 
ptimer_callback(&p->timer) != 0;
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool 
interrupt, bool wake) {
+  bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+  if (interrupt) {
+    (void)read_uint64(p->interruptfd);
+    rearm(p, &p->epoll_interrupt);
+  }
   lock(&p->context.mutex);
-  if (event == PN_PROACTOR_INTERRUPT) {
+  if (interrupt) {
     p->need_interrupt = true;
-  } else if (event == PN_PROACTOR_TIMEOUT) {
+  }
+  if (timeout) {
     p->timer_armed = false;
     if (timer_fired && p->timeout_set) {
       p->need_timeout = true;
     }
-  } else {
+  }
+  if (wake) {
     wake_done(&p->context);
   }
   if (!p->context.working) {       /* Can generate proactor events */
@@ -1996,26 +2462,6 @@ static pn_event_batch_t *proactor_process(pn_proactor_t 
*p, pn_event_type_t even
   return NULL;
 }
 
-static pn_event_batch_t *proactor_chained_epoll_wait(pn_proactor_t *p) {
-  // process one ready pconnection socket event from the secondary/chained 
epollfd_2
-  struct epoll_event ev = {0};
-  int n = epoll_wait(p->epollfd_2, &ev, 1, 0);
-  if (n < 0) {
-    if (errno != EINTR)
-      perror("epoll_wait"); // TODO: proper log
-  } else if (n > 0) {
-    assert(n == 1);
-    rearm(p, &p->epoll_secondary);
-    epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
-    memory_barrier(ee);
-    assert(ee->type == PCONNECTION_IO_2);
-    pconnection_t *pc = psocket_pconnection(ee->psocket);
-    return pconnection_process(pc, ev.events, false, false, true);
-  }
-  rearm(p, &p->epoll_secondary);
-  return NULL;
-}
-
 static void proactor_add(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
   lock(&p->context.mutex);
@@ -2024,6 +2470,7 @@ static void proactor_add(pcontext_t *ctx) {
     ctx->next = p->contexts;
   }
   p->contexts = ctx;
+  p->context_count++;
   unlock(&p->context.mutex);
 }
 
@@ -2031,6 +2478,20 @@ static void proactor_add(pcontext_t *ctx) {
 // return true if safe for caller to free psocket
 static bool proactor_remove(pcontext_t *ctx) {
   pn_proactor_t *p = ctx->proactor;
+  // Disassociate this context from scheduler
+  if (!p->shutting_down) {
+    lock(&p->sched_mutex);
+    ctx->runner->state = DELETING;
+    for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = 
pn_hash_next(p->tslot_map, entry)) {
+      tslot_t *ts = (tslot_t *) pn_hash_value(p->tslot_map, entry);
+      if (ts->context == ctx)
+        ts->context = NULL;
+      if (ts->prev_context == ctx)
+        ts->prev_context = NULL;
+    }
+    unlock(&p->sched_mutex);
+  }
+
   lock(&p->context.mutex);
   bool can_free = true;
   if (ctx->disconnecting) {
@@ -2054,6 +2515,7 @@ static bool proactor_remove(pcontext_t *ctx) {
     if (ctx->next) {
       ctx->next->prev = ctx->prev;
     }
+    p->context_count--;
   }
   bool notify = wake_if_inactive(p);
   unlock(&p->context.mutex);
@@ -2061,81 +2523,449 @@ static bool proactor_remove(pcontext_t *ctx) {
   return can_free;
 }
 
-static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, 
epoll_extended_t *ee) {
-  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
-    (void)read_uint64(p->interruptfd);
-    rearm(p, &p->epoll_interrupt);
-    return proactor_process(p, PN_PROACTOR_INTERRUPT);
+static tslot_t *find_tslot(pn_proactor_t *p) {
+  pthread_t tid = pthread_self();
+  void *v = pn_hash_get(p->tslot_map, (uintptr_t) tid);
+  if (v)
+    return (tslot_t *) v;
+  tslot_t *ts = (tslot_t *) calloc(1, sizeof(tslot_t));
+  ts->state = NEW;
+  pmutex_init(&ts->mutex);
+
+  lock(&p->sched_mutex);
+  // keep important tslot related info thread-safe when holding either the 
sched or tslot mutex
+  p->thread_count++;
+  pn_hash_put(p->tslot_map, (uintptr_t) tid, ts);
+  unlock(&p->sched_mutex);
+  return ts;
+}
+
+// Call with shed_lock held
+// Caller must resume() return value if not null
+static tslot_t *resume_one_thread(pn_proactor_t *p) {
+  // If pn_proactor_get has an early return, we need to resume one suspended 
thread (if any)
+  // to be the new poller.
+
+  tslot_t *ts = p->suspend_list_head;
+  if (ts) {
+    LL_REMOVE(p, suspend_list, ts);
+    p->suspend_list_count--;
+    ts->state = PROCESSING;
+  }
+  return ts;
+}
+
+// Call with sched lock.
+static pn_event_batch_t *process(pcontext_t *ctx) {
+  bool ctx_wake = false;
+  ctx->sched_pending = false;
+  if (ctx->sched_wake) {
+    // update the wake status before releasing the sched_mutex
+    ctx->sched_wake = false;
+    ctx_wake = true;
+  }
+
+  if (ctx->type == PROACTOR) {
+    pn_proactor_t *p = ctx->proactor;
+    bool timeout = p->sched_timeout;
+    if (timeout) p->sched_timeout = false;
+    bool intr = p->sched_interrupt;
+    if (intr) p->sched_interrupt = false;
+    unlock(&p->sched_mutex);
+    return proactor_process(p, timeout, intr, ctx_wake);
+  }
+  pconnection_t *pc = pcontext_pconnection(ctx);
+  if (pc) {
+    uint32_t events = pc->psocket.sched_io_events;
+    if (events) pc->psocket.sched_io_events = 0;
+    bool timeout = pc->sched_timeout;
+    if (timeout) pc->sched_timeout = false;
+    unlock(&ctx->proactor->sched_mutex);
+    return pconnection_process(pc, events, timeout, ctx_wake, false);
+  }
+  pn_listener_t *l = pcontext_listener(ctx);
+  int n_events = 0;
+  for (size_t i = 0; i < l->acceptors_size; i++) {
+    psocket_t *ps = &l->acceptors[i].psocket;
+    if (ps->sched_io_events) {
+      ps->working_io_events = ps->sched_io_events;
+      ps->sched_io_events = 0;
+    }
+    if (ps->working_io_events)
+      n_events++;
   }
-  pcontext_t *ctx = wake_pop_front(p);
-  if (ctx) {
-    switch (ctx->type) {
-     case PROACTOR:
-      return proactor_process(p, PN_EVENT_NONE);
-     case PCONNECTION:
-      return pconnection_process((pconnection_t *) ctx->owner, 0, false, 
false, false);
-     case LISTENER:
-      return listener_process(&((pn_listener_t *) 
ctx->owner)->acceptors[0].psocket, 0);
-     default:
-      assert(ctx->type == WAKEABLE); // TODO: implement or remove
+  unlock(&ctx->proactor->sched_mutex);
+  return listener_process(l, n_events, ctx_wake);
+}
+
+
+// Call with both sched and wake locks
+static void schedule_wake_list(pn_proactor_t *p) {
+  // append wake_list_first..wake_list_last to end of sched_wake_last
+  if (p->wake_list_first) {
+    if (p->sched_wake_last)
+      p->sched_wake_last->wake_next = p->wake_list_first;  // join them
+    if (!p->sched_wake_first)
+      p->sched_wake_first = p->wake_list_first;
+    p->sched_wake_last = p->wake_list_last;
+    if (!p->sched_wake_current)
+      p->sched_wake_current = p->sched_wake_first;
+    p->wake_list_first = p->wake_list_last = NULL;
+  }
+}
+
+// Call with schedule lock held.  Called only by poller thread.
+static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) {
+  epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr;
+  pcontext_t *ctx = NULL;
+
+  if (ee->type == WAKE) {
+    if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
+      p->sched_interrupt = true;
+      ctx = &p->context;
+      ctx->sched_pending = true;
+    } else {
+      // main eventfd wake
+      lock(&p->eventfd_mutex);
+      schedule_wake_list(p);
+      ctx = p->sched_wake_current;
+      unlock(&p->eventfd_mutex);
+    }
+  } else if (ee->type == PROACTOR_TIMER) {
+    p->sched_timeout = true;
+    ctx = &p->context;
+    ctx->sched_pending = true;
+  } else {
+    pconnection_t *pc = psocket_pconnection(ee->psocket);
+    if (pc) {
+      ctx = &pc->context;
+      if (ee->type == PCONNECTION_IO) {
+        ee->psocket->sched_io_events = evp->events;
+      } else {
+        pc->sched_timeout = true;;
+      }
+      ctx->sched_pending = true;
+    }
+    else {
+      pn_listener_t *l = psocket_listener(ee->psocket);
+      assert(l);
+      ctx = &l->context;
+      ee->psocket->sched_io_events = evp->events;
+      ctx->sched_pending = true;
     }
   }
+  if (ctx && !ctx->runnable && !ctx->runner)
+    return ctx;
   return NULL;
 }
 
-static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool 
can_block) {
-  int timeout = can_block ? -1 : 0;
-  while(true) {
-    pn_event_batch_t *batch = NULL;
-    struct epoll_event ev = {0};
-    int n = epoll_wait(p->epollfd, &ev, 1, timeout);
 
-    if (n < 0) {
-      if (errno != EINTR)
-        perror("epoll_wait"); // TODO: proper log
-      if (!can_block)
-        return NULL;
-      else
-        continue;
-    } else if (n == 0) {
-      if (!can_block)
-        return NULL;
-      else {
-        perror("epoll_wait unexpected timeout"); // TODO: proper log
-        continue;
+static pcontext_t *post_wake(pn_proactor_t *p, pcontext_t *ctx) {
+  ctx->sched_wake = true;
+  ctx->sched_pending = true;
+  if (!ctx->runnable && !ctx->runner)
+    return ctx;
+  return NULL;
+}
+
+// call with sched_lock held
+static pcontext_t *next_drain(pn_proactor_t *p, tslot_t *ts) {
+  // This should be called seldomly, best case once per thread removal on 
shutdown.
+  // TODO: how to reduce?  Instrumented near 5 percent of earmarks, 1 in 2000 
calls to do_epoll().
+
+  for (pn_handle_t entry = pn_hash_head(p->tslot_map); entry; entry = 
pn_hash_next(p->tslot_map, entry)) {
+    tslot_t *ts2 = (tslot_t *) pn_hash_value(p->tslot_map, entry);
+    if (ts2->earmarked) {
+      // undo the old assign thread and earmark.  ts2 may never come back
+      pcontext_t *switch_ctx = ts2->context;
+      remove_earmark(ts2);
+      assign_thread(ts, switch_ctx);
+      ts->earmark_override = ts2;
+      ts->earmark_override_gen = ts2->generation;
+      return switch_ctx;
+    }
+  }
+  assert(false);
+  return NULL;
+}
+
+// call with sched_lock held
+static pcontext_t *next_runnable(pn_proactor_t *p, tslot_t *ts) {
+  if (ts->context) {
+    // Already assigned
+    if (ts->earmarked) {
+      ts->earmarked = false;
+      if (--p->earmark_count == 0)
+        p->earmark_drain = false;
+    }
+    return ts->context;
+  }
+
+  // warm pairing ?
+  pcontext_t *ctx = ts->prev_context;
+  if (ctx && (ctx->runnable)) { // or ctx->sched_wake too?
+    assign_thread(ts, ctx);
+    return ctx;
+  }
+
+  // check for an unassigned runnable context or unprocessed wake
+  if (p->n_runnables) {
+    // Any unclaimed runnable?
+    while (p->n_runnables) {
+      ctx = p->runnables[p->next_runnable++];
+      if (p->n_runnables == p->next_runnable)
+        p->n_runnables = 0;
+      if (ctx->runnable) {
+        assign_thread(ts, ctx);
+        return ctx;
       }
     }
-    assert(n == 1);
-    epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
-    memory_barrier(ee);
-
-    if (ee->type == WAKE) {
-      batch = process_inbound_wake(p, ee);
-    } else if (ee->type == PROACTOR_TIMER) {
-      batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
-    } else if (ee->type == CHAINED_EPOLL) {
-      batch = proactor_chained_epoll_wait(p);  // expect a PCONNECTION_IO_2
-    } else {
-      pconnection_t *pc = psocket_pconnection(ee->psocket);
-      if (pc) {
-        if (ee->type == PCONNECTION_IO) {
-          batch = pconnection_process(pc, ev.events, false, false, false);
+  }
+
+  if (p->sched_wake_current) {
+    ctx = p->sched_wake_current;
+    pop_wake(ctx);  // updates sched_wake_current
+    assert(!ctx->runnable && !ctx->runner);
+    assign_thread(ts, ctx);
+    return ctx;
+  }
+
+  if (p->earmark_drain) {
+    ctx = next_drain(p, ts);
+    if (p->earmark_count == 0)
+      p->earmark_drain = false;
+    return ctx;
+  }
+
+  return NULL;
+}
+
+static pn_event_batch_t *proactor_do_epoll(pn_proactor_t* p, bool can_block) {
+  lock(&p->tslot_mutex);
+  tslot_t * ts = find_tslot(p);
+  unlock(&p->tslot_mutex);
+  ts->generation++;  // wrapping OK.  Just looking for any change
+  pn_event_batch_t *batch = NULL;
+
+  lock(&p->sched_mutex);
+  assert(ts->context == NULL || ts->earmarked);
+  assert(ts->state == UNUSED || ts->state == NEW);
+  ts->state = PROCESSING;
+
+  while (true) {
+    // Process outstanding epoll events until we get a batch or need to block.
+
+    pcontext_t *ctx = next_runnable(p, ts);
+    if (ctx) {
+      ts->state = BATCHING;
+      batch = process(ctx);  // unlocks sched_lock before returning
+      if (batch) {
+        return batch;
+      }
+      lock(&p->sched_mutex);
+      bool notify = unassign_thread(ts, PROCESSING);
+      if (notify) {
+        unlock(&p->sched_mutex);
+        wake_notify(&p->context);
+        lock(&p->sched_mutex);
+      }
+      continue;  // Long time may have passed.  Back to beginning.
+    }
+
+    // poll or wait for a runnable context
+    if (p->poller == NULL) {
+      p->poller = ts;
+      // As poller with lots to do, be mindful of hogging the sched lock.  
Release when making kernel calls.
+      assert(p->n_runnables == 0);
+      if (p->thread_count > p->thread_capacity)
+        grow_poller_bufs(p);
+      p->next_runnable = 0;
+      p->n_warm_runnables = 0;
+      p->last_earmark = NULL;
+
+      bool unfinished_earmarks = p->earmark_count > 0;
+      bool new_wakes = false;
+      bool epoll_immediate = unfinished_earmarks || !can_block;
+      assert(!p->sched_wake_first);
+      if (!epoll_immediate) {
+        lock(&p->eventfd_mutex);
+        if (p->wake_list_first) {
+          epoll_immediate = true;
+          new_wakes = true;
         } else {
-          assert(ee->type == PCONNECTION_TIMER);
-          batch = pconnection_process(pc, 0, true, false, false);
+          p->wakes_in_progress = false;
         }
+        unlock(&p->eventfd_mutex);
       }
-      else {
-        // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(ee->psocket, ev.events);
+      int timeout = (epoll_immediate) ? 0 : -1;
+      p->poller_suspended = (timeout == -1);
+      unlock(&p->sched_mutex);
+
+      int n = epoll_wait(p->epollfd, p->kevents, p->kevents_capacity, timeout);
+
+      lock(&p->sched_mutex);
+      p->poller_suspended = false;
+
+      bool unpolled_work = false;
+      if (p->earmark_count > 0) {
+        p->earmark_drain = true;
+        unpolled_work = true;
+      }
+      if (new_wakes) {
+        lock(&p->eventfd_mutex);
+        schedule_wake_list(p);
+        unlock(&p->eventfd_mutex);
+        unpolled_work = true;
       }
-    }
 
-    if (batch) return batch;
-    // No Proton event generated.  epoll_wait() again.
-  }
+      if (n < 0) {
+        if (errno != EINTR)
+          perror("epoll_wait"); // TODO: proper log
+        if (!can_block && !unpolled_work) {
+          p->poller = NULL;
+          tslot_t *res_ts = resume_one_thread(p);
+          ts->state = UNUSED;
+          unlock(&p->sched_mutex);
+          if (res_ts) resume(p, res_ts);
+          return NULL;
+        }
+        else {
+          p->poller = NULL;
+          continue;
+        }
+      } else if (n == 0) {
+        if (!can_block && !unpolled_work) {
+          p->poller = NULL;
+          tslot_t *res_ts = resume_one_thread(p);
+          ts->state = UNUSED;
+          unlock(&p->sched_mutex);
+          if (res_ts) resume(p, res_ts);
+          return NULL;
+        }
+        else {
+          if (!epoll_immediate)
+            perror("epoll_wait unexpected timeout"); // TODO: proper log
+          if (!unpolled_work) {
+            p->poller = NULL;
+            continue;
+          }
+        }
+      }
+
+      for (int i = 0; i < n; i++) {
+        ctx = post_event(p, &p->kevents[i]);
+        if (ctx)
+          make_runnable(ctx);
+      }
+      if (n > 0)
+        memset(p->kevents, 0, sizeof(struct epoll_event) * n);
+
+      // The list of pending wakes can be very long.  Traverse part of it 
looking for warm pairings.
+      pcontext_t *wctx = p->sched_wake_current;
+      int max_runnables = p->runnables_capacity;
+      while (wctx && p->n_runnables < max_runnables) {
+        if (wctx->runner == REWAKE_PLACEHOLDER)
+          wctx->runner = NULL;  // Allow context to run again.
+        ctx = post_wake(p, wctx);
+        if (ctx)
+          make_runnable(ctx);
+        pop_wake(wctx);
+        wctx = wctx->wake_next;
+      }
+      p->sched_wake_current = wctx;
+      // More wakes than places on the runnables list
+      while (wctx) {
+        if (wctx->runner == REWAKE_PLACEHOLDER)
+          wctx->runner = NULL;  // Allow context to run again.
+        wctx->sched_wake = true;
+        wctx->sched_pending = true;
+        if (wctx->runnable || wctx->runner)
+          pop_wake(wctx);
+        wctx = wctx->wake_next;
+      }
+
+      if (pni_immediate && !ts->context) {
+        // Poller gets to run if possible
+        pcontext_t *pctx;
+        if (p->n_runnables) {
+          assert(p->next_runnable == 0);
+          pctx = p->runnables[0];
+          if (++p->next_runnable == p->n_runnables)
+            p->n_runnables = 0;
+        } else if (p->n_warm_runnables) {
+          pctx = p->warm_runnables[--p->n_warm_runnables];
+          tslot_t *ts2 = pctx->runner;
+          ts2->prev_context = ts2->context = NULL;
+          pctx->runner = NULL;
+        } else if (p->last_earmark) {
+          pctx = p->last_earmark->context;
+          remove_earmark(p->last_earmark);
+          if (p->earmark_count == 0)
+            p->earmark_drain = false;
+        } else {
+          pctx = NULL;
+        }
+        if (pctx) {
+          assign_thread(ts, pctx);
+        }
+      }
+
+      // Create a list of available threads to put to work.
+
+      int resume_list_count = 0;
+      for (int i = 0; i < p->n_warm_runnables ; i++) {
+        ctx = p->warm_runnables[i];
+        tslot_t *tsp = ctx->runner;
+        if (tsp->state == SUSPENDED) {
+          p->resume_list[resume_list_count++] = tsp;
+          LL_REMOVE(p, suspend_list, tsp);
+          p->suspend_list_count--;
+          tsp->state = PROCESSING;
+        }
+      }
+
+      int can_use = p->suspend_list_count;
+      if (!ts->context)
+        can_use++;
+      // Run as many unpaired runnable contexts as possible and allow for a 
new poller.
+      int new_runners = pn_min(p->n_runnables + 1, can_use);
+      if (!ts->context)
+        new_runners--;  // poller available and does not need resume
+
+      // Rare corner case on startup.  New inbound threads can make the 
suspend_list too big for resume list.
+      new_runners = pn_min(new_runners, p->thread_capacity - 
resume_list_count);
+
+      for (int i = 0; i < new_runners; i++) {
+        tslot_t *tsp = p->suspend_list_head;
+        assert(tsp);
+        p->resume_list[resume_list_count++] = tsp;
+        LL_REMOVE(p, suspend_list, tsp);
+        p->suspend_list_count--;
+        tsp->state = PROCESSING;
+      }
+
+      p->poller = NULL;
+      // New poller may run concurrently.  Touch only this thread's stack for 
rest of block.
+
+      if (resume_list_count) {
+        unlock(&p->sched_mutex);
+        for (int i = 0; i < resume_list_count; i++) {
+          resume(p, p->resume_list[i]);
+        }
+        lock(&p->sched_mutex);
+      }
+    } else if (!can_block) {
+      ts->state = UNUSED;
+      unlock(&p->sched_mutex);
+      return NULL;
+    } else {
+      // TODO: loop while !poller_suspended, since new work coming
+      suspend(p, ts);
+    }
+  } // while
 }
 
+
 pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   return proactor_do_epoll(p, true);
 }
@@ -2144,21 +2974,68 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* 
p) {
   return proactor_do_epoll(p, false);
 }
 
+// Call with no locks
+static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) {
+  
+  if (!ts || !ts->earmark_override)
+    return;
+  if (ts->earmark_override->generation == ts->earmark_override_gen) {
+    // Other (overridden) thread not seen since this thread started and 
finished the event batch.
+    // Thread is perhaps gone forever, which may leave us short of a poller 
thread
+    lock(&p->sched_mutex);
+    tslot_t *res_ts = resume_one_thread(p);
+    unlock(&p->sched_mutex);
+    if (res_ts) resume(p, res_ts);
+  }
+  ts->earmark_override = NULL;
+}
+
 void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (pc) {
+    tslot_t *ts = pc->context.runner;
     pconnection_done(pc);
+    // pc possibly freed/invalid
+    check_earmark_override(p, ts);
     return;
   }
   pn_listener_t *l = batch_listener(batch);
   if (l) {
+    tslot_t *ts = l->context.runner;
     listener_done(l);
+    // l possibly freed/invalid
+    check_earmark_override(p, ts);
     return;
   }
   pn_proactor_t *bp = batch_proactor(batch);
   if (bp == p) {
     bool notify = false;
+    bool rearm_interrupt = false;
     lock(&p->context.mutex);
+    lock(&p->sched_mutex);
+
+    bool timeout = p->sched_timeout;
+    if (timeout) p->sched_timeout = false;
+    bool intr = p->sched_interrupt;
+    if (intr) {
+      p->sched_interrupt = false;
+      rearm_interrupt = true;
+      p->need_interrupt = true;
+    }
+    if (p->context.sched_wake) {
+      p->context.sched_wake = false;
+      wake_done(&p->context);
+    }
+
+    // ptimer_callback is slow.  Revisit timer cancel code in light of change 
to single poller thread.
+    bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+    if (timeout) {
+      p->timer_armed = false;
+      if (timer_fired && p->timeout_set) {
+        p->need_timeout = true;
+      }
+    }
+
     bool rearm_timer = !p->timer_armed && !p->shutting_down;
     p->timer_armed = true;
     p->context.working = false;
@@ -2171,11 +3048,20 @@ void pn_proactor_done(pn_proactor_t *p, 
pn_event_batch_t *batch) {
     if (proactor_has_event(p))
       if (wake(&p->context))
         notify = true;
+    tslot_t *ts = p->context.runner;
+    if (unassign_thread(ts, UNUSED))
+      notify = true;
+    unlock(&p->sched_mutex);
     unlock(&p->context.mutex);
     if (notify)
       wake_notify(&p->context);
     if (rearm_timer)
       rearm(p, &p->timer.epoll_io);
+    if (rearm_interrupt) {
+      (void)read_uint64(p->interruptfd);
+      rearm(p, &p->epoll_interrupt);
+    }
+    check_earmark_override(p, ts);
     return;
   }
 }
@@ -2232,6 +3118,7 @@ void pn_proactor_disconnect(pn_proactor_t *p, 
pn_condition_t *cond) {
     ctx->disconnect_ops = 2;   // Second pass below and proactor_remove(), in 
any order.
     p->disconnects_pending++;
     ctx = ctx->next;
+    p->context_count--;
   }
   notify = wake_if_inactive(p);
   unlock(&p->context.mutex);
diff --git a/c/tests/proactor_test.cpp b/c/tests/proactor_test.cpp
index 3e4365a..aa0ab57 100644
--- a/c/tests/proactor_test.cpp
+++ b/c/tests/proactor_test.cpp
@@ -173,7 +173,7 @@ TEST_CASE("proactor_connection_wake") {
 
   REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
   REQUIRE_RUN(p, PN_CONNECTION_REMOTE_OPEN);
-  CHECK(!pn_proactor_get(p)); /* Should be idle */
+  while (p.flush().first != 0);
   pn_connection_wake(c);
   REQUIRE_RUN(p, PN_CONNECTION_WAKE);
   REQUIRE_RUN(p, PN_TRANSPORT_CLOSED);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to