This is an automated email from the ASF dual-hosted git repository. cliffjansen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 9e1990b1345f9917648659399f198e74fba7d5d7 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Thu Mar 19 23:35:36 2020 -0400 PROTON-2130: epoll reworking: - Only keep fd in epoll_extended_t remove from ptimer_t, psocket_t - Remove backpointers and consistently use structure embedding to go from: psocket->pconnection; psocket->pn_listener; psocket->acceptor; pcontext->pconnection; pcontext->pn_listener; pn_event_batch->pn_proactor; pn_batch_event->pn_listener; pn_batch_event->pconnection; - Move address string from being stored in psocket to being stored in pconnection and pn_listener - saves strings for multiple listening sockets - Rationalise post_event by switching on event types instead of the previous ad hoc event type detection. --- c/src/proactor/epoll-internal.h | 37 +++++---- c/src/proactor/epoll.c | 167 +++++++++++++++++++++------------------- 2 files changed, 107 insertions(+), 97 deletions(-) diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 6a13e7f..e639e48 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -57,7 +57,6 @@ typedef enum { // Data to use with epoll. typedef struct epoll_extended_t { - struct psocket_t *psocket; // pconnection, listener, or NULL -> proactor int fd; epoll_type_t type; // io/timer/wakeup uint32_t wanted; // events to poll for @@ -67,7 +66,6 @@ typedef struct epoll_extended_t { typedef struct ptimer_t { pmutex mutex; - int timerfd; epoll_extended_t epoll_io; bool timer_active; bool in_doubt; // 0 or 1 callbacks are possible @@ -131,19 +129,6 @@ struct tslot_t { unsigned int earmark_override_gen; }; -/* common to connection and listener */ -typedef struct psocket_t { - pn_proactor_t *proactor; - // Remaining protected by the pconnection/listener mutex - int sockfd; - epoll_extended_t epoll_io; - pn_listener_t *listener; /* NULL for a connection socket */ - char addr_buf[PN_MAX_ADDR]; - const char *host, *port; - uint32_t sched_io_events; - uint32_t working_io_events; -} psocket_t; - struct pn_proactor_t { pcontext_t context; ptimer_t timer; @@ -213,9 +198,21 @@ struct pn_proactor_t { bool shutting_down; }; +/* common to connection and listener */ +typedef struct psocket_t { + pn_proactor_t *proactor; + // Remaining protected by the pconnection/listener mutex + epoll_extended_t epoll_io; + uint32_t sched_io_events; + uint32_t working_io_events; +} psocket_t; + typedef struct pconnection_t { psocket_t psocket; pcontext_t context; + ptimer_t timer; // TODO: review one timerfd per connection + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; uint32_t new_events; int wake_count; bool server; /* accept, not connect */ @@ -223,7 +220,6 @@ typedef struct pconnection_t { bool timer_armed; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ pn_condition_t *disconnect_condition; - ptimer_t timer; // TODO: review one timerfd per connection // Following values only changed by (sole) working context: uint32_t current_arm; // active epoll io events bool connected; @@ -256,18 +252,21 @@ typedef struct pconnection_t { struct acceptor_t{ psocket_t psocket; + struct pn_netaddr_t addr; /* listening address */ + pn_listener_t *listener; + acceptor_t *next; /* next listener list member */ int accepted_fd; bool armed; bool overflowed; - acceptor_t *next; /* next listener list member */ - struct pn_netaddr_t addr; /* listening address */ }; struct pn_listener_t { + pcontext_t context; acceptor_t *acceptors; /* Array of listening sockets */ size_t acceptors_size; + char addr_buf[PN_MAX_ADDR]; + const char *host, *port; int active_count; /* Number of listener sockets registered with epoll */ - pcontext_t context; pn_condition_t *condition; pn_collector_t *collector; pn_event_batch_t batch; diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index f8ebbf4..b891133 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -177,18 +177,16 @@ static void memory_barrier(epoll_extended_t *ee) { */ static bool ptimer_init(ptimer_t *pt, struct psocket_t *ps) { - pt->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pmutex_init(&pt->mutex); pt->timer_active = false; pt->in_doubt = false; pt->shutting_down = false; epoll_type_t type = ps ? PCONNECTION_TIMER : PROACTOR_TIMER; - pt->epoll_io.psocket = ps; - pt->epoll_io.fd = pt->timerfd; + pt->epoll_io.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); pt->epoll_io.type = type; pt->epoll_io.wanted = EPOLLIN; pt->epoll_io.polling = false; - return (pt->timerfd >= 0); + return (pt->epoll_io.fd >= 0); } // Call with ptimer lock held @@ -198,7 +196,7 @@ static void ptimer_set_lh(ptimer_t *pt, uint64_t t_millis) { newt.it_value.tv_sec = t_millis / 1000; newt.it_value.tv_nsec = (t_millis % 1000) * 1000000; - timerfd_settime(pt->timerfd, 0, &newt, &oldt); + timerfd_settime(pt->epoll_io.fd, 0, &newt, &oldt); if (pt->timer_active && oldt.it_value.tv_nsec == 0 && oldt.it_value.tv_sec == 0) { // EPOLLIN is possible but not assured pt->in_doubt = true; @@ -231,11 +229,11 @@ static uint64_t read_uint64(int fd) { static bool ptimer_callback(ptimer_t *pt) { lock(&pt->mutex); struct itimerspec current; - if (timerfd_gettime(pt->timerfd, ¤t) == 0) { + if (timerfd_gettime(pt->epoll_io.fd, ¤t) == 0) { if (current.it_value.tv_nsec == 0 && current.it_value.tv_sec == 0) pt->timer_active = false; } - uint64_t u_exp_count = read_uint64(pt->timerfd); + uint64_t u_exp_count = read_uint64(pt->epoll_io.fd); if (!pt->timer_active) { // Expiry counter just cleared, timer not set, timerfd not armed pt->in_doubt = false; @@ -262,7 +260,7 @@ static bool ptimer_shutdown(ptimer_t *pt, bool currently_armed) { } static void ptimer_finalize(ptimer_t *pt) { - if (pt->timerfd >= 0) close(pt->timerfd); + if (pt->epoll_io.fd >= 0) close(pt->epoll_io.fd); pmutex_finalize(&pt->mutex); } @@ -661,17 +659,13 @@ static void make_runnable(pcontext_t *ctx) { -static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener, const char *addr) +static void psocket_init(psocket_t* ps, pn_proactor_t* p, pn_listener_t *listener) { - ps->epoll_io.psocket = ps; ps->epoll_io.fd = -1; ps->epoll_io.type = listener ? LISTENER_IO : PCONNECTION_IO; ps->epoll_io.wanted = 0; ps->epoll_io.polling = false; ps->proactor = p; - ps->listener = listener; - ps->sockfd = -1; - pni_parse_addr(addr, ps->addr_buf, sizeof(ps->addr_buf), &ps->host, &ps->port); } @@ -707,17 +701,19 @@ static void proactor_add(pcontext_t *ctx); static bool proactor_remove(pcontext_t *ctx); static void poller_done(struct pn_proactor_t* p, tslot_t *ts); +// Type safe version of containerof +#define containerof(ptr, type, member) ((type *)((char *)(1 ? (ptr) : &((type *)0)->member) - offsetof(type, member))) static inline pconnection_t *psocket_pconnection(psocket_t* ps) { - return ps->listener ? NULL : (pconnection_t*)ps; + return ps->epoll_io.type == PCONNECTION_IO ? containerof(ps, pconnection_t, psocket) : NULL; } static inline pn_listener_t *psocket_listener(psocket_t* ps) { - return ps->listener; + return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, psocket)->listener : NULL; } static inline acceptor_t *psocket_acceptor(psocket_t* ps) { - return !ps->listener ? NULL : (acceptor_t *)ps; + return ps->epoll_io.type == LISTENER_IO ? containerof(ps, acceptor_t, psocket) : NULL; } static inline pconnection_t *pcontext_pconnection(pcontext_t *c) { @@ -760,15 +756,19 @@ static inline bool proactor_has_event(pn_proactor_t *p) { } static void psocket_error_str(psocket_t *ps, const char *msg, const char* what) { - if (!ps->listener) { - pn_connection_driver_t *driver = &psocket_pconnection(ps)->driver; + pconnection_t *pc = psocket_pconnection(ps); + if (pc) { + pn_connection_driver_t *driver = &pc->driver; pn_connection_driver_bind(driver); /* Bind so errors will be reported */ - pni_proactor_set_cond(pn_transport_condition(driver->transport), what, ps->host, ps->port, msg); + pni_proactor_set_cond(pn_transport_condition(driver->transport), what, pc->host, pc->port, msg); pn_connection_driver_close(driver); - } else { - pn_listener_t *l = psocket_listener(ps); - pni_proactor_set_cond(l->condition, what, ps->host, ps->port, msg); + return; + } + pn_listener_t *l = psocket_listener(ps); + if (l) { + pni_proactor_set_cond(l->condition, what, l->host, l->port, msg); listener_begin_close(l); + return; } } @@ -830,7 +830,7 @@ static void proactor_rearm_overflow(pn_proactor_t *p) { unlock(&p->overflow_mutex); acceptor_t *a = listener_list_next(&ovflw); while (a) { - pn_listener_t *l = a->psocket.listener; + pn_listener_t *l = a->listener; lock(&l->context.mutex); bool rearming = !l->context.closing; bool notify = false; @@ -876,7 +876,8 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con } pcontext_init(&pc->context, PCONNECTION, p, pc); - psocket_init(&pc->psocket, p, NULL, addr); + psocket_init(&pc->psocket, p, NULL); + pni_parse_addr(addr, pc->addr_buf, sizeof(pc->addr_buf), &pc->host, &pc->port); pc->new_events = 0; pc->wake_count = 0; pc->tick_pending = false; @@ -942,9 +943,10 @@ static void pconnection_final_free(pconnection_t *pc) { // call without lock static void pconnection_cleanup(pconnection_t *pc) { assert(pconnection_is_final(pc)); + int fd = pc->psocket.epoll_io.fd; stop_polling(&pc->psocket.epoll_io, pc->psocket.proactor->epollfd); - if (pc->psocket.sockfd != -1) - pclosefd(pc->psocket.proactor, pc->psocket.sockfd); + if (fd != -1) + pclosefd(pc->psocket.proactor, fd); stop_polling(&pc->timer.epoll_io, pc->psocket.proactor->epollfd); ptimer_finalize(&pc->timer); @@ -977,7 +979,7 @@ static void pconnection_begin_close(pconnection_t *pc) { pc->context.closing = true; if (pc->current_arm) { // Force EPOLLHUP callback(s) - shutdown(pc->psocket.sockfd, SHUT_RDWR); + shutdown(pc->psocket.epoll_io.fd, SHUT_RDWR); } pn_connection_driver_close(&pc->driver); @@ -1162,7 +1164,7 @@ static void pconnection_done(pconnection_t *pc) { // Return true unless error static bool pconnection_write(pconnection_t *pc) { size_t wbuf_size = pc->wbuf_remaining; - ssize_t n = send(pc->psocket.sockfd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL); + ssize_t n = send(pc->psocket.epoll_io.fd, pc->wbuf_current, wbuf_size, MSG_NOSIGNAL); if (n > 0) { pc->wbuf_completed += n; pc->wbuf_remaining -= n; @@ -1218,7 +1220,7 @@ static void write_flush(pconnection_t *pc) { } else { if (pn_connection_driver_write_closed(&pc->driver)) { - shutdown(pc->psocket.sockfd, SHUT_WR); + shutdown(pc->psocket.epoll_io.fd, SHUT_WR); pc->write_blocked = true; } } @@ -1343,7 +1345,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, if (!pconnection_rclosed(pc)) { pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); if (rbuf.size > 0 && !pc->read_blocked) { - ssize_t n = read(pc->psocket.sockfd, rbuf.start, rbuf.size); + ssize_t n = read(pc->psocket.epoll_io.fd, rbuf.start, rbuf.size); if (n > 0) { pn_connection_driver_read_done(&pc->driver, n); pc->output_drained = false; @@ -1421,7 +1423,7 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, goto retry; } - if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.timerfd >= 0) { + if (!pc->timer_armed && !pc->timer.shutting_down && pc->timer.epoll_io.fd >= 0) { pc->timer_armed = true; rearm(pc->psocket.proactor, &pc->timer.epoll_io); } @@ -1451,19 +1453,19 @@ void pconnection_connected_lh(pconnection_t *pc) { } pc->ai = NULL; socklen_t len = sizeof(pc->remote.ss); - (void)getpeername(pc->psocket.sockfd, (struct sockaddr*)&pc->remote.ss, &len); + (void)getpeername(pc->psocket.epoll_io.fd, (struct sockaddr*)&pc->remote.ss, &len); } } /* multi-address connections may call pconnection_start multiple times with diffferent FDs */ -static void pconnection_start(pconnection_t *pc) { +static void pconnection_start(pconnection_t *pc, int fd) { int efd = pc->psocket.proactor->epollfd; /* Start timer, a no-op if the timer has already started. */ start_polling(&pc->timer.epoll_io, efd); // TODO: check for error /* Get the local socket name now, get the peer name in pconnection_connected */ socklen_t len = sizeof(pc->local.ss); - (void)getsockname(pc->psocket.sockfd, (struct sockaddr*)&pc->local.ss, &len); + (void)getsockname(fd, (struct sockaddr*)&pc->local.ss, &len); epoll_extended_t *ee = &pc->psocket.epoll_io; if (ee->polling) { /* This is not the first attempt, stop polling and close the old FD */ @@ -1471,7 +1473,7 @@ static void pconnection_start(pconnection_t *pc) { stop_polling(ee, efd); pclosefd(pc->psocket.proactor, fd); } - ee->fd = pc->psocket.sockfd; + ee->fd = fd; pc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT; start_polling(ee, efd); // TODO: check for error } @@ -1487,8 +1489,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) { if (fd >= 0) { configure_socket(fd); if (!connect(fd, ai->ai_addr, ai->ai_addrlen) || errno == EINPROGRESS) { - pc->psocket.sockfd = fd; - pconnection_start(pc); + pconnection_start(pc, fd); return; /* Async connection started */ } else { close(fd); @@ -1500,7 +1501,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) { pc->addrinfo = NULL; /* If there was a previous attempted connection, let the poller discover the errno from its socket, otherwise set the current error. */ - if (pc->psocket.sockfd < 1) { + if (pc->psocket.epoll_io.fd < 0) { psocket_error(&pc->psocket, errno ? errno : ENOTCONN, "on connect"); } } @@ -1549,7 +1550,7 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * if (pc->disconnected) { notify = wake(&pc->context); /* Error during initialization */ } else { - int gai_error = pgetaddrinfo(pc->psocket.host, pc->psocket.port, 0, &pc->addrinfo); + int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo); if (!gai_error) { pn_connection_open(pc->driver.connection); /* Auto-open */ pc->ai = pc->addrinfo; @@ -1642,12 +1643,10 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in l->context.proactor = p;; l->backlog = backlog; - char addr_buf[PN_MAX_ADDR]; - const char *host, *port; - pni_parse_addr(addr, addr_buf, PN_MAX_ADDR, &host, &port); + pni_parse_addr(addr, l->addr_buf, sizeof(l->addr_buf), &l->host, &l->port); struct addrinfo *addrinfo = NULL; - int gai_err = pgetaddrinfo(host, port, AI_PASSIVE | AI_ALL, &addrinfo); + int gai_err = pgetaddrinfo(l->host, l->port, AI_PASSIVE | AI_ALL, &addrinfo); if (!gai_err) { /* Count addresses, allocate enough space for sockets */ size_t len = 0; @@ -1683,9 +1682,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in } acceptor->accepted_fd = -1; + acceptor->listener = l; psocket_t *ps = &acceptor->psocket; - psocket_init(ps, p, l, addr); - ps->sockfd = fd; + psocket_init(ps, p, l); ps->epoll_io.fd = fd; ps->epoll_io.wanted = EPOLLIN; ps->epoll_io.polling = false; @@ -1709,8 +1708,9 @@ void pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *addr, in l->acceptors = (acceptor_t*)realloc(l->acceptors, sizeof(acceptor_t)); l->acceptors_size = 1; memset(l->acceptors, 0, sizeof(acceptor_t)); - psocket_init(&l->acceptors[0].psocket, p, l, addr); + psocket_init(&l->acceptors[0].psocket, p, l); l->acceptors[0].accepted_fd = -1; + l->acceptors[0].listener = l; if (gai_err) { psocket_gai_error(&l->acceptors[0].psocket, gai_err, "listen on"); } else { @@ -1765,14 +1765,14 @@ static void listener_begin_close(pn_listener_t* l) { for (size_t i = 0; i < l->acceptors_size; ++i) { acceptor_t *a = &l->acceptors[i]; psocket_t *ps = &a->psocket; - if (ps->sockfd >= 0) { + if (ps->epoll_io.fd >= 0) { lock(&l->rearm_mutex); if (a->armed) { - shutdown(ps->sockfd, SHUT_RD); // Force epoll event and callback + shutdown(ps->epoll_io.fd, SHUT_RD); // Force epoll event and callback } else { stop_polling(&ps->epoll_io, ps->proactor->epollfd); - close(ps->sockfd); - ps->sockfd = -1; + close(ps->epoll_io.fd); + ps->epoll_io.fd = -1; l->active_count--; } unlock(&l->rearm_mutex); @@ -1826,7 +1826,7 @@ static void listener_accept_lh(psocket_t *ps) { pn_listener_t *l = psocket_listener(ps); acceptor_t *acceptor = psocket_acceptor(ps); assert(acceptor->accepted_fd < 0); /* Shouldn't already have an accepted_fd */ - acceptor->accepted_fd = accept(ps->sockfd, NULL, 0); + acceptor->accepted_fd = accept(ps->epoll_io.fd, NULL, 0); if (acceptor->accepted_fd >= 0) { // acceptor_t *acceptor = listener_list_next(pending_acceptors); listener_list_append(&l->pending_acceptors, acceptor); @@ -1859,8 +1859,8 @@ static pn_event_batch_t *listener_process(pn_listener_t *l, int n_events, bool w lock(&l->rearm_mutex); stop_polling(&ps->epoll_io, ps->proactor->epollfd); unlock(&l->rearm_mutex); - close(ps->sockfd); - ps->sockfd = -1; + close(ps->epoll_io.fd); + ps->epoll_io.fd = -1; l->active_count--; } else { @@ -2008,10 +2008,9 @@ void pn_listener_accept2(pn_listener_t *l, pn_connection_t *c, pn_transport_t *t proactor_add(&pc->context); lock(&pc->context.mutex); - pc->psocket.sockfd = fd; if (fd >= 0) { configure_socket(fd); - pconnection_start(pc); + pconnection_start(pc, fd); pconnection_connected_lh(pc); } else @@ -2059,7 +2058,6 @@ static void grow_poller_bufs(pn_proactor_t* p) { /* Set up an epoll_extended_t to be used for wakeup or interrupts */ static void epoll_wake_init(epoll_extended_t *ee, int eventfd, int epollfd, bool always_set) { - ee->psocket = NULL; ee->fd = eventfd; ee->type = WAKE; if (always_set) { @@ -2083,7 +2081,7 @@ pn_proactor_t *pn_proactor() { if (getenv("PNI_EPOLL_SPINS")) pni_spins = atoi(getenv("PNI_EPOLL_SPINS")); pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); if (!p) return NULL; - p->epollfd = p->eventfd = p->timer.timerfd = -1; + p->epollfd = p->eventfd = -1; pcontext_init(&p->context, PROACTOR, p, p); pmutex_init(&p->eventfd_mutex); pmutex_init(&p->sched_mutex); @@ -2093,7 +2091,7 @@ pn_proactor_t *pn_proactor() { if ((p->epollfd = epoll_create(1)) >= 0) { if ((p->eventfd = eventfd(0, EFD_NONBLOCK)) >= 0) { if ((p->interruptfd = eventfd(0, EFD_NONBLOCK)) >= 0) { - if (p->timer.timerfd >= 0) + if (p->timer.epoll_io.fd >= 0) if ((p->collector = pn_collector()) != NULL) { p->batch.next_event = &proactor_batch_next; start_polling(&p->timer.epoll_io, p->epollfd); // TODO: check for error @@ -2404,7 +2402,8 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { epoll_extended_t *ee = (epoll_extended_t *) evp->data.ptr; pcontext_t *ctx = NULL; - if (ee->type == WAKE) { + switch (ee->type) { + case WAKE: if (ee->fd == p->interruptfd) { /* Interrupts have their own dedicated eventfd */ p->sched_interrupt = true; ctx = &p->context; @@ -2416,28 +2415,40 @@ static pcontext_t *post_event(pn_proactor_t *p, struct epoll_event *evp) { ctx = p->sched_wake_current; unlock(&p->eventfd_mutex); } - } else if (ee->type == PROACTOR_TIMER) { + break; + + case PROACTOR_TIMER: p->sched_timeout = true; ctx = &p->context; ctx->sched_pending = true; - } else { - pconnection_t *pc = psocket_pconnection(ee->psocket); - if (pc) { - ctx = &pc->context; - if (ee->type == PCONNECTION_IO) { - ee->psocket->sched_io_events = evp->events; - } else { - pc->sched_timeout = true;; - } - ctx->sched_pending = true; - } - else { - pn_listener_t *l = psocket_listener(ee->psocket); - assert(l); - ctx = &l->context; - ee->psocket->sched_io_events = evp->events; - ctx->sched_pending = true; - } + break; + + case PCONNECTION_IO: { + psocket_t *ps = containerof(ee, psocket_t, epoll_io); + pconnection_t *pc = psocket_pconnection(ps); + assert(pc); + ctx = &pc->context; + ps->sched_io_events = evp->events; + ctx->sched_pending = true; + break; + } + case PCONNECTION_TIMER: { + pconnection_t *pc = containerof(containerof(ee, ptimer_t, epoll_io), pconnection_t, timer); + assert(pc); + ctx = &pc->context; + pc->sched_timeout = true;; + ctx->sched_pending = true; + break; + } + case LISTENER_IO: { + psocket_t *ps = containerof(ee, psocket_t, epoll_io); + pn_listener_t *l = psocket_listener(ps); + assert(l); + ctx = &l->context; + ps->sched_io_events = evp->events; + ctx->sched_pending = true; + break; + } } if (ctx && !ctx->runnable && !ctx->runner) return ctx; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org