PROTON-1771: [c] locking around epoll_extended_t

Add locking for epoll_extended_t structure to fix race conditions revealed
by threaderciser.c

We pass pointers to this struct between threads via epoll, and epoll does not
guarantee that memory writes by one thread will be visible in the next thread
that gets the pointer. Since helgrind reports the races, it suggests that epoll
is not acting as a memory barrier in practice either.

These mutexes will not be contended so they are not expensive. We could use an
acquire/release memory barrier, but an un-contended mutex is basically exactly
that, and is portable.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/188ce280
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/188ce280
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/188ce280

Branch: refs/heads/go1
Commit: 188ce28066df8f5e965fb63593f419f49c950760
Parents: 3e2f9b5
Author: Alan Conway <acon...@redhat.com>
Authored: Thu Apr 5 19:10:11 2018 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Apr 12 16:23:13 2018 -0400

----------------------------------------------------------------------
 c/src/proactor/epoll.c | 68 ++++++++++++++++++++++++++-------------------
 1 file changed, 40 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/188ce280/c/src/proactor/epoll.c
----------------------------------------------------------------------
diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c
index 752e6e0..d0db0a7 100644
--- a/c/src/proactor/epoll.c
+++ b/c/src/proactor/epoll.c
@@ -128,20 +128,9 @@ typedef struct epoll_extended_t {
   epoll_type_t type;   // io/timer/wakeup
   uint32_t wanted;     // events to poll for
   bool polling;
-  pmutex barrier_mutex;
+  pmutex mutex;
 } epoll_extended_t;
 
-/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory
-   writes to struct epoll_extended_t in the EPOLL_ADD thread might not be
-   visible to epoll_wait() thread. This function creates a memory barrier,
-   called before epoll_ctl() and after epoll_wait()
-*/
-static void memory_barrier(epoll_extended_t *ee) {
-  // Mutex lock/unlock has the side-effect of being a memory barrier.
-  lock(&ee->barrier_mutex);
-  unlock(&ee->barrier_mutex);
-}
-
 /*
  * This timerfd logic assumes EPOLLONESHOT and there never being two
  * active timeout callbacks.  There can be multiple (or zero)
@@ -287,28 +276,38 @@ PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor)
 PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener)
 
 static bool start_polling(epoll_extended_t *ee, int epollfd) {
-  if (ee->polling)
+  lock(&ee->mutex);
+  if (ee->polling) {
+    unlock(&ee->mutex);
     return false;
+  }
   ee->polling = true;
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  memory_barrier(ee);
-  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0);
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0);
 }
 
 static void stop_polling(epoll_extended_t *ee, int epollfd) {
   // TODO: check for error, return bool or just log?
-  if (ee->fd == -1 || !ee->polling || epollfd == -1)
+  lock(&ee->mutex);
+  if (ee->fd == -1 || !ee->polling || epollfd == -1) {
+    unlock(&ee->mutex);
     return;
+  }
   struct epoll_event ev;
   ev.data.ptr = ee;
   ev.events = 0;
-  memory_barrier(ee);
-  if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1)
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1)
     EPOLL_FATAL("EPOLL_CTL_DEL", errno);
+  lock(&ee->mutex);
   ee->fd = -1;
   ee->polling = false;
+  unlock(&ee->mutex);
 }
 
 /*
@@ -682,10 +681,12 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, 
const char* what) {
 
 static void rearm(pn_proactor_t *p, epoll_extended_t *ee) {
   struct epoll_event ev;
+  lock(&ee->mutex);
   ev.data.ptr = ee;
   ev.events = ee->wanted | EPOLLONESHOT;
-  memory_barrier(ee);
-  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1)
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1)
     EPOLL_FATAL("arming polled file descriptor", errno);
 }
 
@@ -1229,6 +1230,7 @@ static void pconnection_start(pconnection_t *pc) {
   (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len);
 
   epoll_extended_t *ee = &pc->psocket.epoll_io;
+  lock(&ee->mutex);
   if (ee->polling) {     /* This is not the first attempt, stop polling and 
close the old FD */
     int fd = ee->fd;     /* Save fd, it will be set to -1 by stop_polling */
     stop_polling(ee, efd);
@@ -1236,6 +1238,7 @@ static void pconnection_start(pconnection_t *pc) {
   }
   ee->fd = pc->psocket.sockfd;
   pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
+  unlock(&ee->mutex);
   start_polling(ee, efd);  // TODO: check for error
 }
 
@@ -1759,11 +1762,13 @@ void pn_listener_accept2(pn_listener_t *l, 
pn_connection_t *c, pn_transport_t *t
 
 /* Set up an epoll_extended_t to be used for wakeup or interrupts */
 static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) {
+  lock(&ee->mutex);
   ee->psocket = NULL;
   ee->fd = eventfd;
   ee->type = WAKE;
   ee->wanted = EPOLLIN;
   ee->polling = false;
+  unlock(&ee->mutex);
   start_polling(ee, epollfd);  // TODO: check for error
 }
 
@@ -1954,7 +1959,10 @@ static bool proactor_remove(pcontext_t *ctx) {
 }
 
 static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, 
epoll_extended_t *ee) {
-  if  (ee->fd == p->interruptfd) {        /* Interrupts have their own 
dedicated eventfd */
+  lock(&ee->mutex);
+  int fd = ee->fd;
+  unlock(&ee->mutex);
+  if  (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd 
*/
     (void)read_uint64(p->interruptfd);
     rearm(p, &p->epoll_interrupt);
     return proactor_process(p, PN_PROACTOR_INTERRUPT);
@@ -1999,25 +2007,29 @@ static pn_event_batch_t *proactor_do_epoll(struct 
pn_proactor_t* p, bool can_blo
     }
     assert(n == 1);
     epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr;
-    memory_barrier(ee);
 
-    if (ee->type == WAKE) {
+    lock(&ee->mutex);
+    epoll_type_t type = ee->type;
+    struct psocket_t *psocket = ee->psocket;
+    unlock(&ee->mutex);
+
+    if (type == WAKE) {
       batch = process_inbound_wake(p, ee);
-    } else if (ee->type == PROACTOR_TIMER) {
+    } else if (type == PROACTOR_TIMER) {
       batch = proactor_process(p, PN_PROACTOR_TIMEOUT);
     } else {
-      pconnection_t *pc = psocket_pconnection(ee->psocket);
+      pconnection_t *pc = psocket_pconnection(psocket);
       if (pc) {
-        if (ee->type == PCONNECTION_IO) {
+        if (type == PCONNECTION_IO) {
           batch = pconnection_process(pc, ev.events, false, false);
         } else {
-          assert(ee->type == PCONNECTION_TIMER);
+          assert(type == PCONNECTION_TIMER);
           batch = pconnection_process(pc, 0, true, false);
         }
       }
       else {
         // TODO: can any of the listener processing be parallelized like IOCP?
-        batch = listener_process(ee->psocket, ev.events);
+        batch = listener_process(psocket, ev.events);
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to