Repository: qpid-proton Updated Branches: refs/heads/master 10df4133e -> fa598eab4
PROTON-1520: C proactor epoll performance. Defer writes until after event batch processed, defer rearm until after write Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fa598eab Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fa598eab Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fa598eab Branch: refs/heads/master Commit: fa598eab40163d23d082d3ed111b38f31a4cb303 Parents: 10df413 Author: Clifford Jansen <cliffjan...@apache.org> Authored: Wed Jul 19 17:39:14 2017 -0700 Committer: Clifford Jansen <cliffjan...@apache.org> Committed: Wed Jul 19 17:39:14 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 53 ++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa598eab/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 28a689c..9ece597 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -537,6 +537,7 @@ struct pn_listener_t { static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup); +static void write_flush(pconnection_t *pc); static void listener_begin_close(pn_listener_t* l); static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); @@ -800,9 +801,13 @@ static void pconnection_forced_shutdown(pconnection_t *pc) { static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) { pconnection_t *pc = batch_pconnection(batch); pn_event_t *e = pn_connection_driver_next_event(&pc->driver); - if (!e && pc->hog_count < HOG_MAX) { - if (pconnection_process(pc, 0, false, true)) { - e = pn_connection_driver_next_event(&pc->driver); + if (!e) { + write_flush(pc); // May generate transport event + e = pn_connection_driver_next_event(&pc->driver); + if (!e && pc->hog_count < HOG_MAX) { + if (pconnection_process(pc, 0, false, true)) { + e = pn_connection_driver_next_event(&pc->driver); + } } } return e; @@ -903,6 +908,23 @@ static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) { return true; } +static void write_flush(pconnection_t *pc) { + if (!pc->write_blocked && !pconnection_wclosed(pc)) { + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); + if (wbuf.size > 0) { + if (!pconnection_write(pc, wbuf)) { + psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to"); + } + } + else { + if (pn_connection_driver_write_closed(&pc->driver)) { + shutdown(pc->psocket.sockfd, SHUT_WR); + pc->write_blocked = true; + } + } + } +} + static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); @@ -1001,7 +1023,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, pc->current_arm = 0; pc->new_events = 0; } - bool unarmed = (pc->current_arm == 0); if (pc->context.closing && pconnection_is_final(pc)) { unlock(&pc->context.mutex); @@ -1058,36 +1079,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, tick_required = false; } - while (!pc->write_blocked && !pconnection_wclosed(pc)) { - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - if (wbuf.size > 0) { - if (!pconnection_write(pc, wbuf)) { - psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to"); - } - } - else { - if (pn_connection_driver_write_closed(&pc->driver)) { - shutdown(pc->psocket.sockfd, SHUT_WR); - pc->write_blocked = true; - } - else - break; /* nothing to write until next read/wake/timeout */ - } - } - if (topup) { // If there was anything new to topup, we have it by now. - if (unarmed && pconnection_rearm_check(pc)) - pconnection_rearm(pc); return NULL; // caller already owns the batch } if (pconnection_has_event(pc)) { - if (unarmed && pconnection_rearm_check(pc)) - pconnection_rearm(pc); return &pc->batch; } + write_flush(pc); + lock(&pc->context.mutex); if (pc->context.closing && pconnection_is_final(pc)) { unlock(&pc->context.mutex); @@ -1608,6 +1610,7 @@ pn_proactor_t *pn_proactor() { void pn_proactor_free(pn_proactor_t *p) { // No competing threads, not even a pending timer + p->shutting_down = true; close(p->epollfd); p->epollfd = -1; close(p->eventfd); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org