# HG changeset patch # User Roman Arutyunyan <a...@nginx.com> # Date 1707486707 -28800 # Fri Feb 09 21:51:47 2024 +0800 # Node ID 4584ba4b1d65a90f69201cecf1f1e650c1cbd87b # Parent 5d28510b62bffba3187d7fe69baccd2d2da41a12 QUIC: input packet batching with recvmmsg().
diff --git a/auto/os/linux b/auto/os/linux --- a/auto/os/linux +++ b/auto/os/linux @@ -303,4 +303,15 @@ ngx_feature_test="struct mmsghdr msg[UIO . auto/feature +ngx_feature="recvmmsg()" +ngx_feature_name="NGX_HAVE_RECVMMSG" +ngx_feature_run=no +ngx_feature_incs="#include <sys/socket.h>" +ngx_feature_path= +ngx_feature_libs= +ngx_feature_test="struct mmsghdr msg[64]; + recvmmsg(0, msg, 64, 0, NULL);" +. auto/feature + + CC_AUX_FLAGS="$cc_aux_flags -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64" diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -898,7 +898,11 @@ ngx_event_process_init(ngx_cycle_t *cycl #if (NGX_QUIC) } else if (ls[i].quic) { +#if (NGX_HAVE_RECVMMSG) + rev->handler = ngx_quic_recvmmsg; +#else rev->handler = ngx_quic_recvmsg; +#endif #if (NGX_HAVE_SENDMMSG) wev->handler = ngx_event_sendmmsg; diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h --- a/src/event/quic/ngx_event_quic.h +++ b/src/event/quic/ngx_event_quic.h @@ -112,6 +112,9 @@ struct ngx_quic_stream_s { void ngx_quic_recvmsg(ngx_event_t *ev); +#if (NGX_HAVE_RECVMMSG) +void ngx_quic_recvmmsg(ngx_event_t *ev); +#endif void ngx_quic_run(ngx_connection_t *c, ngx_quic_conf_t *conf); ngx_connection_t *ngx_quic_open_stream(ngx_connection_t *c, ngx_uint_t bidi); void ngx_quic_finalize_connection(ngx_connection_t *c, ngx_uint_t err, diff --git a/src/event/quic/ngx_event_quic_udp.c b/src/event/quic/ngx_event_quic_udp.c --- a/src/event/quic/ngx_event_quic_udp.c +++ b/src/event/quic/ngx_event_quic_udp.c @@ -11,6 +11,7 @@ #include <ngx_event_quic_connection.h> +static void ngx_quic_handle_msg(ngx_event_t *ev, struct msghdr *msg, size_t n); static void ngx_quic_close_accepted_connection(ngx_connection_t *c); static ngx_connection_t *ngx_quic_lookup_connection(ngx_listening_t *ls, ngx_str_t *key, struct sockaddr *local_sockaddr, socklen_t local_socklen); @@ -20,20 +21,13 @@ void ngx_quic_recvmsg(ngx_event_t *ev) { ssize_t n; - ngx_str_t key; - ngx_buf_t buf; - ngx_log_t *log; ngx_err_t err; - socklen_t socklen, local_socklen; - ngx_event_t *rev, *wev; struct iovec iov[1]; struct msghdr msg; - ngx_sockaddr_t sa, lsa; - struct sockaddr *sockaddr, *local_sockaddr; + ngx_sockaddr_t sa; ngx_listening_t *ls; ngx_event_conf_t *ecf; - ngx_connection_t *c, *lc; - ngx_quic_socket_t *qsock; + ngx_connection_t *lc; static u_char buffer[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE]; #if (NGX_HAVE_ADDRINFO_CMSG) @@ -106,239 +100,10 @@ ngx_quic_recvmsg(ngx_event_t *ev) } #endif - sockaddr = msg.msg_name; - socklen = msg.msg_namelen; - - if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { - socklen = sizeof(ngx_sockaddr_t); - } - -#if (NGX_HAVE_UNIX_DOMAIN) - - if (sockaddr->sa_family == AF_UNIX) { - struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr; - - if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path) - || saun->sun_path[0] == '\0') - { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, - "unbound unix socket"); - goto next; - } - } - -#endif - - local_sockaddr = ls->sockaddr; - local_socklen = ls->socklen; - -#if (NGX_HAVE_ADDRINFO_CMSG) - - if (ls->wildcard) { - struct cmsghdr *cmsg; - - ngx_memcpy(&lsa, local_sockaddr, local_socklen); - local_sockaddr = &lsa.sockaddr; - - for (cmsg = CMSG_FIRSTHDR(&msg); - cmsg != NULL; - cmsg = CMSG_NXTHDR(&msg, cmsg)) - { - if (ngx_get_srcaddr_cmsg(cmsg, local_sockaddr) == NGX_OK) { - break; - } - } - } - -#endif - - if (ngx_quic_get_packet_dcid(ev->log, buffer, n, &key) != NGX_OK) { - goto next; - } - - c = ngx_quic_lookup_connection(ls, &key, local_sockaddr, local_socklen); - - if (c) { - -#if (NGX_DEBUG) - if (c->log->log_level & NGX_LOG_DEBUG_EVENT) { - ngx_log_handler_pt handler; - - handler = c->log->handler; - c->log->handler = NULL; - - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic recvmsg: fd:%d n:%z", c->fd, n); - - c->log->handler = handler; - } -#endif - - ngx_memzero(&buf, sizeof(ngx_buf_t)); - - buf.pos = buffer; - buf.last = buffer + n; - buf.start = buf.pos; - buf.end = buffer + sizeof(buffer); - - qsock = ngx_quic_get_socket(c); - - ngx_memcpy(&qsock->sockaddr, sockaddr, socklen); - qsock->socklen = socklen; - - c->udp->buffer = &buf; - - rev = c->read; - rev->ready = 1; - rev->active = 0; - - rev->handler(rev); - - if (c->udp) { - c->udp->buffer = NULL; - } - - rev->ready = 0; - rev->active = 1; - - goto next; - } - -#if (NGX_STAT_STUB) - (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); -#endif - - ngx_accept_disabled = ngx_cycle->connection_n / 8 - - ngx_cycle->free_connection_n; - - c = ngx_get_connection(lc->fd, ev->log); - if (c == NULL) { - return; - } - - c->shared = 1; - c->type = SOCK_DGRAM; - c->socklen = socklen; + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "quic recvmsg: fd:%d n:%z", lc->fd, n); -#if (NGX_STAT_STUB) - (void) ngx_atomic_fetch_add(ngx_stat_active, 1); -#endif - - c->pool = ngx_create_pool(ls->pool_size, ev->log); - if (c->pool == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - c->sockaddr = ngx_palloc(c->pool, NGX_SOCKADDRLEN); - if (c->sockaddr == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - ngx_memcpy(c->sockaddr, sockaddr, socklen); - - log = ngx_palloc(c->pool, sizeof(ngx_log_t)); - if (log == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - *log = ls->log; - - c->log = log; - c->pool->log = log; - c->listening = ls; - - if (local_sockaddr == &lsa.sockaddr) { - local_sockaddr = ngx_palloc(c->pool, local_socklen); - if (local_sockaddr == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - ngx_memcpy(local_sockaddr, &lsa, local_socklen); - } - - c->local_sockaddr = local_sockaddr; - c->local_socklen = local_socklen; - - c->buffer = ngx_create_temp_buf(c->pool, n); - if (c->buffer == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n); - - rev = c->read; - wev = c->write; - - rev->active = 1; - wev->ready = 1; - - rev->log = log; - wev->log = log; - - /* - * TODO: MT: - ngx_atomic_fetch_add() - * or protection by critical section or light mutex - * - * TODO: MP: - allocated in a shared memory - * - ngx_atomic_fetch_add() - * or protection by critical section or light mutex - */ - - c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); - - c->start_time = ngx_current_msec; - -#if (NGX_STAT_STUB) - (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); -#endif - - if (ls->addr_ntop) { - c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); - if (c->addr_text.data == NULL) { - ngx_quic_close_accepted_connection(c); - return; - } - - c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, - c->addr_text.data, - ls->addr_text_max_len, 0); - if (c->addr_text.len == 0) { - ngx_quic_close_accepted_connection(c); - return; - } - } - -#if (NGX_DEBUG) - { - ngx_str_t addr; - u_char text[NGX_SOCKADDR_STRLEN]; - - ngx_debug_accepted_connection(ecf, c); - - if (log->log_level & NGX_LOG_DEBUG_EVENT) { - addr.data = text; - addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, - NGX_SOCKADDR_STRLEN, 1); - - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, log, 0, - "*%uA quic recvmsg: %V fd:%d n:%z", - c->number, &addr, c->fd, n); - } - - } -#endif - - log->data = NULL; - log->handler = NULL; - - ls->handler(c); - - next: + ngx_quic_handle_msg(ev, &msg, n); if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { ev->available -= n; @@ -348,6 +113,386 @@ ngx_quic_recvmsg(ngx_event_t *ev) } +#if (NGX_HAVE_RECVMMSG) + +#define NGX_QUIC_RECVMMSG_NUM 64 + +void +ngx_quic_recvmmsg(ngx_event_t *ev) +{ + int nmsg; + size_t n; + ngx_err_t err; + ngx_int_t i; + struct iovec iov[NGX_QUIC_RECVMMSG_NUM]; + struct msghdr *msg; + ngx_sockaddr_t sa[NGX_QUIC_RECVMMSG_NUM]; + ngx_listening_t *ls; + ngx_event_conf_t *ecf; + ngx_connection_t *lc; + static u_char buffer[NGX_QUIC_RECVMMSG_NUM] + [NGX_QUIC_MAX_UDP_PAYLOAD_SIZE]; + struct mmsghdr msgvec[NGX_QUIC_RECVMMSG_NUM]; + +#if (NGX_HAVE_ADDRINFO_CMSG) + u_char msg_control[NGX_QUIC_RECVMMSG_NUM] + [CMSG_SPACE(sizeof(ngx_addrinfo_t))]; +#endif + + if (ev->timedout) { + if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) { + return; + } + + ev->timedout = 0; + } + + ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); + + if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) { + ev->available = ecf->multi_accept; + } + + lc = ev->data; + ls = lc->listening; + ev->ready = 0; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "quic recvmmsg on %V, ready: %d", + &ls->addr_text, ev->available); + + do { + + ngx_memzero(msgvec, sizeof(msgvec)); + +#if (NGX_HAVE_ADDRINFO_CMSG) + if (ls->wildcard) { + ngx_memzero(msg_control, sizeof(msg_control)); + } +#endif + + for (i = 0; i < NGX_QUIC_RECVMMSG_NUM; i++) { + msg = &msgvec[i].msg_hdr; + + iov[i].iov_base = (void *) buffer[i]; + iov[i].iov_len = NGX_QUIC_MAX_UDP_PAYLOAD_SIZE; + + msg->msg_name = &sa[i]; + msg->msg_namelen = sizeof(ngx_sockaddr_t); + msg->msg_iov = &iov[i]; + msg->msg_iovlen = 1; + +#if (NGX_HAVE_ADDRINFO_CMSG) + if (ls->wildcard) { + msg->msg_control = &msg_control[i]; + msg->msg_controllen = sizeof(msg_control); + } +#endif + } + + nmsg = recvmmsg(lc->fd, msgvec, NGX_QUIC_RECVMMSG_NUM, 0, NULL); + + if (nmsg == -1) { + err = ngx_socket_errno; + + if (err == NGX_EAGAIN) { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ev->log, err, + "quic recvmmsg() not ready"); + return; + } + + ngx_log_error(NGX_LOG_ALERT, ev->log, err, + "quic recvmmsg() failed"); + + return; + } + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "quic recvmmsg: fd:%d n:%i", lc->fd, nmsg); + + for (i = 0; i < nmsg; i++) { + msg = &msgvec[i].msg_hdr; + n = msgvec[i].msg_len; + +#if (NGX_HAVE_ADDRINFO_CMSG) + if (msg->msg_flags & (MSG_TRUNC|MSG_CTRUNC)) { + ngx_log_error(NGX_LOG_ALERT, ev->log, 0, + "quic recvmmsg() truncated data"); + continue; + } +#endif + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0, + "quic recvmmsg: n:%z", n); + + ngx_quic_handle_msg(ev, msg, n); + + if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) { + ev->available -= n; + } + } + + } while (ev->available); +} + +#endif + + +static void +ngx_quic_handle_msg(ngx_event_t *ev, struct msghdr *msg, size_t n) +{ + u_char *buffer; + ngx_str_t key; + ngx_buf_t buf; + ngx_log_t *log; + socklen_t socklen, local_socklen; + ngx_event_t *rev, *wev; + ngx_sockaddr_t lsa; + struct sockaddr *sockaddr, *local_sockaddr; + ngx_listening_t *ls; + ngx_connection_t *c, *lc; + ngx_quic_socket_t *qsock; + + lc = ev->data; + ls = lc->listening; + + buffer = msg->msg_iov[0].iov_base; + + sockaddr = msg->msg_name; + socklen = msg->msg_namelen; + + if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) { + socklen = sizeof(ngx_sockaddr_t); + } + +#if (NGX_HAVE_UNIX_DOMAIN) + + if (sockaddr->sa_family == AF_UNIX) { + struct sockaddr_un *saun = (struct sockaddr_un *) sockaddr; + + if (socklen <= (socklen_t) offsetof(struct sockaddr_un, sun_path) + || saun->sun_path[0] == '\0') + { + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, + "unbound unix socket"); + return; + } + } + +#endif + + local_sockaddr = ls->sockaddr; + local_socklen = ls->socklen; + +#if (NGX_HAVE_ADDRINFO_CMSG) + + if (ls->wildcard) { + struct cmsghdr *cmsg; + + ngx_memcpy(&lsa, local_sockaddr, local_socklen); + local_sockaddr = &lsa.sockaddr; + + for (cmsg = CMSG_FIRSTHDR(msg); + cmsg != NULL; + cmsg = CMSG_NXTHDR(msg, cmsg)) + { + if (ngx_get_srcaddr_cmsg(cmsg, local_sockaddr) == NGX_OK) { + break; + } + } + } + +#endif + + if (ngx_quic_get_packet_dcid(ev->log, buffer, n, &key) != NGX_OK) { + return; + } + + c = ngx_quic_lookup_connection(ls, &key, local_sockaddr, local_socklen); + + if (c) { + +#if (NGX_DEBUG) + if (c->log->log_level & NGX_LOG_DEBUG_EVENT) { + ngx_log_handler_pt handler; + + handler = c->log->handler; + c->log->handler = NULL; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic msg: n:%z", n); + + c->log->handler = handler; + } +#endif + + ngx_memzero(&buf, sizeof(ngx_buf_t)); + + buf.pos = buffer; + buf.last = buffer + n; + buf.start = buf.pos; + buf.end = buffer + sizeof(buffer); + + qsock = ngx_quic_get_socket(c); + + ngx_memcpy(&qsock->sockaddr, sockaddr, socklen); + qsock->socklen = socklen; + + c->udp->buffer = &buf; + + rev = c->read; + rev->ready = 1; + rev->active = 0; + + rev->handler(rev); + + if (c->udp) { + c->udp->buffer = NULL; + } + + rev->ready = 0; + rev->active = 1; + + return; + } + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_accepted, 1); +#endif + + ngx_accept_disabled = ngx_cycle->connection_n / 8 + - ngx_cycle->free_connection_n; + + c = ngx_get_connection(lc->fd, ev->log); + if (c == NULL) { + return; + } + + c->shared = 1; + c->type = SOCK_DGRAM; + c->socklen = socklen; + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_active, 1); +#endif + + c->pool = ngx_create_pool(ls->pool_size, ev->log); + if (c->pool == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + c->sockaddr = ngx_palloc(c->pool, NGX_SOCKADDRLEN); + if (c->sockaddr == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + ngx_memcpy(c->sockaddr, sockaddr, socklen); + + log = ngx_palloc(c->pool, sizeof(ngx_log_t)); + if (log == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + *log = ls->log; + + c->log = log; + c->pool->log = log; + c->listening = ls; + + if (local_sockaddr == &lsa.sockaddr) { + local_sockaddr = ngx_palloc(c->pool, local_socklen); + if (local_sockaddr == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + ngx_memcpy(local_sockaddr, &lsa, local_socklen); + } + + c->local_sockaddr = local_sockaddr; + c->local_socklen = local_socklen; + + c->buffer = ngx_create_temp_buf(c->pool, n); + if (c->buffer == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + c->buffer->last = ngx_cpymem(c->buffer->last, buffer, n); + + rev = c->read; + wev = c->write; + + rev->active = 1; + wev->ready = 1; + + rev->log = log; + wev->log = log; + + /* + * TODO: MT: - ngx_atomic_fetch_add() + * or protection by critical section or light mutex + * + * TODO: MP: - allocated in a shared memory + * - ngx_atomic_fetch_add() + * or protection by critical section or light mutex + */ + + c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); + + c->start_time = ngx_current_msec; + +#if (NGX_STAT_STUB) + (void) ngx_atomic_fetch_add(ngx_stat_handled, 1); +#endif + + if (ls->addr_ntop) { + c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len); + if (c->addr_text.data == NULL) { + ngx_quic_close_accepted_connection(c); + return; + } + + c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen, + c->addr_text.data, + ls->addr_text_max_len, 0); + if (c->addr_text.len == 0) { + ngx_quic_close_accepted_connection(c); + return; + } + } + +#if (NGX_DEBUG) + { + ngx_str_t addr; + ngx_event_conf_t *ecf; + u_char text[NGX_SOCKADDR_STRLEN]; + + ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module); + + ngx_debug_accepted_connection(ecf, c); + + if (log->log_level & NGX_LOG_DEBUG_EVENT) { + addr.data = text; + addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text, + NGX_SOCKADDR_STRLEN, 1); + + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0, + "*%uA quic msg: %V n:%z", c->number, &addr, n); + } + + } +#endif + + log->data = NULL; + log->handler = NULL; + + ls->handler(c); +} + + static void ngx_quic_close_accepted_connection(ngx_connection_t *c) { _______________________________________________ nginx-devel mailing list nginx-devel@nginx.org https://mailman.nginx.org/mailman/listinfo/nginx-devel