Hi, Here is my kernel diff to expand socket splicing to UDP. The advantage for relaying applications is that they can forward the data without copying it to user space. Currently relayd uses socket splicing for TCP connections only.
The idea of my implementation is to merge the code relevant for UDP from sosend() and soreceive() into somove(). That allows the kernel to directly transfer the UDP data from one socket to another. I have written a bunch of tests in /usr/src/regress/sys/kern/sosplice/udp/. Performance measurement and a real live relaying daemon are on my todo list. bluhm Index: kern/uipc_socket.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket.c,v retrieving revision 1.110 diff -u -p -r1.110 uipc_socket.c --- kern/uipc_socket.c 31 Dec 2012 13:46:49 -0000 1.110 +++ kern/uipc_socket.c 4 Jan 2013 17:19:51 -0000 @@ -1042,7 +1042,8 @@ sosplice(struct socket *so, int fd, off_ return (EPROTONOSUPPORT); if (so->so_options & SO_ACCEPTCONN) return (EOPNOTSUPP); - if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0) + if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0 && + (so->so_proto->pr_flags & PR_CONNREQUIRED)) return (ENOTCONN); /* If no fd is given, unsplice by removing existing link. */ @@ -1070,6 +1071,10 @@ sosplice(struct socket *so, int fd, off_ return (error); sosp = fp->f_data; + if (so->so_state & SS_ISCONFIRMING) + (*so->so_proto->pr_usrreq)(so, PRU_RCVD, NULL, NULL, NULL, + curproc); + /* Lock both receive and send buffer. */ if ((error = sblock(&so->so_rcv, (so->so_state & SS_NBIO) ? M_NOWAIT : M_WAITOK)) != 0) { @@ -1159,6 +1164,7 @@ somove(struct socket *so, int wait) splsoftassert(IPL_SOFTNET); + nextpkt: if (so->so_error) { error = so->so_error; goto release; @@ -1175,9 +1181,7 @@ somove(struct socket *so, int wait) goto release; /* Calculate how many bytes can be copied now. */ - len = so->so_rcv.sb_cc; - if (len == 0) - goto release; + len = so->so_rcv.sb_datacc; if (so->so_splicemax) { KASSERT(so->so_splicelen < so->so_splicemax); if (so->so_splicemax <= so->so_splicelen + len) { @@ -1201,13 +1205,78 @@ somove(struct socket *so, int wait) } sosp->so_state |= SS_ISSENDING; - /* Take at most len mbufs out of receive buffer. */ + SBLASTRECORDCHK(&so->so_rcv, "somove 1"); + SBLASTMBUFCHK(&so->so_rcv, "somove 1"); m = so->so_rcv.sb_mb; + if (m == NULL) + goto release; nextrecord = m->m_nextpkt; - for (off = 0, mp = &m; off < len; + + /* Drop address and control information not used with splicing. */ + if (so->so_proto->pr_flags & PR_ADDR) { +#ifdef DIAGNOSTIC + if (m->m_type != MT_SONAME) + panic("somove soname"); +#endif + m = m->m_next; + } + while (m && m->m_type == MT_CONTROL) { + m = m->m_next; + } + if (m == NULL) { + sbdroprecord(&so->so_rcv); + if (so->so_proto->pr_flags & PR_WANTRCVD && so->so_pcb) + (so->so_proto->pr_usrreq)(so, PRU_RCVD, NULL, + (struct mbuf *)0L, NULL, NULL); + goto nextpkt; + } + + if (so->so_proto->pr_flags & PR_ATOMIC) { + if ((m->m_flags & M_PKTHDR) == 0) + panic("somove pkthdr"); + if (sosp->so_snd.sb_hiwat < m->m_pkthdr.len) { + error = EMSGSIZE; + goto release; + } + if (len < m->m_pkthdr.len) + goto release; + if (m->m_pkthdr.len < len) { + maxreached = 0; + len = m->m_pkthdr.len; + } + /* + * Throw away the name mbuf after it has been assured + * that the whole first record can be processed. + */ + m = so->so_rcv.sb_mb; + sbfree(&so->so_rcv, m); + MFREE(m, so->so_rcv.sb_mb); + sbsync(&so->so_rcv, nextrecord); + } + /* + * Throw away the control mbufs after it has been assured + * that the whole first record can be processed. + */ + m = so->so_rcv.sb_mb; + while (m && m->m_type == MT_CONTROL) { + sbfree(&so->so_rcv, m); + MFREE(m, so->so_rcv.sb_mb); + m = so->so_rcv.sb_mb; + sbsync(&so->so_rcv, nextrecord); + } + + SBLASTRECORDCHK(&so->so_rcv, "somove 2"); + SBLASTMBUFCHK(&so->so_rcv, "somove 2"); + + /* Take at most len mbufs out of receive buffer. */ + for (off = 0, mp = &m; off <= len && *mp; off += (*mp)->m_len, mp = &(*mp)->m_next) { u_long size = len - off; +#ifdef DIAGNOSTIC + if ((*mp)->m_type != MT_DATA && (*mp)->m_type != MT_HEADER) + panic("somove type"); +#endif if ((*mp)->m_len > size) { if (!maxreached || (*mp = m_copym( so->so_rcv.sb_mb, 0, size, wait)) == NULL) { @@ -1227,14 +1296,12 @@ somove(struct socket *so, int wait) } *mp = NULL; - SBLASTRECORDCHK(&so->so_rcv, "somove"); - SBLASTMBUFCHK(&so->so_rcv, "somove"); - KASSERT(so->so_rcv.sb_mb == so->so_rcv.sb_lastrecord); + SBLASTRECORDCHK(&so->so_rcv, "somove 3"); + SBLASTMBUFCHK(&so->so_rcv, "somove 3"); SBCHECK(&so->so_rcv); - - /* m might be NULL if the loop did break during the first iteration. */ if (m == NULL) goto release; + m->m_nextpkt = NULL; /* Send window update to source peer as receive buffer has changed. */ if (so->so_proto->pr_flags & PR_WANTRCVD && so->so_pcb) @@ -1315,6 +1382,10 @@ somove(struct socket *so, int wait) goto release; } so->so_splicelen += len; + + /* Move several packets if possible. */ + if (!maxreached && so->so_rcv.sb_mb) + goto nextpkt; release: sosp->so_state &= ~SS_ISSENDING; Index: netinet/in_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v retrieving revision 1.57 diff -u -p -r1.57 in_proto.c --- netinet/in_proto.c 15 Oct 2012 11:11:32 -0000 1.57 +++ netinet/in_proto.c 4 Jan 2013 12:30:14 -0000 @@ -184,7 +184,7 @@ struct protosw inetsw[] = { 0, ip_init, 0, ip_slowtimo, ip_drain, ip_sysctl }, -{ SOCK_DGRAM, &inetdomain, IPPROTO_UDP, PR_ATOMIC|PR_ADDR, +{ SOCK_DGRAM, &inetdomain, IPPROTO_UDP, PR_ATOMIC|PR_ADDR|PR_SPLICE, udp_input, 0, udp_ctlinput, ip_ctloutput, udp_usrreq, udp_init, 0, 0, 0, udp_sysctl Index: netinet6/in6_proto.c =================================================================== RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v retrieving revision 1.63 diff -u -p -r1.63 in6_proto.c --- netinet6/in6_proto.c 15 Mar 2012 14:11:57 -0000 1.63 +++ netinet6/in6_proto.c 4 Jan 2013 12:30:14 -0000 @@ -132,7 +132,7 @@ struct ip6protosw inet6sw[] = { ip6_init, 0, frag6_slowtimo, frag6_drain, ip6_sysctl, }, -{ SOCK_DGRAM, &inet6domain, IPPROTO_UDP, PR_ATOMIC|PR_ADDR, +{ SOCK_DGRAM, &inet6domain, IPPROTO_UDP, PR_ATOMIC|PR_ADDR|PR_SPLICE, udp6_input, 0, udp6_ctlinput, ip6_ctloutput, udp_usrreq, 0, 0, 0, 0,