Repository: qpid-proton Updated Branches: refs/heads/master a80d54e62 -> fc1df0551
Revert "PROTON-1771: [c] locking around epoll_extended_t" This reverts commit 188ce28066df8f5e965fb63593f419f49c950760. The fix caused hangs due to mutex deadlocks. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/fc1df055 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/fc1df055 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/fc1df055 Branch: refs/heads/master Commit: fc1df0551947c357e9fa0bf9da4b836c5a97c11f Parents: a80d54e Author: Alan Conway <acon...@redhat.com> Authored: Mon Apr 16 13:21:08 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Mon Apr 16 13:21:08 2018 -0400 ---------------------------------------------------------------------- c/src/proactor/epoll.c | 68 +++++++++++++++++++-------------------------- 1 file changed, 28 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/fc1df055/c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index d0db0a7..752e6e0 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -128,9 +128,20 @@ typedef struct epoll_extended_t { epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for bool polling; - pmutex mutex; + pmutex barrier_mutex; } epoll_extended_t; +/* epoll_ctl()/epoll_wait() do not form a memory barrier, so cached memory + writes to struct epoll_extended_t in the EPOLL_ADD thread might not be + visible to epoll_wait() thread. This function creates a memory barrier, + called before epoll_ctl() and after epoll_wait() +*/ +static void memory_barrier(epoll_extended_t *ee) { + // Mutex lock/unlock has the side-effect of being a memory barrier. + lock(&ee->barrier_mutex); + unlock(&ee->barrier_mutex); +} + /* * This timerfd logic assumes EPOLLONESHOT and there never being two * active timeout callbacks. There can be multiple (or zero) @@ -276,38 +287,28 @@ PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) static bool start_polling(epoll_extended_t *ee, int epollfd) { - lock(&ee->mutex); - if (ee->polling) { - unlock(&ee->mutex); + if (ee->polling) return false; - } ee->polling = true; struct epoll_event ev; ev.data.ptr = ee; ev.events = ee->wanted | EPOLLONESHOT; - int fd = ee->fd; - unlock(&ee->mutex); - return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0); + memory_barrier(ee); + return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0); } static void stop_polling(epoll_extended_t *ee, int epollfd) { // TODO: check for error, return bool or just log? - lock(&ee->mutex); - if (ee->fd == -1 || !ee->polling || epollfd == -1) { - unlock(&ee->mutex); + if (ee->fd == -1 || !ee->polling || epollfd == -1) return; - } struct epoll_event ev; ev.data.ptr = ee; ev.events = 0; - int fd = ee->fd; - unlock(&ee->mutex); - if (epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, &ev) == -1) + memory_barrier(ee); + if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1) EPOLL_FATAL("EPOLL_CTL_DEL", errno); - lock(&ee->mutex); ee->fd = -1; ee->polling = false; - unlock(&ee->mutex); } /* @@ -681,12 +682,10 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) { static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { struct epoll_event ev; - lock(&ee->mutex); ev.data.ptr = ee; ev.events = ee->wanted | EPOLLONESHOT; - int fd = ee->fd; - unlock(&ee->mutex); - if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) + memory_barrier(ee); + if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1) EPOLL_FATAL("arming polled file descriptor", errno); } @@ -1230,7 +1229,6 @@ static void pconnection_start(pconnection_t *pc) { (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len); epoll_extended_t *ee = &pc->psocket.epoll_io; - lock(&ee->mutex); if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */ int fd = ee->fd; /* Save fd, it will be set to -1 by stop_polling */ stop_polling(ee, efd); @@ -1238,7 +1236,6 @@ static void pconnection_start(pconnection_t *pc) { } ee->fd = pc->psocket.sockfd; pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT; - unlock(&ee->mutex); start_polling(ee, efd); // TODO: check for error } @@ -1762,13 +1759,11 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t /* Set up an epoll_extended_t to be used for wakeup or interrupts */ static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) { - lock(&ee->mutex); ee->psocket = NULL; ee->fd = eventfd; ee->type = WAKE; ee->wanted = EPOLLIN; ee->polling = false; - unlock(&ee->mutex); start_polling(ee, epollfd); // TODO: check for error } @@ -1959,10 +1954,7 @@ static bool proactor_remove(pcontext_t *ctx) { } static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) { - lock(&ee->mutex); - int fd = ee->fd; - unlock(&ee->mutex); - if (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ + if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ (void)read_uint64(p->interruptfd); rearm(p, &p->epoll_interrupt); return proactor_process(p, PN_PROACTOR_INTERRUPT); @@ -2007,29 +1999,25 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo } assert(n == 1); epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr; + memory_barrier(ee); - lock(&ee->mutex); - epoll_type_t type = ee->type; - struct psocket_t *psocket = ee->psocket; - unlock(&ee->mutex); - - if (type == WAKE) { + if (ee->type == WAKE) { batch = process_inbound_wake(p, ee); - } else if (type == PROACTOR_TIMER) { + } else if (ee->type == PROACTOR_TIMER) { batch = proactor_process(p, PN_PROACTOR_TIMEOUT); } else { - pconnection_t *pc = psocket_pconnection(psocket); + pconnection_t *pc = psocket_pconnection(ee->psocket); if (pc) { - if (type == PCONNECTION_IO) { + if (ee->type == PCONNECTION_IO) { batch = pconnection_process(pc, ev.events, false, false); } else { - assert(type == PCONNECTION_TIMER); + assert(ee->type == PCONNECTION_TIMER); batch = pconnection_process(pc, 0, true, false); } } else { // TODO: can any of the listener processing be parallelized like IOCP? - batch = listener_process(psocket, ev.events); + batch = listener_process(ee->psocket, ev.events); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org