On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote: > We used to queue tx packets in sk_receive_queue, this is less > efficient since it requires spinlocks to synchronize between producer > and consumer. > > This patch tries to address this by using circular buffer which allows > lockless synchronization. This is done by switching from > sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when > this is set:
Why do we need a new flag? Is there a userspace-visible behaviour change? > > - store pointer to skb in circular buffer in tun_net_xmit(), and read > it from the circular buffer in tun_do_read(). > - introduce a new proto_ops peek which could be implemented by > specific socket which does not use sk_receive_queue. > - store skb length in circular buffer too, and implement a lockless > peek for tuntap. > - change vhost_net to use proto_ops->peek() instead > - new spinlocks were introduced to synchronize among producers (and so > did for consumers). > > Pktgen test shows about 9% improvement on guest receiving pps: > > Before: ~1480000pps > After : ~1610000pps > > (I'm not sure noblocking read is still needed, so it was not included > in this patch) How do you mean? Of course we must support blocking and non-blocking read - userspace uses it. > Signed-off-by: Jason Wang <jasow...@redhat.com> > --- > --- > drivers/net/tun.c | 157 > +++++++++++++++++++++++++++++++++++++++++--- > drivers/vhost/net.c | 16 ++++- > include/linux/net.h | 1 + > include/uapi/linux/if_tun.h | 1 + > 4 files changed, 165 insertions(+), 10 deletions(-) > > diff --git a/drivers/net/tun.c b/drivers/net/tun.c > index 425e983..6001ece 100644 > --- a/drivers/net/tun.c > +++ b/drivers/net/tun.c > @@ -71,6 +71,7 @@ > #include <net/sock.h> > #include <linux/seq_file.h> > #include <linux/uio.h> > +#include <linux/circ_buf.h> > > #include <asm/uaccess.h> > > @@ -130,6 +131,8 @@ struct tap_filter { > #define MAX_TAP_FLOWS 4096 > > #define TUN_FLOW_EXPIRE (3 * HZ) > +#define TUN_RING_SIZE 256 Can we resize this according to tx_queue_len set by user? > +#define TUN_RING_MASK (TUN_RING_SIZE - 1) > > struct tun_pcpu_stats { > u64 rx_packets; > @@ -142,6 +145,11 @@ struct tun_pcpu_stats { > u32 rx_frame_errors; > }; > > +struct tun_desc { > + struct sk_buff *skb; > + int len; /* Cached skb len for peeking */ > +}; > + > /* A tun_file connects an open character device to a tuntap netdevice. It > * also contains all socket related structures (except sock_fprog and > tap_filter) > * to serve as one transmit queue for tuntap device. The sock_fprog and > @@ -167,6 +175,13 @@ struct tun_file { > }; > struct list_head next; > struct tun_struct *detached; > + /* reader lock */ > + spinlock_t rlock; > + unsigned long tail; > + struct tun_desc tx_descs[TUN_RING_SIZE]; > + /* writer lock */ > + spinlock_t wlock; > + unsigned long head; > }; > > struct tun_flow_entry { > @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct > tun_file *tfile) > > static void tun_queue_purge(struct tun_file *tfile) > { > + unsigned long head, tail; > + struct tun_desc *desc; > + struct sk_buff *skb; > skb_queue_purge(&tfile->sk.sk_receive_queue); > + spin_lock(&tfile->rlock); > + > + head = ACCESS_ONCE(tfile->head); > + tail = tfile->tail; > + > + /* read tail before reading descriptor at tail */ > + smp_rmb(); I think you mean read *head* here > + > + while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) { > + desc = &tfile->tx_descs[tail]; > + skb = desc->skb; > + kfree_skb(skb); > + tail = (tail + 1) & TUN_RING_MASK; > + /* read descriptor before incrementing tail. */ > + smp_store_release(&tfile->tail, tail & TUN_RING_MASK); > + } > + spin_unlock(&tfile->rlock); > skb_queue_purge(&tfile->sk.sk_error_queue); > } > Barrier pairing seems messed up. Could you tag each barrier with its pair pls? E.g. add /* Barrier A for pairing */ Before barrier and its pair. > @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, > struct net_device *dev) > int txq = skb->queue_mapping; > struct tun_file *tfile; > u32 numqueues = 0; > + unsigned long flags; > > rcu_read_lock(); > tfile = rcu_dereference(tun->tfiles[txq]); > @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, > struct net_device *dev) > > nf_reset(skb); > > - /* Enqueue packet */ > - skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb); > + if (tun->flags & IFF_TX_RING) { > + unsigned long head, tail; > + > + spin_lock_irqsave(&tfile->wlock, flags); > + > + head = tfile->head; > + tail = ACCESS_ONCE(tfile->tail); this should be acquire > + > + if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) { > + struct tun_desc *desc = &tfile->tx_descs[head]; > + > + desc->skb = skb; > + desc->len = skb->len; > + if (skb_vlan_tag_present(skb)) > + desc->len += VLAN_HLEN; > + > + /* read descriptor before incrementing head. */ write descriptor. so smp_wmb is enough. > + smp_store_release(&tfile->head, > + (head + 1) & TUN_RING_MASK); > + } else { > + spin_unlock_irqrestore(&tfile->wlock, flags); > + goto drop; > + } > + > + spin_unlock_irqrestore(&tfile->wlock, flags); > + } else { > + /* Enqueue packet */ > + skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb); > + } > > /* Notify and wake up reader process */ > if (tfile->flags & TUN_FASYNC) > @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev) > } > } > > +static bool tun_queue_not_empty(struct tun_file *tfile) > +{ > + struct sock *sk = tfile->socket.sk; > + > + return (!skb_queue_empty(&sk->sk_receive_queue) || > + ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail)); > +} > /* Character device part */ > > /* Poll */ > @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, > poll_table *wait) > > poll_wait(file, sk_sleep(sk), wait); > > - if (!skb_queue_empty(&sk->sk_receive_queue)) > + if (tun_queue_not_empty(tfile)) > mask |= POLLIN | POLLRDNORM; > > if (sock_writeable(sk) || > @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, > struct tun_file *tfile, > if (tun->dev->reg_state != NETREG_REGISTERED) > return -EIO; > > - /* Read frames from queue */ > - skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0, > - &peeked, &off, &err); > - if (!skb) > - return err; > + if (tun->flags & IFF_TX_RING) { > + unsigned long head, tail; > + struct tun_desc *desc; > + > + spin_lock(&tfile->rlock); > + head = ACCESS_ONCE(tfile->head); > + tail = tfile->tail; > + > + if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) { > + /* read tail before reading descriptor at tail */ > + smp_rmb(); > + desc = &tfile->tx_descs[tail]; > + skb = desc->skb; > + /* read descriptor before incrementing tail. */ > + smp_store_release(&tfile->tail, > + (tail + 1) & TUN_RING_MASK); same comments as purge, also - pls order code consistently. > + } else { > + spin_unlock(&tfile->rlock); > + return -EAGAIN; > + } > + > + spin_unlock(&tfile->rlock); > + } else { > + /* Read frames from queue */ > + skb = __skb_recv_datagram(tfile->socket.sk, > + noblock ? MSG_DONTWAIT : 0, > + &peeked, &off, &err); > + if (!skb) > + return err; > + } > > ret = tun_put_user(tun, tfile, skb, to); > if (unlikely(ret < 0)) > @@ -1629,8 +1724,47 @@ out: > return ret; > } > > +static int tun_peek(struct socket *sock, bool exact) > +{ > + struct tun_file *tfile = container_of(sock, struct tun_file, socket); > + struct sock *sk = sock->sk; > + struct tun_struct *tun; > + int ret = 0; > + > + if (!exact) > + return tun_queue_not_empty(tfile); > + > + tun = __tun_get(tfile); > + if (!tun) > + return 0; > + > + if (tun->flags & IFF_TX_RING) { > + unsigned long head = ACCESS_ONCE(tfile->head); > + unsigned long tail = ACCESS_ONCE(tfile->tail); > + > + if (head != tail) > + ret = tfile->tx_descs[tail].len; no ordering at all here? > + } else { > + struct sk_buff *head; > + unsigned long flags; > + > + spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); > + head = skb_peek(&sk->sk_receive_queue); > + if (likely(head)) { > + ret = head->len; > + if (skb_vlan_tag_present(head)) > + ret += VLAN_HLEN; > + } > + spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags); > + } > + > + tun_put(tun); > + return ret; > +} > + > /* Ops structure to mimic raw sockets with tun */ > static const struct proto_ops tun_socket_ops = { > + .peek = tun_peek, > .sendmsg = tun_sendmsg, > .recvmsg = tun_recvmsg, > }; > @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct > file * file) > { > struct net *net = current->nsproxy->net_ns; > struct tun_file *tfile; > - > DBG1(KERN_INFO, "tunX: tun_chr_open\n"); > > tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL, > &tun_proto, 0); > if (!tfile) > return -ENOMEM; > + > RCU_INIT_POINTER(tfile->tun, NULL); > tfile->flags = 0; > tfile->ifindex = 0; > @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct > file * file) > INIT_LIST_HEAD(&tfile->next); > > sock_set_flag(&tfile->sk, SOCK_ZEROCOPY); > + tfile->head = 0; > + tfile->tail = 0; > + > + spin_lock_init(&tfile->rlock); > + spin_lock_init(&tfile->wlock); > > return 0; > } > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c > index f744eeb..10ff494 100644 > --- a/drivers/vhost/net.c > +++ b/drivers/vhost/net.c > @@ -455,10 +455,14 @@ out: > > static int peek_head_len(struct sock *sk) > { > + struct socket *sock = sk->sk_socket; > struct sk_buff *head; > int len = 0; > unsigned long flags; > > + if (sock->ops->peek) > + return sock->ops->peek(sock, true); > + > spin_lock_irqsave(&sk->sk_receive_queue.lock, flags); > head = skb_peek(&sk->sk_receive_queue); > if (likely(head)) { > @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk) > return len; > } > > +static int sk_has_rx_data(struct sock *sk) > +{ > + struct socket *sock = sk->sk_socket; > + > + if (sock->ops->peek) > + return sock->ops->peek(sock, false); > + > + return skb_queue_empty(&sk->sk_receive_queue); > +} > + > static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) > { > struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; > @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net > *net, struct sock *sk) > endtime = busy_clock() + vq->busyloop_timeout; > > while (vhost_can_busy_poll(&net->dev, endtime) && > - skb_queue_empty(&sk->sk_receive_queue) && > + !sk_has_rx_data(sk) && > vhost_vq_avail_empty(&net->dev, vq)) > cpu_relax_lowlatency(); > > diff --git a/include/linux/net.h b/include/linux/net.h > index 72c1e06..3c4ecd5 100644 > --- a/include/linux/net.h > +++ b/include/linux/net.h > @@ -132,6 +132,7 @@ struct module; > struct proto_ops { > int family; > struct module *owner; > + int (*peek) (struct socket *sock, bool exact); > int (*release) (struct socket *sock); > int (*bind) (struct socket *sock, > struct sockaddr *myaddr, > diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h > index 3cb5e1d..d64ddc1 100644 > --- a/include/uapi/linux/if_tun.h > +++ b/include/uapi/linux/if_tun.h > @@ -61,6 +61,7 @@ > #define IFF_TUN 0x0001 > #define IFF_TAP 0x0002 > #define IFF_NO_PI 0x1000 > +#define IFF_TX_RING 0x0010 > /* This flag has no real effect */ > #define IFF_ONE_QUEUE 0x2000 > #define IFF_VNET_HDR 0x4000 > -- > 2.7.4