[qpid-proton] 03/09: PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 664e436e36267ce3e7354c8de0949dacfb13d485
Author: Cliff Jansen 
AuthorDate: Mon Dec 2 09:42:59 2019 -0800

PROTON-2130: epoll proactor io bytes accounting fix for shutdown and error
---
 c/src/proactor/epoll.c | 4 
 1 file changed, 4 insertions(+)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 1283d91..21c611f 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -1417,6 +1417,7 @@ static bool pconnection_write(pconnection_t *pc) {
   } else if (errno == EWOULDBLOCK) {
 pc->write_blocked = true;
   } else if (!(errno == EAGAIN || errno == EINTR)) {
+pc->wbuf_remaining = 0;
 return false;
   }
   return true;
@@ -1586,6 +1587,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   pc->read_blocked = true;
   }
   else if (n == 0) {
+pc->read_blocked = true;
 pn_connection_driver_read_close(>driver);
   }
   else if (errno == EWOULDBLOCK)
@@ -1594,6 +1596,8 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
 psocket_error(>psocket, errno, pc->disconnected ? "disconnected" : 
"on read from");
   }
 }
+  } else {
+pc->read_blocked = true;
   }
 
   if (tick_required) {


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] 06/09: PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 9e1990b1345f9917648659399f198e74fba7d5d7
Author: Andrew Stitcher 
AuthorDate: Thu Mar 19 23:35:36 2020 -0400

PROTON-2130: epoll reworking:
- Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t
- Remove backpointers and consistently use structure embedding to go from:
  psocket->pconnection;
  psocket->pn_listener;
  psocket->acceptor;
  pcontext->pconnection;
  pcontext->pn_listener;
  pn_event_batch->pn_proactor;
  pn_batch_event->pn_listener;
  pn_batch_event->pconnection;
- Move address string from being stored in psocket to being stored in 
pconnection and
  pn_listener - saves strings for multiple listening sockets
- Rationalise post_event by switching on event types instead of the 
previous ad hoc
  event type detection.
