This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b3d1004800b569f69ebea58366021f7ddcd64692 Author: Cliff Jansen <cjan...@redhat.com> AuthorDate: Mon Dec 2 09:27:22 2019 -0800 PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131 --- c/src/proactor/epoll.c | 130 ++++++++++++++++++++++--------------------------- 1 file changed, 59 insertions(+), 71 deletions(-) diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index d0664d9..1283d91 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -58,6 +58,7 @@ #undef _GNU_SOURCE #include "proactor-internal.h" +#include "core/engine-internal.h" #include "core/logger_private.h" #include "core/util.h" @@ -293,13 +294,6 @@ static void ptimer_finalize(ptimer_t *pt) { pmutex_finalize(&pt->mutex); } -pn_timestamp_t pn_i_now2(void) -{ - struct timespec now; - clock_gettime(CLOCK_REALTIME, &now); - return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 1000000); -} - // ======================================================================== // Proactor common code @@ -542,7 +536,7 @@ typedef struct pconnection_t { int hog_count; // thread hogging limiter pn_event_batch_t batch; pn_connection_driver_t driver; - bool wbuf_valid; + bool output_drained; const char *wbuf_current; size_t wbuf_remaining; size_t wbuf_completed; @@ -555,7 +549,7 @@ typedef struct pconnection_t { } pconnection_t; /* - * A listener can have mutiple sockets (as specified in the addrinfo). They + * A listener can have multiple sockets (as specified in the addrinfo). They * are armed separately. The individual psockets can be part of at most one * list: the global proactor overflow retry list or the per-listener list of * pending accepts (valid inbound socket obtained, but pn_listener_accept not @@ -1128,7 +1122,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->read_blocked = true; pc->write_blocked = true; pc->disconnected = false; - pc->wbuf_valid = false; + pc->output_drained = false; pc->wbuf_completed = 0; pc->wbuf_remaining = 0; pc->wbuf_current = NULL; @@ -1195,26 +1189,19 @@ static void pconnection_cleanup(pconnection_t *pc) { // else proactor_disconnect logic owns psocket and its final free } -static void invalidate_wbuf(pconnection_t *pc) { - if (pc->wbuf_valid) { - if (pc->wbuf_completed) - pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed); - pc->wbuf_completed = 0; - pc->wbuf_remaining = 0; - pc->wbuf_valid = false; - } +static void set_wbuf(pconnection_t *pc, const char *start, size_t sz) { + pc->wbuf_completed = 0; + pc->wbuf_current = start; + pc->wbuf_remaining = sz; } // Never call with any locks held. static void ensure_wbuf(pconnection_t *pc) { - if (!pc->wbuf_valid) { - // next connection_driver call is the expensive output generator - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - pc->wbuf_completed = 0; - pc->wbuf_remaining = wbuf.size; - pc->wbuf_current = wbuf.start; - pc->wbuf_valid = true; - } + // next connection_driver call is the expensive output generator + pn_bytes_t bytes = pn_connection_driver_write_buffer(&pc->driver); + set_wbuf(pc, bytes.start, bytes.size); + if (bytes.size == 0) + pc->output_drained = true; } // Call with lock held or from forced_shutdown @@ -1260,9 +1247,8 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { lock(&p->sched_mutex); idle_threads = (p->suspend_list_head != NULL); unlock(&p->sched_mutex); - if (idle_threads) { + if (idle_threads && !pc->write_blocked && !pc->read_blocked) { write_flush(pc); // May generate transport event - pc->read_blocked = pc->write_blocked = false; pconnection_process(pc, 0, false, false, true); e = pn_connection_driver_next_event(&pc->driver); } @@ -1275,7 +1261,7 @@ static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { } } } - if (e) invalidate_wbuf(pc); + if (e) pc->output_drained = false; return e; } @@ -1299,7 +1285,6 @@ static inline bool pconnection_wclosed(pconnection_t *pc) { close/shutdown. Let read()/write() return 0 or -1 to trigger cleanup logic. */ static bool pconnection_rearm_check(pconnection_t *pc) { - assert(pc->wbuf_valid); if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) { return false; } @@ -1352,7 +1337,6 @@ static bool pconnection_sched_sync(pconnection_t *pc) { /* Call with context lock and having done a write_flush() to "know" the value of wbuf_remaining */ static inline bool pconnection_work_pending(pconnection_t *pc) { - assert(pc->wbuf_valid); if (pc->new_events || pc->wake_count || pc->tick_pending || pc->queued_disconnect) return true; if (!pc->read_blocked && !pconnection_rclosed(pc)) @@ -1409,19 +1393,26 @@ static void pconnection_done(pconnection_t *pc) { } // Return true unless error - static bool pconnection_write(pconnection_t *pc) { +static bool pconnection_write(pconnection_t *pc) { size_t wbuf_size = pc->wbuf_remaining; ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL); if (n > 0) { pc->wbuf_completed += n; pc->wbuf_remaining -= n; pc->io_doublecheck = false; - if (pc->wbuf_remaining) + if (pc->wbuf_remaining) { pc->write_blocked = true; + pc->wbuf_current += n; + } else { - // No need to aggregate multiple writes + // write_done also calls pn_transport_pending(), so the transport knows all current output pn_connection_driver_write_done(&pc->driver, pc->wbuf_completed); pc->wbuf_completed = 0; + pn_transport_t *t = pc->driver.transport; + set_wbuf(pc, t->output_buf, t->output_pending); + if (t->output_pending == 0) + pc->output_drained = true; + // TODO: revise transport API to allow similar efficient access to transport output } } else if (errno == EWOULDBLOCK) { pc->write_blocked = true; @@ -1433,12 +1424,29 @@ static void pconnection_done(pconnection_t *pc) { // Never call with any locks held. static void write_flush(pconnection_t *pc) { - ensure_wbuf(pc); - if (!pc->write_blocked && !pconnection_wclosed(pc)) { + size_t prev_wbuf_remaining = 0; + + while(!pc->write_blocked && !pc->output_drained && !pconnection_wclosed(pc)) { + if (pc->wbuf_remaining == 0) { + ensure_wbuf(pc); + if (pc->wbuf_remaining == 0) + pc->output_drained = true; + } else { + // Check if we are doing multiple small writes in a row, possibly worth growing the transport output buffer. + if (prev_wbuf_remaining + && prev_wbuf_remaining == pc->wbuf_remaining // two max outputs in a row + && pc->wbuf_remaining < 131072) { + ensure_wbuf(pc); // second call -> unchanged wbuf or transport buffer size doubles and more bytes added + prev_wbuf_remaining = 0; + } else { + prev_wbuf_remaining = pc->wbuf_remaining; + } + } if (pc->wbuf_remaining > 0) { if (!pconnection_write(pc)) { psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to"); } + // pconnection_write side effect: wbuf may be replenished, and if not, output_drained may be set. } else { if (pn_connection_driver_write_closed(&pc->driver)) { @@ -1458,6 +1466,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timer_fired = false; bool waking = false; bool tick_required = false; + bool immediate_write = false; // Don't touch data exclusive to working thread (yet). if (timeout) { @@ -1533,8 +1542,11 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pconnection_maybe_connect_lh(pc); else pconnection_connected_lh(pc); /* Non error event means we are connected */ - if (update_events & EPOLLOUT) + if (update_events & EPOLLOUT) { pc->write_blocked = false; + if (pc->wbuf_remaining > 0) + immediate_write = true; + } if (update_events & EPOLLIN) pc->read_blocked = false; } @@ -1555,8 +1567,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, waking = false; } - // read... tick... write - // perhaps should be: write_if_recent_EPOLLOUT... read... tick... write + if (immediate_write) { + immediate_write = false; + write_flush(pc); + } if (!pconnection_rclosed(pc)) { pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); @@ -1564,7 +1578,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size); if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); - invalidate_wbuf(pc); + pc->output_drained = false; pconnection_tick(pc); /* check for tick changes. */ tick_required = false; pc->io_doublecheck = false; @@ -1585,7 +1599,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (tick_required) { pconnection_tick(pc); /* check for tick changes. */ tick_required = false; - invalidate_wbuf(pc); + pc->output_drained = false; } if (topup) { @@ -1594,7 +1608,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (pconnection_has_event(pc)) { - invalidate_wbuf(pc); + pc->output_drained = false; return &pc->batch; } @@ -3010,32 +3024,7 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { pn_proactor_t *bp = batch_proactor(batch); if (bp == p) { bool notify = false; - bool rearm_interrupt = false; lock(&p->context.mutex); - lock(&p->sched_mutex); - - bool timeout = p->sched_timeout; - if (timeout) p->sched_timeout = false; - bool intr = p->sched_interrupt; - if (intr) { - p->sched_interrupt = false; - rearm_interrupt = true; - p->need_interrupt = true; - } - if (p->context.sched_wake) { - p->context.sched_wake = false; - wake_done(&p->context); - } - - // ptimer_callback is slow. Revisit timer cancel code in light of change to single poller thread. - bool timer_fired = timeout && ptimer_callback(&p->timer) != 0; - if (timeout) { - p->timer_armed = false; - if (timer_fired && p->timeout_set) { - p->need_timeout = true; - } - } - bool rearm_timer = !p->timer_armed && !p->shutting_down; p->timer_armed = true; p->context.working = false; @@ -3048,19 +3037,18 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { if (proactor_has_event(p)) if (wake(&p->context)) notify = true; + + lock(&p->sched_mutex); tslot_t *ts = p->context.runner; if (unassign_thread(ts, UNUSED)) notify = true; unlock(&p->sched_mutex); unlock(&p->context.mutex); + if (notify) wake_notify(&p->context); if (rearm_timer) rearm(p, &p->timer.epoll_io); - if (rearm_interrupt) { - (void)read_uint64(p->interruptfd); - rearm(p, &p->epoll_interrupt); - } check_earmark_override(p, ts); return; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org