Repository: qpid-proton Updated Branches: refs/heads/master 50a4e8353 -> 3206378b0
PROTON-1771: [c] fix race conditions in threaderciser.c Was incorrectly calling pn_connection_close() from non-handler threads. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3206378b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3206378b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3206378b Branch: refs/heads/master Commit: 3206378b0149c903f0507fc6429ae899e38f2368 Parents: 50a4e83 Author: Alan Conway <acon...@redhat.com> Authored: Wed May 16 15:15:49 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Wed May 16 15:17:40 2018 -0400 ---------------------------------------------------------------------- c/tests/threaderciser.c | 82 +++++++++++++++++++------------------------- 1 file changed, 35 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3206378b/c/tests/threaderciser.c ---------------------------------------------------------------------- diff --git a/c/tests/threaderciser.c b/c/tests/threaderciser.c index e74db2c..552ccb0 100644 --- a/c/tests/threaderciser.c +++ b/c/tests/threaderciser.c @@ -243,28 +243,6 @@ void cpool_wake(cpool *cp) { } } -void cpool_close(cpool *cp) { - if (!action_enabled[A_CLOSE_CONNECT]) return; - connection_ctx *ctx = cpool_pick(cp); - if (ctx) { - pthread_mutex_lock(&ctx->lock); - if (ctx->pn_connection) { - pn_connection_close(ctx->pn_connection); - debuga(A_CLOSE_CONNECT, ctx->pn_connection); - } - pthread_mutex_unlock(&ctx->lock); - cpool_unref(ctx); - } -} - -static void connection_ctx_on_close(connection_ctx *ctx) { - /* Required locking: mark connection (possibly) closed no more wake calls */ - pthread_mutex_lock(&ctx->lock); - ctx->pn_connection = NULL; - pthread_mutex_unlock(&ctx->lock); - cpool_unref(ctx); -} - /* Listener pool */ typedef struct listener_ctx { @@ -306,23 +284,6 @@ static void lpool_listen(lpool *lp, pn_proactor_t *proactor) { } } -/* Advertise address once open */ -static void listener_ctx_on_open(listener_ctx *ctx) { - pthread_mutex_lock(&ctx->lock); - if (ctx->pn_listener) { - pn_netaddr_str(pn_listener_addr(ctx->pn_listener), ctx->addr, sizeof(ctx->addr)); - } - debug("[%p] listening on %s", ctx->pn_listener, ctx->addr); - pthread_mutex_unlock(&ctx->lock); -} - -static void listener_ctx_on_close(listener_ctx *ctx) { - pthread_mutex_lock(&ctx->lock); - ctx->pn_listener = NULL; - pthread_mutex_unlock(&ctx->lock); - lpool_unref(ctx); -} - /* Pick a random listening address from the listener pool. Returns "invalid:address" for no address. */ @@ -414,7 +375,6 @@ static void global_do_stuff(global *g) { if (maybe(0.1)) lpool_close(&g->listeners); if (maybe(0.5)) cpool_wake(&g->connections_active); if (maybe(0.5)) cpool_wake(&g->connections_idle); - if (maybe(0.1)) cpool_close(&g->connections_active); if (action_enabled[A_TIMEOUT] && maybe(0.5)) { debuga(A_TIMEOUT, g->proactor); pn_proactor_set_timeout(g->proactor, rand() % TIMEOUT_MAX); @@ -437,6 +397,11 @@ static void* user_thread(void* void_g) { } static bool handle(global *g, pn_event_t *e) { + pn_connection_t *c = pn_event_connection(e); + connection_ctx *cctx = c ? (connection_ctx*)pn_connection_get_context(c) : NULL; + pn_listener_t *l = pn_event_listener(e); + listener_ctx *lctx = l ? (listener_ctx*)pn_listener_get_context(l) : NULL; + switch (pn_event_type(e)) { case PN_PROACTOR_TIMEOUT: { @@ -444,24 +409,47 @@ static bool handle(global *g, pn_event_t *e) { global_do_stuff(g); break; } + case PN_LISTENER_OPEN: { - listener_ctx *ctx = (listener_ctx*)pn_listener_get_context(pn_event_listener(e)); - listener_ctx_on_open(ctx); - cpool_connect(&g->connections_active, g->proactor, ctx->addr); /* Initial connection */ + pthread_mutex_lock(&lctx->lock); + if (lctx->pn_listener) { + pn_netaddr_str(pn_listener_addr(lctx->pn_listener), lctx->addr, sizeof(lctx->addr)); + } + debug("[%p] listening on %s", lctx->pn_listener, lctx->addr); + pthread_mutex_unlock(&lctx->lock); + cpool_connect(&g->connections_active, g->proactor, lctx->addr); /* Initial connection */ break; } case PN_LISTENER_CLOSE: { - listener_ctx_on_close((listener_ctx*)pn_listener_get_context(pn_event_listener(e))); + pthread_mutex_lock(&lctx->lock); + lctx->pn_listener = NULL; + pthread_mutex_unlock(&lctx->lock); + lpool_unref(lctx); + break; + } + + case PN_CONNECTION_WAKE: { + if (!action_enabled[A_CLOSE_CONNECT] && maybe(0.5)) pn_connection_close(c); + /* FIXME aconway 2018-05-16: connection release/re-use */ break; } + case PN_TRANSPORT_CLOSED: { - connection_ctx_on_close((connection_ctx*)pn_connection_get_context(pn_event_connection(e))); + if (cctx) { + /* Required locking: mark connection as closed, no more wake calls */ + pthread_mutex_lock(&cctx->lock); + cctx->pn_connection = NULL; + pthread_mutex_unlock(&cctx->lock); + cpool_unref(cctx); + } break; } - case PN_PROACTOR_INACTIVE: /* Shutting down */ + // Following events are used to shut down the threaderciser + + case PN_PROACTOR_INACTIVE: { /* Shutting down */ pn_proactor_interrupt(g->proactor); /* Interrupt remaining threads */ return false; - + } case PN_PROACTOR_INTERRUPT: pn_proactor_interrupt(g->proactor); /* Pass the interrupt along */ return false; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org