---
 c/src/proactor/epoll-internal.h |  37 +
 c/src/proactor/epoll.c  | 167 +---
 2 files changed, 107 insertions(+), 97 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 6a13e7f..e639e48 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -57,7 +57,6 @@ typedef enum {
 
 // Data to use with epoll.
 typedef struct epoll_extended_t {
-  struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
   int fd;
   epoll_type_t type;   // io/timer/wakeup
   uint32_t wanted; // events to poll for
@@ -67,7 +66,6 @@ typedef struct epoll_extended_t {
 
 typedef struct ptimer_t {
   pmutex mutex;
-  int timerfd;
   epoll_extended_t epoll_io;
   bool timer_active;
   bool in_doubt;  // 0 or 1 callbacks are possible
@@ -131,19 +129,6 @@ struct tslot_t {
   unsigned int earmark_override_gen;
 };
 
-/* common to connection and listener */
-typedef struct psocket_t {
-  pn_proactor_t *proactor;
-  // Remaining protected by the pconnection/listener mutex
-  int sockfd;
-  epoll_extended_t epoll_io;
-  pn_listener_t *listener;  /* NULL for a connection socket */
-  char addr_buf[PN_MAX_ADDR];
-  const char *host, *port;
-  uint32_t sched_io_events;
-  uint32_t working_io_events;
-} psocket_t;
-
 struct pn_proactor_t {
   pcontext_t context;
   ptimer_t timer;
@@ -213,9 +198,21 @@ struct pn_proactor_t {
   bool shutting_down;
 };
 
+/* common to connection and listener */
+typedef struct psocket_t {
+  pn_proactor_t *proactor;
+  // Remaining protected by the pconnection/listener mutex
+  epoll_extended_t epoll_io;
+  uint32_t sched_io_events;
+  uint32_t working_io_events;
+} psocket_t;
+
 typedef struct pconnection_t {
   psocket_t psocket;
   pcontext_t context;
+  ptimer_t timer;  // TODO: review one timerfd per connection
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
   uint32_t new_events;
   int wake_count;
   bool server;/* accept, not connect */
@@ -223,7 +220,6 @@ typedef struct pconnection_t {
   bool timer_armed;
   bool queued_disconnect; /* deferred from pn_proactor_disconnect() */
   pn_condition_t *disconnect_condition;
-  ptimer_t timer;  // TODO: review one timerfd per connection
   // Following values only changed by (sole) working context:
   uint32_t current_arm;  // active epoll io events
   bool connected;
@@ -256,18 +252,21 @@ typedef struct pconnection_t {
 
 struct acceptor_t{
   psocket_t psocket;
+  struct pn_netaddr_t addr;  /* listening address */
+  pn_listener_t *listener;
+  acceptor_t *next;  /* next listener list member */
   int accepted_fd;
   bool armed;
   bool overflowed;
-  acceptor_t *next;  /* next listener list member */
-  struct pn_netaddr_t addr;  /* listening address */
 };
 
 struct pn_listener_t {
+  pcontext_t context;
   acceptor_t *acceptors;  /* Array of listening sockets */
   size_t acceptors_size;
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
   int active_count;   /* Number of listener sockets registered 
with epoll */
-  pcontext_t context;
   pn_condition_t *condition;
   pn_collector_t *collector;
   pn_event_batch_t batch;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index f8ebbf4..b891133 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -177,18 +177,16 @@ static void memory_barrier(epoll_extended_t *ee) {
  */
 
 static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
-  pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
   pmutex_init(>mutex);
   pt->timer_active = false;
   pt->in_doubt = false;
   pt->shutting_down = false;
   epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
-  pt->epoll_io.psocket = ps;
-  pt->epoll_io.fd = pt->timerfd;
+  pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
   pt->epoll_io.type = type;
   pt->epoll_io.wanted = EPOLLIN;
   

[qpid-proton] 05/09: PROTON-2130: Split out structs from epoll.c to make it easier to mess with them

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b860538ba698a6d6a85cc47a44e9e522d4825d47
Author: Andrew Stitcher 
AuthorDate: Mon Mar 16 13:09:05 2020 -0400

PROTON-2130: Split out structs from epoll.c to make it easier to mess with 
them
---
 c/src/proactor/epoll-internal.h | 288 
 c/src/proactor/epoll.c  | 238 +
 2 files changed, 289 insertions(+), 237 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
new file mode 100644
index 000..6a13e7f
--- /dev/null
+++ b/c/src/proactor/epoll-internal.h
@@ -0,0 +1,288 @@
+#ifndef PROACTOR_EPOLL_INTERNAL_H
+#define PROACTOR_EPOLL_INTERNAL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include 
+#include 
+#include 
+
+#include 
+#include 
+#include 
+
+
+#include 
+#include 
+
+#include "netaddr-internal.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+//typedef struct pn_proactor_t pn_proactor_t;
+//typedef struct pn_listener_t pn_listener_t;
+//typedef struct pn_connection_driver_t pn_connection_driver_t;
+typedef struct acceptor_t acceptor_t;
+typedef struct tslot_t tslot_t;
+typedef pthread_mutex_t pmutex;
+
+typedef enum {
+  WAKE,   /* see if any work to do in proactor/psocket context */
+  PCONNECTION_IO,
+  PCONNECTION_TIMER,
+  LISTENER_IO,
+  PROACTOR_TIMER
+} epoll_type_t;
+
+// Data to use with epoll.
+typedef struct epoll_extended_t {
+  struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
+  int fd;
+  epoll_type_t type;   // io/timer/wakeup
+  uint32_t wanted; // events to poll for
+  bool polling;
+  pmutex barrier_mutex;
+} epoll_extended_t;
+
+typedef struct ptimer_t {
+  pmutex mutex;
+  int timerfd;
+  epoll_extended_t epoll_io;
+  bool timer_active;
+  bool in_doubt;  // 0 or 1 callbacks are possible
+  bool shutting_down;
+} ptimer_t;
+
+typedef enum {
+  PROACTOR,
+  PCONNECTION,
+  LISTENER,
+  WAKEABLE
+} pcontext_type_t;
+
+typedef struct pcontext_t {
+  pmutex mutex;
+  pn_proactor_t *proactor;  /* Immutable */
+  void *owner;  /* Instance governed by the context */
+  pcontext_type_t type;
+  bool working;
+  bool on_wake_list;
+  bool wake_pending; // unprocessed eventfd wake callback (convert 
to bool?)
+  struct pcontext_t *wake_next; // wake list, guarded by proactor eventfd_mutex
+  bool closing;
+  // Next 4 are protected by the proactor mutex
+  struct pcontext_t* next;  /* Protected by proactor.mutex */
+  struct pcontext_t* prev;  /* Protected by proactor.mutex */
+  int disconnect_ops;   /* ops remaining before disconnect complete */
+  bool disconnecting;   /* pn_proactor_disconnect */
+  // Protected by schedule mutex
+  tslot_t *runner __attribute__((aligned(64)));  /* designated or running 
thread */
+  tslot_t *prev_runner;
+  bool sched_wake;
+  bool sched_pending;   /* If true, one or more unseen epoll or other 
events to process() */
+  bool runnable ;   /* in need of scheduling */
+} pcontext_t;
+
+typedef enum {
+  NEW,
+  UNUSED,   /* pn_proactor_done() called, may never come 
back */
+  SUSPENDED,
+  PROCESSING,   /* Hunting for a context  */
+  BATCHING, /* Doing work on behalf of a context */
+  DELETING,
+  POLLING
+} tslot_state;
+
+// Epoll proactor's concept of a worker thread provided by the application.
+struct tslot_t {
+  pmutex mutex;  // suspend and resume
+  pthread_cond_t cond;
+  unsigned int generation;
+  bool suspended;
+  volatile bool scheduled;
+  tslot_state state;
+  pcontext_t *context;
+  pcontext_t *prev_context;
+  bool earmarked;
+  tslot_t *suspend_list_prev;
+  tslot_t *suspend_list_next;
+  tslot_t *earmark_override;   // on earmark_drain, which thread was unassigned
+  unsigned int earmark_override_gen;
+};
+
+/* common to connection and listener */
+typedef struct psocket_t {
+  pn_proactor_t *proactor;
+  // Remaining protected by the pconnection/listener mutex
+  int sockfd;
+  epoll_extended_t 

[qpid-proton] 04/09: PROTON-2130: epoll proactor race/deadlock fixes

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 7b000f12448a93a7aa1168fbca416a036547a92a
Author: Cliff Jansen 
AuthorDate: Wed Dec 11 10:19:08 2019 -0800

PROTON-2130: epoll proactor race/deadlock fixes
---
 c/src/proactor/epoll.c | 66 +++---
 1 file changed, 41 insertions(+), 25 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 21c611f..9390595 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -87,6 +87,7 @@
 #include 
 #include 
 #include 
+#include 
 
 #include "./netaddr-internal.h" /* Include after socket/inet headers */
 
@@ -940,6 +941,8 @@ static void write_flush(pconnection_t *pc);
 static void listener_begin_close(pn_listener_t* l);
 static void proactor_add(pcontext_t *ctx);
 static bool proactor_remove(pcontext_t *ctx);
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
+
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
   return ps->listener ? NULL : (pconnection_t*)ps;
@@ -2928,14 +2931,36 @@ static pn_event_batch_t 
*proactor_do_epoll(pn_proactor_t* p, bool can_block) {
 }
   }
 
-  // Create a list of available threads to put to work.
+  poller_done(p, ts);  // put suspended threads to work.
+  // p->poller has been released, so a new poller may already be running.
+} else if (!can_block) {
+  ts->state = UNUSED;
+  unlock(>sched_mutex);
+  return NULL;
+} else {
+  // TODO: loop while !poller_suspended, since new work coming
+  suspend(p, ts);
+}
+  } // while
+}
 
-  int resume_list_count = 0;
+// Call with sched lock, but only from poller context.
+static void poller_done(struct pn_proactor_t* p, tslot_t *ts) {
+  // Create a list of available threads to put to work.
+  // ts is the poller thread
+  int resume_list_count = 0;
+  tslot_t **resume_list2 = NULL;
+
+  if (p->suspend_list_count) {
+int max_resumes = p->n_warm_runnables + p->n_runnables;
+max_resumes = pn_min(max_resumes, p->suspend_list_count);
+if (max_resumes) {
+  resume_list2 = (tslot_t **) alloca(max_resumes * sizeof(tslot_t *));
   for (int i = 0; i < p->n_warm_runnables ; i++) {
-ctx = p->warm_runnables[i];
+pcontext_t *ctx = p->warm_runnables[i];
 tslot_t *tsp = ctx->runner;
 if (tsp->state == SUSPENDED) {
-  p->resume_list[resume_list_count++] = tsp;
+  resume_list2[resume_list_count++] = tsp;
   LL_REMOVE(p, suspend_list, tsp);
   p->suspend_list_count--;
   tsp->state = PROCESSING;
@@ -2956,34 +2981,25 @@ static pn_event_batch_t 
*proactor_do_epoll(pn_proactor_t* p, bool can_block) {
   for (int i = 0; i < new_runners; i++) {
 tslot_t *tsp = p->suspend_list_head;
 assert(tsp);
-p->resume_list[resume_list_count++] = tsp;
+resume_list2[resume_list_count++] = tsp;
 LL_REMOVE(p, suspend_list, tsp);
 p->suspend_list_count--;
 tsp->state = PROCESSING;
   }
+}
+  }
+  p->poller = NULL;
 
-  p->poller = NULL;
-  // New poller may run concurrently.  Touch only this thread's stack for 
rest of block.
-
-  if (resume_list_count) {
-unlock(>sched_mutex);
-for (int i = 0; i < resume_list_count; i++) {
-  resume(p, p->resume_list[i]);
-}
-lock(>sched_mutex);
-  }
-} else if (!can_block) {
-  ts->state = UNUSED;
-  unlock(>sched_mutex);
-  return NULL;
-} else {
-  // TODO: loop while !poller_suspended, since new work coming
-  suspend(p, ts);
+  if (resume_list_count) {
+// Allows a new poller to run concurrently.  Touch only stack vars.
+unlock(>sched_mutex);
+for (int i = 0; i < resume_list_count; i++) {
+  resume(p, resume_list2[i]);
 }
-  } // while
+lock(>sched_mutex);
+  }
 }
 
-
 pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) {
   return proactor_do_epoll(p, true);
 }
@@ -3042,12 +3058,12 @@ void pn_proactor_done(pn_proactor_t *p, 
pn_event_batch_t *batch) {
   if (wake(>context))
 notify = true;
 
+unlock(>context.mutex);
 lock(>sched_mutex);
 tslot_t *ts = p->context.runner;
 if (unassign_thread(ts, UNUSED))
   notify = true;
 unlock(>sched_mutex);
-unlock(>context.mutex);
 
 if (notify)
   wake_notify(>context);


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] 09/09: PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing whitespace

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 72635e3e882266be5a9c479bb4cfc1fb92b9d694
Author: Cliff Jansen 
AuthorDate: Wed Apr 22 22:21:21 2020 -0700

PROTON-2130: swap include file ordering for PROTON-2195/2196, trim trailing 
whitespace
---
 c/src/proactor/epoll.c | 10 +++---
 1 file changed, 3 insertions(+), 7 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 2eb1ac2..baf1638 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -43,8 +43,8 @@
  already running.  The context will know if it needs to put itself back on the 
wake list
  to be runnable later to process the pending events.
 
- Lock ordering - never add locks right to left: 
-context -> sched -> wake  
+ Lock ordering - never add locks right to left:
+context -> sched -> wake
 non-proactor-context -> proactor-context
 tslot -> sched
  */
@@ -57,8 +57,8 @@
 /* Avoid GNU extensions, in particular the incompatible alternative 
strerror_r() */
 #undef _GNU_SOURCE
 
-#include "epoll-internal.h"
 #include "proactor-internal.h"
+#include "epoll-internal.h"
 #include "core/engine-internal.h"
 #include "core/logger_private.h"
 #include "core/util.h"
@@ -700,9 +700,6 @@ static void proactor_add(pcontext_t *ctx);
 static bool proactor_remove(pcontext_t *ctx);
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
 
-// Type safe version of containerof
-#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type 
*)0)->member) - offsetof(type, member)))
-
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
   return ps->epoll_io.type == PCONNECTION_IO ? containerof(ps, pconnection_t, 
psocket) : NULL;
 }
@@ -2815,7 +2812,6 @@ pn_event_batch_t *pn_proactor_get(struct pn_proactor_t* 
p) {
 
 // Call with no locks
 static inline void check_earmark_override(pn_proactor_t *p, tslot_t *ts) {
-  
   if (!ts || !ts->earmark_override)
 return;
   if (ts->earmark_override->generation == ts->earmark_override_gen) {


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] branch master updated (b569a4f -> 72635e3)

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git.


from b569a4f  PROTON-2196: Small proactor tidy ups
 new 667f78c  PROTON-2130: epoll proactor changed to use serialized calls 
to epoll_wait for multiple events
 new b3d1004  PROTON-2130: epoll proactor: fix unwritten output bytes, pick 
up PROTON-2030 and PROTON-2131
 new 664e436  PROTON-2130: epoll proactor io bytes accounting fix for 
shutdown and error
 new 7b000f1  PROTON-2130: epoll proactor race/deadlock fixes
 new b860538  PROTON-2130: Split out structs from epoll.c to make it easier 
to mess with them
 new 9e1990b  PROTON-2130: epoll reworking: - Only keep fd in 
epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and 
consistently use structure embedding to go from:   psocket->pconnection;   
psocket->pn_listener;   psocket->acceptor;   pcontext->pconnection;   
pcontext->pn_listener;   pn_event_batch->pn_proactor;   
pn_batch_event->pn_listener;   pn_batch_event->pconnection; - Move address 
string from being stored in psocket to being stored in pconnection and   pn_ 
[...]
 new ce6d021  PROTON-2130: Substantially reduce memory use for proactor 
connections
 new 4281367  PROTON-2130: more epoll reworking: - Rework queued accepts so 
that we do multiple at once - This should allow app to accept new connections a 
little   more efficiently. - We limit the number of accepted connections to the 
specified backlog - If the app doesn't accept all the connections in a single 
batch   we don't rearm the listener until they do, as a form of accept flow 
control.
 new 72635e3  PROTON-2130: swap include file ordering for PROTON-2195/2196, 
trim trailing whitespace

The 9 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 c/src/proactor/epoll-internal.h |  286 ++
 c/src/proactor/epoll.c  | 1838 +++
 c/tests/proactor_test.cpp   |2 +-
 3 files changed, 1554 insertions(+), 572 deletions(-)
 create mode 100644 c/src/proactor/epoll-internal.h


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] 01/09: PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait for multiple events

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 667f78c6d2ac17256ea67da54ff5703d424328f1
Author: Cliff Jansen 
AuthorDate: Thu Nov 7 00:55:50 2019 -0800

PROTON-2130: epoll proactor changed to use serialized calls to epoll_wait 
for multiple events
---
 c/src/proactor/epoll.c| 1627 ++---
 c/tests/proactor_test.cpp |2 +-
 2 files changed, 1258 insertions(+), 371 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 5cc3b65..d0664d9 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -19,6 +19,37 @@
  *
  */
 
+/*
+ The epoll proactor works with multiple concurrent threads.  If there is no 
work to do,
+ one thread temporarily becomes the poller thread, while other inbound threads 
suspend
+ waiting for work.  The poller calls epoll_wait(), generates work lists, 
resumes suspended
+ threads, and grabs work for itself or polls again.
+
+ A serialized grouping of Proton events is a context (connection, listener, 
proactor).
+ Each has multiple pollable fds that make it schedulable.  E.g. a connection 
could have a
+ socket fd, timerfd, and (indirect) eventfd all signaled in a single 
epoll_wait().
+
+ At the conclusion of each
+  N = epoll_wait(..., N_MAX, timeout)
+
+ there will be N epoll events and M wakes on the wake list.  M can be very 
large in a
+ server with many active connections. The poller makes the contexts "runnable" 
if they are
+ not already running.  A running context can only be made runnable once until 
it completes
+ a chunk of work and calls unassign_thread().  (N + M - duplicates) contexts 
will be
+ scheduled.  A new poller will occur when next_runnable() returns NULL.
+
+ A running context, before it stops "working" must check to see if there were 
new incoming
+ events that the poller posted to the context, but could not make it runnable 
since it was
+ already running.  The context will know if it needs to put itself back on the 
wake list
+ to be runnable later to process the pending events.
+
+ Lock ordering - never add locks right to left: 
+context -> sched -> wake  
+non-proactor-context -> proactor-context
+tslot -> sched
+ */
+
+
 /* Enable POSIX features beyond c99 for modern pthread and standard 
strerror_r() */
 #ifndef _POSIX_C_SOURCE
 #define _POSIX_C_SOURCE 200809L
@@ -26,8 +57,9 @@
 /* Avoid GNU extensions, in particular the incompatible alternative 
strerror_r() */
 #undef _GNU_SOURCE
 
-#include "core/logger_private.h"
 #include "proactor-internal.h"
+#include "core/logger_private.h"
+#include "core/util.h"
 
 #include 
 #include 
@@ -92,9 +124,7 @@ static void pstrerror(int err, strerrorbuf msg) {
 // 
 
 // In general all locks to be held singly and shortly (possibly as spin locks).
-// Exception: psockets+proactor for pn_proactor_disconnect (convention: acquire
-// psocket first to avoid deadlock).  TODO: revisit the exception and its
-// awkwardness in the code (additional mutex? different type?).
+// See above about lock ordering.
 
 typedef pthread_mutex_t pmutex;
 static void pmutex_init(pthread_mutex_t *pm){
@@ -113,14 +143,13 @@ static inline void lock(pmutex *m) { 
pthread_mutex_lock(m); }
 static inline void unlock(pmutex *m) { pthread_mutex_unlock(m); }
 
 typedef struct acceptor_t acceptor_t;
+typedef struct tslot_t tslot_t;
 
 typedef enum {
   WAKE,   /* see if any work to do in proactor/psocket context */
   PCONNECTION_IO,
-  PCONNECTION_IO_2,
   PCONNECTION_TIMER,
   LISTENER_IO,
-  CHAINED_EPOLL,
   PROACTOR_TIMER } epoll_type_t;
 
 // Data to use with epoll.
@@ -161,6 +190,8 @@ static void memory_barrier(epoll_extended_t *ee) {
  * The original implementation with counters to track expiry counts
  * was abandoned in favor of "in doubt" transitions and resolution
  * at shutdown.
+ *
+ * TODO: review above in light of single poller thread.
  */
 
 typedef struct ptimer_t {
@@ -262,6 +293,14 @@ static void ptimer_finalize(ptimer_t *pt) {
   pmutex_finalize(>mutex);
 }
 
+pn_timestamp_t pn_i_now2(void)
+{
+  struct timespec now;
+  clock_gettime(CLOCK_REALTIME, );
+  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 100);
+}
+
+
 // 
 // Proactor common code
 // 
@@ -330,30 +369,6 @@ static void stop_polling(epoll_extended_t *ee, int 
epollfd) {
  * eventfd to allow a lock-free pn_proactor_interrupt() implementation.
  */
 
-/*
- *  epollfd and epollfd_2 
- *
- * This implementation allows multiple threads to call epoll_wait()
- * concurrently (as opposed to having a single thread call
- * epoll_wait() and feed work to helper threads).  Unfortunately
- * with this 

[qpid-proton] 02/09: PROTON-2130: epoll proactor: fix unwritten output bytes, pick up PROTON-2030 and PROTON-2131

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b3d1004800b569f69ebea58366021f7ddcd64692
Author: Cliff Jansen 
AuthorDate: Mon Dec 2 09:27:22 2019 -0800

PROTON-2130: epoll proactor: fix unwritten output bytes, pick up 
PROTON-2030 and PROTON-2131
---
 c/src/proactor/epoll.c | 130 ++---
 1 file changed, 59 insertions(+), 71 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index d0664d9..1283d91 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -58,6 +58,7 @@
 #undef _GNU_SOURCE
 
 #include "proactor-internal.h"
+#include "core/engine-internal.h"
 #include "core/logger_private.h"
 #include "core/util.h"
 
@@ -293,13 +294,6 @@ static void ptimer_finalize(ptimer_t *pt) {
   pmutex_finalize(>mutex);
 }
 
-pn_timestamp_t pn_i_now2(void)
-{
-  struct timespec now;
-  clock_gettime(CLOCK_REALTIME, );
-  return ((pn_timestamp_t)now.tv_sec) * 1000 + (now.tv_nsec / 100);
-}
-
 
 // 
 // Proactor common code
@@ -542,7 +536,7 @@ typedef struct pconnection_t {
   int hog_count; // thread hogging limiter
   pn_event_batch_t batch;
   pn_connection_driver_t driver;
-  bool wbuf_valid;
+  bool output_drained;
   const char *wbuf_current;
   size_t wbuf_remaining;
   size_t wbuf_completed;
@@ -555,7 +549,7 @@ typedef struct pconnection_t {
 } pconnection_t;
 
 /*
- * A listener can have mutiple sockets (as specified in the addrinfo).  They
+ * A listener can have multiple sockets (as specified in the addrinfo).  They
  * are armed separately.  The individual psockets can be part of at most one
  * list: the global proactor overflow retry list or the per-listener list of
  * pending accepts (valid inbound socket obtained, but pn_listener_accept not
@@ -1128,7 +1122,7 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   pc->read_blocked = true;
   pc->write_blocked = true;
   pc->disconnected = false;
-  pc->wbuf_valid = false;
+  pc->output_drained = false;
   pc->wbuf_completed = 0;
   pc->wbuf_remaining = 0;
   pc->wbuf_current = NULL;
@@ -1195,26 +1189,19 @@ static void pconnection_cleanup(pconnection_t *pc) {
   // else proactor_disconnect logic owns psocket and its final free
 }
 
-static void invalidate_wbuf(pconnection_t *pc) {
-  if (pc->wbuf_valid) {
-if (pc->wbuf_completed)
-  pn_connection_driver_write_done(>driver, pc->wbuf_completed);
-pc->wbuf_completed = 0;
-pc->wbuf_remaining = 0;
-pc->wbuf_valid = false;
-  }
+static void set_wbuf(pconnection_t *pc, const char *start, size_t sz) {
+  pc->wbuf_completed = 0;
+  pc->wbuf_current = start;
+  pc->wbuf_remaining = sz;
 }
 
 // Never call with any locks held.
 static void ensure_wbuf(pconnection_t *pc) {
-  if (!pc->wbuf_valid) {
-// next connection_driver call is the expensive output generator
-pn_bytes_t wbuf = pn_connection_driver_write_buffer(>driver);
-pc->wbuf_completed = 0;
-pc->wbuf_remaining = wbuf.size;
-pc->wbuf_current = wbuf.start;
-pc->wbuf_valid = true;
-  }
+  // next connection_driver call is the expensive output generator
+  pn_bytes_t bytes = pn_connection_driver_write_buffer(>driver);
+  set_wbuf(pc, bytes.start, bytes.size);
+  if (bytes.size == 0)
+pc->output_drained = true;
 }
 
 // Call with lock held or from forced_shutdown
@@ -1260,9 +1247,8 @@ static pn_event_t 
*pconnection_batch_next(pn_event_batch_t *batch) {
 lock(>sched_mutex);
 idle_threads = (p->suspend_list_head != NULL);
 unlock(>sched_mutex);
-if (idle_threads) {
+if (idle_threads && !pc->write_blocked && !pc->read_blocked) {
   write_flush(pc);  // May generate transport event
-  pc->read_blocked = pc->write_blocked = false;
   pconnection_process(pc, 0, false, false, true);
   e = pn_connection_driver_next_event(>driver);
 }
@@ -1275,7 +1261,7 @@ static pn_event_t 
*pconnection_batch_next(pn_event_batch_t *batch) {
   }
 }
   }
-  if (e) invalidate_wbuf(pc);
+  if (e) pc->output_drained = false;
 
   return e;
 }
@@ -1299,7 +1285,6 @@ static inline bool pconnection_wclosed(pconnection_t  
*pc) {
close/shutdown.  Let read()/write() return 0 or -1 to trigger cleanup logic.
 */
 static bool pconnection_rearm_check(pconnection_t *pc) {
-  assert(pc->wbuf_valid);
   if (pconnection_rclosed(pc) && pconnection_wclosed(pc)) {
 return false;
   }
@@ -1352,7 +1337,6 @@ static bool pconnection_sched_sync(pconnection_t *pc) {
 
 /* Call with context lock and having done a write_flush() to "know" the value 
of wbuf_remaining */
 static inline bool pconnection_work_pending(pconnection_t *pc) {
-  assert(pc->wbuf_valid);
   if (pc->new_events || pc->wake_count || pc->tick_pending || 
pc->queued_disconnect)
 return true;
   if (!pc->read_blocked 

[qpid-proton] 07/09: PROTON-2130: Substantially reduce memory use for proactor connections

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit ce6d0219b4fcc839db0e53b555ec378a391eff3d
Author: Andrew Stitcher 
AuthorDate: Tue Apr 7 14:12:42 2020 -0400

PROTON-2130: Substantially reduce memory use for proactor connections
---
 c/src/proactor/epoll-internal.h |  2 +-
 c/src/proactor/epoll.c  | 13 +++--
 2 files changed, 8 insertions(+), 7 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index e639e48..3cdeef3 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -211,7 +211,6 @@ typedef struct pconnection_t {
   psocket_t psocket;
   pcontext_t context;
   ptimer_t timer;  // TODO: review one timerfd per connection
-  char addr_buf[PN_MAX_ADDR];
   const char *host, *port;
   uint32_t new_events;
   int wake_count;
@@ -239,6 +238,7 @@ typedef struct pconnection_t {
   pmutex rearm_mutex;/* protects pconnection_rearm from out of 
order arming*/
   bool io_doublecheck;   /* callbacks made and new IO may have 
arrived */
   bool sched_timeout;
+  char addr_buf[1];
 } pconnection_t;
 
 /*
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index b891133..a173b9b 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -866,7 +866,7 @@ static int pclosefd(pn_proactor_t *p, int fd) {
 
 static void pconnection_tick(pconnection_t *pc);
 
-static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *t, bool server, const char *addr)
+static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *t, bool server, const char *addr, size_t 
addrlen)
 {
   memset(pc, 0, sizeof(*pc));
 
@@ -877,7 +877,7 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
 
   pcontext_init(>context, PCONNECTION, p, pc);
   psocket_init(>psocket, p, NULL);
-  pni_parse_addr(addr, pc->addr_buf, sizeof(pc->addr_buf), >host, 
>port);
+  pni_parse_addr(addr, pc->addr_buf, addrlen+1, >host, >port);
   pc->new_events = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
@@ -1531,9 +1531,10 @@ static bool wake_if_inactive(pn_proactor_t *p) {
 }
 
 void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t 
*t, const char *addr) {
-  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
+  size_t addrlen = strlen(addr);
+  pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen);
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, p, c, t, false, addr);
+  const char *err = pconnection_setup(pc, p, c, t, false, addr, addrlen);
   if (err) {/* TODO aconway 2017-09-13: errors must be reported as events 
*/
 PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_proactor_connect 
failure: %s", err);
 return;
@@ -1977,9 +1978,9 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) {
 }
 
 void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t 
*t) {
-  pconnection_t *pc = (pconnection_t*) calloc(1, sizeof(pconnection_t));
+  pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t));
   assert(pc); // TODO: memory safety
-  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, 
"");
+  const char *err = pconnection_setup(pc, pn_listener_proactor(l), c, t, true, 
"", 0);
   if (err) {
 PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_ERROR, "pn_listener_accept 
failure: %s", err);
 return;


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] 08/09: PROTON-2130: more epoll reworking: - Rework queued accepts so that we do multiple at once - This should allow app to accept new connections a little more efficiently. - We limit t

2020-04-22 Thread cliffjansen
This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4281367e77b403b5b18cc82a038ef3a6d4f92238
Author: Andrew Stitcher 
AuthorDate: Fri Mar 27 18:31:32 2020 -0400

PROTON-2130: more epoll reworking:
- Rework queued accepts so that we do multiple at once
- This should allow app to accept new connections a little
  more efficiently.
- We limit the number of accepted connections to the specified backlog
- If the app doesn't accept all the connections in a single batch
  we don't rearm the listener until they do, as a form of accept flow 
control.
---
 c/src/proactor/epoll-internal.h |  17 +++--
 c/src/proactor/epoll.c  | 147 
 2 files changed, 96 insertions(+), 68 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 3cdeef3..3dfe956 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -40,9 +40,6 @@
 extern "C" {
 #endif
 
-//typedef struct pn_proactor_t pn_proactor_t;
-//typedef struct pn_listener_t pn_listener_t;
-//typedef struct pn_connection_driver_t pn_connection_driver_t;
 typedef struct acceptor_t acceptor_t;
 typedef struct tslot_t tslot_t;
 typedef pthread_mutex_t pmutex;
@@ -82,7 +79,6 @@ typedef enum {
 typedef struct pcontext_t {
   pmutex mutex;
   pn_proactor_t *proactor;  /* Immutable */
-  void *owner;  /* Instance governed by the context */
   pcontext_type_t type;
   bool working;
   bool on_wake_list;
@@ -255,11 +251,14 @@ struct acceptor_t{
   struct pn_netaddr_t addr;  /* listening address */
   pn_listener_t *listener;
   acceptor_t *next;  /* next listener list member */
-  int accepted_fd;
   bool armed;
   bool overflowed;
 };
 
+typedef struct accepted_t{
+  int accepted_fd;
+} accepted_t;
+
 struct pn_listener_t {
   pcontext_t context;
   acceptor_t *acceptors;  /* Array of listening sockets */
@@ -272,10 +271,10 @@ struct pn_listener_t {
   pn_event_batch_t batch;
   pn_record_t *attachments;
   void *listener_context;
-  acceptor_t *pending_acceptors;  /* list of those with a valid inbound fd*/
-  int pending_count;
-  bool unclaimed; /* attach event dispatched but no 
pn_listener_attach() call yet */
-  size_t backlog;
+  accepted_t *pending_accepteds;  /* array of accepted connections */
+  size_t pending_first;  /* index of first pending connection */
+  size_t pending_count;  /* number of pending accepted connections 
*/
+  size_t backlog; /* size of pending accepted array */
   bool close_dispatched;
   pmutex rearm_mutex; /* orders rearms/disarms, nothing else */
   uint32_t sched_io_events;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index a173b9b..2eb1ac2 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -337,11 +337,10 @@ static void stop_polling(epoll_extended_t *ee, int 
epollfd) {
 // Fake thread for temporarily disabling the scheduling of a context.
 static struct tslot_t *REWAKE_PLACEHOLDER = (struct tslot_t*) -1;
 
-static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t 
*p, void *o) {
+static void pcontext_init(pcontext_t *ctx, pcontext_type_t t, pn_proactor_t 
*p) {
   memset(ctx, 0, sizeof(*ctx));
   pmutex_init(>mutex);
   ctx->proactor = p;
-  ctx->owner = o;
   ctx->type = t;
 }
 
@@ -659,10 +658,10 @@ static void make_runnable(pcontext_t *ctx) {
 
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t 
*listener)
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, epoll_type_t type)
 {
   ps->epoll_io.fd = -1;
-  ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO;
+  ps->epoll_io.type = type;
   ps->epoll_io.wanted = 0;
   ps->epoll_io.polling = false;
   ps->proactor = p;
@@ -748,7 +747,7 @@ static inline bool pconnection_has_event(pconnection_t *pc) 
{
 }
 
 static inline bool listener_has_event(pn_listener_t *l) {
-  return pn_collector_peek(l->collector) || (l->pending_count && 
!l->unclaimed);
+  return pn_collector_peek(l->collector) || (l->pending_count);
 }
 
 static inline bool proactor_has_event(pn_proactor_t *p) {
@@ -791,7 +790,21 @@ static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
 EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
-static void listener_list_append(acceptor_t **start, acceptor_t *item) {
+static void listener_accepted_append(pn_listener_t *listener, accepted_t item) 
{
+  if (listener->pending_first+listener->pending_count >= listener->backlog) 
return;
+
+  listener->pending_accepteds[listener->pending_first+listener->pending_count] 
= item;
+  listener->pending_count++;
+}
+
+static accepted_t *listener_accepted_next(pn_listener_t *listener) {
+  if (!listener->pending_count) return NULL;
+
+  listener->pending_count--;
+  

[qpid-proton] branch master updated: PROTON-2196: Small proactor tidy ups

2020-04-22 Thread astitcher
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
 new b569a4f  PROTON-2196: Small proactor tidy ups
b569a4f is described below

commit b569a4f9db094558f29f05a8825385583bbb8902
Author: Andrew Stitcher 
AuthorDate: Wed Apr 22 22:25:29 2020 -0400

PROTON-2196: Small proactor tidy ups
---
 c/src/proactor/epoll.c | 14 ++
 c/src/proactor/libuv.c |  8 
 c/src/proactor/proactor-internal.h |  3 +++
 c/src/proactor/win_iocp.c  | 14 +++---
 4 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 9008489..5cc3b65 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -571,7 +571,7 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   pn_connection_driver_t *d = *pn_connection_driver_ptr(c);
   unlock(_ptr_mutex);
   if (!d) return NULL;
-  return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver));
+  return containerof(d, pconnection_t, driver);
 }
 
 static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
@@ -635,13 +635,11 @@ static inline acceptor_t *psocket_acceptor(psocket_t* ps) 
{
 }
 
 static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
-  return c->type == PCONNECTION ?
-(pconnection_t*)((char*)c - offsetof(pconnection_t, context)) : NULL;
+  return c->type == PCONNECTION ? containerof(c, pconnection_t, context) : 
NULL;
 }
 
 static inline pn_listener_t *pcontext_listener(pcontext_t *c) {
-  return c->type == LISTENER ?
-(pn_listener_t*)((char*)c - offsetof(pn_listener_t, context)) : NULL;
+  return c->type == LISTENER ? containerof(c, pn_listener_t, context) : NULL;
 }
 
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch);
@@ -650,17 +648,17 @@ static pn_event_t 
*pconnection_batch_next(pn_event_batch_t *batch);
 
 static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
   return (batch->next_event == proactor_batch_next) ?
-(pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+containerof(batch, pn_proactor_t, batch) : NULL;
 }
 
 static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
   return (batch->next_event == listener_batch_next) ?
-(pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+containerof(batch, pn_listener_t, batch) : NULL;
 }
 
 static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
   return (batch->next_event == pconnection_batch_next) ?
-(pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+containerof(batch, pconnection_t, batch) : NULL;
 }
 
 static inline bool pconnection_has_event(pconnection_t *pc) {
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index ef2cd7c..4779aba 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -316,7 +316,7 @@ static pconnection_t *get_pconnection(pn_connection_t* c) {
   pn_connection_driver_t *d = *pn_connection_driver_ptr(c);
   uv_mutex_unlock(_ptr_mutex);
   if (!d) return NULL;
-  return (pconnection_t*)((char*)d-offsetof(pconnection_t, driver));
+  return containerof(d, pconnection_t, driver);
 }
 
 static void set_pconnection(pn_connection_t* c, pconnection_t *pc) {
@@ -360,17 +360,17 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t 
*batch);
 
 static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) {
   return (batch->next_event == proactor_batch_next) ?
-(pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL;
+containerof(batch, pn_proactor_t, batch) : NULL;
 }
 
 static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) {
   return (batch->next_event == listener_batch_next) ?
-(pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL;
+containerof(batch, pn_listener_t, batch) : NULL;
 }
 
 static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) {
   return (batch->next_event == pconnection_batch_next) ?
-(pconnection_t*)((char*)batch - offsetof(pconnection_t, batch)) : NULL;
+containerof(batch, pconnection_t, batch) : NULL;
 }
 
 static inline work_t *batch_work(pn_event_batch_t *batch) {
diff --git a/c/src/proactor/proactor-internal.h 
b/c/src/proactor/proactor-internal.h
index afa615a..d368a0c 100644
--- a/c/src/proactor/proactor-internal.h
+++ b/c/src/proactor/proactor-internal.h
@@ -27,6 +27,9 @@
 
 #include "core/logger_private.h"
 
+// Type safe version of containerof used to find parent structs from contained 
structs
+#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type 
*)0)->member) - offsetof(type, member)))
+
 #ifdef __cplusplus
 extern "C" {
 #endif
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
index 0c738f4..ae5a369 

[qpid-proton] branch master updated: PROTON-2196: Small proactor tidy ups

2020-04-22 Thread astitcher
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
 new c856de2  PROTON-2196: Small proactor tidy ups
c856de2 is described below

commit c856de20b15e53bf7a2b351375bb4100f962
Author: Andrew Stitcher 
AuthorDate: Wed Apr 22 22:25:29 2020 -0400

PROTON-2196: Small proactor tidy ups
---
 c/src/proactor/epoll.c | 11 ++-
 c/src/proactor/libuv.c | 11 ++-
 c/src/proactor/proactor-internal.h |  9 +
 c/src/proactor/win_iocp.c  |  7 ---
 4 files changed, 13 insertions(+), 25 deletions(-)

diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index e68e7c8..9008489 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -675,13 +675,6 @@ static inline bool proactor_has_event(pn_proactor_t *p) {
   return pn_collector_peek(p->collector);
 }
 
-static pn_event_t *log_event(void* p, pn_event_t *e) {
-  if (e) {
-PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, 
pn_event_type_name(pn_event_type(e)));
-  }
-  return e;
-}
-
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* 
what) {
   if (!ps->listener) {
 pn_connection_driver_t *driver = _pconnection(ps)->driver;
@@ -1746,7 +1739,7 @@ static pn_event_t *listener_batch_next(pn_event_batch_t 
*batch) {
   if (e && pn_event_type(e) == PN_LISTENER_CLOSE)
 l->close_dispatched = true;
   unlock(>context.mutex);
-  return log_event(l, e);
+  return pni_log_event(l, e);
 }
 
 static void listener_done(pn_listener_t *l) {
@@ -1974,7 +1967,7 @@ static pn_event_t *proactor_batch_next(pn_event_batch_t 
*batch) {
   if (e && pn_event_type(e) == PN_PROACTOR_TIMEOUT)
 p->timeout_processed = true;
   unlock(>context.mutex);
-  return log_event(p, e);
+  return pni_log_event(p, e);
 }
 
 static pn_event_batch_t *proactor_process(pn_proactor_t *p, pn_event_type_t 
event) {
diff --git a/c/src/proactor/libuv.c b/c/src/proactor/libuv.c
index fb27cd7..ef2cd7c 100644
--- a/c/src/proactor/libuv.c
+++ b/c/src/proactor/libuv.c
@@ -823,25 +823,18 @@ static pn_event_batch_t *proactor_batch_lh(pn_proactor_t 
*p, pn_event_type_t t)
   return >batch;
 }
 
-static pn_event_t *log_event(void* p, pn_event_t *e) {
-  if (e) {
-PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, 
pn_event_type_name(pn_event_type(e)));
-  }
-  return e;
-}
-
 static pn_event_t *listener_batch_next(pn_event_batch_t *batch) {
   pn_listener_t *l = batch_listener(batch);
   uv_mutex_lock(>lock);
   pn_event_t *e = pn_collector_next(l->collector);
   uv_mutex_unlock(>lock);
-  return log_event(l, e);
+  return pni_log_event(l, e);
 }
 
 static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) {
   pn_proactor_t *p = batch_proactor(batch);
   assert(p->batch_working);
-  return log_event(p, pn_collector_next(p->collector));
+  return pni_log_event(p, pn_collector_next(p->collector));
 }
 
 static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
diff --git a/c/src/proactor/proactor-internal.h 
b/c/src/proactor/proactor-internal.h
index 88e61c0..afa615a 100644
--- a/c/src/proactor/proactor-internal.h
+++ b/c/src/proactor/proactor-internal.h
@@ -25,6 +25,8 @@
 #include 
 #include 
 
+#include "core/logger_private.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -60,6 +62,13 @@ struct pn_event_batch_t {
   pn_event_t *(*next_event)(pn_event_batch_t *batch);
 };
 
+static inline pn_event_t *pni_log_event(void* p, pn_event_t *e) {
+  if (e) {
+PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, 
pn_event_type_name(pn_event_type(e)));
+  }
+  return e;
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/src/proactor/win_iocp.c b/c/src/proactor/win_iocp.c
index 07a3a78..0c738f4 100644
--- a/c/src/proactor/win_iocp.c
+++ b/c/src/proactor/win_iocp.c
@@ -2017,13 +2017,6 @@ static inline bool proactor_has_event(pn_proactor_t *p) {
   return pn_collector_peek(p->collector);
 }
 
-static pn_event_t *log_event(void* p, pn_event_t *e) {
-  if (e) {
-PN_LOG_DEFAULT(PN_SUBSYSTEM_EVENT, PN_LEVEL_DEBUG, "[%p]:(%s)", (void*)p, 
pn_event_type_name(pn_event_type(e)));
-  }
-  return e;
-}
-
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* 
what) {
   if (ps->is_reaper)
 return;


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-proton] branch master updated: PROTON-2195: Finalise the proactor API around pn_event_batch_t - Move pn_event_batch_t entirely into libqpid-proton-proactor - Make the pn_event_batch_t implementa

2020-04-22 Thread astitcher
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
 new ae06b41  PROTON-2195: Finalise the proactor API around 
pn_event_batch_t - Move pn_event_batch_t entirely into libqpid-proton-proactor 
- Make the pn_event_batch_t implementation private - Removed 
pn_event_batch_connection_driver() API as it is not used at all   and can't be 
used for anything.
ae06b41 is described below

commit ae06b4180e924b392e1b816f97aada8591255b5a
Author: Andrew Stitcher 
AuthorDate: Wed Apr 22 13:34:51 2020 -0400

PROTON-2195: Finalise the proactor API around pn_event_batch_t
- Move pn_event_batch_t entirely into libqpid-proton-proactor
- Make the pn_event_batch_t implementation private
- Removed pn_event_batch_connection_driver() API as it is not used at all
  and can't be used for anything.
---
 c/include/proton/connection_driver.h |  7 ---
 c/include/proton/event.h | 32 
 c/include/proton/proactor.h  | 12 
 c/include/proton/types.h | 13 +
 c/src/core/connection_driver.c   | 18 ++
 c/src/core/event.c   |  4 
 c/src/proactor/libuv.c   | 15 ---
 c/src/proactor/proactor-internal.c   |  4 
 c/src/proactor/proactor-internal.h   |  8 
 9 files changed, 51 insertions(+), 62 deletions(-)

diff --git a/c/include/proton/connection_driver.h 
b/c/include/proton/connection_driver.h
index 06b80ee..aa703b5 100644
--- a/c/include/proton/connection_driver.h
+++ b/c/include/proton/connection_driver.h
@@ -91,7 +91,6 @@ typedef struct pn_connection_driver_t {
   pn_connection_t *connection;
   pn_transport_t *transport;
   pn_collector_t *collector;
-  pn_event_batch_t batch;
 } pn_connection_driver_t;
 
 /**
@@ -234,12 +233,6 @@ PN_EXTERN void 
pn_connection_driver_errorf(pn_connection_driver_t *d, const char
 PN_EXTERN void pn_connection_driver_verrorf(pn_connection_driver_t *d, const 
char *name, const char *fmt, va_list);
 
 /**
- * If batch is part of a connection_driver, return the connection_driver 
address,
- * else return NULL
- */
-PN_EXTERN pn_connection_driver_t* 
pn_event_batch_connection_driver(pn_event_batch_t *batch);
-
-/**
  * The write side of the transport is closed, it will no longer produce bytes 
to write to
  * external IO. Synonym for PN_TRANSPORT_HEAD_CLOSED
  */
diff --git a/c/include/proton/event.h b/c/include/proton/event.h
index 8d62d6b..8e5fba2 100644
--- a/c/include/proton/event.h
+++ b/c/include/proton/event.h
@@ -539,38 +539,6 @@ PN_EXTERN pn_record_t *pn_event_attachments(pn_event_t 
*event);
  */
 PN_EXTERN struct pn_condition_t *pn_event_condition(pn_event_t *event);
 
-/**
- * **Unsettled API** - A batch of events that must be handled in sequence.
- * Call pn_event_batch_next() in a loop until it returns NULL to extract
- * the events.
- */
-typedef struct pn_event_batch_t pn_event_batch_t;
-
-/* NOTE: there is deliberately no peek(), more() or other look-ahead on an 
event
- * batch. We want to know exactly which events have been handled, next() only
- * allows the user to get each event exactly once, in order.
- */
-
-/**
- * **Unsettled API** - Remove the next event from the batch and return
- *  it. NULL means the batch is empty. The returned event pointer is
- *  valid until pn_event_batch_next() is called again on the same
- *  batch.
- */
-PN_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
-
-/**
- * @cond INTERNAL
- *
- * pn_event_batch_next() can be re-implemented for different behaviors in 
different contexts.
- */
-struct pn_event_batch_t {
-  pn_event_t *(*next_event)(pn_event_batch_t *batch);
-};
-/**
- * @endcond
- */
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/c/include/proton/proactor.h b/c/include/proton/proactor.h
index 88544bc..b0303c9 100644
--- a/c/include/proton/proactor.h
+++ b/c/include/proton/proactor.h
@@ -206,6 +206,18 @@ PNP_EXTERN pn_event_batch_t 
*pn_proactor_wait(pn_proactor_t *proactor);
 PNP_EXTERN pn_event_batch_t *pn_proactor_get(pn_proactor_t *proactor);
 
 /**
+ * Remove the next event from the batch and return
+ * it. NULL means the batch is empty. The returned event pointer is
+ * valid until pn_event_batch_next() is called again on the same
+ * batch.
+ *
+ * @note there is deliberately no peek(), more() or other look-ahead on an 
event
+ * batch. We want to know exactly which events have been handled, next() only
+ * allows the user to get each event exactly once, in order.
+ */
+PNP_EXTERN pn_event_t *pn_event_batch_next(pn_event_batch_t *batch);
+
+/**
  * Call when finished handling a batch of events.
  *
  * Must be called exactly once to match each call to pn_proactor_wait().
diff --git a/c/include/proton/types.h b/c/include/proton/types.h
index 

[qpid-proton] branch master updated: Passing pointer value instead of address to LocalFree.

2020-04-22 Thread astitcher
This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/master by this push:
 new 06c680b  Passing pointer value instead of address to LocalFree.
06c680b is described below

commit 06c680b39fe3ef2dfe1215cb1221e9360d3261bf
Author: Attila Kun 
AuthorDate: Wed Apr 15 20:30:38 2020 +0100

Passing pointer value instead of address to LocalFree.
---
 c/src/ssl/schannel.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/c/src/ssl/schannel.c b/c/src/ssl/schannel.c
index 5938857..18073f8 100644
--- a/c/src/ssl/schannel.c
+++ b/c/src/ssl/schannel.c
@@ -2075,7 +2075,7 @@ static bool server_name_matches(const char *server_name, 
CERT_EXTENSION *alt_nam
 }
   }
 }
-LocalFree(_name_info);
+LocalFree(alt_name_info);
   }
 
   if (!matched) {


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org



[qpid-jms] branch master updated: QPIDJMS-499 Update to netty 4.1.49.Final

2020-04-22 Thread tabish
This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
 new f8e79b7  QPIDJMS-499 Update to netty 4.1.49.Final
f8e79b7 is described below

commit f8e79b71948b9f9ce201457ce3e95bf4afa8a9cf
Author: Timothy Bish 
AuthorDate: Wed Apr 22 09:29:30 2020 -0400

QPIDJMS-499 Update to netty 4.1.49.Final
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index adf3927..98546f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
   
 
 0.33.4
-4.1.48.Final
+4.1.49.Final
 1.7.30
 1.0-alpha-2
 


-
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org