This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
     new 6353ad9  PROTON-2334: fix libuv proactor PN_PROACTOR_INACTIVE event 
generation
6353ad9 is described below

commit 6353ad99c23d3a9861ddcc4642df88c68e62698c
Author: Cliff Jansen <cliffjan...@apache.org>
AuthorDate: Mon Feb 22 09:41:04 2021 -0800

    PROTON-2334: fix libuv proactor PN_PROACTOR_INACTIVE event generation
---
 c/src/proactor/libuv.c | 27 +++++++++++++++++++--------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index 31f1e7c..b06b1eb 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -253,6 +253,7 @@ struct pn_proactor_t {
   bool batch_working;          /* batch is being processed in a worker thread 
*/
   bool need_interrupt;         /* Need a PN_PROACTOR_INTERRUPT event */
   bool need_inactive;          /* need INACTIVE event */
+  bool timeout_processed;
 };
 
 
@@ -383,6 +384,13 @@ static inline work_t *batch_work(pn_event_batch_t *batch) {
   return NULL;
 }
 
+static void check_for_inactive(pn_proactor_t *p) {
+  /* No future events: no active socket io, no pending timer, no
+     current event processing. */
+  if (!p->batch_working && !p->active && !p->need_interrupt && 
p->timeout_state == TM_NONE)
+    p->need_inactive = true;
+}
+
 /* Total count of listener and connections for PN_PROACTOR_INACTIVE */
 static void add_active(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
@@ -393,7 +401,7 @@ static void add_active(pn_proactor_t *p) {
 static void remove_active_lh(pn_proactor_t *p) {
   assert(p->active > 0);
   if (--p->active == 0) {
-    p->need_inactive = true;
+    check_for_inactive(p);
   }
 }
 
@@ -503,9 +511,7 @@ static int pconnection_init(pconnection_t *pc) {
       uv_close((uv_handle_t*)&pc->tcp, NULL);
     }
   }
-  if (!err) {
-    add_active(pc->work.proactor);
-  } else {
+  if (err) {
     pconnection_error(pc, err, "initialization");
   }
   return err;
@@ -856,7 +862,7 @@ static pn_event_batch_t *get_batch_lh(pn_proactor_t *p) {
     }
     if (p->timeout_state == TM_FIRED) {
       p->timeout_state = TM_NONE;
-      remove_active_lh(p);
+      p->timeout_processed = true;
       return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT);
     }
   }
@@ -1084,6 +1090,10 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t 
*batch) {
   pn_proactor_t *bp = batch_proactor(batch); /* Proactor events */
   if (bp == p) {
     p->batch_working = false;
+    if (p->timeout_processed) {
+      p->timeout_processed = false;
+      check_for_inactive(p);
+    }
   }
   uv_mutex_unlock(&p->lock);
   notify(p);
@@ -1141,7 +1151,6 @@ void pn_proactor_set_timeout(pn_proactor_t *p, 
pn_millis_t t) {
   uv_mutex_lock(&p->lock);
   p->timeout = t;
   // This timeout *replaces* any existing timeout
-  if (p->timeout_state == TM_NONE) ++p->active;
   p->timeout_state = TM_REQUEST;
   uv_mutex_unlock(&p->lock);
   notify(p);
@@ -1151,7 +1160,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
   uv_mutex_lock(&p->lock);
   if (p->timeout_state != TM_NONE) {
     p->timeout_state = TM_NONE;
-    remove_active_lh(p);
+    check_for_inactive(p);
     notify(p);
   }
   uv_mutex_unlock(&p->lock);
@@ -1160,6 +1169,7 @@ void pn_proactor_cancel_timeout(pn_proactor_t *p) {
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t 
*t, const char *addr) {
   pconnection_t *pc = pconnection(p, c, t, false);
   assert(pc);                                  /* TODO aconway 2017-03-31: 
memory safety */
+  add_active(p);
   pn_connection_open(pc->driver.connection);   /* Auto-open */
   parse_addr(&pc->addr, addr);
   work_start(&pc->work);
@@ -1314,9 +1324,10 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t 
*t) {
-  uv_mutex_lock(&l->lock);
   pconnection_t *pc = pconnection(l->work.proactor, c, t, true);
   assert(pc);
+  add_active(l->work.proactor);
+  uv_mutex_lock(&l->lock);
   /* Get the socket from the accept event that we are processing */
   pn_event_t *e = pn_collector_prev(l->collector);
   assert(pn_event_type(e) == PN_LISTENER_ACCEPT);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to