Hi Bruce, Konstantin, Stephen,
        Appreciate if you could provide feedback on this.

Thanks,
Honnappa

> -----Original Message-----
> From: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Sent: Tuesday, October 8, 2019 9:47 PM
> To: olivier.m...@6wind.com; sthem...@microsoft.com; jer...@marvell.com;
> bruce.richard...@intel.com; david.march...@redhat.com;
> pbhagavat...@marvell.com; konstantin.anan...@intel.com; Honnappa
> Nagarahalli <honnappa.nagaraha...@arm.com>
> Cc: dev@dpdk.org; Dharmik Thakkar <dharmik.thak...@arm.com>; Ruifeng
> Wang (Arm Technology China) <ruifeng.w...@arm.com>; Gavin Hu (Arm
> Technology China) <gavin...@arm.com>
> Subject: [PATCH v4 1/2] lib/ring: apis to support configurable element size
> 
> Current APIs assume ring elements to be pointers. However, in many use cases,
> the size can be different. Add new APIs to support configurable ring element
> sizes.
> 
> Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com>
> Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com>
> Reviewed-by: Gavin Hu <gavin...@arm.com>
> Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com>
> ---
>  lib/librte_ring/Makefile             |   3 +-
>  lib/librte_ring/meson.build          |   3 +
>  lib/librte_ring/rte_ring.c           |  45 +-
>  lib/librte_ring/rte_ring.h           |   1 +
>  lib/librte_ring/rte_ring_elem.h      | 946 +++++++++++++++++++++++++++
>  lib/librte_ring/rte_ring_version.map |   2 +
>  6 files changed, 991 insertions(+), 9 deletions(-)  create mode 100644
> lib/librte_ring/rte_ring_elem.h
> 
> diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index
> 21a36770d..515a967bb 100644
> --- a/lib/librte_ring/Makefile
> +++ b/lib/librte_ring/Makefile
> @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk  # library name  LIB =
> librte_ring.a
> 
> -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3
> +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -
> DALLOW_EXPERIMENTAL_API
>  LDLIBS += -lrte_eal
> 
>  EXPORT_MAP := rte_ring_version.map
> @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c
> 
>  # install includes
>  SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
> +                                     rte_ring_elem.h \
>                                       rte_ring_generic.h \
>                                       rte_ring_c11_mem.h
> 
> diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index
> ab8b0b469..74219840a 100644
> --- a/lib/librte_ring/meson.build
> +++ b/lib/librte_ring/meson.build
> @@ -6,3 +6,6 @@ sources = files('rte_ring.c')  headers = files('rte_ring.h',
>               'rte_ring_c11_mem.h',
>               'rte_ring_generic.h')
> +
> +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental
> +allow_experimental_apis = true
> diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index
> d9b308036..6fed3648b 100644
> --- a/lib/librte_ring/rte_ring.c
> +++ b/lib/librte_ring/rte_ring.c
> @@ -33,6 +33,7 @@
>  #include <rte_tailq.h>
> 
>  #include "rte_ring.h"
> +#include "rte_ring_elem.h"
> 
>  TAILQ_HEAD(rte_ring_list, rte_tailq_entry);
> 
> @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq)
> 
>  /* return the size of memory occupied by a ring */  ssize_t -
> rte_ring_get_memsize(unsigned count)
> +rte_ring_get_memsize_elem(unsigned count, unsigned esize)
>  {
>       ssize_t sz;
> 
> +     /* Supported esize values are 4/8/16.
> +      * Others can be added on need basis.
> +      */
> +     if ((esize != 4) && (esize != 8) && (esize != 16)) {
> +             RTE_LOG(ERR, RING,
> +                     "Unsupported esize value. Supported values are 4, 8
> and 16\n");
> +
> +             return -EINVAL;
> +     }
> +
>       /* count must be a power of 2 */
>       if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) {
>               RTE_LOG(ERR, RING,
> -                     "Requested size is invalid, must be power of 2, and "
> -                     "do not exceed the size limit %u\n",
> RTE_RING_SZ_MASK);
> +                     "Requested number of elements is invalid, must be "
> +                     "power of 2, and do not exceed the limit %u\n",
> +                     RTE_RING_SZ_MASK);
> +
>               return -EINVAL;
>       }
> 
> -     sz = sizeof(struct rte_ring) + count * sizeof(void *);
> +     sz = sizeof(struct rte_ring) + count * esize;
>       sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
>       return sz;
>  }
> 
> +/* return the size of memory occupied by a ring */ ssize_t
> +rte_ring_get_memsize(unsigned count) {
> +     return rte_ring_get_memsize_elem(count, sizeof(void *)); }
> +
>  void
>  rte_ring_reset(struct rte_ring *r)
>  {
> @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>       return 0;
>  }
> 
> -/* create the ring */
> +/* create the ring for a given element size */
>  struct rte_ring *
> -rte_ring_create(const char *name, unsigned count, int socket_id,
> -             unsigned flags)
> +rte_ring_create_elem(const char *name, unsigned count, unsigned esize,
> +             int socket_id, unsigned flags)
>  {
>       char mz_name[RTE_MEMZONE_NAMESIZE];
>       struct rte_ring *r;
> @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count,
> int socket_id,
>       if (flags & RING_F_EXACT_SZ)
>               count = rte_align32pow2(count + 1);
> 
> -     ring_size = rte_ring_get_memsize(count);
> +     ring_size = rte_ring_get_memsize_elem(count, esize);
>       if (ring_size < 0) {
>               rte_errno = ring_size;
>               return NULL;
> @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count,
> int socket_id,
>       return r;
>  }
> 
> +/* create the ring */
> +struct rte_ring *
> +rte_ring_create(const char *name, unsigned count, int socket_id,
> +             unsigned flags)
> +{
> +     return rte_ring_create_elem(name, count, sizeof(void *), socket_id,
> +             flags);
> +}
> +
>  /* free the ring */
>  void
>  rte_ring_free(struct rte_ring *r)
> diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index
> 2a9f768a1..18fc5d845 100644
> --- a/lib/librte_ring/rte_ring.h
> +++ b/lib/librte_ring/rte_ring.h
> @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name,
> unsigned count,
>   */
>  struct rte_ring *rte_ring_create(const char *name, unsigned count,
>                                int socket_id, unsigned flags);
> +
>  /**
>   * De-allocate all memory used by the ring.
>   *
> diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h
> new file mode 100644 index 000000000..860f059ad
> --- /dev/null
> +++ b/lib/librte_ring/rte_ring_elem.h
> @@ -0,0 +1,946 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + *
> + * Copyright (c) 2019 Arm Limited
> + * Copyright (c) 2010-2017 Intel Corporation
> + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org
> + * All rights reserved.
> + * Derived from FreeBSD's bufring.h
> + * Used as BSD-3 Licensed with permission from Kip Macy.
> + */
> +
> +#ifndef _RTE_RING_ELEM_H_
> +#define _RTE_RING_ELEM_H_
> +
> +/**
> + * @file
> + * RTE Ring with flexible element size
> + */
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +#include <stdio.h>
> +#include <stdint.h>
> +#include <sys/queue.h>
> +#include <errno.h>
> +#include <rte_common.h>
> +#include <rte_config.h>
> +#include <rte_memory.h>
> +#include <rte_lcore.h>
> +#include <rte_atomic.h>
> +#include <rte_branch_prediction.h>
> +#include <rte_memzone.h>
> +#include <rte_pause.h>
> +
> +#include "rte_ring.h"
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Calculate the memory size needed for a ring with given element size
> + *
> + * This function returns the number of bytes needed for a ring, given
> + * the number of elements in it and the size of the element. This value
> + * is the sum of the size of the structure rte_ring and the size of the
> + * memory needed for storing the elements. The value is aligned to a
> +cache
> + * line size.
> + *
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported.
> + * @return
> + *   - The memory size needed for the ring on success.
> + *   - -EINVAL if count is not a power of 2.
> + */
> +__rte_experimental
> +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize);
> +
> +/**
> + * @warning
> + * @b EXPERIMENTAL: this API may change without prior notice
> + *
> + * Create a new ring named *name* that stores elements with given size.
> + *
> + * This function uses ``memzone_reserve()`` to allocate memory. Then it
> + * calls rte_ring_init() to initialize an empty ring.
> + *
> + * The new ring size is set to *count*, which must be a power of
> + * two. Water marking is disabled by default. The real usable ring size
> + * is *count-1* instead of *count* to differentiate a free ring from an
> + * empty ring.
> + *
> + * The ring is added in RTE_TAILQ_RING list.
> + *
> + * @param name
> + *   The name of the ring.
> + * @param count
> + *   The number of elements in the ring (must be a power of 2).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported.
> + * @param socket_id
> + *   The *socket_id* argument is the socket identifier in case of
> + *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
> + *   constraint for the reserved zone.
> + * @param flags
> + *   An OR of the following:
> + *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
> + *      is "single-producer". Otherwise, it is "multi-producers".
> + *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
> + *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
> + *      is "single-consumer". Otherwise, it is "multi-consumers".
> + * @return
> + *   On success, the pointer to the new allocated ring. NULL on error with
> + *    rte_errno set appropriately. Possible errno values include:
> + *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config
> structure
> + *    - E_RTE_SECONDARY - function was called from a secondary process
> instance
> + *    - EINVAL - count provided is not a power of 2
> + *    - ENOSPC - the maximum number of memzones has already been
> allocated
> + *    - EEXIST - a memzone with the same name already exists
> + *    - ENOMEM - no appropriate memory area found in which to create
> memzone
> + */
> +__rte_experimental
> +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count,
> +                             unsigned esize, int socket_id, unsigned flags);
> +
> +/* the actual enqueue of pointers on the ring.
> + * Placed here since identical code needed in both
> + * single and multi producer enqueue functions.
> + */
> +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n)
> do { \
> +     if (esize == 4) \
> +             ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \
> +     else if (esize == 8) \
> +             ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \
> +     else if (esize == 16) \
> +             ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ }
> while
> +(0)
> +
> +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \
> +     unsigned int i; \
> +     const uint32_t size = (r)->size; \
> +     uint32_t idx = prod_head & (r)->mask; \
> +     uint32_t *ring = (uint32_t *)ring_start; \
> +     uint32_t *obj = (uint32_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \
> +                     ring[idx] = obj[i]; \
> +                     ring[idx + 1] = obj[i + 1]; \
> +                     ring[idx + 2] = obj[i + 2]; \
> +                     ring[idx + 3] = obj[i + 3]; \
> +                     ring[idx + 4] = obj[i + 4]; \
> +                     ring[idx + 5] = obj[i + 5]; \
> +                     ring[idx + 6] = obj[i + 6]; \
> +                     ring[idx + 7] = obj[i + 7]; \
> +             } \
> +             switch (n & 0x7) { \
> +             case 7: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 6: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 5: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 4: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 3: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 2: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 1: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++)\
> +                     ring[idx] = obj[i]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     ring[idx] = obj[i]; \
> +     } \
> +} while (0)
> +
> +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \
> +     unsigned int i; \
> +     const uint32_t size = (r)->size; \
> +     uint32_t idx = prod_head & (r)->mask; \
> +     uint64_t *ring = (uint64_t *)ring_start; \
> +     uint64_t *obj = (uint64_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \
> +                     ring[idx] = obj[i]; \
> +                     ring[idx + 1] = obj[i + 1]; \
> +                     ring[idx + 2] = obj[i + 2]; \
> +                     ring[idx + 3] = obj[i + 3]; \
> +             } \
> +             switch (n & 0x3) { \
> +             case 3: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 2: \
> +                     ring[idx++] = obj[i++]; /* fallthrough */ \
> +             case 1: \
> +                     ring[idx++] = obj[i++]; \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++)\
> +                     ring[idx] = obj[i]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     ring[idx] = obj[i]; \
> +     } \
> +} while (0)
> +
> +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \
> +     unsigned int i; \
> +     const uint32_t size = (r)->size; \
> +     uint32_t idx = prod_head & (r)->mask; \
> +     __uint128_t *ring = (__uint128_t *)ring_start; \
> +     __uint128_t *obj = (__uint128_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> +                     ring[idx] = obj[i]; \
> +                     ring[idx + 1] = obj[i + 1]; \
> +             } \
> +             switch (n & 0x1) { \
> +             case 1: \
> +                     ring[idx++] = obj[i++]; \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++)\
> +                     ring[idx] = obj[i]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     ring[idx] = obj[i]; \
> +     } \
> +} while (0)
> +
> +/* the actual copy of pointers on the ring to obj_table.
> + * Placed here since identical code needed in both
> + * single and multi consumer dequeue functions.
> + */
> +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n)
> do { \
> +     if (esize == 4) \
> +             DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \
> +     else if (esize == 8) \
> +             DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \
> +     else if (esize == 16) \
> +             DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ }
> while
> +(0)
> +
> +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \
> +     unsigned int i; \
> +     uint32_t idx = cons_head & (r)->mask; \
> +     const uint32_t size = (r)->size; \
> +     uint32_t *ring = (uint32_t *)ring_start; \
> +     uint32_t *obj = (uint32_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\
> +                     obj[i] = ring[idx]; \
> +                     obj[i + 1] = ring[idx + 1]; \
> +                     obj[i + 2] = ring[idx + 2]; \
> +                     obj[i + 3] = ring[idx + 3]; \
> +                     obj[i + 4] = ring[idx + 4]; \
> +                     obj[i + 5] = ring[idx + 5]; \
> +                     obj[i + 6] = ring[idx + 6]; \
> +                     obj[i + 7] = ring[idx + 7]; \
> +             } \
> +             switch (n & 0x7) { \
> +             case 7: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 6: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 5: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 4: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 3: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 2: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 1: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +     } \
> +} while (0)
> +
> +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \
> +     unsigned int i; \
> +     uint32_t idx = cons_head & (r)->mask; \
> +     const uint32_t size = (r)->size; \
> +     uint64_t *ring = (uint64_t *)ring_start; \
> +     uint64_t *obj = (uint64_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\
> +                     obj[i] = ring[idx]; \
> +                     obj[i + 1] = ring[idx + 1]; \
> +                     obj[i + 2] = ring[idx + 2]; \
> +                     obj[i + 3] = ring[idx + 3]; \
> +             } \
> +             switch (n & 0x3) { \
> +             case 3: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 2: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             case 1: \
> +                     obj[i++] = ring[idx++]; \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +     } \
> +} while (0)
> +
> +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \
> +     unsigned int i; \
> +     uint32_t idx = cons_head & (r)->mask; \
> +     const uint32_t size = (r)->size; \
> +     __uint128_t *ring = (__uint128_t *)ring_start; \
> +     __uint128_t *obj = (__uint128_t *)obj_table; \
> +     if (likely(idx + n < size)) { \
> +             for (i = 0; i < (n >> 1); i += 2, idx += 2) { \
> +                     obj[i] = ring[idx]; \
> +                     obj[i + 1] = ring[idx + 1]; \
> +             } \
> +             switch (n & 0x1) { \
> +             case 1: \
> +                     obj[i++] = ring[idx++]; /* fallthrough */ \
> +             } \
> +     } else { \
> +             for (i = 0; idx < size; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +             for (idx = 0; i < n; i++, idx++) \
> +                     obj[i] = ring[idx]; \
> +     } \
> +} while (0)
> +
> +/* Between load and load. there might be cpu reorder in weak model
> + * (powerpc/arm).
> + * There are 2 choices for the users
> + * 1.use rmb() memory barrier
> + * 2.use one-direction load_acquire/store_release barrier,defined by
> + * CONFIG_RTE_USE_C11_MEM_MODEL=y
> + * It depends on performance test results.
> + * By default, move common functions to rte_ring_generic.h  */ #ifdef
> +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h"
> +#else
> +#include "rte_ring_generic.h"
> +#endif
> +
> +/**
> + * @internal Enqueue several objects on the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from
> ring
> + * @param is_sp
> + *   Indicates whether to use single producer or multi-producer head update
> + * @param free_space
> + *   returns the amount of space after the enqueue operation has finished
> + * @return
> + *   Actual number of objects enqueued.
> + *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n,
> +             enum rte_ring_queue_behavior behavior, unsigned int is_sp,
> +             unsigned int *free_space)
> +{
> +     uint32_t prod_head, prod_next;
> +     uint32_t free_entries;
> +
> +     n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
> +                     &prod_head, &prod_next, &free_entries);
> +     if (n == 0)
> +             goto end;
> +
> +     ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n);
> +
> +     update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
> +end:
> +     if (free_space != NULL)
> +             *free_space = free_entries - n;
> +     return n;
> +}
> +
> +/**
> + * @internal Dequeue several objects from the ring
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to pull from the ring.
> + * @param behavior
> + *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
> + *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from
> ring
> + * @param is_sc
> + *   Indicates whether to use single consumer or multi-consumer head update
> + * @param available
> + *   returns the number of remaining ring entries after the dequeue has
> finished
> + * @return
> + *   - Actual number of objects dequeued.
> + *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
> + */
> +static __rte_always_inline unsigned int
> +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n,
> +             enum rte_ring_queue_behavior behavior, unsigned int is_sc,
> +             unsigned int *available)
> +{
> +     uint32_t cons_head, cons_next;
> +     uint32_t entries;
> +
> +     n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
> +                     &cons_head, &cons_next, &entries);
> +     if (n == 0)
> +             goto end;
> +
> +     DEQUEUE_PTRS_ELEM(r, &r[1], cons_head, obj_table, esize, n);
> +
> +     update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
> +
> +end:
> +     if (available != NULL)
> +             *available = entries - n;
> +     return n;
> +}
> +
> +/**
> + * Enqueue several objects on the ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_FIXED, __IS_MP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_FIXED, __IS_SP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version depending on the default behavior that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   The number of objects enqueued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_FIXED, r->prod.single, free_space); }
> +
> +/**
> + * Enqueue one object on a ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +     return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +                                                             -ENOBUFS;
> +}
> +
> +/**
> + * Enqueue one object on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +     return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +                                                             -ENOBUFS;
> +}
> +
> +/**
> + * Enqueue one object on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj
> + *   A pointer to the object to be added.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects enqueued.
> + *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is
> enqueued.
> + */
> +static __rte_always_inline int
> +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int
> +esize) {
> +     return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
> +                                                             -ENOBUFS;
> +}
> +
> +/**
> + * Dequeue several objects from a ring (multi-consumers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                             RTE_RING_QUEUE_FIXED, __IS_MC,
> available); }
> +
> +/**
> + * Dequeue several objects from a ring (NOT multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table,
> + *   must be strictly positive.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_FIXED, __IS_SC, available); }
> +
> +/**
> + * Dequeue several objects from a ring.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   The number of objects dequeued, either 0 or n
> + */
> +static __rte_always_inline unsigned int
> +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_FIXED, r->cons.single, available); }
> +
> +/**
> + * Dequeue one object from a ring (multi-consumers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
> +                             unsigned int esize)
> +{
> +     return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL)  ? 0 :
> +                                                             -ENOENT;
> +}
> +
> +/**
> + * Dequeue one object from a ring (NOT multi-consumers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success; objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
> +                             unsigned int esize)
> +{
> +     return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
> +                                                             -ENOENT;
> +}
> +
> +/**
> + * Dequeue one object from a ring.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_p
> + *   A pointer to a void * pointer (object) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @return
> + *   - 0: Success, objects dequeued.
> + *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
> + *     dequeued.
> + */
> +static __rte_always_inline int
> +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int
> +esize) {
> +     return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
> +                                                             -ENOENT;
> +}
> +
> +/**
> + * Enqueue several objects on the ring (multi-producers safe).
> + *
> + * This function uses a "compare and set" instruction to move the
> + * producer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring (NOT multi-producers safe).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); }
> +
> +/**
> + * Enqueue several objects on a ring.
> + *
> + * This function calls the multi-producer or the single-producer
> + * version depending on the default behavior that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to add in the ring from the obj_table.
> + * @param free_space
> + *   if non-NULL, returns the amount of space in the ring after the
> + *   enqueue operation has finished.
> + * @return
> + *   - n: Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned
> +rte_ring_enqueue_burst_elem(struct rte_ring *r, void * const obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *free_space) {
> +     return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_VARIABLE, r->prod.single,
> free_space); }
> +
> +/**
> + * Dequeue several objects from a ring (multi-consumers safe). When the
> +request
> + * objects are more than the available objects, only dequeue the actual
> +number
> + * of objects
> + *
> + * This function uses a "compare and set" instruction to move the
> + * consumer index atomically.
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +static __rte_always_inline unsigned
> +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_VARIABLE, __IS_MC, available); }
> +
> +/**
> + * Dequeue several objects from a ring (NOT multi-consumers safe).When
> +the
> + * request objects are more than the available objects, only dequeue
> +the
> + * actual number of objects
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - n: Actual number of objects dequeued, 0 if ring is empty
> + */
> +static __rte_always_inline unsigned
> +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                     RTE_RING_QUEUE_VARIABLE, __IS_SC, available); }
> +
> +/**
> + * Dequeue multiple objects from a ring up to a maximum number.
> + *
> + * This function calls the multi-consumers or the single-consumer
> + * version, depending on the default behaviour that was specified at
> + * ring creation time (see flags).
> + *
> + * @param r
> + *   A pointer to the ring structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects) that will be filled.
> + * @param esize
> + *   The size of ring element, in bytes. It must be a multiple of 4.
> + *   Currently, sizes 4, 8 and 16 are supported. This should be the same
> + *   as passed while creating the ring, otherwise the results are undefined.
> + * @param n
> + *   The number of objects to dequeue from the ring to the obj_table.
> + * @param available
> + *   If non-NULL, returns the number of remaining ring entries after the
> + *   dequeue has finished.
> + * @return
> + *   - Number of objects dequeued
> + */
> +static __rte_always_inline unsigned
> +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
> +             unsigned int esize, unsigned int n, unsigned int *available) {
> +     return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
> +                             RTE_RING_QUEUE_VARIABLE,
> +                             r->cons.single, available);
> +}
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +
> +#endif /* _RTE_RING_ELEM_H_ */
> diff --git a/lib/librte_ring/rte_ring_version.map
> b/lib/librte_ring/rte_ring_version.map
> index 510c1386e..e410a7503 100644
> --- a/lib/librte_ring/rte_ring_version.map
> +++ b/lib/librte_ring/rte_ring_version.map
> @@ -21,6 +21,8 @@ DPDK_2.2 {
>  EXPERIMENTAL {
>       global:
> 
> +     rte_ring_create_elem;
> +     rte_ring_get_memsize_elem;
>       rte_ring_reset;
> 
>  };
> --
> 2.17.1

Reply via email to