This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new d31ad96  PROTON-1496: epoll proactor - improved timers implementation 
with single timerfd kernel resource
d31ad96 is described below

commit d31ad9652a1a63f856ed10772e362b4a155ecbf4
Author: Cliff Jansen <cliffjan...@apache.org>
AuthorDate: Sun Nov 8 11:56:58 2020 -0800

    PROTON-1496: epoll proactor - improved timers implementation with single 
timerfd kernel resource
---
 c/CMakeLists.txt                      |   2 +-
 c/src/proactor/epoll-internal.h       |  62 ++++--
 c/src/proactor/epoll.c                | 338 ++++++++----------------------
 c/src/proactor/epoll_raw_connection.c |   1 +
 c/src/proactor/epoll_timer.c          | 380 ++++++++++++++++++++++++++++++++++
 5 files changed, 510 insertions(+), 273 deletions(-)

diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt
index 2967176..99e328b 100644
--- a/c/CMakeLists.txt
+++ b/c/CMakeLists.txt
@@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT 
BUILD_PROACTOR))
   check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL)
   if (HAVE_EPOLL)
     set (PROACTOR_OK epoll)
-    set (qpid-proton-proactor src/proactor/epoll.c 
src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common})
+    set (qpid-proton-proactor src/proactor/epoll.c 
src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c 
${qpid-proton-proactor-common})
     set (PROACTOR_LIBS Threads::Threads ${TIME_LIB})
   endif()
 endif()
diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 1b8edd3..b14b485 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -54,14 +54,14 @@ extern "C" {
 typedef struct acceptor_t acceptor_t;
 typedef struct tslot_t tslot_t;
 typedef pthread_mutex_t pmutex;
+typedef struct pni_timer_t pni_timer_t;
 
 typedef enum {
   WAKE,   /* see if any work to do in proactor/psocket context */
-  PCONNECTION_IO,
-  PCONNECTION_TIMER,
   LISTENER_IO,
-  PROACTOR_TIMER,
-  RAW_CONNECTION_IO
+  PCONNECTION_IO,
+  RAW_CONNECTION_IO,
+  TIMER
 } epoll_type_t;
 
 // Data to use with epoll.
@@ -73,19 +73,12 @@ typedef struct epoll_extended_t {
   pmutex barrier_mutex;
 } epoll_extended_t;
 
-typedef struct ptimer_t {
-  pmutex mutex;
-  epoll_extended_t epoll_io;
-  bool timer_active;
-  bool in_doubt;  // 0 or 1 callbacks are possible
-  bool shutting_down;
-} ptimer_t;
-
 typedef enum {
   PROACTOR,
   PCONNECTION,
   LISTENER,
-  RAW_CONNECTION
+  RAW_CONNECTION,
+  TIMER_MANAGER
 } pcontext_type_t;
 
 typedef struct pcontext_t {
@@ -137,13 +130,24 @@ struct tslot_t {
   unsigned int earmark_override_gen;
 };
 
+typedef struct pni_timer_manager_t {
+  pcontext_t context;
+  epoll_extended_t epoll_timer;
+  pmutex deletion_mutex;
+  pni_timer_t *proactor_timer;
+  pn_list_t *timers_heap;
+  uint64_t timerfd_deadline;
+  bool sched_timeout;
+} pni_timer_manager_t;
+
 struct pn_proactor_t {
   pcontext_t context;
-  ptimer_t timer;
+  pni_timer_manager_t timer_manager;
   epoll_extended_t epoll_wake;
   epoll_extended_t epoll_interrupt;
   pn_event_batch_t batch;
   pcontext_t *contexts;         /* track in-use contexts for 
PN_PROACTOR_INACTIVE and disconnect */
+  pni_timer_t *timer;
   size_t disconnects_pending;   /* unfinished proactor disconnects*/
   // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next 
update_batch()
   bool need_interrupt;
@@ -151,7 +155,6 @@ struct pn_proactor_t {
   bool need_timeout;
   bool timeout_set; /* timeout has been set by user and not yet cancelled or 
generated event */
   bool timeout_processed;  /* timeout event dispatched in the most recent 
event batch */
-  bool timer_armed; /* timer is armed in epoll */
   int context_count;
 
   // wake subsystem
@@ -167,7 +170,6 @@ struct pn_proactor_t {
   pmutex overflow_mutex;
 
   // Sched vars specific to proactor context.
-  bool sched_timeout;
   bool sched_interrupt;
 
   // Global scheduling/poller vars.
@@ -220,13 +222,12 @@ typedef struct psocket_t {
 typedef struct pconnection_t {
   psocket_t psocket;
   pcontext_t context;
-  ptimer_t timer;  // TODO: review one timerfd per connection
+  pni_timer_t *timer;
   const char *host, *port;
   uint32_t new_events;
   int wake_count; // TODO: protected by context.mutex so should be moved in 
there (also really bool)
   bool server;                /* accept, not connect */
   bool tick_pending;
-  bool timer_armed;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
   pn_condition_t *disconnect_condition;
   // Following values only changed by (sole) working context:
@@ -250,7 +251,7 @@ typedef struct pconnection_t {
   struct addrinfo *ai;               /* Current connect address */
   pmutex rearm_mutex;                /* protects pconnection_rearm from out of 
order arming*/
   bool io_doublecheck;               /* callbacks made and new IO may have 
arrived */
-  bool sched_timeout;
+  uint64_t expected_timeout;
   char addr_buf[1];
 } pconnection_t;
 
@@ -299,6 +300,16 @@ struct pn_listener_t {
 typedef char strerrorbuf[1024];      /* used for pstrerror message buffer */
 void pstrerror(int err, strerrorbuf msg);
 
+/* Internal error, no recovery */
+#define EPOLL_FATAL(EXPR, SYSERRNO)                                     \
+  do {                                                                  \
+    strerrorbuf msg;                                                    \
+    pstrerror((SYSERRNO), msg);                                         \
+    fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n",        \
+            __FILE__, __LINE__ , #EXPR, msg);                           \
+    abort();                                                            \
+  } while (0)
+
 // In general all locks to be held singly and shortly (possibly as spin locks).
 // See above about lock ordering.
 
@@ -338,6 +349,10 @@ bool proactor_remove(pcontext_t *ctx);
 bool unassign_thread(tslot_t *ts, tslot_state new_state);
 
 void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p);
+static void pcontext_finalize(pcontext_t* ctx) {
+  pmutex_finalize(&ctx->mutex);
+}
+
 bool wake(pcontext_t *ctx);
 void wake_notify(pcontext_t *ctx);
 void wake_done(pcontext_t *ctx);
@@ -360,6 +375,15 @@ pcontext_t *pni_raw_connection_context(praw_connection_t 
*rc);
 praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch);
 void pni_raw_connection_done(praw_connection_t *rc);
 
+pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c);
+void pni_timer_free(pni_timer_t *timer);
+void pni_timer_set(pni_timer_t *timer, uint64_t deadline);
+bool pni_timer_manager_init(pni_timer_manager_t *tm);
+void pni_timer_manager_finalize(pni_timer_manager_t *tm);
+pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool 
timeout, bool wake);
+void pni_pconnection_timeout(pconnection_t *pc);
+void pni_proactor_timeout(pn_proactor_t *p);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 4c00d47..a2d0864 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -27,7 +27,7 @@
 
  A serialized grouping of Proton events is a context (connection, listener, 
proactor).
  Each has multiple pollable fds that make it schedulable.  E.g. a connection 
could have a
- socket fd, timerfd, and (indirect) eventfd all signaled in a single 
epoll_wait().
+ socket fd, (indirect) timerfd, and (indirect) eventfd all signaled in a 
single epoll_wait().
 
  At the conclusion of each
       N = epoll_wait(..., N_MAX, timeout)
@@ -80,7 +80,6 @@
 
 #include <errno.h>
 #include <pthread.h>
-#include <sys/timerfd.h>
 #include <sys/epoll.h>
 #include <unistd.h>
 #include <sys/socket.h>
@@ -94,8 +93,7 @@
 
 #include "./netaddr-internal.h" /* Include after socket/inet headers */
 
-// TODO: replace timerfd per connection with global lightweight timer 
mechanism.
-// logging in general
+// TODO: logging in general
 // SIGPIPE?
 // Can some of the mutexes be spinlocks (any benefit over adaptive pthread 
mutex)?
 //   Maybe futex is even better?
@@ -112,20 +110,6 @@ void pstrerror(int err, strerrorbuf msg) {
   if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err);
 }
 
-/* Internal error, no recovery */
-#define EPOLL_FATAL(EXPR, SYSERRNO)                                     \
-  do {                                                                  \
-    strerrorbuf msg;                                                    \
-    pstrerror((SYSERRNO), msg);                                         \
-    fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n",        \
-            __FILE__, __LINE__ , #EXPR, msg);                           \
-    abort();                                                            \
-  } while (0)
-
-// ========================================================================
-// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to 
taste.
-// ========================================================================
-
 /* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
    writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
    visible to epoll_wait() thread. This function creates a memory barrier,
@@ -137,116 +121,16 @@ static void memory_barrier(epoll_extended_t *ee) {
   unlock(&ee->barrier_mutex);
 }
 
-/*
- * This timerfd logic assumes EPOLLONESHOT and there never being two
- * active timeout callbacks.  There can be multiple (or zero)
- * unclaimed expiries processed in a single callback.
- *
- * timerfd_set() documentation implies a crisp relationship between
- * timer expiry count and oldt's return value, but a return value of
- * zero is ambiguous.  It can lead to no EPOLLIN, EPOLLIN + expected
- * read, or
- *
- *   event expiry (in kernel) -> EPOLLIN
- *   cancel/settime(0) (thread A) (number of expiries resets to zero)
- *   read(timerfd) -> -1, EAGAIN  (thread B servicing epoll event)
- *
- * The original implementation with counters to track expiry counts
- * was abandoned in favor of "in doubt" transitions and resolution
- * at shutdown.
- *
- * TODO: review above in light of single poller thread.
- */
-
-static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
-  pmutex_init(&pt->mutex);
-  pt->timer_active = false;
-  pt->in_doubt = false;
-  pt->shutting_down = false;
-  epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
-  pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
-  pt->epoll_io.type = type;
-  pt->epoll_io.wanted = EPOLLIN;
-  pt->epoll_io.polling = false;
-  return (pt->epoll_io.fd >= 0);
-}
-
-// Call with ptimer lock held
-static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
-  struct itimerspec newt, oldt;
-  memset(&newt, 0, sizeof(newt));
-  newt.it_value.tv_sec = t_millis / 1000;
-  newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
-
-  timerfd_settime(pt->epoll_io.fd, 0, &newt, &oldt);
-  if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec 
== 0) {
-    // EPOLLIN is possible but not assured
-    pt->in_doubt = true;
-  }
-  pt->timer_active = t_millis != 0;
-}
-
-static void ptimer_set(ptimer_t *pt, uint64_t t_millis) {
-  // t_millis == 0 -> cancel
-  lock(&pt->mutex);
-  if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) {
-    unlock(&pt->mutex);
-    return;  // nothing to do
-  }
-  ptimer_set_lh(pt, t_millis);
-  unlock(&pt->mutex);
-}
-
-/* Read from a timer or event FD */
+/* Read from an event FD */
 static uint64_t read_uint64(int fd) {
   uint64_t result = 0;
   ssize_t n = read(fd, &result, sizeof(result));
   if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) {
-    EPOLL_FATAL("timerfd or eventfd read error", errno);
+    EPOLL_FATAL("eventfd read error", errno);
   }
   return result;
 }
 
