Repository: qpid-proton
Updated Branches:
  refs/heads/master 10df4133e -> fa598eab4


PROTON-1520: C proactor epoll performance.  Defer writes until after event 
batch processed,
 defer rearm until after write


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fa598eab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fa598eab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fa598eab

Branch: refs/heads/master
Commit: fa598eab40163d23d082d3ed111b38f31a4cb303
Parents: 10df413
Author: Clifford Jansen <cliffjan...@apache.org>
Authored: Wed Jul 19 17:39:14 2017 -0700
Committer: Clifford Jansen <cliffjan...@apache.org>
Committed: Wed Jul 19 17:39:14 2017 -0700

----------------------------------------------------------------------
 proton-c/src/proactor/epoll.c | 53 ++++++++++++++++++++------------------
 1 file changed, 28 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fa598eab/proton-c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c
index 28a689c..9ece597 100644
--- a/proton-c/src/proactor/epoll.c
+++ b/proton-c/src/proactor/epoll.c
@@ -537,6 +537,7 @@ struct pn_listener_t {
 
 
 static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t 
events, bool timeout, bool topup);
+static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(pcontext_t *ctx);
 static bool proactor_remove(pcontext_t *ctx);
@@ -800,9 +801,13 @@ static void pconnection_forced_shutdown(pconnection_t *pc) 
{
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
   pconnection_t *pc = batch_pconnection(batch);
   pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
-  if (!e && pc->hog_count < HOG_MAX) {
-    if (pconnection_process(pc, 0, false, true)) {
-      e = pn_connection_driver_next_event(&pc->driver);
+  if (!e) {
+    write_flush(pc);  // May generate transport event
+    e = pn_connection_driver_next_event(&pc->driver);
+    if (!e && pc->hog_count < HOG_MAX) {
+      if (pconnection_process(pc, 0, false, true)) {
+        e = pn_connection_driver_next_event(&pc->driver);
+      }
     }
   }
   return e;
@@ -903,6 +908,23 @@ static bool pconnection_write(pconnection_t *pc, 
pn_bytes_t wbuf) {
   return true;
 }
 
+static void write_flush(pconnection_t *pc) {
+  if (!pc->write_blocked && !pconnection_wclosed(pc)) {
+    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
+    if (wbuf.size > 0) {
+      if (!pconnection_write(pc, wbuf)) {
+        psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : 
"on write to");
+      }
+    }
+    else {
+      if (pn_connection_driver_write_closed(&pc->driver)) {
+        shutdown(pc->psocket.sockfd, SHUT_WR);
+        pc->write_blocked = true;
+      }
+    }
+  }
+}
+
 static void pconnection_connected_lh(pconnection_t *pc);
 static void pconnection_maybe_connect_lh(pconnection_t *pc);
 
@@ -1001,7 +1023,6 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     pc->current_arm = 0;
     pc->new_events = 0;
   }
-  bool unarmed = (pc->current_arm == 0);
 
   if (pc->context.closing && pconnection_is_final(pc)) {
     unlock(&pc->context.mutex);
@@ -1058,36 +1079,17 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     tick_required = false;
   }
 
-  while (!pc->write_blocked && !pconnection_wclosed(pc)) {
-    pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
-    if (wbuf.size > 0) {
-      if (!pconnection_write(pc, wbuf)) {
-        psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : 
"on write to");
-      }
-    }
-    else {
-      if (pn_connection_driver_write_closed(&pc->driver)) {
-        shutdown(pc->psocket.sockfd, SHUT_WR);
-        pc->write_blocked = true;
-      }
-      else
-        break;  /* nothing to write until next read/wake/timeout */
-    }
-  }
-
   if (topup) {
     // If there was anything new to topup, we have it by now.
-    if (unarmed && pconnection_rearm_check(pc))
-      pconnection_rearm(pc);
     return NULL;  // caller already owns the batch
   }
 
   if (pconnection_has_event(pc)) {
-    if (unarmed && pconnection_rearm_check(pc))
-      pconnection_rearm(pc);
     return &pc->batch;
   }
 
+  write_flush(pc);
+
   lock(&pc->context.mutex);
   if (pc->context.closing && pconnection_is_final(pc)) {
     unlock(&pc->context.mutex);
@@ -1608,6 +1610,7 @@ pn_proactor_t *pn_proactor() {
 
 void pn_proactor_free(pn_proactor_t *p) {
   //  No competing threads, not even a pending timer
+  p->shutting_down = true;
   close(p->epollfd);
   p->epollfd = -1;
   close(p->eventfd);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to