PROTON-1564: fix ref counting and pconnection binding thread safety for pconnection setup and teardown
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b9525a68 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b9525a68 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b9525a68 Branch: refs/heads/go1 Commit: b9525a68b3929f004fea362dda35271aa5a5b464 Parents: 540622e Author: Clifford Jansen <cliffjan...@apache.org> Authored: Tue Aug 29 22:25:41 2017 -0700 Committer: Clifford Jansen <cliffjan...@apache.org> Committed: Tue Aug 29 22:27:49 2017 -0700 ---------------------------------------------------------------------- proton-c/src/proactor/epoll.c | 59 +++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b9525a68/proton-c/src/proactor/epoll.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/epoll.c b/proton-c/src/proactor/epoll.c index fdb660c..3faeb1a 100644 --- a/proton-c/src/proactor/epoll.c +++ b/proton-c/src/proactor/epoll.c @@ -386,6 +386,7 @@ struct pn_proactor_t { // wake subsystem int eventfd; pmutex eventfd_mutex; + pmutex bind_mutex; bool wakes_in_progress; pcontext_t *wake_list_first; pcontext_t *wake_list_last; @@ -510,6 +511,7 @@ typedef struct pconnection_t { bool read_blocked; bool write_blocked; bool disconnected; + bool bound; int hog_count; // thread hogging limiter pn_event_batch_t batch; pn_connection_driver_t driver; @@ -692,13 +694,23 @@ static const pn_class_t pconnection_class = PN_CLASS(pconnection); static void pconnection_tick(pconnection_t *pc); -static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) +static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_connection_t *c, bool server, const char *addr) { - pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); - if (!pc) return NULL; + lock(&p->bind_mutex); + pn_record_t *r = pn_connection_attachments(c); + if (pn_record_get(r, PN_PROACTOR)) { + unlock(&p->bind_mutex); + free(pc); + return "pn_connection_t already in use"; + } + pn_record_def(r, PN_PROACTOR, &pconnection_class); + pn_record_set(r, PN_PROACTOR, pc); + pc->bound = true; + unlock(&p->bind_mutex); + if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { free(pc); - return NULL; + return "pn_connection_driver_init failure"; } pcontext_init(&pc->context, PCONNECTION, p, pc); psocket_init(&pc->psocket, p, NULL, addr); @@ -720,9 +732,6 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo if (server) { pn_transport_set_server(pc->driver.transport); } - pn_record_t *r = pn_connection_attachments(pc->driver.connection); - pn_record_def(r, PN_PROACTOR, &pconnection_class); - pn_record_set(r, PN_PROACTOR, pc); if (!ptimer_init(&pc->timer, &pc->psocket)) { psocket_error(&pc->psocket, errno, "timer setup"); @@ -731,7 +740,7 @@ static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bo pmutex_init(&pc->rearm_mutex); pn_decref(pc); /* Will be deleted when the connection is */ - return pc; + return NULL; } // Call with lock held and closing == true (i.e. pn_connection_driver_finished() == true), timer cancelled. @@ -746,10 +755,11 @@ static void pconnection_final_free(pconnection_t *pc) { } pmutex_finalize(&pc->rearm_mutex); pn_condition_free(pc->disconnect_condition); - pn_incref(pc); /* Make sure we don't do a circular free */ + if (pc->bound) + pn_incref(pc); /* Make sure we don't do a circular free */ pn_connection_driver_destroy(&pc->driver); pn_decref(pc); - /* Now pc is freed iff the connection is, otherwise remains till the pn_connection_t is freed. */ + /* Freed if not bound, otherwise pc is freed iff the pn_connection_t is freed. */ } // call without lock, but only if pconnection_is_final() is true @@ -1214,9 +1224,15 @@ static bool wake_if_inactive(pn_proactor_t *p) { } void pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *addr) { - pconnection_t *pc = new_pconnection_t(p, c, false, addr); + pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); assert(pc); // TODO: memory safety + const char *err = pconnection_setup(pc, p, c, false, addr); + if (err) { + pn_logf("pn_proactor_connect failure: %s", err); + return; + } // TODO: check case of proactor shutting down + lock(&pc->context.mutex); proactor_add(&pc->context); pn_connection_open(pc->driver.connection); /* Auto-open */ @@ -1276,7 +1292,15 @@ void pn_proactor_release_connection(pn_connection_t *c) { pconnection_t *pc = get_pconnection(c); if (pc) { lock(&pc->context.mutex); + // reverse lifecycle entanglement of pc and c from new_pconnection_t() + pn_incref(pc); + pn_proactor_t *p = pc->psocket.proactor; + lock(&p->bind_mutex); + pn_record_t *r = pn_connection_attachments(pc->driver.connection); + pn_record_set(r, PN_PROACTOR, NULL); pn_connection_driver_release_connection(&pc->driver); + pc->bound = false; // Transport unbound + unlock(&p->bind_mutex); pconnection_begin_close(pc); notify = wake(&pc->context); unlock(&pc->context.mutex); @@ -1548,9 +1572,14 @@ pn_record_t *pn_listener_attachments(pn_listener_t *l) { } void pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { + pconnection_t *pc = (pconnection_t*) pn_class_new(&pconnection_class, sizeof(pconnection_t)); + assert(pc); // TODO: memory safety + const char *err = pconnection_setup(pc, l->psockets[0].proactor, c, true, ""); + if (err) { + pn_logf("pn_listener_accept failure: %s", err); + return; + } // TODO: fuller sanity check on input args - pconnection_t *pc = new_pconnection_t(l->psockets[0].proactor, c, true, ""); - assert(pc); // TODO: memory safety lock(&l->context.mutex); int fd = l->accepted_fd; @@ -1587,6 +1616,7 @@ pn_proactor_t *pn_proactor() { p->epollfd = p->eventfd = p->timer.timerfd = -1; pcontext_init(&p->context, PROACTOR, p, p); pmutex_init(&p->eventfd_mutex); + pmutex_init(&p->bind_mutex); ptimer_init(&p->timer, 0); if ((p->epollfd = epoll_create(1)) >= 0) { @@ -1640,6 +1670,7 @@ void pn_proactor_free(pn_proactor_t *p) { pn_collector_free(p->collector); pmutex_finalize(&p->eventfd_mutex); + pmutex_finalize(&p->bind_mutex); pcontext_finalize(&p->context); free(p); } @@ -1649,7 +1680,7 @@ pn_proactor_t *pn_event_proactor(pn_event_t *e) { pn_listener_t *l = pn_event_listener(e); if (l) return l->psockets[0].proactor; pn_connection_t *c = pn_event_connection(e); - if (c) return pn_connection_proactor(pn_event_connection(e)); + if (c) return pn_connection_proactor(c); return NULL; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org