-// Callback bookkeeping. Return true if there is an expired timer.
-static bool ptimer_callback(ptimer_t *pt) {
-  lock(&pt->mutex);
-  struct itimerspec current;
-  if (timerfd_gettime(pt->epoll_io.fd, &current) == 0) {
-    if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
-      pt->timer_active = false;
-  }
-  uint64_t u_exp_count = read_uint64(pt->epoll_io.fd);
-  if (!pt->timer_active) {
-    // Expiry counter just cleared, timer not set, timerfd not armed
-    pt->in_doubt = false;
-  }
-  unlock(&pt->mutex);
-  return u_exp_count > 0;
-}
-
-// Return true if timerfd has and will have no pollable expiries in the 
current armed state
-static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) {
-  lock(&pt->mutex);
-  if (currently_armed) {
-    ptimer_set_lh(pt, 0);
-    pt->shutting_down = true;
-    if (pt->in_doubt)
-      // Force at least one callback.  If two, second cannot proceed with 
unarmed timerfd.
-      ptimer_set_lh(pt, 1);
-  }
-  else
-    pt->shutting_down = true;
-  bool rv = !pt->in_doubt;
-  unlock(&pt->mutex);
-  return rv;
-}
-
-static void ptimer_finalize(ptimer_t *pt) {
-  if (pt->epoll_io.fd >= 0) close(pt->epoll_io.fd);
-  pmutex_finalize(&pt->mutex);
-}
-
-
 // ========================================================================
 // Proactor common code
 // ========================================================================
@@ -339,10 +223,6 @@ void pcontext_init(pcontext_t *ctx, pcontext_type_t t, 
pn_proactor_t *p) {
   ctx->type = t;
 }
 
-static void pcontext_finalize(pcontext_t* ctx) {
-  pmutex_finalize(&ctx->mutex);
-}
-
 /*
  * Wake strategy with eventfd.
  *  - wakees can be in the list only once
@@ -688,7 +568,7 @@ static void set_pconnection(pn_connection_t* c, 
pconnection_t *pc) {
   unlock(&driver_ptr_mutex);
 }
 
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool wake, bool topup);
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool sched_wake, bool topup);
 static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
 static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool 
can_block);
@@ -853,6 +733,11 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
     free(pc);
     return "pn_connection_driver_init failure";
   }
+  if (!(pc->timer = pni_timer(&p->timer_manager, pc))) {
+    free(pc);
+    return "connection timer creation failure";
+  }
+
 
   pcontext_init(&pc->context, PCONNECTION, p);
   psocket_init(&pc->psocket, p, PCONNECTION_IO);
@@ -860,7 +745,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   pc->new_events = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
-  pc->timer_armed = false;
   pc->queued_disconnect = false;
   pc->disconnect_condition = NULL;
 
@@ -880,10 +764,6 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
     pn_transport_set_server(pc->driver.transport);
   }
 
-  if (!ptimer_init(&pc->timer, &pc->psocket)) {
-    psocket_error(&pc->psocket, errno, "timer setup");
-    pc->disconnected = true;    /* Already failed */
-  }
   pmutex_init(&pc->rearm_mutex);
 
   /* Set the pconnection_t backpointer last.
@@ -895,10 +775,10 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   return NULL;
 }
 
-// Call with lock held and closing == true (i.e. 
pn_connection_driver_finished() == true), timer cancelled.
+// Call with lock held and closing == true (i.e. 
pn_connection_driver_finished() == true), no pending timer.
 // Return true when all possible outstanding epoll events associated with this 
pconnection have been processed.
 static inline bool pconnection_is_final(pconnection_t *pc) {
-  return !pc->current_arm && !pc->timer_armed && !pc->context.wake_pending;
+  return !pc->current_arm && !pc->context.wake_pending && !pc->tick_pending;
 }
 
 static void pconnection_final_free(pconnection_t *pc) {
@@ -915,6 +795,7 @@ static void pconnection_final_free(pconnection_t *pc) {
   pn_condition_free(pc->disconnect_condition);
   pn_connection_driver_destroy(&pc->driver);
   pcontext_finalize(&pc->context);
+  pni_timer_free(pc->timer);
   free(pc);
 }
 
@@ -927,12 +808,6 @@ static void pconnection_cleanup(pconnection_t *pc) {
   if (fd != -1)
     pclosefd(pc->psocket.proactor, fd);
 
-  fd = pc->timer.epoll_io.fd;
-  stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
-  if (fd != -1)
-    pclosefd(pc->psocket.proactor, fd);
-  ptimer_finalize(&pc->timer);
-
   lock(&pc->context.mutex);
   bool can_free = proactor_remove(&pc->context);
   unlock(&pc->context.mutex);
@@ -960,19 +835,12 @@ static void ensure_wbuf(pconnection_t *pc) {
 static void pconnection_begin_close(pconnection_t *pc) {
   if (!pc->context.closing) {
     pc->context.closing = true;
+    pc->tick_pending = false;
     if (pc->current_arm) {
       // Force EPOLLHUP callback(s)
       shutdown(pc->psocket.epoll_io.fd, SHUT_RDWR);
     }
-
     pn_connection_driver_close(&pc->driver);
-    if (ptimer_shutdown(&pc->timer, pc->timer_armed))
-      pc->timer_armed = false;  // disarmed in the sense that the timer will 
never fire again
-    else if (!pc->timer_armed) {
-      // In doubt.  One last callback to collect
-      rearm(pc->psocket.proactor, &pc->timer.epoll_io);
-      pc->timer_armed = true;
-    }
   }
 }
 
@@ -982,13 +850,30 @@ static void pconnection_forced_shutdown(pconnection_t 
*pc) {
   pc->new_events = 0;
   pconnection_begin_close(pc);
   // pconnection_process will never be called again.  Zero everything.
-  pc->timer_armed = false;
   pc->context.wake_pending = 0;
   pn_collector_release(pc->driver.collector);
   assert(pconnection_is_final(pc));
   pconnection_cleanup(pc);
 }
 
+// Called from timer_manager with no locks.
+void pni_pconnection_timeout(pconnection_t  *pc) {
+  bool notify = false;
+  uint64_t now = pn_proactor_now_64();
+  lock(&pc->context.mutex);
+  if (!pc->context.closing) {
+    // confirm no simultaneous timeout change from another thread.
+    if (pc->expected_timeout && now >= pc->expected_timeout) {
+      pc->tick_pending = true;
+      pc->expected_timeout = 0;
+      notify = wake(&pc->context);
+    }
+  }
+  unlock(&pc->context.mutex);
+  if (notify)
+    wake_notify(&pc->context);
+}
+
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   if (!pc->driver.connection) return NULL;
@@ -1001,14 +886,14 @@ static pn_event_t 
*pconnection_batch_next(pn_event_batch_t *batch) {
     unlock(&p->sched_mutex);
     if (idle_threads && !pc->write_blocked && !pc->read_blocked) {
       write_flush(pc);  // May generate transport event
-      pconnection_process(pc, 0, false, false, true);
+      pconnection_process(pc, 0, false, true);
       e = pn_connection_driver_next_event(&pc->driver);
     }
     else {
       write_flush(pc);  // May generate transport event
       e = pn_connection_driver_next_event(&pc->driver);
       if (!e && pc->hog_count < HOG_MAX) {
-        pconnection_process(pc, 0, false, false, true);
+        pconnection_process(pc, 0, false, true);
         e = pn_connection_driver_next_event(&pc->driver);
       }
     }
@@ -1017,7 +902,6 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t 
*batch) {
     pc->output_drained = false;
     pc->current_event_type = pn_event_type(e);
   }
-
   return e;
 }
 
@@ -1030,15 +914,6 @@ static inline bool pconnection_wclosed(pconnection_t  
*pc) {
   return pn_connection_driver_write_closed(&pc->driver);
 }
 
-// Call with pc context locked.
-static void pconnection_rearm_timer(pconnection_t *pc) {
-  if (!pc->timer_armed && !pc->timer.shutting_down &&
-      pc->timer.epoll_io.fd >= 0 && pc->timer.epoll_io.polling) {
-    pc->timer_armed = true;
-    rearm(pc->psocket.proactor, &pc->timer.epoll_io);
-  }
-}
-
 /* Call only from working context (no competitor for pc->current_arm or
    connection driver).  If true returned, caller must do
    pconnection_rearm().
@@ -1050,7 +925,7 @@ static void pconnection_rearm_timer(pconnection_t *pc) {
 */
 static int pconnection_rearm_check(pconnection_t *pc) {
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
-    return 0;;
+    return 0;
   }
   uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? 
