PROTON-1574: Fix lock-order-inversion warnings The lock introduced in
17d2a6f4 PROTON-1568: c++ enable race detection for self-tests created lock-order problems. Using the lock as a simple memory barrier solves the original race and the lock-order problem. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/eac4e310 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/eac4e310 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/eac4e310 Branch: refs/heads/master Commit: eac4e3101a8e078e812e22b33b88b9b6219d8a5d Parents: 5f4d852 Author: Alan Conway <acon...@redhat.com> Authored: Wed Sep 6 17:11:37 2017 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Wed Sep 6 17:54:44 2017 -0400 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 67 +++++++++++++++----------------------- 1 file changed, 27 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/eac4e310/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index 46effcc..6f0cc96 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -121,18 +121,25 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { - /* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory - writes to struct epoll_extended_t in the EPOLL_ADD thread might not be - visible to epoll_wait() thread. Lock use of epoll_extended_t to be safe. - */ - pmutex mutex; struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor int fd; epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for bool polling; + pmutex barrier_mutex; } epoll_extended_t; +/* epoll_ctl()/epoll_wake() do not form a memory barrier, so cached memory + writes to struct epoll_extended_t in the EPOLL_ADD thread might not be + visible to epoll_wait() thread. This function creates a memory barrier, + called before epoll_ctl() and after epoll_wait() +*/ +static void memory_barrier(epoll_extended_t *ee) { + // Mutex lock/unlock has the side-effect of being a memory barrier. + lock(&ee->barrier_mutex); + unlock(&ee->barrier_mutex); +} + /* * This timerfd logic assumes EPOLLONESHOT and there never being two * active timeout callbacks. There can be multiple (or zero) @@ -282,33 +289,26 @@ PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) static bool start_polling(epoll_extended_t *ee, int epollfd) { if (ee->polling) return false; - pmutex_init(&ee->mutex); - lock(&ee->mutex); ee->polling = true; struct epoll_event ev; ev.data.ptr = ee; ev.events = ee->wanted | EPOLLONESHOT; - int fd = ee->fd; - unlock(&ee->mutex); - return (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev) == 0); + memory_barrier(ee); + return (epoll_ctl(epollfd, EPOLL_CTL_ADD, ee->fd, &ev) == 0); } static void stop_polling(epoll_extended_t *ee, int epollfd) { // TODO: check for error, return bool or just log? - lock(&ee->mutex); if (ee->fd == -1 || !ee->polling || epollfd == -1) return; struct epoll_event ev; ev.data.ptr = ee; ev.events = 0; - unlock(&ee->mutex); + memory_barrier(ee); if (epoll_ctl(epollfd, EPOLL_CTL_DEL, ee->fd, &ev) == -1) EPOLL_FATAL("EPOLL_CTL_DEL", errno); - lock(&ee->mutex); ee->fd = -1; ee->polling = false; - unlock(&ee->mutex); - pmutex_finalize(&ee->mutex); } /* @@ -642,11 +642,9 @@ static void psocket_gai_error(psocket_t *ps, int gai_err, const char* what) { static void rearm(pn_proactor_t *p, epoll_extended_t *ee) { struct epoll_event ev; ev.data.ptr = ee; - lock(&ee->mutex); ev.events = ee->wanted | EPOLLONESHOT; - int fd = ee->fd; - unlock(&ee->mutex); - if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, fd, &ev) == -1) + memory_barrier(ee); + if (epoll_ctl(p->epollfd, EPOLL_CTL_MOD, ee->fd, &ev) == -1) EPOLL_FATAL("arming polled file descriptor", errno); } @@ -1185,11 +1183,9 @@ static void pconnection_start(pconnection_t *pc) { start_polling(&pc->timer.epoll_io, efd); // TODO: check for error epoll_extended_t *ee = &pc->psocket.epoll_io; - lock(&ee->mutex); ee->fd = pc->psocket.sockfd; ee->wanted = EPOLLIN | EPOLLOUT; ee->polling = false; - unlock(&ee->mutex); start_polling(ee, efd); // TODO: check for error } @@ -1256,6 +1252,8 @@ void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) lock(&pc->context.mutex); proactor_add(&pc->context); + pn_connection_open(pc->driver.connection); /* Auto-open */ + bool notify = false; bool notify_proactor = false; @@ -1621,14 +1619,11 @@ void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { /* Set up an epoll_extended_t to be used for wakeup or interrupts */ static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd) { - pmutex_init(&ee->mutex); - lock(&ee->mutex); ee->psocket = NULL; ee->fd = eventfd; ee->type = WAKE; ee->wanted = EPOLLIN; ee->polling = false; - unlock(&ee->mutex); start_polling(ee, epollfd); // TODO: check for error } @@ -1640,7 +1635,6 @@ pn_proactor_t *pn_proactor() { pmutex_init(&p->eventfd_mutex); pmutex_init(&p->bind_mutex); ptimer_init(&p->timer, 0); - pmutex_init(&p->overflow_mutex); if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { @@ -1822,11 +1816,7 @@ static bool proactor_remove(pcontext_t *ctx) { } static pn_event_batch_t *process_inbound_wake(pn_proactor_t *p, epoll_extended_t *ee) { - lock(&ee->mutex); - int fd = ee->fd; - unlock(&ee->mutex); - - if (fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ + if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ (void)read_uint64(p->interruptfd); rearm(p, &p->epoll_interrupt); return proactor_process(p, PN_PROACTOR_INTERRUPT); @@ -1871,28 +1861,25 @@ static pn_event_batch_t *proactor_do_epoll(struct pn_proactor_t* p, bool can_blo } assert(n == 1); epoll_extended_t *ee = (epoll_extended_t *) ev.data.ptr; + memory_barrier(ee); - lock(&ee->mutex); - epoll_type_t type = ee->type; - psocket_t* psocket = ee->psocket; - unlock(&ee->mutex); - if (type == WAKE) { + if (ee->type == WAKE) { batch = process_inbound_wake(p, ee); - } else if (type == PROACTOR_TIMER) { + } else if (ee->type == PROACTOR_TIMER) { batch = proactor_process(p, PN_PROACTOR_TIMEOUT); } else { - pconnection_t *pc = psocket_pconnection(psocket); + pconnection_t *pc = psocket_pconnection(ee->psocket); if (pc) { - if (type == PCONNECTION_IO) { + if (ee->type == PCONNECTION_IO) { batch = pconnection_process(pc, ev.events, false, false); } else { - assert(type == PCONNECTION_TIMER); + assert(ee->type == PCONNECTION_TIMER); batch = pconnection_process(pc, 0, true, false); } } else { // TODO: can any of the listener processing be parallelized like IOCP? - batch = listener_process(psocket, ev.events); + batch = listener_process(ee->psocket, ev.events); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org