This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/master by this push: new d31ad96 PROTON-1496: epoll proactor - improved timers implementation with single timerfd kernel resource d31ad96 is described below commit d31ad9652a1a63f856ed10772e362b4a155ecbf4 Author: Cliff Jansen <cliffjan...@apache.org> AuthorDate: Sun Nov 8 11:56:58 2020 -0800 PROTON-1496: epoll proactor - improved timers implementation with single timerfd kernel resource --- c/CMakeLists.txt | 2 +- c/src/proactor/epoll-internal.h | 62 ++++-- c/src/proactor/epoll.c | 338 ++++++++---------------------- c/src/proactor/epoll_raw_connection.c | 1 + c/src/proactor/epoll_timer.c | 380 ++++++++++++++++++++++++++++++++++ 5 files changed, 510 insertions(+), 273 deletions(-) diff --git a/c/CMakeLists.txt b/c/CMakeLists.txt index 2967176..99e328b 100644 --- a/c/CMakeLists.txt +++ b/c/CMakeLists.txt @@ -340,7 +340,7 @@ if (PROACTOR STREQUAL "epoll" OR (NOT PROACTOR AND NOT BUILD_PROACTOR)) check_symbol_exists(epoll_wait "sys/epoll.h" HAVE_EPOLL) if (HAVE_EPOLL) set (PROACTOR_OK epoll) - set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c ${qpid-proton-proactor-common}) + set (qpid-proton-proactor src/proactor/epoll.c src/proactor/epoll_raw_connection.c src/proactor/epoll_timer.c ${qpid-proton-proactor-common}) set (PROACTOR_LIBS Threads::Threads ${TIME_LIB}) endif() endif() diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 1b8edd3..b14b485 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -54,14 +54,14 @@ extern "C" { typedef struct acceptor_t acceptor_t; typedef struct tslot_t tslot_t; typedef pthread_mutex_t pmutex; +typedef struct pni_timer_t pni_timer_t; typedef enum { WAKE, /* see if any work to do in proactor/psocket context */ - PCONNECTION_IO, - PCONNECTION_TIMER, LISTENER_IO, - PROACTOR_TIMER, - RAW_CONNECTION_IO + PCONNECTION_IO, + RAW_CONNECTION_IO, + TIMER } epoll_type_t; // Data to use with epoll. @@ -73,19 +73,12 @@ typedef struct epoll_extended_t { pmutex barrier_mutex; } epoll_extended_t; -typedef struct ptimer_t { - pmutex mutex; - epoll_extended_t epoll_io; - bool timer_active; - bool in_doubt; // 0 or 1 callbacks are possible - bool shutting_down; -} ptimer_t; - typedef enum { PROACTOR, PCONNECTION, LISTENER, - RAW_CONNECTION + RAW_CONNECTION, + TIMER_MANAGER } pcontext_type_t; typedef struct pcontext_t { @@ -137,13 +130,24 @@ struct tslot_t { unsigned int earmark_override_gen; }; +typedef struct pni_timer_manager_t { + pcontext_t context; + epoll_extended_t epoll_timer; + pmutex deletion_mutex; + pni_timer_t *proactor_timer; + pn_list_t *timers_heap; + uint64_t timerfd_deadline; + bool sched_timeout; +} pni_timer_manager_t; + struct pn_proactor_t { pcontext_t context; - ptimer_t timer; + pni_timer_manager_t timer_manager; epoll_extended_t epoll_wake; epoll_extended_t epoll_interrupt; pn_event_batch_t batch; pcontext_t *contexts; /* track in-use contexts for PN_PROACTOR_INACTIVE and disconnect */ + pni_timer_t *timer; size_t disconnects_pending; /* unfinished proactor disconnects*/ // need_xxx flags indicate we should generate PN_PROACTOR_XXX on the next update_batch() bool need_interrupt; @@ -151,7 +155,6 @@ struct pn_proactor_t { bool need_timeout; bool timeout_set; /* timeout has been set by user and not yet cancelled or generated event */ bool timeout_processed; /* timeout event dispatched in the most recent event batch */ - bool timer_armed; /* timer is armed in epoll */ int context_count; // wake subsystem @@ -167,7 +170,6 @@ struct pn_proactor_t { pmutex overflow_mutex; // Sched vars specific to proactor context. - bool sched_timeout; bool sched_interrupt; // Global scheduling/poller vars. @@ -220,13 +222,12 @@ typedef struct psocket_t { typedef struct pconnection_t { psocket_t psocket; pcontext_t context; - ptimer_t timer; // TODO: review one timerfd per connection + pni_timer_t *timer; const char *host, *port; uint32_t new_events; int wake_count; // TODO: protected by context.mutex so should be moved in there (also really bool) bool server; /* accept, not connect */ bool tick_pending; - bool timer_armed; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; // Following values only changed by (sole) working context: @@ -250,7 +251,7 @@ typedef struct pconnection_t { struct addrinfo *ai; /* Current connect address */ pmutex rearm_mutex; /* protects pconnection_rearm from out of order arming*/ bool io_doublecheck; /* callbacks made and new IO may have arrived */ - bool sched_timeout; + uint64_t expected_timeout; char addr_buf[1]; } pconnection_t; @@ -299,6 +300,16 @@ struct pn_listener_t { typedef char strerrorbuf[1024]; /* used for pstrerror message buffer */ void pstrerror(int err, strerrorbuf msg); +/* Internal error, no recovery */ +#define EPOLL_FATAL(EXPR, SYSERRNO) \ + do { \ + strerrorbuf msg; \ + pstrerror((SYSERRNO), msg); \ + fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n", \ + __FILE__, __LINE__ , #EXPR, msg); \ + abort(); \ + } while (0) + // In general all locks to be held singly and shortly (possibly as spin locks). // See above about lock ordering. @@ -338,6 +349,10 @@ bool proactor_remove(pcontext_t *ctx); bool unassign_thread(tslot_t *ts, tslot_state new_state); void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p); +static void pcontext_finalize(pcontext_t* ctx) { + pmutex_finalize(&ctx->mutex); +} + bool wake(pcontext_t *ctx); void wake_notify(pcontext_t *ctx); void wake_done(pcontext_t *ctx); @@ -360,6 +375,15 @@ pcontext_t *pni_raw_connection_context(praw_connection_t *rc); praw_connection_t *pni_batch_raw_connection(pn_event_batch_t* batch); void pni_raw_connection_done(praw_connection_t *rc); +pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c); +void pni_timer_free(pni_timer_t *timer); +void pni_timer_set(pni_timer_t *timer, uint64_t deadline); +bool pni_timer_manager_init(pni_timer_manager_t *tm); +void pni_timer_manager_finalize(pni_timer_manager_t *tm); +pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake); +void pni_pconnection_timeout(pconnection_t *pc); +void pni_proactor_timeout(pn_proactor_t *p); + #ifdef __cplusplus } #endif diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 4c00d47..a2d0864 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -27,7 +27,7 @@ A serialized grouping of Proton events is a context (connection, listener, proactor). Each has multiple pollable fds that make it schedulable. E.g. a connection could have a - socket fd, timerfd, and (indirect) eventfd all signaled in a single epoll_wait(). + socket fd, (indirect) timerfd, and (indirect) eventfd all signaled in a single epoll_wait(). At the conclusion of each N = epoll_wait(..., N_MAX, timeout) @@ -80,7 +80,6 @@ #include <errno.h> #include <pthread.h> -#include <sys/timerfd.h> #include <sys/epoll.h> #include <unistd.h> #include <sys/socket.h> @@ -94,8 +93,7 @@ #include "./netaddr-internal.h" /* Include after socket/inet headers */ -// TODO: replace timerfd per connection with global lightweight timer mechanism. -// logging in general +// TODO: logging in general // SIGPIPE? // Can some of the mutexes be spinlocks (any benefit over adaptive pthread mutex)? // Maybe futex is even better? @@ -112,20 +110,6 @@ void pstrerror(int err, strerrorbuf msg) { if (e) snprintf(msg, sizeof(strerrorbuf), "unknown error %d", err); } -/* Internal error, no recovery */ -#define EPOLL_FATAL(EXPR, SYSERRNO) \ - do { \ - strerrorbuf msg; \ - pstrerror((SYSERRNO), msg); \ - fprintf(stderr, "epoll proactor failure in %s:%d: %s: %s\n", \ - __FILE__, __LINE__ , #EXPR, msg); \ - abort(); \ - } while (0) - -// ======================================================================== -// First define a proactor mutex (pmutex) and timer mechanism (ptimer) to taste. -// ======================================================================== - /* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory writes to struct epoll_extended_t in the EPOLL_ADD thread might not be visible to epoll_wait() thread. This function creates a memory barrier, @@ -137,116 +121,16 @@ static void memory_barrier(epoll_extended_t *ee) { unlock(&ee->barrier_mutex); } -/* - * This timerfd logic assumes EPOLLONESHOT and there never being two - * active timeout callbacks. There can be multiple (or zero) - * unclaimed expiries processed in a single callback. - * - * timerfd_set() documentation implies a crisp relationship between - * timer expiry count and oldt's return value, but a return value of - * zero is ambiguous. It can lead to no EPOLLIN, EPOLLIN + expected - * read, or - * - * event expiry (in kernel) -> EPOLLIN - * cancel/settime(0) (thread A) (number of expiries resets to zero) - * read(timerfd) -> -1, EAGAIN (thread B servicing epoll event) - * - * The original implementation with counters to track expiry counts - * was abandoned in favor of "in doubt" transitions and resolution - * at shutdown. - * - * TODO: review above in light of single poller thread. - */ - -static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { - pmutex_init(&pt->mutex); - pt->timer_active = false; - pt->in_doubt = false; - pt->shutting_down = false; - epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER; - pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - pt->epoll_io.type = type; - pt->epoll_io.wanted = EPOLLIN; - pt->epoll_io.polling = false; - return (pt->epoll_io.fd >= 0); -} - -// Call with ptimer lock held -static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) { - struct itimerspec newt, oldt; - memset(&newt, 0, sizeof(newt)); - newt.it_value.tv_sec = t_millis / 1000; - newt.it_value.tv_nsec = (t_millis % 1000) * 1000000; - - timerfd_settime(pt->epoll_io.fd, 0, &newt, &oldt); - if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) { - // EPOLLIN is possible but not assured - pt->in_doubt = true; - } - pt->timer_active = t_millis != 0; -} - -static void ptimer_set(ptimer_t *pt, uint64_t t_millis) { - // t_millis == 0 -> cancel - lock(&pt->mutex); - if ((t_millis == 0 && !pt->timer_active) || pt->shutting_down) { - unlock(&pt->mutex); - return; // nothing to do - } - ptimer_set_lh(pt, t_millis); - unlock(&pt->mutex); -} - -/* Read from a timer or event FD */ +/* Read from an event FD */ static uint64_t read_uint64(int fd) { uint64_t result = 0; ssize_t n = read(fd, &result, sizeof(result)); if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) { - EPOLL_FATAL("timerfd or eventfd read error", errno); + EPOLL_FATAL("eventfd read error", errno); } return result; } -// Callback bookkeeping. Return true if there is an expired timer. -static bool ptimer_callback(ptimer_t *pt) { - lock(&pt->mutex); - struct itimerspec current; - if (timerfd_gettime(pt->epoll_io.fd, ¤t) == 0) { - if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0) - pt->timer_active = false; - } - uint64_t u_exp_count = read_uint64(pt->epoll_io.fd); - if (!pt->timer_active) { - // Expiry counter just cleared, timer not set, timerfd not armed - pt->in_doubt = false; - } - unlock(&pt->mutex); - return u_exp_count > 0; -} - -// Return true if timerfd has and will have no pollable expiries in the current armed state -static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) { - lock(&pt->mutex); - if (currently_armed) { - ptimer_set_lh(pt, 0); - pt->shutting_down = true; - if (pt->in_doubt) - // Force at least one callback. If two, second cannot proceed with unarmed timerfd. - ptimer_set_lh(pt, 1); - } - else - pt->shutting_down = true; - bool rv = !pt->in_doubt; - unlock(&pt->mutex); - return rv; -} - -static void ptimer_finalize(ptimer_t *pt) { - if (pt->epoll_io.fd >= 0) close(pt->epoll_io.fd); - pmutex_finalize(&pt->mutex); -} - - // ======================================================================== // Proactor common code // ======================================================================== @@ -339,10 +223,6 @@ void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t *p) { ctx->type = t; } -static void pcontext_finalize(pcontext_t* ctx) { - pmutex_finalize(&ctx->mutex); -} - /* * Wake strategy with eventfd. * - wakees can be in the list only once @@ -688,7 +568,7 @@ static void set_pconnection(pn_connection_t* c, pconnection_t *pc) { unlock(&driver_ptr_mutex); } -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool wake, bool topup); +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup); static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static bool poller_do_epoll(struct pn_proactor_t* p, tslot_t *ts, bool can_block); @@ -853,6 +733,11 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con free(pc); return "pn_connection_driver_init failure"; } + if (!(pc->timer = pni_timer(&p->timer_manager, pc))) { + free(pc); + return "connection timer creation failure"; + } + pcontext_init(&pc->context, PCONNECTION, p); psocket_init(&pc->psocket, p, PCONNECTION_IO); @@ -860,7 +745,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->new_events = 0; pc->wake_count = 0; pc->tick_pending = false; - pc->timer_armed = false; pc->queued_disconnect = false; pc->disconnect_condition = NULL; @@ -880,10 +764,6 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pn_transport_set_server(pc->driver.transport); } - if (!ptimer_init(&pc->timer, &pc->psocket)) { - psocket_error(&pc->psocket, errno, "timer setup"); - pc->disconnected = true; /* Already failed */ - } pmutex_init(&pc->rearm_mutex); /* Set the pconnection_t backpointer last. @@ -895,10 +775,10 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con return NULL; } -// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled. +// Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), no pending timer. // Return true when all possible outstanding epoll events associated with this pconnection have been processed. static inline bool pconnection_is_final(pconnection_t *pc) { - return !pc->current_arm && !pc->timer_armed && !pc->context.wake_pending; + return !pc->current_arm && !pc->context.wake_pending && !pc->tick_pending; } static void pconnection_final_free(pconnection_t *pc) { @@ -915,6 +795,7 @@ static void pconnection_final_free(pconnection_t *pc) { pn_condition_free(pc->disconnect_condition); pn_connection_driver_destroy(&pc->driver); pcontext_finalize(&pc->context); + pni_timer_free(pc->timer); free(pc); } @@ -927,12 +808,6 @@ static void pconnection_cleanup(pconnection_t *pc) { if (fd != -1) pclosefd(pc->psocket.proactor, fd); - fd = pc->timer.epoll_io.fd; - stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd); - if (fd != -1) - pclosefd(pc->psocket.proactor, fd); - ptimer_finalize(&pc->timer); - lock(&pc->context.mutex); bool can_free = proactor_remove(&pc->context); unlock(&pc->context.mutex); @@ -960,19 +835,12 @@ static void ensure_wbuf(pconnection_t *pc) { static void pconnection_begin_close(pconnection_t *pc) { if (!pc->context.closing) { pc->context.closing = true; + pc->tick_pending = false; if (pc->current_arm) { // Force EPOLLHUP callback(s) shutdown(pc->psocket.epoll_io.fd, SHUT_RDWR); } - pn_connection_driver_close(&pc->driver); - if (ptimer_shutdown(&pc->timer, pc->timer_armed)) - pc->timer_armed = false; // disarmed in the sense that the timer will never fire again - else if (!pc->timer_armed) { - // In doubt. One last callback to collect - rearm(pc->psocket.proactor, &pc->timer.epoll_io); - pc->timer_armed = true; - } } } @@ -982,13 +850,30 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { pc->new_events = 0; pconnection_begin_close(pc); // pconnection_process will never be called again. Zero everything. - pc->timer_armed = false; pc->context.wake_pending = 0; pn_collector_release(pc->driver.collector); assert(pconnection_is_final(pc)); pconnection_cleanup(pc); } +// Called from timer_manager with no locks. +void pni_pconnection_timeout(pconnection_t *pc) { + bool notify = false; + uint64_t now = pn_proactor_now_64(); + lock(&pc->context.mutex); + if (!pc->context.closing) { + // confirm no simultaneous timeout change from another thread. + if (pc->expected_timeout && now >= pc->expected_timeout) { + pc->tick_pending = true; + pc->expected_timeout = 0; + notify = wake(&pc->context); + } + } + unlock(&pc->context.mutex); + if (notify) + wake_notify(&pc->context); +} + static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); if (!pc->driver.connection) return NULL; @@ -1001,14 +886,14 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { unlock(&p->sched_mutex); if (idle_threads && !pc->write_blocked && !pc->read_blocked) { write_flush(pc); // May generate transport event - pconnection_process(pc, 0, false, false, true); + pconnection_process(pc, 0, false, true); e = pn_connection_driver_next_event(&pc->driver); } else { write_flush(pc); // May generate transport event e = pn_connection_driver_next_event(&pc->driver); if (!e && pc->hog_count < HOG_MAX) { - pconnection_process(pc, 0, false, false, true); + pconnection_process(pc, 0, false, true); e = pn_connection_driver_next_event(&pc->driver); } } @@ -1017,7 +902,6 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { pc->output_drained = false; pc->current_event_type = pn_event_type(e); } - return e; } @@ -1030,15 +914,6 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { return pn_connection_driver_write_closed(&pc->driver); } -// Call with pc context locked. -static void pconnection_rearm_timer(pconnection_t *pc) { - if (!pc->timer_armed && !pc->timer.shutting_down && - pc->timer.epoll_io.fd >= 0 && pc->timer.epoll_io.polling) { - pc->timer_armed = true; - rearm(pc->psocket.proactor, &pc->timer.epoll_io); - } -} - /* Call only from working context (no competitor for pc->current_arm or connection driver). If true returned, caller must do pconnection_rearm(). @@ -1050,7 +925,7 @@ static void pconnection_rearm_timer(pconnection_t *pc) { */ static int pconnection_rearm_check(pconnection_t *pc) { if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { - return 0;; + return 0; } uint32_t wanted_now = (pc->read_blocked && !pconnection_rclosed(pc)) ? EPOLLIN : 0; if (!pconnection_wclosed(pc)) { @@ -1077,16 +952,9 @@ static inline void pconnection_rearm(pconnection_t *pc, int wanted_now) { /* Only call when context switch is imminent. Sched lock is highly contested. */ // Call with both context and sched locks. -static bool pconnection_sched_sync(pconnection_t *pc, bool *timerfd_fired) { +static bool pconnection_sched_sync(pconnection_t *pc) { uint32_t sync_events = 0; - uint32_t sync_args = 0; - *timerfd_fired = false; - if (pc->sched_timeout) { - *timerfd_fired = true;; - pc->timer_armed = false; - pc->sched_timeout = false; - sync_args |= (1 << 1); - } + uint32_t sync_args = pc->tick_pending << 1; if (pc->psocket.sched_io_events) { pc->new_events = pc->psocket.sched_io_events; pc->psocket.sched_io_events = 0; @@ -1102,7 +970,7 @@ static bool pconnection_sched_sync(pconnection_t *pc, bool *timerfd_fired) { if (sync_args || sync_events) { // Only replace if poller has found new work for us. - pc->process_args = sync_args; + pc->process_args = (1 << 2) | sync_args; pc->process_events = sync_events; } @@ -1132,14 +1000,10 @@ static void pconnection_done(pconnection_t *pc) { // working context while the lock is held. Need sched_sync too to drain possible stale wake. pc->hog_count = 0; bool has_event = pconnection_has_event(pc); - bool timerfd_fired; // Do as little as possible while holding the sched lock lock(&p->sched_mutex); - pconnection_sched_sync(pc, &timerfd_fired); + pconnection_sched_sync(pc); unlock(&p->sched_mutex); - if (timerfd_fired) - if (ptimer_callback(&pc->timer) != 0) - pc->tick_pending = true; if (has_event || pconnection_work_pending(pc)) { self_wake = true; @@ -1160,11 +1024,10 @@ static void pconnection_done(pconnection_t *pc) { if (self_wake) notify = wake(&pc->context); - pconnection_rearm_timer(pc); int wanted = pconnection_rearm_check(pc); unlock(&pc->context.mutex); - if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return. + if (wanted) pconnection_rearm(pc, wanted); // May free pc on another thread. Return without touching pc again. lock(&p->sched_mutex); if (unassign_thread(ts, UNUSED)) notify = true; @@ -1242,37 +1105,22 @@ static void write_flush(pconnection_t *pc) { static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); -static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool sched_wake, bool topup) { - bool rearm_timer = false; - bool timer_fired = false; +static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_wake, bool topup) { bool waking = false; bool tick_required = false; bool immediate_write = false; - if (!topup) { + lock(&pc->context.mutex); + if (!topup) { // Save some state in case of crash investigation. pc->process_events = events; - pc->process_args = (timeout << 1) | sched_wake; - } - // Don't touch data exclusive to working thread (yet). - if (timeout) { - rearm_timer = true; - timer_fired = ptimer_callback(&pc->timer) != 0; + pc->process_args = (pc->tick_pending << 1) | sched_wake; } - lock(&pc->context.mutex); - if (events) { pc->new_events = events; pc->current_arm = 0; events = 0; } - if (timer_fired) { - pc->tick_pending = true; - timer_fired = false; - } if (sched_wake) wake_done(&pc->context); - if (rearm_timer) - pc->timer_armed = false; - if (topup) { // Only called by the batch owner. Does not loop, just "tops up" // once. May be back depending on hog_count. @@ -1406,13 +1254,9 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } // Never stop working while work remains. hog_count exception to this rule is elsewhere. - bool timerfd_fired; lock(&pc->context.proactor->sched_mutex); - bool workers_free = pconnection_sched_sync(pc, &timerfd_fired); + bool workers_free = pconnection_sched_sync(pc); unlock(&pc->context.proactor->sched_mutex); - if (timerfd_fired) - if (ptimer_callback(&pc->timer) != 0) - pc->tick_pending = true; if (pconnection_work_pending(pc)) { goto retry; // TODO: get rid of goto without adding more locking @@ -1438,7 +1282,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, goto retry; } - pconnection_rearm_timer(pc); int wanted = pconnection_rearm_check(pc); // holds rearm_mutex until pconnection_rearm() below unlock(&pc->context.mutex); @@ -1472,9 +1315,6 @@ void pconnection_connected_lh(pconnection_t *pc) { /* multi-address connections may call pconnection_start multiple times with diffferent FDs */ static void pconnection_start(pconnection_t *pc, int fd) { int efd = pc->psocket.proactor->epollfd; - /* Start timer, a no-op if the timer has already started. */ - start_polling(&pc->timer.epoll_io, efd); // TODO: check for error - /* Get the local socket name now, get the peer name in pconnection_connected */ socklen_t len = sizeof(pc->local.ss); (void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len); @@ -1586,11 +1426,13 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * static void pconnection_tick(pconnection_t *pc) { pn_transport_t *t = pc->driver.transport; if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { - ptimer_set(&pc->timer, 0); uint64_t now = pn_proactor_now_64(); uint64_t next = pn_transport_tick(t, now); if (next) { - ptimer_set(&pc->timer, next - now); + lock(&pc->context.mutex); + pc->expected_timeout = next; + unlock(&pc->context.mutex); + pni_timer_set(pc->timer, next); } } } @@ -2105,16 +1947,14 @@ pn_proactor_t *pn_proactor() { pmutex_init(&p->eventfd_mutex); pmutex_init(&p->sched_mutex); pmutex_init(&p->tslot_mutex); - ptimer_init(&p->timer, 0); if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) { - if (p->timer.epoll_io.fd >= 0) + if (pni_timer_manager_init(&p->timer_manager)) if ((p->collector = pn_collector()) != NULL) { p->batch.next_event = &proactor_batch_next; - start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error - p->timer_armed = true; + start_polling(&p->timer_manager.epoll_timer, p->epollfd); // TODO: check for error epoll_wake_init(&p->epoll_wake, p->eventfd, p->epollfd, true); epoll_wake_init(&p->epoll_interrupt, p->interruptfd, p->epollfd, false); p->tslot_map = pn_hash(PN_VOID, 0, 0.75); @@ -2127,7 +1967,7 @@ pn_proactor_t *pn_proactor() { if (p->epollfd >= 0) close(p->epollfd); if (p->eventfd >= 0) close(p->eventfd); if (p->interruptfd >= 0) close(p->interruptfd); - ptimer_finalize(&p->timer); + pni_timer_manager_finalize(&p->timer_manager); pmutex_finalize(&p->tslot_mutex); pmutex_finalize(&p->sched_mutex); pmutex_finalize(&p->eventfd_mutex); @@ -2146,7 +1986,7 @@ void pn_proactor_free(pn_proactor_t *p) { p->eventfd = -1; close(p->interruptfd); p->interruptfd = -1; - ptimer_finalize(&p->timer); + pni_timer_manager_finalize(&p->timer_manager); while (p->contexts) { pcontext_t *ctx = p->contexts; p->contexts = ctx->next; @@ -2233,8 +2073,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { return pni_log_event(p, e); } -static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool interrupt, bool wake) { - bool timer_fired = timeout && ptimer_callback(&p->timer) != 0; +static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool interrupt, bool wake) { if (interrupt) { (void)read_uint64(p->interruptfd); rearm(p, &p->epoll_interrupt); @@ -2243,12 +2082,6 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool i if (interrupt) { p->need_interrupt = true; } - if (timeout) { - p->timer_armed = false; - if (timer_fired && p->timeout_set) { - p->need_timeout = true; - } - } if (wake) { wake_done(&p->context); } @@ -2259,11 +2092,7 @@ static pn_event_batch_t *proactor_process(pn_proactor_t *p, bool timeout, bool i return &p->batch; } } - bool rearm_timer = !p->timer_armed && !p->timer.shutting_down; - p->timer_armed = true; unlock(&p->context.mutex); - if (rearm_timer) - rearm(p, &p->timer.epoll_io); return NULL; } @@ -2373,22 +2202,18 @@ static pn_event_batch_t *process(pcontext_t *ctx) { pn_event_batch_t* batch = NULL; switch (ctx->type) { case PROACTOR: { - bool timeout = p->sched_timeout; - if (timeout) p->sched_timeout = false; bool intr = p->sched_interrupt; if (intr) p->sched_interrupt = false; unlock(&p->sched_mutex); - batch = proactor_process(p, timeout, intr, ctx_wake); + batch = proactor_process(p, intr, ctx_wake); break; } case PCONNECTION: { pconnection_t *pc = pcontext_pconnection(ctx); uint32_t events = pc->psocket.sched_io_events; if (events) pc->psocket.sched_io_events = 0; - bool timeout = pc->sched_timeout; - if (timeout) pc->sched_timeout = false; unlock(&p->sched_mutex); - batch = pconnection_process(pc, events, timeout, ctx_wake, false); + batch = pconnection_process(pc, events, ctx_wake, false); break; } case LISTENER: { @@ -2412,6 +2237,14 @@ static pn_event_batch_t *process(pcontext_t *ctx) { batch = pni_raw_connection_process(ctx, ctx_wake); break; } + case TIMER_MANAGER: { + pni_timer_manager_t *tm = &p->timer_manager; + bool timeout = tm->sched_timeout; + if (timeout) tm->sched_timeout = false; + unlock(&p->sched_mutex); + batch = pni_timer_manager_process(tm, timeout, ctx_wake); + break; + } default: assert(NULL); } @@ -2454,21 +2287,6 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { unlock(&p->eventfd_mutex); } break; - - case PROACTOR_TIMER: - p->sched_timeout = true; - ctx = &p->context; - ctx->sched_pending = true; - break; - - case PCONNECTION_TIMER: { - pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer); - assert(pc); - ctx = &pc->context; - pc->sched_timeout = true;; - ctx->sched_pending = true; - break; - } case PCONNECTION_IO: { psocket_t *ps = containerof(ee, psocket_t, epoll_io); pconnection_t *pc = psocket_pconnection(ps); @@ -2494,6 +2312,13 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { ctx->sched_pending = true; break; } + case TIMER: { + pni_timer_manager_t *tm = &p->timer_manager; + ctx = &tm->context; + tm->sched_timeout = true; + ctx->sched_pending = true; + break; + } } if (ctx && !ctx->runnable && !ctx->runner) return ctx; @@ -2880,8 +2705,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (bp == p) { bool notify = false; lock(&p->context.mutex); - bool rearm_timer = !p->timer_armed && !p->shutting_down; - p->timer_armed = true; p->context.working = false; if (p->timeout_processed) { p->timeout_processed = false; @@ -2902,8 +2725,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (notify) wake_notify(&p->context); - if (rearm_timer) - rearm(p, &p->timer.epoll_io); check_earmark_override(p, ts); return; } @@ -2922,11 +2743,11 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { lock(&p->context.mutex); p->timeout_set = true; if (t == 0) { - ptimer_set(&p->timer, 0); + pni_timer_set(p->timer, 0); p->need_timeout = true; notify = wake(&p->context); } else { - ptimer_set(&p->timer, t); + pni_timer_set(p->timer, t + pn_proactor_now_64()); } unlock(&p->context.mutex); if (notify) wake_notify(&p->context); @@ -2936,12 +2757,23 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) { lock(&p->context.mutex); p->timeout_set = false; p->need_timeout = false; - ptimer_set(&p->timer, 0); + pni_timer_set(p->timer, 0); bool notify = wake_if_inactive(p); unlock(&p->context.mutex); if (notify) wake_notify(&p->context); } +void pni_proactor_timeout(pn_proactor_t *p) { + bool notify = false; + lock(&p->context.mutex); + if (!p->context.closing) { + p->need_timeout = true; + notify = wake(&p->context); + } + unlock(&p->context.mutex); + if (notify) wake_notify(&p->context); +} + pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { pconnection_t *pc = get_pconnection(c); return pc ? pc->psocket.proactor : NULL; diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index 06d24eb..6f3e971 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -160,6 +160,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) { bool can_free = proactor_remove(&prc->context); unlock(&prc->context.mutex); if (can_free) { + pcontext_finalize(&prc->context); free(prc); } // else proactor_disconnect logic owns prc and its final free diff --git a/c/src/proactor/epoll_timer.c b/c/src/proactor/epoll_timer.c new file mode 100644 index 0000000..58f0211 --- /dev/null +++ b/c/src/proactor/epoll_timer.c @@ -0,0 +1,380 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "epoll-internal.h" +#include "core/util.h" +#include <assert.h> +#include <string.h> +#include <errno.h> +#include <unistd.h> +#include <sys/timerfd.h> +#include <sys/epoll.h> + +/* + * Epoll proactor subsystem for timers. + * + * Two types of timers: (1) connection timers, one per connection, active if at least one of the peers has set a heartbeat, + * timers move forward in time (but see replace_timer_deadline() note below), latency not critical; (2) a single proactor + * timer, can move forwards or backwards, can be canceled. + * + * A single timerfd is shared by all the timers. Connection timers are tracked on a heap ordered list. The proactor timer is + * tracked separately. The next timerfd_deadline is the earliest of all the timers, in this case the earliest of the first + * connection timer and the poactor timer. + * + * If a connection timer is changed to a later time, it is not moved. It is kept in place but marked with the new deadline. On + * expiry, depending on whether the deadline was extended, the decision is made to either generate a timeout or replace the + * timer on the ordered list. + * + * When a timerfd read event is generated, the proactor invokes pni_timer_manager_process() to generate timeouts for each + * expired timer and to do housekeeping on the rest. + * + * replace_timer_deadline(): a connection timer can go backwards in time at most once if: both peers have heartbeats and the + * second AMQP open frame results in a shorter periodic transport timer than the first open frame. In this case, the + * existing timer_deadline is immediately orphaned and a new one created for the rest of the connection's life. + * + * Lock ordering: tm->context_mutex --> tm->deletion_mutex. + */ + +static void timerfd_set(int fd, uint64_t t_millis) { + // t_millis == 0 -> cancel + struct itimerspec newt; + memset(&newt, 0, sizeof(newt)); + newt.it_value.tv_sec = t_millis / 1000; + newt.it_value.tv_nsec = (t_millis % 1000) * 1000000; + timerfd_settime(fd, 0, &newt, NULL); +} + +static void timerfd_drain(int fd) { + // Forget any old expired timers and only trigger an epoll read event for a subsequent expiry. + uint64_t result = 0; + ssize_t n = read(fd, &result, sizeof(result)); + if (n != sizeof(result) && !(n < 0 && errno == EAGAIN)) { + EPOLL_FATAL("timerfd read error", errno); + } +} + +// Struct to manage the ordering of timers on the heap ordered list and manage the lifecycle if +// the parent timer is self-deleting. +typedef struct timer_deadline_t { + uint64_t list_deadline; // Heap ordering deadline. Must not change while on list. + pni_timer_t *timer; // Parent timer. NULL means orphaned and to be deleted. + bool resequenced; // An out-of-order connection timeout caught and handled. +} timer_deadline_t; + +static void timer_deadline_initialize(void *object) { + timer_deadline_t *td = (timer_deadline_t *) object; + memset(td, 0 , sizeof(*td)); +} + +static void timer_deadline_finalize(void *object) { + assert(((timer_deadline_t *) object)->list_deadline == 0); +} + +static intptr_t timer_deadline_compare(void *oa, void *ob) { + timer_deadline_t *a = (timer_deadline_t *) oa; + timer_deadline_t *b = (timer_deadline_t *) ob; + return a->list_deadline - b->list_deadline; +} + +#define timer_deadline_inspect NULL +#define timer_deadline_hashcode NULL +#define CID_timer_deadline CID_pn_void + +static timer_deadline_t* pni_timer_deadline(void) { + static const pn_class_t timer_deadline_clazz = PN_CLASS(timer_deadline); + return (timer_deadline_t *) pn_class_new(&timer_deadline_clazz, sizeof(timer_deadline_t)); +} + + +struct pni_timer_t { + uint64_t deadline; + timer_deadline_t *timer_deadline; + pni_timer_manager_t *manager; + pconnection_t *connection; +}; + +pni_timer_t *pni_timer(pni_timer_manager_t *tm, pconnection_t *c) { + timer_deadline_t *td = NULL; + pni_timer_t *timer = NULL; + assert(c || !tm->context.proactor->timer); // Proactor timer. Can only be one. + timer = (pni_timer_t *) malloc(sizeof(pni_timer_t)); + if (!timer) return NULL; + if (c) { + // Connections are tracked on the timer_heap. Allocate the tracking struct. + td = pni_timer_deadline(); + if (!td) { + free(timer); + return NULL; + } + } + + lock(&tm->context.mutex); + timer->connection = c; + timer->manager = tm; + timer->timer_deadline = td; + timer->deadline = 0; + if (c) + td->timer = timer; + unlock(&tm->context.mutex); + return timer; +} + +// Call with no locks. +void pni_timer_free(pni_timer_t *timer) { + timer_deadline_t *td = timer->timer_deadline; + bool can_free_td = false; + if (td) pni_timer_set(timer, 0); + pni_timer_manager_t *tm = timer->manager; + lock(&tm->context.mutex); + lock(&tm->deletion_mutex); + if (td) { + if (td->list_deadline) + td->timer = NULL; // Orphan. timer_manager does eventual pn_free() in process(). + else + can_free_td = true; + } + unlock(&tm->deletion_mutex); + unlock(&tm->context.mutex); + if (can_free_td) { + pn_free(td); + } + free(timer); +} + +static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, pni_timer_t *timer); + +// Return true if initialization succeeds. Called once at proactor creation. +bool pni_timer_manager_init(pni_timer_manager_t *tm) { + tm->epoll_timer.fd = -1; + tm->timerfd_deadline = 0; + tm->timers_heap = NULL; + tm->proactor_timer = NULL; + pn_proactor_t *p = containerof(tm, pn_proactor_t, timer_manager); + pcontext_init(&tm->context, TIMER_MANAGER, p); + pmutex_init(&tm->deletion_mutex); + + // PN_VOID turns off ref counting for the elements in the list. + tm->timers_heap = pn_list(PN_VOID, 0); + if (!tm->timers_heap) + return false; + tm->proactor_timer = pni_timer(tm, NULL); + if (!tm->proactor_timer) + return false; + + p->timer = tm->proactor_timer; + epoll_extended_t *ee = &tm->epoll_timer; + ee->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + ee->type = TIMER; + ee->wanted = EPOLLIN; + ee->polling = false; + return (ee->fd >= 0); +} + +// Only call from proactor's destructor, when it is single threaded and scheduling has stopped. +void pni_timer_manager_finalize(pni_timer_manager_t *tm) { + lock(&tm->context.mutex); + unlock(&tm->context.mutex); // Memory barrier + if (tm->epoll_timer.fd >= 0) close(tm->epoll_timer.fd); + pni_timer_free(tm->proactor_timer); + if (tm->timers_heap) { + size_t sz = pn_list_size(tm->timers_heap); + // On teardown there is no need to preserve the heap. Traverse the list ignoring minpop(). + for (size_t idx = 0; idx < sz; idx++) { + timer_deadline_t *td = (timer_deadline_t *) pn_list_get(tm->timers_heap, idx); + td->list_deadline = 0; + pn_free(td); + } + pn_free(tm->timers_heap); + } + pmutex_finalize(&tm->deletion_mutex); + pcontext_finalize(&tm->context); +} + +// Call with timer_manager lock held. Return true if wake_notify required. +static bool adjust_deadline(pni_timer_manager_t *tm) { + // Make sure the timer_manager context will get a timeout in time for the earliest connection timeout. + if (tm->context.working) + return false; // timer_manager context will adjust the timer when it stops working + bool notify = false; + uint64_t new_deadline = tm->proactor_timer->deadline; + if (pn_list_size(tm->timers_heap)) { + // First element of timers_heap has earliest deadline on the heap. + timer_deadline_t *heap0 = (timer_deadline_t *) pn_list_get(tm->timers_heap, 0); + assert(heap0->list_deadline != 0); + new_deadline = new_deadline ? pn_min(new_deadline, heap0->list_deadline) : heap0->list_deadline; + } + // Only change target deadline if new_deadline is in future but earlier than old timerfd_deadline. + if (new_deadline) { + if (tm->timerfd_deadline == 0 || new_deadline < tm->timerfd_deadline) { + uint64_t now = pn_proactor_now_64(); + if (new_deadline <= now) { + // no need for a timer update. Wake the timer_manager. + notify = wake(&tm->context); + } + else { + timerfd_set(tm->epoll_timer.fd, new_deadline - now); + tm->timerfd_deadline = new_deadline; + } + } + } + return notify; +} + +// Call without context lock or timer_manager lock. +// Calls for connection timers are generated in the proactor and serialized per connection. +// Calls for the proactor timer can come from arbitrary user threads. +void pni_timer_set(pni_timer_t *timer, uint64_t deadline) { + pni_timer_manager_t *tm = timer->manager; + bool notify = false; + + lock(&tm->context.mutex); + if (deadline == timer->deadline) { + unlock(&tm->context.mutex); + return; // No change. + } + + if (timer == tm->proactor_timer) { + assert(!timer->connection); + timer->deadline = deadline; + } else { + // Connection + timer_deadline_t *td = timer->timer_deadline; + // A connection timer can go backwards at most once. Check here. + if (deadline && td->list_deadline && deadline < td->list_deadline) { + if (td->resequenced) + EPOLL_FATAL("idle timeout sequencing error", 0); // + else { + // replace drops the lock for malloc. Safe because there can be no competing call to + // the timer set function by the same pconnection from another thread. + td = replace_timer_deadline(tm, timer); + } + } + + timer->deadline = deadline; + // Put on list if not already there. + if (deadline && !td->list_deadline) { + td->list_deadline = deadline; + pn_list_minpush(tm->timers_heap, td); + } + } + + // Skip a cancelled timer (deadline == 0) since it doesn't change the timerfd deadline. + if (deadline) + notify = adjust_deadline(tm); + unlock(&tm->context.mutex); + + if (notify) + wake_notify(&tm->context); +} + +pn_event_batch_t *pni_timer_manager_process(pni_timer_manager_t *tm, bool timeout, bool wake) { + uint64_t now = pn_proactor_now_64(); + tm->context.working = true; + if (timeout) + tm->timerfd_deadline = 0; + if (wake) + wake_done(&tm->context); + + // First check for proactor timer expiry. + uint64_t deadline = tm->proactor_timer->deadline; + if (deadline && deadline <= now) { + tm->proactor_timer->deadline = 0; + unlock(&tm->context.mutex); + pni_proactor_timeout(tm->context.proactor); + lock(&tm->context.mutex); + // If lower latency desired for the proactor timer, we could convert to the proactor context (if not working) and return + // here with the event batch, and wake the timer manager context to process the connection timers. + } + + // Next, find all expired connection timers at front of the ordered heap. + while (pn_list_size(tm->timers_heap)) { + timer_deadline_t *td = (timer_deadline_t *) pn_list_get(tm->timers_heap, 0); + if (td->list_deadline > now) + break; + + // Expired. Remove from list. + timer_deadline_t *min = (timer_deadline_t *) pn_list_minpop(tm->timers_heap); + assert (min == td); + min->list_deadline = 0; + + // Three possibilities to act on: + // timer expired -> pni_connection_timeout() + // timer deadline extended -> minpush back on list to new spot + // timer freed -> free the associated timer_deadline popped off the list + if (!td->timer) { + unlock(&tm->context.mutex); + pn_free(td); + lock(&tm->context.mutex); + } else { + uint64_t deadline = td->timer->deadline; + if (deadline && deadline <= now) { + td->timer->deadline = 0; + pconnection_t *pc = td->timer->connection; + lock(&tm->deletion_mutex); // Prevent connection from deleting itself when tm->context.mutex dropped. + unlock(&tm->context.mutex); + pni_pconnection_timeout(pc); + unlock(&tm->deletion_mutex); + lock(&tm->context.mutex); + } else { + td->list_deadline = td->timer->deadline; + pn_list_minpush(tm->timers_heap, td); + } + } + } + + if (timeout) { + // TODO: query whether perf gain by doing these system calls outside the lock, perhaps with additional set_reset_mutex. + timerfd_drain(tm->epoll_timer.fd); + rearm_polling(&tm->epoll_timer, tm->context.proactor->epollfd); + } + tm->context.working = false; // must be false for adjust_deadline to do adjustment + bool notify = adjust_deadline(tm); + unlock(&tm->context.mutex); + + if (notify) + wake_notify(&tm->context); + // The timer_manager never has events to batch. + return NULL; + // TODO: perhaps become context of one of the timed out timers (if otherwise idle) and process() that context. +} + +// Call with timer_manager lock held. +// There can be no competing call to this and timer_set() from the same connection. +static timer_deadline_t *replace_timer_deadline(pni_timer_manager_t *tm, pni_timer_t *timer) { + assert(timer->connection); + timer_deadline_t *old_td = timer->timer_deadline; + assert(old_td); + // Mark old struct for deletion. No parent timer. + old_td->timer = NULL; + + unlock(&tm->context.mutex); + // Create replacement timer for life of connection. + timer_deadline_t *new_td = pni_timer_deadline(); + if (!new_td) + EPOLL_FATAL("replacement timer deadline allocation", errno); + lock(&tm->context.mutex); + + new_td->list_deadline = 0; + new_td->timer = timer; + new_td->resequenced = true; + timer->timer_deadline = new_td; + return new_td; +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org