EPOLLIN : 0;
   if (!pconnection_wclosed(pc)) {
@@ -1077,16 +952,9 @@ static inline void pconnection_rearm(pconnection_t *pc, 
int wanted_now) {
 
 /* Only call when context switch is imminent.  Sched lock is highly contested. 
*/
 // Call with both context and sched locks.
-static bool pconnection_sched_sync(pconnection_t *pc, bool *timerfd_fired) {
+static bool pconnection_sched_sync(pconnection_t *pc) {
   uint32_t sync_events = 0;
-  uint32_t sync_args = 0;
-  *timerfd_fired = false;
-  if (pc->sched_timeout) {
-    *timerfd_fired = true;;
-    pc->timer_armed = false;
-    pc->sched_timeout = false;
-    sync_args |= (1 << 1);
-  }
+  uint32_t sync_args = pc->tick_pending << 1;
   if (pc->psocket.sched_io_events) {
     pc->new_events = pc->psocket.sched_io_events;
     pc->psocket.sched_io_events = 0;
@@ -1102,7 +970,7 @@ static bool pconnection_sched_sync(pconnection_t *pc, bool 
*timerfd_fired) {
 
   if (sync_args || sync_events) {
     // Only replace if poller has found new work for us.
-    pc->process_args = sync_args;
+    pc->process_args = (1 << 2) | sync_args;
     pc->process_events = sync_events;
   }
 
@@ -1132,14 +1000,10 @@ static void pconnection_done(pconnection_t *pc) {
                                 // working context while the lock is held.  
Need sched_sync too to drain possible stale wake.
   pc->hog_count = 0;
   bool has_event = pconnection_has_event(pc);
-  bool timerfd_fired;
   // Do as little as possible while holding the sched lock
   lock(&p->sched_mutex);
-  pconnection_sched_sync(pc, &timerfd_fired);
+  pconnection_sched_sync(pc);
   unlock(&p->sched_mutex);
-  if (timerfd_fired)
-    if (ptimer_callback(&pc->timer) != 0)
-      pc->tick_pending = true;
 
   if (has_event || pconnection_work_pending(pc)) {
     self_wake = true;
@@ -1160,11 +1024,10 @@ static void pconnection_done(pconnection_t *pc) {
   if (self_wake)
     notify = wake(&pc->context);
 
-  pconnection_rearm_timer(pc);
   int wanted = pconnection_rearm_check(pc);
   unlock(&pc->context.mutex);
 
-  if (wanted) pconnection_rearm(pc, wanted);  // May free pc on another 
thread.  Return.
+  if (wanted) pconnection_rearm(pc, wanted);  // May free pc on another 
thread.  Return without touching pc again.
   lock(&p->sched_mutex);
   if (unassign_thread(ts, UNUSED))
     notify = true;
@@ -1242,37 +1105,22 @@ static void write_flush(pconnection_t *pc) {
 static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
 
-static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool sched_wake, bool topup) {
-  bool rearm_timer = false;
-  bool timer_fired = false;
+static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool sched_wake, bool topup) {
   bool waking = false;
   bool tick_required = false;
   bool immediate_write = false;
-  if (!topup) {
+  lock(&pc->context.mutex);
+  if (!topup) { // Save some state in case of crash investigation.
     pc->process_events = events;
-    pc->process_args = (timeout << 1) | sched_wake;
-  }
-  // Don't touch data exclusive to working thread (yet).
-  if (timeout) {
-    rearm_timer = true;
-    timer_fired = ptimer_callback(&pc->timer) != 0;
+    pc->process_args = (pc->tick_pending << 1) | sched_wake;
   }
-  lock(&pc->context.mutex);
-
   if (events) {
     pc->new_events = events;
     pc->current_arm = 0;
     events = 0;
   }
-  if (timer_fired) {
-    pc->tick_pending = true;
-    timer_fired = false;
-  }
   if (sched_wake) wake_done(&pc->context);
 
-  if (rearm_timer)
-    pc->timer_armed = false;
-
   if (topup) {
     // Only called by the batch owner.  Does not loop, just "tops up"
     // once.  May be back depending on hog_count.
@@ -1406,13 +1254,9 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   }
 
   // Never stop working while work remains.  hog_count exception to this rule 
is elsewhere.
-  bool timerfd_fired;
   lock(&pc->context.proactor->sched_mutex);
-  bool workers_free = pconnection_sched_sync(pc, &timerfd_fired);
+  bool workers_free = pconnection_sched_sync(pc);
   unlock(&pc->context.proactor->sched_mutex);
-  if (timerfd_fired)
-    if (ptimer_callback(&pc->timer) != 0)
-      pc->tick_pending = true;
 
   if (pconnection_work_pending(pc)) {
     goto retry;  // TODO: get rid of goto without adding more locking
@@ -1438,7 +1282,6 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     goto retry;
   }
 
-  pconnection_rearm_timer(pc);
   int wanted = pconnection_rearm_check(pc);  // holds rearm_mutex until 
pconnection_rearm() below
 
   unlock(&pc->context.mutex);
@@ -1472,9 +1315,6 @@ void pconnection_connected_lh(pconnection_t *pc) {
 /* multi-address connections may call pconnection_start multiple times with 
diffferent FDs  */
 static void pconnection_start(pconnection_t *pc, int fd) {
   int efd = pc->psocket.proactor->epollfd;
-  /* Start timer, a no-op if the timer has already started. */
-  start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
-
   /* Get the local socket name now, get the peer name in pconnection_connected 
*/
   socklen_t len = sizeof(pc->local.ss);
   (void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len);
@@ -1586,11 +1426,13 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
 static void pconnection_tick(pconnection_t *pc) {
   pn_transport_t *t = pc->driver.transport;
   if (pn_transport_get_idle_timeout(t) || 
pn_transport_get_remote_idle_timeout(t)) {
-    ptimer_set(&pc->timer, 0);
     uint64_t now = pn_proactor_now_64();
     uint64_t next = pn_transport_tick(t, now);
     if (next) {
-      ptimer_set(&pc->timer, next - now);
+      lock(&pc->context.mutex);
+      pc->expected_timeout = next;
+      unlock(&pc->context.mutex);
+      pni_timer_set(pc->timer, next);
     }
   }
 }
@@ -2105,16 +1947,14 @@ pn_proactor_t *pn_proactor() {
   pmutex_init(&p->eventfd_mutex);
   pmutex_init(&p->sched_mutex);
   pmutex_init(&p->tslot_mutex);
-  ptimer_init(&p->timer, 0);
 
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
       if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
-        if (p->timer.epoll_io.fd >= 0)
+        if (pni_timer_manager_init(&p->timer_manager))
           if ((p->collector = pn_collector()) != NULL) {
             p->batch.next_event = &proactor_batch_next;
-            start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for 
error
-            p->timer_armed = true;
+            start_polling(&p->timer_manager.epoll_timer, p->epollfd);  // 
TODO: check for error
             epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true);
             epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, 
false);
             p->tslot_map = pn_hash(PN_VOID, 0, 0.75);
@@ -2127,7 +1967,7 @@ pn_proactor_t *pn_proactor() {
   if (p->epollfd >= 0) close(p->epollfd);
   if (p->eventfd >= 0) close(p->eventfd);
   if (p->interruptfd >= 0) close(p->interruptfd);
-  ptimer_finalize(&p->timer);
+  pni_timer_manager_finalize(&p->timer_manager);
   pmutex_finalize(&p->tslot_mutex);
   pmutex_finalize(&p->sched_mutex);
   pmutex_finalize(&p->eventfd_mutex);
@@ -2146,7 +1986,7 @@ void pn_proactor_free(pn_proactor_t *p) {
   p->eventfd = -1;
   close(p->interruptfd);
   p->interruptfd = -1;
-  ptimer_finalize(&p->timer);
+  pni_timer_manager_finalize(&p->timer_manager);
   while (p->contexts) {
     pcontext_t *ctx = p->contexts;
     p->contexts = ctx->next;
@@ -2233,8 +2073,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t 
*batch) {
   return pni_log_event(p, e);
 }
 
-static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool 
interrupt, bool wake) {
-  bool timer_fired = timeout && ptimer_callback(&p->timer) != 0;
+static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, 
bool wake) {
   if (interrupt) {
     (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
@@ -2243,12 +2082,6 @@ static pn_event_batch_t *proactor_process(pn_proactor_t 
*p, bool timeout, bool i
   if (interrupt) {
     p->need_interrupt = true;
   }
-  if (timeout) {
-    p->timer_armed = false;
-    if (timer_fired && p->timeout_set) {
-      p->need_timeout = true;
-    }
-  }
   if (wake) {
     wake_done(&p->context);
   }
@@ -2259,11 +2092,7 @@ static pn_event_batch_t *proactor_process(pn_proactor_t 
*p, bool timeout, bool i
       return &p->batch;
     }
   }
-  bool rearm_timer = !p->timer_armed && !p->timer.shutting_down;
-  p->timer_armed = true;
   unlock(&p->context.mutex);
-  if (rearm_timer)
-    rearm(p, &p->timer.epoll_io);
   return NULL;
 }
 
@@ -2373,22 +2202,18 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
   pn_event_batch_t* batch = NULL;
   switch (ctx->type) {
   case PROACTOR: {
-    bool timeout = p->sched_timeout;
-    if (timeout) p->sched_timeout = false;
     bool intr = p->sched_interrupt;
     if (intr) p->sched_interrupt = false;
     unlock(&p->sched_mutex);
-    batch = proactor_process(p, timeout, intr, ctx_wake);
+    batch = proactor_process(p, intr, ctx_wake);
     break;
   }
   case PCONNECTION: {
     pconnection_t *pc = pcontext_pconnection(ctx);
     uint32_t events = pc->psocket.sched_io_events;
     if (events) pc->psocket.sched_io_events = 0;
-    bool timeout = pc->sched_timeout;
-    if (timeout) pc->sched_timeout = false;
     unlock(&p->sched_mutex);
-    batch = pconnection_process(pc, events, timeout, ctx_wake, false);
+    batch = pconnection_process(pc, events, ctx_wake, false);
     break;
   }
   case LISTENER: {
@@ -2412,6 +2237,14 @@ static pn_event_batch_t *process(pcontext_t *ctx) {
     batch = pni_raw_connection_process(ctx, ctx_wake);
     break;
   }
+  case TIMER_MANAGER: {
+    pni_timer_manager_t *tm = &p->timer_manager;
+    bool timeout = tm->sched_timeout;
+    if (timeout) tm->sched_timeout = false;
+    unlock(&p->sched_mutex);
+    batch = pni_timer_manager_process(tm, timeout, ctx_wake);
+    break;
+  }
   default:
     assert(NULL);
   }
@@ -2454,21 +2287,6 @@ static pcontext_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
       unlock(&p->eventfd_mutex);
     }
     break;
-
-  case PROACTOR_TIMER:
-    p->sched_timeout = true;
-    ctx = &p->context;
-    ctx->sched_pending = true;
-    break;
-
-  case PCONNECTION_TIMER: {
-    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), 
pconnection_t, timer);
-    assert(pc);
-    ctx = &pc->context;
-    pc->sched_timeout = true;;
-    ctx->sched_pending = true;
-    break;
-  }
   case PCONNECTION_IO: {
     psocket_t *ps = containerof(ee, psocket_t, epoll_io);
     pconnection_t *pc = psocket_pconnection(ps);
@@ -2494,6 +2312,13 @@ static pcontext_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
     ctx->sched_pending = true;
     break;
   }
+  case TIMER: {
+    pni_timer_manager_t *tm = &p->timer_manager;
+    ctx = &tm->context;
+    tm->sched_timeout = true;
+    ctx->sched_pending = true;
+    break;
+  }
   }
   if (ctx && !ctx->runnable && !ctx->runner)
     return ctx;
@@ -2880,8 +2705,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t 
*batch) {
   if (bp == p) {
     bool notify = false;
     lock(&p->context.mutex);
-    bool rearm_timer = !p->timer_armed && !p->shutting_down;
-    p->timer_armed = true;
     p->context.working = false;
     if (p->timeout_processed) {
       p->timeout_processed = false;
@@ -2902,8 +2725,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t 
*batch) {
 
     if (notify)
       wake_notify(&p->context);
-    if (rearm_timer)
-      rearm(p, &p->timer.epoll_io);
     check_earmark_override(p, ts);
     return;
   }
@@ -2922,11 +2743,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, 
pn_millis_t t) {
   lock(&p->context.mutex);
   p->timeout_set = true;
   if (t == 0) {
-    ptimer_set(&p->timer, 0);
+    pni_timer_set(p->timer, 0);
     p->need_timeout = true;
     notify = wake(&p->context);
   } else {
-    ptimer_set(&p->timer, t);
+    pni_timer_set(p->timer, t + pn_proactor_now_64());
   }
   unlock(&p->context.mutex);
   if (notify) wake_notify(&p->context);
@@ -2936,12 +2757,23 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   lock(&p->context.mutex);
   p->timeout_set = false;
   p->need_timeout = false;
-  ptimer_set(&p->timer, 0);
+  pni_timer_set(p->timer, 0);
   bool notify = wake_if_inactive(p);
   unlock(&p->context.mutex);
   if (notify) wake_notify(&p->context);
 }
 
+void pni_proactor_timeout(pn_proactor_t *p) {
+  bool notify = false;
+  lock(&p->context.mutex);
+  if (!p->context.closing) {
+    p->need_timeout = true;
+    notify = wake(&p->context);
+  }
+  unlock(&p->context.mutex);
+  if (notify) wake_notify(&p->context);
+}
+
 pn_proactor_t *pn_connection_proactor(pn_connection_t* c) {
   pconnection_t *pc = get_pconnection(c);
   return pc ? pc->psocket.proactor : NULL;
diff --git a/c/src/proactor/epoll_raw_connection.c 
b/c/src/proactor/epoll_raw_connection.c
index 06d24eb..6f3e971 100644
--- a/c/src/proactor/epoll_raw_connection.c
+++ b/c/src/proactor/epoll_raw_connection.c
@@ -160,6 +160,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) 
{
   bool can_free = proactor_remove(&prc->context);
   unlock(&prc->context.mutex);
   if (can_free) {
+    pcontext_finalize(&prc->context);
     free(prc);
   }
   // else proactor_disconnect logic owns prc and its final free
diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c
new file mode 100644
index 0000000..58f0211
--- /dev/null
+++ b/c/src/proactor/epoll_timer.c
@@ -0,0 +1,380 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "epoll-internal.h"
+#include "core/util.h"
+#include <assert.h>
+#include <string.h>
+#include <errno.h>
+#include <unistd.h>
+#include <sys/timerfd.h>
+#include <sys/epoll.h>
+
+/*
+ * Epoll proactor subsystem for timers.
+ *
+ * Two types of timers: (1) connection timers, one per connection, active if 
at least one of the peers has set a heartbeat,
+ * timers move forward in time (but see replace_timer_deadline() note below), 
latency not critical; (2) a single proactor
+ * timer, can move forwards or backwards, can be canceled.
+ *
+ * A single timerfd is shared by all the timers.  Connection timers are 
tracked on a heap ordered list.  The proactor timer is
+ * tracked separately.  The next timerfd_deadline is the earliest of all the 
timers, in this case the earliest of the first
+ * connection timer and the poactor timer.
+ *
+ * If a connection timer is changed to a later time, it is not moved.  It is 
kept in place but marked with the new deadline.  On
+ * expiry, depending on whether the deadline was extended, the decision is 
made to either generate a timeout or replace the
+ * timer on the ordered list.
+ *
+ * When a timerfd read event is generated, the proactor invokes 
pni_timer_manager_process() to generate timeouts for each
+ * expired timer and to do housekeeping on the rest.
+ *
+ * replace_timer_deadline(): a connection timer can go backwards in time at 
most once if: both peers have heartbeats and the
+ * second AMQP open frame results in a shorter periodic transport timer than 
the first open frame.  In this case, the
+ * existing timer_deadline is immediately orphaned and a new one created for 
the rest of the connection's life.
+ *
+ * Lock ordering: tm->context_mutex --> tm->deletion_mutex.
+ */
+
+static void timerfd_set(int fd, uint64_t t_millis) {
+  // t_millis == 0 -> cancel
+  struct itimerspec newt;
+  memset(&newt, 0, sizeof(newt));
+  newt.it_value.tv_sec = t_millis / 1000;
+  newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
+  timerfd_settime(fd, 0, &newt, NULL);
+}
+
+static void timerfd_drain(int fd) {
+  // Forget any old expired timers and only trigger an epoll read event for a 
subsequent expiry.
+  uint64_t result = 0;
+  ssize_t n = read(fd, &result, sizeof(result));
+  if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) {
+    EPOLL_FATAL("timerfd read error", errno);
+  }
+}
+
+// Struct to manage the ordering of timers on the heap ordered list and manage 
the lifecycle if
+// the parent timer is self-deleting.
+typedef struct timer_deadline_t {
+  uint64_t list_deadline;      // Heap ordering deadline.  Must not change 
while on list.
+  pni_timer_t *timer;          // Parent timer.  NULL means orphaned and to be 
deleted.
+  bool resequenced;            // An out-of-order connection timeout caught 
and handled.
+} timer_deadline_t;
+
+static void timer_deadline_initialize(void *object) {
+  timer_deadline_t *td = (timer_deadline_t *) object;
+  memset(td, 0 , sizeof(*td));
+}
+
+static void timer_deadline_finalize(void *object) {
+  assert(((timer_deadline_t *) object)->list_deadline == 0);
+}
+
+static intptr_t timer_deadline_compare(void *oa, void *ob) {
+  timer_deadline_t *a = (timer_deadline_t *) oa;
+  timer_deadline_t *b = (timer_deadline_t *) ob;
+  return a->list_deadline - b->list_deadline;
+}
+
+#define timer_deadline_inspect NULL
+#define timer_deadline_hashcode NULL
+#define CID_timer_deadline CID_pn_void
+
+static timer_deadline_t* pni_timer_deadline(void) {
+  static const pn_class_t timer_deadline_clazz = PN_CLASS(timer_deadline);
+  return (timer_deadline_t *) pn_class_new(&timer_deadline_clazz, 
sizeof(timer_deadline_t));
+}
+
+
+struct pni_timer_t {
+  uint64_t deadline;
+  timer_deadline_t *timer_deadline;
+  pni_timer_manager_t *manager;
+  pconnection_t *connection;
+};
+
+pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) {
+  timer_deadline_t *td = NULL;
+  pni_timer_t *timer = NULL;
+  assert(c || !tm->context.proactor->timer);  // Proactor timer.  Can only be 
one.
+  timer = (pni_timer_t *) malloc(sizeof(pni_timer_t));
+  if (!timer) return NULL;
+  if (c) {
+    // Connections are tracked on the timer_heap.  Allocate the tracking 
struct.
+    td = pni_timer_deadline();
+    if (!td) {
+      free(timer);
+      return NULL;
+    }
+  }
+
+  lock(&tm->context.mutex);
+  timer->connection = c;
+  timer->manager = tm;
+  timer->timer_deadline = td;
+  timer->deadline = 0;
+  if (c)
+    td->timer = timer;
+  unlock(&tm->context.mutex);
+  return timer;
+}
+
+// Call with no locks.
+void pni_timer_free(pni_timer_t *timer) {
+  timer_deadline_t *td = timer->timer_deadline;
+  bool can_free_td = false;
+  if (td) pni_timer_set(timer, 0);
+  pni_timer_manager_t *tm = timer->manager;
+  lock(&tm->context.mutex);
+  lock(&tm->deletion_mutex);
+  if (td) {
+    if (td->list_deadline)
+      td->timer = NULL;  // Orphan.  timer_manager does eventual pn_free() in 
process().
+    else
+      can_free_td = true;
+  }
+  unlock(&tm->deletion_mutex);
+  unlock(&tm->context.mutex);
+  if (can_free_td) {
+    pn_free(td);
+  }
+  free(timer);
+}
+
+static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, 
pni_timer_t *timer);
+
+// Return true if initialization succeeds.  Called once at proactor creation.
+bool pni_timer_manager_init(pni_timer_manager_t *tm) {
+  tm->epoll_timer.fd = -1;
+  tm->timerfd_deadline = 0;
+  tm->timers_heap = NULL;
+  tm->proactor_timer = NULL;
+  pn_proactor_t *p = containerof(tm, pn_proactor_t, timer_manager);
+  pcontext_init(&tm->context, TIMER_MANAGER, p);
+  pmutex_init(&tm->deletion_mutex);
+
+  // PN_VOID turns off ref counting for the elements in the list.
+  tm->timers_heap = pn_list(PN_VOID, 0);
+  if (!tm->timers_heap)
+    return false;
+  tm->proactor_timer = pni_timer(tm, NULL);
+  if (!tm->proactor_timer)
+    return false;
+
+  p->timer = tm->proactor_timer;
+  epoll_extended_t *ee = &tm->epoll_timer;
+  ee->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+  ee->type = TIMER;
+  ee->wanted = EPOLLIN;
+  ee->polling = false;
+  return (ee->fd >= 0);
+}
+
+// Only call from proactor's destructor, when it is single threaded and 
scheduling has stopped.
+void pni_timer_manager_finalize(pni_timer_manager_t *tm) {
+  lock(&tm->context.mutex);
+  unlock(&tm->context.mutex);  // Memory barrier
+  if (tm->epoll_timer.fd >= 0) close(tm->epoll_timer.fd);
+  pni_timer_free(tm->proactor_timer);
+  if (tm->timers_heap) {
+    size_t sz = pn_list_size(tm->timers_heap);
+    // On teardown there is no need to preserve the heap.  Traverse the list 
ignoring minpop().
+    for (size_t idx = 0; idx < sz; idx++) {
+      timer_deadline_t *td = (timer_deadline_t *) pn_list_get(tm->timers_heap, 
idx);
+      td->list_deadline = 0;
+      pn_free(td);
+    }
+    pn_free(tm->timers_heap);
+  }
+  pmutex_finalize(&tm->deletion_mutex);
+  pcontext_finalize(&tm->context);
+}
+
+// Call with timer_manager lock held.  Return true if wake_notify required.
+static bool adjust_deadline(pni_timer_manager_t *tm) {
+  // Make sure the timer_manager context will get a timeout in time for the 
earliest connection timeout.
+  if (tm->context.working)
+    return false;  // timer_manager context will adjust the timer when it 
stops working
+  bool notify = false;
+  uint64_t new_deadline = tm->proactor_timer->deadline;
+  if (pn_list_size(tm->timers_heap)) {
+    // First element of timers_heap has earliest deadline on the heap.
+    timer_deadline_t *heap0 = (timer_deadline_t *) 
pn_list_get(tm->timers_heap, 0);
+    assert(heap0->list_deadline != 0);
+    new_deadline = new_deadline ? pn_min(new_deadline, heap0->list_deadline) : 
heap0->list_deadline;
+  }
+  // Only change target deadline if new_deadline is in future but earlier than 
old timerfd_deadline.
+  if (new_deadline) {
+    if (tm->timerfd_deadline == 0 || new_deadline < tm->timerfd_deadline) {
+      uint64_t now = pn_proactor_now_64();
+      if (new_deadline <= now) {
+        // no need for a timer update.  Wake the timer_manager.
+        notify = wake(&tm->context);
+      }
+      else {
+        timerfd_set(tm->epoll_timer.fd, new_deadline - now);
+        tm->timerfd_deadline = new_deadline;
+      }
+    }
+  }
+  return notify;
+}
+
+// Call without context lock or timer_manager lock.
+// Calls for connection timers are generated in the proactor and serialized 
per connection.
+// Calls for the proactor timer can come from arbitrary user threads.
+void pni_timer_set(pni_timer_t *timer, uint64_t deadline) {
+  pni_timer_manager_t *tm = timer->manager;
+  bool notify = false;
+
+  lock(&tm->context.mutex);
+  if (deadline == timer->deadline) {
+    unlock(&tm->context.mutex);
+    return;  // No change.
+  }
+
+  if (timer == tm->proactor_timer) {
+    assert(!timer->connection);
+    timer->deadline = deadline;
+  } else {
+    // Connection
+    timer_deadline_t *td = timer->timer_deadline;
+    // A connection timer can go backwards at most once.  Check here.
+    if (deadline && td->list_deadline && deadline < td->list_deadline) {
+      if (td->resequenced)
+        EPOLL_FATAL("idle timeout sequencing error", 0);  //
+      else {
+        // replace drops the lock for malloc.  Safe because there can be no 
competing call to 
+        // the timer set function by the same pconnection from another thread.
+        td = replace_timer_deadline(tm, timer);
+      }
+    }
+
+    timer->deadline = deadline;
+    // Put on list if not already there.
+    if (deadline && !td->list_deadline) {
+      td->list_deadline = deadline;
+      pn_list_minpush(tm->timers_heap, td);
+    }
+  }
+
+  // Skip a cancelled timer (deadline == 0) since it doesn't change the 
timerfd deadline.
+  if (deadline)
+    notify = adjust_deadline(tm);
+  unlock(&tm->context.mutex);
+
+  if (notify)
+    wake_notify(&tm->context);
+}
+
+pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool 
timeout, bool wake) {
+  uint64_t now = pn_proactor_now_64();
+  tm->context.working = true;
+  if (timeout)
+    tm->timerfd_deadline = 0;
+  if (wake)
+    wake_done(&tm->context);
+
+  // First check for proactor timer expiry.
+  uint64_t deadline = tm->proactor_timer->deadline;
+  if (deadline && deadline <= now) {
+    tm->proactor_timer->deadline = 0;
+    unlock(&tm->context.mutex);
+    pni_proactor_timeout(tm->context.proactor);
+    lock(&tm->context.mutex);
+    // If lower latency desired for the proactor timer, we could convert to 
the proactor context (if not working) and return
+    // here with the event batch, and wake the timer manager context to 
process the connection timers.
+  }
+
+  // Next, find all expired connection timers at front of the ordered heap.
+  while (pn_list_size(tm->timers_heap)) {
+    timer_deadline_t *td = (timer_deadline_t *) pn_list_get(tm->timers_heap, 
0);
+    if (td->list_deadline > now)
+      break;
+
+    // Expired. Remove from list.
+    timer_deadline_t *min = (timer_deadline_t *) 
pn_list_minpop(tm->timers_heap);
+    assert (min == td);
+    min->list_deadline = 0;
+
+    // Three possibilities to act on:
+    //   timer expired -> pni_connection_timeout()
+    //   timer deadline extended -> minpush back on list to new spot
+    //   timer freed -> free the associated timer_deadline popped off the list
+    if (!td->timer) {
+      unlock(&tm->context.mutex);
+      pn_free(td);
+      lock(&tm->context.mutex);
+    } else {
+      uint64_t deadline = td->timer->deadline;
+      if (deadline && deadline <= now) {
+        td->timer->deadline = 0;
+        pconnection_t *pc = td->timer->connection;
+        lock(&tm->deletion_mutex);     // Prevent connection from deleting 
itself when tm->context.mutex dropped.
+        unlock(&tm->context.mutex);
+        pni_pconnection_timeout(pc);
+        unlock(&tm->deletion_mutex);
+        lock(&tm->context.mutex);
+      } else {
+        td->list_deadline = td->timer->deadline;
+        pn_list_minpush(tm->timers_heap, td);
+      }
+    }
+  }
+
+  if (timeout) {
+    // TODO: query whether perf gain by doing these system calls outside the 
lock, perhaps with additional set_reset_mutex.
+    timerfd_drain(tm->epoll_timer.fd);
+    rearm_polling(&tm->epoll_timer, tm->context.proactor->epollfd);
+  }
+  tm->context.working = false;  // must be false for adjust_deadline to do 
adjustment
+  bool notify = adjust_deadline(tm);
+  unlock(&tm->context.mutex);
+
+  if (notify)
+    wake_notify(&tm->context);
+  // The timer_manager never has events to batch.
+  return NULL;
+  // TODO: perhaps become context of one of the timed out timers (if otherwise 
idle) and process() that context.
+}
+
+// Call with timer_manager lock held.  
+// There can be no competing call to this and timer_set() from the same 
connection.
+static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, 
pni_timer_t *timer) {
+  assert(timer->connection);
+  timer_deadline_t *old_td = timer->timer_deadline;
+  assert(old_td);
+  // Mark old struct for deletion.  No parent timer.
+  old_td->timer = NULL;
+
+  unlock(&tm->context.mutex);
+  // Create replacement timer for life of connection.
+  timer_deadline_t *new_td = pni_timer_deadline();
+  if (!new_td)
+    EPOLL_FATAL("replacement timer deadline allocation", errno);
+  lock(&tm->context.mutex);
+
+  new_td->list_deadline = 0;
+  new_td->timer = timer;
+  new_td->resequenced = true;
+  timer->timer_deadline = new_td;
+  return new_td;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to