Introduce tun_ring_consume() that wraps ptr_ring_consume() and calls __tun_wake_queue(). The latter wakes the stopped netdev subqueue once half of the ring capacity has been consumed, tracked via the new cons_cnt field in tun_file. As a safety net, the queue is also woken on the last consumed entry if it leaves the ring empty. The point is to allow the queue to be stopped when it gets full, which is required for traffic shaping - implemented by the following "avoid ptr_ring tail-drop when a qdisc is present".
Some implementation details: - tun_ring_recv() replaces ptr_ring_consume() with tun_ring_consume() to properly wake the queue on purge. - tun_queue_purge() also replaces ptr_ring_consume() with tun_ring_consume(). - __tun_detach() locks the tx_ring.consumer_lock to avoid races with the consumer on the queue_index. - Reset cons_cnt in tun_attach() so the half-ring wake threshold is valid for the new ring size after ptr_ring_resize(). - The upcoming patch explains the pairing of the smp_mb() of __tun_wake_queue(). - tun_queue_resize() wakes all queues after resizing with the proper tx_ring.consumer_lock and resets the cons_cnt to avoid a possible stale queue. Without the corresponding queue stopping, this patch alone causes no regression for a tap setup sending to a qemu VM: 1.132 Mpps to 1.134 Mpps. Details: AMD Ryzen 5 5600X at 4.3 GHz, 3200 MHz RAM, isolated QEMU threads, pktgen sender; Avg over 50 runs @ 100,000,000 packets; SRSO and spectre v2 mitigations disabled. Co-developed-by: Tim Gebauer <[email protected]> Signed-off-by: Tim Gebauer <[email protected]> Signed-off-by: Simon Schippers <[email protected]> --- drivers/net/tun.c | 73 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 9 deletions(-) diff --git a/drivers/net/tun.c b/drivers/net/tun.c index b183189f1853..b24cc899a890 100644 --- a/drivers/net/tun.c +++ b/drivers/net/tun.c @@ -145,6 +145,8 @@ struct tun_file { struct list_head next; struct tun_struct *detached; struct ptr_ring tx_ring; + /* Protected by tx_ring.consumer_lock */ + int cons_cnt; struct xdp_rxq_info xdp_rxq; }; @@ -557,11 +559,43 @@ void tun_ptr_free(void *ptr) } EXPORT_SYMBOL_GPL(tun_ptr_free); -static void tun_queue_purge(struct tun_file *tfile) +/* Callers must hold ring.consumer_lock */ +static void __tun_wake_queue(struct tun_struct *tun, + struct tun_file *tfile, int consumed) +{ + struct netdev_queue *txq = netdev_get_tx_queue(tun->dev, + tfile->queue_index); + + /* Paired with smp_mb__after_atomic() in tun_net_xmit() */ + smp_mb(); + if (netif_tx_queue_stopped(txq)) { + tfile->cons_cnt += consumed; + if (tfile->cons_cnt >= tfile->tx_ring.size / 2 || + __ptr_ring_empty(&tfile->tx_ring)) { + netif_tx_wake_queue(txq); + tfile->cons_cnt = 0; + } + } +} + +static void *tun_ring_consume(struct tun_struct *tun, struct tun_file *tfile) +{ + void *ptr; + + spin_lock(&tfile->tx_ring.consumer_lock); + ptr = __ptr_ring_consume(&tfile->tx_ring); + if (ptr) + __tun_wake_queue(tun, tfile, 1); + + spin_unlock(&tfile->tx_ring.consumer_lock); + return ptr; +} + +static void tun_queue_purge(struct tun_struct *tun, struct tun_file *tfile) { void *ptr; - while ((ptr = ptr_ring_consume(&tfile->tx_ring)) != NULL) + while ((ptr = tun_ring_consume(tun, tfile)) != NULL) tun_ptr_free(ptr); skb_queue_purge(&tfile->sk.sk_write_queue); @@ -588,8 +622,10 @@ static void __tun_detach(struct tun_file *tfile, bool clean) rcu_assign_pointer(tun->tfiles[index], tun->tfiles[tun->numqueues - 1]); ntfile = rtnl_dereference(tun->tfiles[index]); + spin_lock(&ntfile->tx_ring.consumer_lock); ntfile->queue_index = index; ntfile->xdp_rxq.queue_index = index; + spin_unlock(&ntfile->tx_ring.consumer_lock); rcu_assign_pointer(tun->tfiles[tun->numqueues - 1], NULL); @@ -605,7 +641,7 @@ static void __tun_detach(struct tun_file *tfile, bool clean) synchronize_net(); tun_flow_delete_by_queue(tun, tun->numqueues + 1); /* Drop read queue */ - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); tun_set_real_num_queues(tun); } else if (tfile->detached && clean) { tun = tun_enable_queue(tfile); @@ -670,14 +706,14 @@ static void tun_detach_all(struct net_device *dev) tfile = rtnl_dereference(tun->tfiles[i]); tun_napi_del(tfile); /* Drop read queue */ - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); xdp_rxq_info_unreg(&tfile->xdp_rxq); sock_put(&tfile->sk); } list_for_each_entry_safe(tfile, tmp, &tun->disabled, next) { tun_napi_del(tfile); tun_enable_queue(tfile); - tun_queue_purge(tfile); + tun_queue_purge(tun, tfile); xdp_rxq_info_unreg(&tfile->xdp_rxq); sock_put(&tfile->sk); } @@ -687,6 +723,13 @@ static void tun_detach_all(struct net_device *dev) module_put(THIS_MODULE); } +static void tun_reset_cons_cnt(struct tun_file *tfile) +{ + spin_lock(&tfile->tx_ring.consumer_lock); + tfile->cons_cnt = 0; + spin_unlock(&tfile->tx_ring.consumer_lock); +} + static int tun_attach(struct tun_struct *tun, struct file *file, bool skip_filter, bool napi, bool napi_frags, bool publish_tun) @@ -730,6 +773,7 @@ static int tun_attach(struct tun_struct *tun, struct file *file, goto out; } + tun_reset_cons_cnt(tfile); tfile->queue_index = tun->numqueues; tfile->socket.sk->sk_shutdown &= ~RCV_SHUTDOWN; @@ -2115,13 +2159,14 @@ static ssize_t tun_put_user(struct tun_struct *tun, return total; } -static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) +static void *tun_ring_recv(struct tun_struct *tun, struct tun_file *tfile, + int noblock, int *err) { DECLARE_WAITQUEUE(wait, current); void *ptr = NULL; int error = 0; - ptr = ptr_ring_consume(&tfile->tx_ring); + ptr = tun_ring_consume(tun, tfile); if (ptr) goto out; if (noblock) { @@ -2133,7 +2178,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) while (1) { set_current_state(TASK_INTERRUPTIBLE); - ptr = ptr_ring_consume(&tfile->tx_ring); + ptr = tun_ring_consume(tun, tfile); if (ptr) break; if (signal_pending(current)) { @@ -2170,7 +2215,7 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile, if (!ptr) { /* Read frames from ring */ - ptr = tun_ring_recv(tfile, noblock, &err); + ptr = tun_ring_recv(tun, tfile, noblock, &err); if (!ptr) return err; } @@ -3622,6 +3667,16 @@ static int tun_queue_resize(struct tun_struct *tun) dev->tx_queue_len, GFP_KERNEL, tun_ptr_free); + if (!ret) { + for (i = 0; i < tun->numqueues; i++) { + tfile = rtnl_dereference(tun->tfiles[i]); + spin_lock(&tfile->tx_ring.consumer_lock); + netif_wake_subqueue(tun->dev, tfile->queue_index); + tfile->cons_cnt = 0; + spin_unlock(&tfile->tx_ring.consumer_lock); + } + } + kfree(rings); return ret; } -- 2.43.0

