Hi,

Here is my kernel diff to expand socket splicing to UDP.  The
advantage for relaying applications is that they can forward the
data without copying it to user space.  Currently relayd uses socket
splicing for TCP connections only.

The idea of my implementation is to merge the code relevant for UDP
from sosend() and soreceive() into somove().  That allows the kernel
to directly transfer the UDP data from one socket to another.

I have written a bunch of tests in /usr/src/regress/sys/kern/sosplice/udp/.
Performance measurement and a real live relaying daemon are on my
todo list.

bluhm

Index: kern/uipc_socket.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.110
diff -u -p -r1.110 uipc_socket.c
--- kern/uipc_socket.c  31 Dec 2012 13:46:49 -0000      1.110
+++ kern/uipc_socket.c  4 Jan 2013 17:19:51 -0000
@@ -1042,7 +1042,8 @@ sosplice(struct socket *so, int fd, off_
                return (EPROTONOSUPPORT);
        if (so->so_options & SO_ACCEPTCONN)
                return (EOPNOTSUPP);
-       if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0)
+       if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0 &&
+           (so->so_proto->pr_flags & PR_CONNREQUIRED))
                return (ENOTCONN);
 
        /* If no fd is given, unsplice by removing existing link. */
@@ -1070,6 +1071,10 @@ sosplice(struct socket *so, int fd, off_
                return (error);
        sosp = fp->f_data;
 
+       if (so->so_state & SS_ISCONFIRMING)
+               (*so->so_proto->pr_usrreq)(so, PRU_RCVD, NULL, NULL, NULL,
+                   curproc);
+
        /* Lock both receive and send buffer. */
        if ((error = sblock(&so->so_rcv,
            (so->so_state & SS_NBIO) ? M_NOWAIT : M_WAITOK)) != 0) {
@@ -1159,6 +1164,7 @@ somove(struct socket *so, int wait)
 
        splsoftassert(IPL_SOFTNET);
 
+ nextpkt:
        if (so->so_error) {
                error = so->so_error;
                goto release;
@@ -1175,9 +1181,7 @@ somove(struct socket *so, int wait)
                goto release;
 
        /* Calculate how many bytes can be copied now. */
-       len = so->so_rcv.sb_cc;
-       if (len == 0)
-               goto release;
+       len = so->so_rcv.sb_datacc;
        if (so->so_splicemax) {
                KASSERT(so->so_splicelen < so->so_splicemax);
                if (so->so_splicemax <= so->so_splicelen + len) {
@@ -1201,13 +1205,78 @@ somove(struct socket *so, int wait)
        }
        sosp->so_state |= SS_ISSENDING;
 
-       /* Take at most len mbufs out of receive buffer. */
+       SBLASTRECORDCHK(&so->so_rcv, "somove 1");
+       SBLASTMBUFCHK(&so->so_rcv, "somove 1");
        m = so->so_rcv.sb_mb;
+       if (m == NULL)
+               goto release;
        nextrecord = m->m_nextpkt;
-       for (off = 0, mp = &m; off < len;
+
+       /* Drop address and control information not used with splicing. */
+       if (so->so_proto->pr_flags & PR_ADDR) {
+#ifdef DIAGNOSTIC
+               if (m->m_type != MT_SONAME)
+                       panic("somove soname");
+#endif
+               m = m->m_next;
+       }
+       while (m && m->m_type == MT_CONTROL) {
+               m = m->m_next;
+       }
+       if (m == NULL) {
+               sbdroprecord(&so->so_rcv);
+               if (so->so_proto->pr_flags & PR_WANTRCVD && so->so_pcb)
+                       (so->so_proto->pr_usrreq)(so, PRU_RCVD, NULL,
+                           (struct mbuf *)0L, NULL, NULL);
+               goto nextpkt;
+       }
+
+       if (so->so_proto->pr_flags & PR_ATOMIC) {
+               if ((m->m_flags & M_PKTHDR) == 0)
+                       panic("somove pkthdr");
+               if (sosp->so_snd.sb_hiwat < m->m_pkthdr.len) {
+                       error = EMSGSIZE;
+                       goto release;
+               }
+               if (len < m->m_pkthdr.len)
+                       goto release;
+               if (m->m_pkthdr.len < len) {
+                       maxreached = 0;
+                       len = m->m_pkthdr.len;
+               }
+               /*
+                * Throw away the name mbuf after it has been assured
+                * that the whole first record can be processed.
+                */
+               m = so->so_rcv.sb_mb;
+               sbfree(&so->so_rcv, m);
+               MFREE(m, so->so_rcv.sb_mb);
+               sbsync(&so->so_rcv, nextrecord);
+       }
+       /*
+        * Throw away the control mbufs after it has been assured
+        * that the whole first record can be processed.
+        */
+       m = so->so_rcv.sb_mb;
+       while (m && m->m_type == MT_CONTROL) {
+               sbfree(&so->so_rcv, m);
+               MFREE(m, so->so_rcv.sb_mb);
+               m = so->so_rcv.sb_mb;
+               sbsync(&so->so_rcv, nextrecord);
+       }
+
+       SBLASTRECORDCHK(&so->so_rcv, "somove 2");
+       SBLASTMBUFCHK(&so->so_rcv, "somove 2");
+
+       /* Take at most len mbufs out of receive buffer. */
+       for (off = 0, mp = &m; off <= len && *mp;
            off += (*mp)->m_len, mp = &(*mp)->m_next) {
                u_long size = len - off;
 
+#ifdef DIAGNOSTIC
+               if ((*mp)->m_type != MT_DATA && (*mp)->m_type != MT_HEADER)
+                       panic("somove type");
+#endif
                if ((*mp)->m_len > size) {
                        if (!maxreached || (*mp = m_copym(
                            so->so_rcv.sb_mb, 0, size, wait)) == NULL) {
@@ -1227,14 +1296,12 @@ somove(struct socket *so, int wait)
        }
        *mp = NULL;
 
-       SBLASTRECORDCHK(&so->so_rcv, "somove");
-       SBLASTMBUFCHK(&so->so_rcv, "somove");
-       KASSERT(so->so_rcv.sb_mb == so->so_rcv.sb_lastrecord);
+       SBLASTRECORDCHK(&so->so_rcv, "somove 3");
+       SBLASTMBUFCHK(&so->so_rcv, "somove 3");
        SBCHECK(&so->so_rcv);
-
-       /* m might be NULL if the loop did break during the first iteration. */
        if (m == NULL)
                goto release;
+       m->m_nextpkt = NULL;
 
        /* Send window update to source peer as receive buffer has changed. */
        if (so->so_proto->pr_flags & PR_WANTRCVD && so->so_pcb)
@@ -1315,6 +1382,10 @@ somove(struct socket *so, int wait)
                goto release;
        }
        so->so_splicelen += len;
+
+       /* Move several packets if possible. */
+       if (!maxreached && so->so_rcv.sb_mb)
+               goto nextpkt;
 
  release:
        sosp->so_state &= ~SS_ISSENDING;
Index: netinet/in_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
retrieving revision 1.57
diff -u -p -r1.57 in_proto.c
--- netinet/in_proto.c  15 Oct 2012 11:11:32 -0000      1.57
+++ netinet/in_proto.c  4 Jan 2013 12:30:14 -0000
@@ -184,7 +184,7 @@ struct protosw inetsw[] = {
   0,
   ip_init,     0,              ip_slowtimo,    ip_drain,       ip_sysctl
 },
-{ SOCK_DGRAM,  &inetdomain,    IPPROTO_UDP,    PR_ATOMIC|PR_ADDR,
+{ SOCK_DGRAM,  &inetdomain,    IPPROTO_UDP,    PR_ATOMIC|PR_ADDR|PR_SPLICE,
   udp_input,   0,              udp_ctlinput,   ip_ctloutput,
   udp_usrreq,
   udp_init,    0,              0,              0,              udp_sysctl
Index: netinet6/in6_proto.c
===================================================================
RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
retrieving revision 1.63
diff -u -p -r1.63 in6_proto.c
--- netinet6/in6_proto.c        15 Mar 2012 14:11:57 -0000      1.63
+++ netinet6/in6_proto.c        4 Jan 2013 12:30:14 -0000
@@ -132,7 +132,7 @@ struct ip6protosw inet6sw[] = {
   ip6_init,    0,              frag6_slowtimo, frag6_drain,
   ip6_sysctl,
 },
-{ SOCK_DGRAM,  &inet6domain,   IPPROTO_UDP,    PR_ATOMIC|PR_ADDR,
+{ SOCK_DGRAM,  &inet6domain,   IPPROTO_UDP,    PR_ATOMIC|PR_ADDR|PR_SPLICE,
   udp6_input,  0,              udp6_ctlinput,  ip6_ctloutput,
   udp_usrreq,  0,
   0,           0,              0,

Reply via email to