On Thu, Nov 10, 2016 at 08:44:32PM -0800, John Fastabend wrote:
> Signed-off-by: John Fastabend <john.r.fastab...@intel.com>

This will naturally reduce the cache line bounce
costs, but so will a _many API for ptr-ring,
doing lock-add many-unlock.

the number of atomics also scales better with the lock:
one per push instead of one per queue.

Also, when can qdisc use a _many operation?


> ---
>  include/linux/ptr_ring_ll.h |   22 ++++++++++++++++------
>  include/linux/skb_array.h   |   11 +++++++++--
>  net/sched/sch_generic.c     |    2 +-
>  3 files changed, 26 insertions(+), 9 deletions(-)
> 
> diff --git a/include/linux/ptr_ring_ll.h b/include/linux/ptr_ring_ll.h
> index bcb11f3..5dc25f7 100644
> --- a/include/linux/ptr_ring_ll.h
> +++ b/include/linux/ptr_ring_ll.h
> @@ -45,9 +45,10 @@ struct ptr_ring_ll {
>  /* Note: callers invoking this in a loop must use a compiler barrier,
>   * for example cpu_relax(). Callers must hold producer_lock.
>   */
> -static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void *ptr)
> +static inline int __ptr_ring_ll_produce_many(struct ptr_ring_ll *r,
> +                                          void **ptr, int num)
>  {
> -     u32 ret, head, tail, next, slots, mask;
> +     u32 ret, head, tail, next, slots, mask, i;
>  
>       do {
>               head = READ_ONCE(r->prod_head);
> @@ -55,21 +56,30 @@ static inline int __ptr_ring_ll_produce(struct 
> ptr_ring_ll *r, void *ptr)
>               tail = READ_ONCE(r->cons_tail);
>  
>               slots = mask + tail - head;
> -             if (slots < 1)
> +             if (slots < num)
> +                     num = slots;
> +
> +             if (unlikely(!num))
>                       return -ENOMEM;
>  
> -             next = head + 1;
> +             next = head + num;
>               ret = cmpxchg(&r->prod_head, head, next);
>       } while (ret != head);
>  
> -     r->queue[head & mask] = ptr;
> +     for (i = 0; i < num; i++)
> +             r->queue[(head + i) & mask] = ptr[i];
>       smp_wmb();
>  
>       while (r->prod_tail != head)
>               cpu_relax();
>  
>       r->prod_tail = next;
> -     return 0;
> +     return num;
> +}
> +
> +static inline int __ptr_ring_ll_produce(struct ptr_ring_ll *r, void **ptr)
> +{
> +     return __ptr_ring_ll_produce_many(r, ptr, 1);
>  }
>  
>  static inline void *__ptr_ring_ll_consume(struct ptr_ring_ll *r)
> diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
> index 9b43dfd..de3c700 100644
> --- a/include/linux/skb_array.h
> +++ b/include/linux/skb_array.h
> @@ -48,9 +48,16 @@ static inline bool skb_array_full(struct skb_array *a)
>       return ptr_ring_full(&a->ring);
>  }
>  
> -static inline int skb_array_ll_produce(struct skb_array_ll *a, struct 
> sk_buff *skb)
> +static inline int skb_array_ll_produce_many(struct skb_array_ll *a,
> +                                         struct sk_buff **skb, int num)
>  {
> -     return __ptr_ring_ll_produce(&a->ring, skb);
> +     return __ptr_ring_ll_produce_many(&a->ring, (void **)skb, num);
> +}
> +
> +static inline int skb_array_ll_produce(struct skb_array_ll *a,
> +                                    struct sk_buff **skb)
> +{
> +     return __ptr_ring_ll_produce(&a->ring, (void **)skb);
>  }
>  
>  static inline int skb_array_produce(struct skb_array *a, struct sk_buff *skb)
> diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
> index 4648ec8..58f2011 100644
> --- a/net/sched/sch_generic.c
> +++ b/net/sched/sch_generic.c
> @@ -571,7 +571,7 @@ static int pfifo_fast_enqueue(struct sk_buff *skb, struct 
> Qdisc *qdisc,
>       struct skb_array_ll *q = band2list(priv, band);
>       int err;
>  
> -     err = skb_array_ll_produce(q, skb);
> +     err = skb_array_ll_produce(q, &skb);
>  
>       if (unlikely(err)) {
>               net_warn_ratelimited("drop a packet from fast enqueue\n");

I don't see a pop many operation here.

-- 
MST

Reply via email to