This is an automated email from the ASF dual-hosted git repository.

cliffjansen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 9e1990b1345f9917648659399f198e74fba7d5d7
Author: Andrew Stitcher <astitc...@apache.org>
AuthorDate: Thu Mar 19 23:35:36 2020 -0400

    PROTON-2130: epoll reworking:
    - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t
    - Remove backpointers and consistently use structure embedding to go from:
      psocket->pconnection;
      psocket->pn_listener;
      psocket->acceptor;
      pcontext->pconnection;
      pcontext->pn_listener;
      pn_event_batch->pn_proactor;
      pn_batch_event->pn_listener;
      pn_batch_event->pconnection;
    - Move address string from being stored in psocket to being stored in 
pconnection and
      pn_listener - saves strings for multiple listening sockets
    - Rationalise post_event by switching on event types instead of the 
previous ad hoc
      event type detection.
---
 c/src/proactor/epoll-internal.h |  37 +++++----
 c/src/proactor/epoll.c          | 167 +++++++++++++++++++++-------------------
 2 files changed, 107 insertions(+), 97 deletions(-)

diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h
index 6a13e7f..e639e48 100644
--- a/c/src/proactor/epoll-internal.h
+++ b/c/src/proactor/epoll-internal.h
@@ -57,7 +57,6 @@ typedef enum {
 
 // Data to use with epoll.
 typedef struct epoll_extended_t {
-  struct psocket_t *psocket;  // pconnection, listener, or NULL -> proactor
   int fd;
   epoll_type_t type;   // io/timer/wakeup
   uint32_t wanted;     // events to poll for
@@ -67,7 +66,6 @@ typedef struct epoll_extended_t {
 
 typedef struct ptimer_t {
   pmutex mutex;
-  int timerfd;
   epoll_extended_t epoll_io;
   bool timer_active;
   bool in_doubt;  // 0 or 1 callbacks are possible
@@ -131,19 +129,6 @@ struct tslot_t {
   unsigned int earmark_override_gen;
 };
 
-/* common to connection and listener */
-typedef struct psocket_t {
-  pn_proactor_t *proactor;
-  // Remaining protected by the pconnection/listener mutex
-  int sockfd;
-  epoll_extended_t epoll_io;
-  pn_listener_t *listener;      /* NULL for a connection socket */
-  char addr_buf[PN_MAX_ADDR];
-  const char *host, *port;
-  uint32_t sched_io_events;
-  uint32_t working_io_events;
-} psocket_t;
-
 struct pn_proactor_t {
   pcontext_t context;
   ptimer_t timer;
@@ -213,9 +198,21 @@ struct pn_proactor_t {
   bool shutting_down;
 };
 
+/* common to connection and listener */
+typedef struct psocket_t {
+  pn_proactor_t *proactor;
+  // Remaining protected by the pconnection/listener mutex
+  epoll_extended_t epoll_io;
+  uint32_t sched_io_events;
+  uint32_t working_io_events;
+} psocket_t;
+
 typedef struct pconnection_t {
   psocket_t psocket;
   pcontext_t context;
+  ptimer_t timer;  // TODO: review one timerfd per connection
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
   uint32_t new_events;
   int wake_count;
   bool server;                /* accept, not connect */
@@ -223,7 +220,6 @@ typedef struct pconnection_t {
   bool timer_armed;
   bool queued_disconnect;     /* deferred from pn_proactor_disconnect() */
   pn_condition_t *disconnect_condition;
-  ptimer_t timer;  // TODO: review one timerfd per connection
   // Following values only changed by (sole) working context:
   uint32_t current_arm;  // active epoll io events
   bool connected;
@@ -256,18 +252,21 @@ typedef struct pconnection_t {
 
 struct acceptor_t{
   psocket_t psocket;
+  struct pn_netaddr_t addr;      /* listening address */
+  pn_listener_t *listener;
+  acceptor_t *next;              /* next listener list member */
   int accepted_fd;
   bool armed;
   bool overflowed;
-  acceptor_t *next;              /* next listener list member */
-  struct pn_netaddr_t addr;      /* listening address */
 };
 
 struct pn_listener_t {
+  pcontext_t context;
   acceptor_t *acceptors;          /* Array of listening sockets */
   size_t acceptors_size;
+  char addr_buf[PN_MAX_ADDR];
+  const char *host, *port;
   int active_count;               /* Number of listener sockets registered 
with epoll */
-  pcontext_t context;
   pn_condition_t *condition;
   pn_collector_t *collector;
   pn_event_batch_t batch;
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index f8ebbf4..b891133 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -177,18 +177,16 @@ static void memory_barrier(epoll_extended_t *ee) {
  */
 
 static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) {
-  pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
   pmutex_init(&pt->mutex);
   pt->timer_active = false;
   pt->in_doubt = false;
   pt->shutting_down = false;
   epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER;
-  pt->epoll_io.psocket = ps;
-  pt->epoll_io.fd = pt->timerfd;
+  pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
   pt->epoll_io.type = type;
   pt->epoll_io.wanted = EPOLLIN;
   pt->epoll_io.polling = false;
-  return (pt->timerfd >= 0);
+  return (pt->epoll_io.fd >= 0);
 }
 
 // Call with ptimer lock held
@@ -198,7 +196,7 @@ static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) {
   newt.it_value.tv_sec = t_millis / 1000;
   newt.it_value.tv_nsec = (t_millis % 1000) * 1000000;
 
-  timerfd_settime(pt->timerfd, 0, &newt, &oldt);
+  timerfd_settime(pt->epoll_io.fd, 0, &newt, &oldt);
   if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec 
== 0) {
     // EPOLLIN is possible but not assured
     pt->in_doubt = true;
@@ -231,11 +229,11 @@ static uint64_t read_uint64(int fd) {
 static bool ptimer_callback(ptimer_t *pt) {
   lock(&pt->mutex);
   struct itimerspec current;
-  if (timerfd_gettime(pt->timerfd, &current) == 0) {
+  if (timerfd_gettime(pt->epoll_io.fd, &current) == 0) {
     if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0)
       pt->timer_active = false;
   }
-  uint64_t u_exp_count = read_uint64(pt->timerfd);
+  uint64_t u_exp_count = read_uint64(pt->epoll_io.fd);
   if (!pt->timer_active) {
     // Expiry counter just cleared, timer not set, timerfd not armed
     pt->in_doubt = false;
@@ -262,7 +260,7 @@ static bool ptimer_shutdown(ptimer_t *pt, bool 
currently_armed) {
 }
 
 static void ptimer_finalize(ptimer_t *pt) {
-  if (pt->timerfd >= 0) close(pt->timerfd);
+  if (pt->epoll_io.fd >= 0) close(pt->epoll_io.fd);
   pmutex_finalize(&pt->mutex);
 }
 
@@ -661,17 +659,13 @@ static void make_runnable(pcontext_t *ctx) {
 
 
 
-static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t 
*listener, const char *addr)
+static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t 
*listener)
 {
-  ps->epoll_io.psocket = ps;
   ps->epoll_io.fd = -1;
   ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO;
   ps->epoll_io.wanted = 0;
   ps->epoll_io.polling = false;
   ps->proactor = p;
-  ps->listener = listener;
-  ps->sockfd = -1;
-  pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, 
&ps->port);
 }
 
 
@@ -707,17 +701,19 @@ static void proactor_add(pcontext_t *ctx);
 static bool proactor_remove(pcontext_t *ctx);
 static void poller_done(struct pn_proactor_t* p, tslot_t *ts);
 
+// Type safe version of containerof
+#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type 
*)0)->member) - offsetof(type, member)))
 
 static inline pconnection_t *psocket_pconnection(psocket_t* ps) {
-  return ps->listener ? NULL : (pconnection_t*)ps;
+  return ps->epoll_io.type == PCONNECTION_IO ? containerof(ps, pconnection_t, 
psocket) : NULL;
 }
 
 static inline pn_listener_t *psocket_listener(psocket_t* ps) {
-  return ps->listener;
+  return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, 
psocket)->listener : NULL;
 }
 
 static inline acceptor_t *psocket_acceptor(psocket_t* ps) {
-  return !ps->listener ? NULL : (acceptor_t *)ps;
+  return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, 
psocket) : NULL;
 }
 
 static inline pconnection_t *pcontext_pconnection(pcontext_t *c) {
@@ -760,15 +756,19 @@ static inline bool proactor_has_event(pn_proactor_t *p) {
 }
 
 static void psocket_error_str(psocket_t *ps, const char *msg, const char* 
what) {
-  if (!ps->listener) {
-    pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver;
+  pconnection_t *pc = psocket_pconnection(ps);
+  if (pc) {
+    pn_connection_driver_t *driver = &pc->driver;
     pn_connection_driver_bind(driver); /* Bind so errors will be reported */
-    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, 
ps->host, ps->port, msg);
+    pni_proactor_set_cond(pn_transport_condition(driver->transport), what, 
pc->host, pc->port, msg);
     pn_connection_driver_close(driver);
-  } else {
-    pn_listener_t *l = psocket_listener(ps);
-    pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg);
+    return;
+  }
+  pn_listener_t *l = psocket_listener(ps);
+  if (l) {
+    pni_proactor_set_cond(l->condition, what, l->host, l->port, msg);
     listener_begin_close(l);
+    return;
   }
 }
 
@@ -830,7 +830,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) {
   unlock(&p->overflow_mutex);
   acceptor_t *a = listener_list_next(&ovflw);
   while (a) {
-    pn_listener_t *l = a->psocket.listener;
+    pn_listener_t *l = a->listener;
     lock(&l->context.mutex);
     bool rearming = !l->context.closing;
     bool notify = false;
@@ -876,7 +876,8 @@ static const char *pconnection_setup(pconnection_t *pc, 
pn_proactor_t *p, pn_con
   }
 
   pcontext_init(&pc->context, PCONNECTION, p, pc);
-  psocket_init(&pc->psocket, p, NULL, addr);
+  psocket_init(&pc->psocket, p, NULL);
+  pni_parse_addr(addr, pc->addr_buf, sizeof(pc->addr_buf), &pc->host, 
&pc->port);
   pc->new_events = 0;
   pc->wake_count = 0;
   pc->tick_pending = false;
@@ -942,9 +943,10 @@ static void pconnection_final_free(pconnection_t *pc) {
 // call without lock
 static void pconnection_cleanup(pconnection_t *pc) {
   assert(pconnection_is_final(pc));
+  int fd = pc->psocket.epoll_io.fd;
   stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd);
-  if (pc->psocket.sockfd != -1)
-    pclosefd(pc->psocket.proactor, pc->psocket.sockfd);
+  if (fd != -1)
+    pclosefd(pc->psocket.proactor, fd);
   stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd);
   ptimer_finalize(&pc->timer);
 
@@ -977,7 +979,7 @@ static void pconnection_begin_close(pconnection_t *pc) {
     pc->context.closing = true;
     if (pc->current_arm) {
       // Force EPOLLHUP callback(s)
-      shutdown(pc->psocket.sockfd, SHUT_RDWR);
+      shutdown(pc->psocket.epoll_io.fd, SHUT_RDWR);
     }
 
     pn_connection_driver_close(&pc->driver);
@@ -1162,7 +1164,7 @@ static void pconnection_done(pconnection_t *pc) {
 // Return true unless error
 static bool pconnection_write(pconnection_t *pc) {
   size_t wbuf_size = pc->wbuf_remaining;
-  ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, 
MSG_NOSIGNAL);
+  ssize_t n = send(pc->psocket.epoll_io.fd, pc->wbuf_current, wbuf_size, 
MSG_NOSIGNAL);
   if (n > 0) {
     pc->wbuf_completed += n;
     pc->wbuf_remaining -= n;
@@ -1218,7 +1220,7 @@ static void write_flush(pconnection_t *pc) {
     }
     else {
       if (pn_connection_driver_write_closed(&pc->driver)) {
-        shutdown(pc->psocket.sockfd, SHUT_WR);
+        shutdown(pc->psocket.epoll_io.fd, SHUT_WR);
         pc->write_blocked = true;
       }
     }
@@ -1343,7 +1345,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
   if (!pconnection_rclosed(pc)) {
     pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver);
     if (rbuf.size > 0 && !pc->read_blocked) {
-      ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size);
+      ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size);
       if (n > 0) {
         pn_connection_driver_read_done(&pc->driver, n);
         pc->output_drained = false;
@@ -1421,7 +1423,7 @@ static pn_event_batch_t 
*pconnection_process(pconnection_t *pc, uint32_t events,
     goto retry;
   }
 
-  if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) {
+  if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.epoll_io.fd >= 
0) {
     pc->timer_armed = true;
     rearm(pc->psocket.proactor, &pc->timer.epoll_io);
   }
@@ -1451,19 +1453,19 @@ void pconnection_connected_lh(pconnection_t *pc) {
     }
     pc->ai = NULL;
     socklen_t len = sizeof(pc->remote.ss);
-    (void)getpeername(pc->psocket.sockfd, (struct sockaddr*)&pc->remote.ss, 
&len);
+    (void)getpeername(pc->psocket.epoll_io.fd, (struct 
sockaddr*)&pc->remote.ss, &len);
   }
 }
 
 /* multi-address connections may call pconnection_start multiple times with 
diffferent FDs  */
-static void pconnection_start(pconnection_t *pc) {
+static void pconnection_start(pconnection_t *pc, int fd) {
   int efd = pc->psocket.proactor->epollfd;
   /* Start timer, a no-op if the timer has already started. */
   start_polling(&pc->timer.epoll_io, efd);  // TODO: check for error
 
   /* Get the local socket name now, get the peer name in pconnection_connected 
*/
   socklen_t len = sizeof(pc->local.ss);
-  (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len);
+  (void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len);
 
   epoll_extended_t *ee = &pc->psocket.epoll_io;
   if (ee->polling) {     /* This is not the first attempt, stop polling and 
close the old FD */
@@ -1471,7 +1473,7 @@ static void pconnection_start(pconnection_t *pc) {
     stop_polling(ee, efd);
     pclosefd(pc->psocket.proactor, fd);
   }
-  ee->fd = pc->psocket.sockfd;
+  ee->fd = fd;
   pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
   start_polling(ee, efd);  // TODO: check for error
 }
@@ -1487,8 +1489,7 @@ static void pconnection_maybe_connect_lh(pconnection_t 
*pc) {
       if (fd >= 0) {
         configure_socket(fd);
         if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) 
{
-          pc->psocket.sockfd = fd;
-          pconnection_start(pc);
+          pconnection_start(pc, fd);
           return;               /* Async connection started */
         } else {
           close(fd);
@@ -1500,7 +1501,7 @@ static void pconnection_maybe_connect_lh(pconnection_t 
*pc) {
     pc->addrinfo = NULL;
     /* If there was a previous attempted connection, let the poller discover 
the
        errno from its socket, otherwise set the current error. */
-    if (pc->psocket.sockfd < 1) {
+    if (pc->psocket.epoll_io.fd < 0) {
       psocket_error(&pc->psocket, errno ? errno : ENOTCONN, "on connect");
     }
   }
@@ -1549,7 +1550,7 @@ void pn_proactor_connect2(pn_proactor_t *p, 
pn_connection_t *c, pn_transport_t *
   if (pc->disconnected) {
     notify = wake(&pc->context);    /* Error during initialization */
   } else {
-    int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, 
&pc->addrinfo);
+    int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
     if (!gai_error) {
       pn_connection_open(pc->driver.connection); /* Auto-open */
       pc->ai = pc->addrinfo;
@@ -1642,12 +1643,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
   l->context.proactor = p;;
   l->backlog = backlog;
 
-  char addr_buf[PN_MAX_ADDR];
-  const char *host, *port;
-  pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port);
+  pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port);
 
   struct addrinfo *addrinfo = NULL;
-  int gai_err = pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo);
+  int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo);
   if (!gai_err) {
     /* Count addresses, allocate enough space for sockets */
     size_t len = 0;
@@ -1683,9 +1682,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
           }
 
           acceptor->accepted_fd = -1;
+          acceptor->listener = l;
           psocket_t *ps = &acceptor->psocket;
-          psocket_init(ps, p, l, addr);
-          ps->sockfd = fd;
+          psocket_init(ps, p, l);
           ps->epoll_io.fd = fd;
           ps->epoll_io.wanted = EPOLLIN;
           ps->epoll_io.polling = false;
@@ -1709,8 +1708,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t 
*l, const char *addr, in
     l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t));
     l->acceptors_size = 1;
     memset(l->acceptors, 0, sizeof(acceptor_t));
-    psocket_init(&l->acceptors[0].psocket, p, l, addr);
+    psocket_init(&l->acceptors[0].psocket, p, l);
     l->acceptors[0].accepted_fd = -1;
+    l->acceptors[0].listener = l;
     if (gai_err) {
       psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on");
     } else {
@@ -1765,14 +1765,14 @@ static void listener_begin_close(pn_listener_t* l) {
     for (size_t i = 0; i < l->acceptors_size; ++i) {
       acceptor_t *a = &l->acceptors[i];
       psocket_t *ps = &a->psocket;
-      if (ps->sockfd >= 0) {
+      if (ps->epoll_io.fd >= 0) {
         lock(&l->rearm_mutex);
         if (a->armed) {
-          shutdown(ps->sockfd, SHUT_RD);  // Force epoll event and callback
+          shutdown(ps->epoll_io.fd, SHUT_RD);  // Force epoll event and 
callback
         } else {
           stop_polling(&ps->epoll_io, ps->proactor->epollfd);
-          close(ps->sockfd);
-          ps->sockfd = -1;
+          close(ps->epoll_io.fd);
+          ps->epoll_io.fd = -1;
           l->active_count--;
         }
         unlock(&l->rearm_mutex);
@@ -1826,7 +1826,7 @@ static void listener_accept_lh(psocket_t *ps) {
   pn_listener_t *l = psocket_listener(ps);
   acceptor_t *acceptor = psocket_acceptor(ps);
   assert(acceptor->accepted_fd < 0); /* Shouldn't already have an accepted_fd 
*/
-  acceptor->accepted_fd = accept(ps->sockfd, NULL, 0);
+  acceptor->accepted_fd = accept(ps->epoll_io.fd, NULL, 0);
   if (acceptor->accepted_fd >= 0) {
     //    acceptor_t *acceptor = listener_list_next(pending_acceptors);
     listener_list_append(&l->pending_acceptors, acceptor);
@@ -1859,8 +1859,8 @@ static pn_event_batch_t *listener_process(pn_listener_t 
*l, int n_events, bool w
           lock(&l->rearm_mutex);
           stop_polling(&ps->epoll_io, ps->proactor->epollfd);
           unlock(&l->rearm_mutex);
-          close(ps->sockfd);
-          ps->sockfd = -1;
+          close(ps->epoll_io.fd);
+          ps->epoll_io.fd = -1;
           l->active_count--;
         }
         else {
@@ -2008,10 +2008,9 @@ void pn_listener_accept2(pn_listener_t *l, 
pn_connection_t *c, pn_transport_t *t
 
   proactor_add(&pc->context);
   lock(&pc->context.mutex);
-  pc->psocket.sockfd = fd;
   if (fd >= 0) {
     configure_socket(fd);
-    pconnection_start(pc);
+    pconnection_start(pc, fd);
     pconnection_connected_lh(pc);
   }
   else
@@ -2059,7 +2058,6 @@ static void grow_poller_bufs(pn_proactor_t* p) {
 
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
  static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, 
bool always_set) {
-  ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
   if (always_set) {
@@ -2083,7 +2081,7 @@ pn_proactor_t *pn_proactor() {
   if (getenv("PNI_EPOLL_SPINS")) pni_spins = atoi(getenv("PNI_EPOLL_SPINS"));
   pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p));
   if (!p) return NULL;
-  p->epollfd = p->eventfd = p->timer.timerfd = -1;
+  p->epollfd = p->eventfd = -1;
   pcontext_init(&p->context, PROACTOR, p, p);
   pmutex_init(&p->eventfd_mutex);
   pmutex_init(&p->sched_mutex);
@@ -2093,7 +2091,7 @@ pn_proactor_t *pn_proactor() {
   if ((p->epollfd = epoll_create(1)) >= 0) {
     if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
       if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) {
-        if (p->timer.timerfd >= 0)
+        if (p->timer.epoll_io.fd >= 0)
           if ((p->collector = pn_collector()) != NULL) {
             p->batch.next_event = &proactor_batch_next;
             start_polling(&p->timer.epoll_io, p->epollfd);  // TODO: check for 
error
@@ -2404,7 +2402,8 @@ static pcontext_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
   epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr;
   pcontext_t *ctx = NULL;
 
-  if (ee->type == WAKE) {
+  switch (ee->type) {
+  case WAKE:
     if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
       p->sched_interrupt = true;
       ctx = &p->context;
@@ -2416,28 +2415,40 @@ static pcontext_t *post_event(pn_proactor_t *p, struct 
epoll_event *evp) {
       ctx = p->sched_wake_current;
       unlock(&p->eventfd_mutex);
     }
-  } else if (ee->type == PROACTOR_TIMER) {
+    break;
+
+  case PROACTOR_TIMER:
     p->sched_timeout = true;
     ctx = &p->context;
     ctx->sched_pending = true;
-  } else {
-    pconnection_t *pc = psocket_pconnection(ee->psocket);
-    if (pc) {
-      ctx = &pc->context;
-      if (ee->type == PCONNECTION_IO) {
-        ee->psocket->sched_io_events = evp->events;
-      } else {
-        pc->sched_timeout = true;;
-      }
-      ctx->sched_pending = true;
-    }
-    else {
-      pn_listener_t *l = psocket_listener(ee->psocket);
-      assert(l);
-      ctx = &l->context;
-      ee->psocket->sched_io_events = evp->events;
-      ctx->sched_pending = true;
-    }
+    break;
+
+  case PCONNECTION_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    pconnection_t *pc = psocket_pconnection(ps);
+    assert(pc);
+    ctx = &pc->context;
+    ps->sched_io_events = evp->events;
+    ctx->sched_pending = true;
+    break;
+  }
+  case PCONNECTION_TIMER: {
+    pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), 
pconnection_t, timer);
+    assert(pc);
+    ctx = &pc->context;
+    pc->sched_timeout = true;;
+    ctx->sched_pending = true;
+    break;
+  }
+  case LISTENER_IO: {
+    psocket_t *ps = containerof(ee, psocket_t, epoll_io);
+    pn_listener_t *l = psocket_listener(ps);
+    assert(l);
+    ctx = &l->context;
+    ps->sched_io_events = evp->events;
+    ctx->sched_pending = true;
+    break;
+  }
   }
   if (ctx && !ctx->runnable && !ctx->runner)
     return ctx;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to