On Mon, 15 Mar 2021 11:10:18 +0800 Yunsheng Lin wrote:
> @@ -606,6 +623,11 @@ static const u8 prio2band[TC_PRIO_MAX + 1] = {
> */
> struct pfifo_fast_priv {
> struct skb_array q[PFIFO_FAST_BANDS];
> +
> + /* protect against data race between enqueue/dequeue and
> + * qdisc->empty setting
> + */
> + spinlock_t lock;
> };
>
> static inline struct skb_array *band2list(struct pfifo_fast_priv *priv,
> @@ -623,7 +645,10 @@ static int pfifo_fast_enqueue(struct sk_buff *skb,
> struct Qdisc *qdisc,
> unsigned int pkt_len = qdisc_pkt_len(skb);
> int err;
>
> - err = skb_array_produce(q, skb);
> + spin_lock(&priv->lock);
> + err = __ptr_ring_produce(&q->ring, skb);
> + WRITE_ONCE(qdisc->empty, false);
> + spin_unlock(&priv->lock);
>
> if (unlikely(err)) {
> if (qdisc_is_percpu_stats(qdisc))
> @@ -642,6 +667,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc
> *qdisc)
> struct sk_buff *skb = NULL;
> int band;
>
> + spin_lock(&priv->lock);
> for (band = 0; band < PFIFO_FAST_BANDS && !skb; band++) {
> struct skb_array *q = band2list(priv, band);
>
> @@ -655,6 +681,7 @@ static struct sk_buff *pfifo_fast_dequeue(struct Qdisc
> *qdisc)
> } else {
> WRITE_ONCE(qdisc->empty, true);
> }
> + spin_unlock(&priv->lock);
>
> return skb;
> }
I thought pfifo was supposed to be "lockless" and this change
re-introduces a lock between producer and consumer, no?