Hi Bruce, Konstantin, Stephen, Appreciate if you could provide feedback on this.
Thanks, Honnappa > -----Original Message----- > From: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Sent: Tuesday, October 8, 2019 9:47 PM > To: olivier.m...@6wind.com; sthem...@microsoft.com; jer...@marvell.com; > bruce.richard...@intel.com; david.march...@redhat.com; > pbhagavat...@marvell.com; konstantin.anan...@intel.com; Honnappa > Nagarahalli <honnappa.nagaraha...@arm.com> > Cc: dev@dpdk.org; Dharmik Thakkar <dharmik.thak...@arm.com>; Ruifeng > Wang (Arm Technology China) <ruifeng.w...@arm.com>; Gavin Hu (Arm > Technology China) <gavin...@arm.com> > Subject: [PATCH v4 1/2] lib/ring: apis to support configurable element size > > Current APIs assume ring elements to be pointers. However, in many use cases, > the size can be different. Add new APIs to support configurable ring element > sizes. > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Reviewed-by: Dharmik Thakkar <dharmik.thak...@arm.com> > Reviewed-by: Gavin Hu <gavin...@arm.com> > Reviewed-by: Ruifeng Wang <ruifeng.w...@arm.com> > --- > lib/librte_ring/Makefile | 3 +- > lib/librte_ring/meson.build | 3 + > lib/librte_ring/rte_ring.c | 45 +- > lib/librte_ring/rte_ring.h | 1 + > lib/librte_ring/rte_ring_elem.h | 946 +++++++++++++++++++++++++++ > lib/librte_ring/rte_ring_version.map | 2 + > 6 files changed, 991 insertions(+), 9 deletions(-) create mode 100644 > lib/librte_ring/rte_ring_elem.h > > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > 21a36770d..515a967bb 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = > librte_ring.a > > -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 - > DALLOW_EXPERIMENTAL_API > LDLIBS += -lrte_eal > > EXPORT_MAP := rte_ring_version.map > @@ -18,6 +18,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c > > # install includes > SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ > + rte_ring_elem.h \ > rte_ring_generic.h \ > rte_ring_c11_mem.h > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index > ab8b0b469..74219840a 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -6,3 +6,6 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', > 'rte_ring_c11_mem.h', > 'rte_ring_generic.h') > + > +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental > +allow_experimental_apis = true > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index > d9b308036..6fed3648b 100644 > --- a/lib/librte_ring/rte_ring.c > +++ b/lib/librte_ring/rte_ring.c > @@ -33,6 +33,7 @@ > #include <rte_tailq.h> > > #include "rte_ring.h" > +#include "rte_ring_elem.h" > > TAILQ_HEAD(rte_ring_list, rte_tailq_entry); > > @@ -46,23 +47,42 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > > /* return the size of memory occupied by a ring */ ssize_t - > rte_ring_get_memsize(unsigned count) > +rte_ring_get_memsize_elem(unsigned count, unsigned esize) > { > ssize_t sz; > > + /* Supported esize values are 4/8/16. > + * Others can be added on need basis. > + */ > + if ((esize != 4) && (esize != 8) && (esize != 16)) { > + RTE_LOG(ERR, RING, > + "Unsupported esize value. Supported values are 4, 8 > and 16\n"); > + > + return -EINVAL; > + } > + > /* count must be a power of 2 */ > if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { > RTE_LOG(ERR, RING, > - "Requested size is invalid, must be power of 2, and " > - "do not exceed the size limit %u\n", > RTE_RING_SZ_MASK); > + "Requested number of elements is invalid, must be " > + "power of 2, and do not exceed the limit %u\n", > + RTE_RING_SZ_MASK); > + > return -EINVAL; > } > > - sz = sizeof(struct rte_ring) + count * sizeof(void *); > + sz = sizeof(struct rte_ring) + count * esize; > sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > return sz; > } > > +/* return the size of memory occupied by a ring */ ssize_t > +rte_ring_get_memsize(unsigned count) { > + return rte_ring_get_memsize_elem(count, sizeof(void *)); } > + > void > rte_ring_reset(struct rte_ring *r) > { > @@ -114,10 +134,10 @@ rte_ring_init(struct rte_ring *r, const char *name, > unsigned count, > return 0; > } > > -/* create the ring */ > +/* create the ring for a given element size */ > struct rte_ring * > -rte_ring_create(const char *name, unsigned count, int socket_id, > - unsigned flags) > +rte_ring_create_elem(const char *name, unsigned count, unsigned esize, > + int socket_id, unsigned flags) > { > char mz_name[RTE_MEMZONE_NAMESIZE]; > struct rte_ring *r; > @@ -135,7 +155,7 @@ rte_ring_create(const char *name, unsigned count, > int socket_id, > if (flags & RING_F_EXACT_SZ) > count = rte_align32pow2(count + 1); > > - ring_size = rte_ring_get_memsize(count); > + ring_size = rte_ring_get_memsize_elem(count, esize); > if (ring_size < 0) { > rte_errno = ring_size; > return NULL; > @@ -182,6 +202,15 @@ rte_ring_create(const char *name, unsigned count, > int socket_id, > return r; > } > > +/* create the ring */ > +struct rte_ring * > +rte_ring_create(const char *name, unsigned count, int socket_id, > + unsigned flags) > +{ > + return rte_ring_create_elem(name, count, sizeof(void *), socket_id, > + flags); > +} > + > /* free the ring */ > void > rte_ring_free(struct rte_ring *r) > diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index > 2a9f768a1..18fc5d845 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name, > unsigned count, > */ > struct rte_ring *rte_ring_create(const char *name, unsigned count, > int socket_id, unsigned flags); > + > /** > * De-allocate all memory used by the ring. > * > diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h > new file mode 100644 index 000000000..860f059ad > --- /dev/null > +++ b/lib/librte_ring/rte_ring_elem.h > @@ -0,0 +1,946 @@ > +/* SPDX-License-Identifier: BSD-3-Clause > + * > + * Copyright (c) 2019 Arm Limited > + * Copyright (c) 2010-2017 Intel Corporation > + * Copyright (c) 2007-2009 Kip Macy km...@freebsd.org > + * All rights reserved. > + * Derived from FreeBSD's bufring.h > + * Used as BSD-3 Licensed with permission from Kip Macy. > + */ > + > +#ifndef _RTE_RING_ELEM_H_ > +#define _RTE_RING_ELEM_H_ > + > +/** > + * @file > + * RTE Ring with flexible element size > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdio.h> > +#include <stdint.h> > +#include <sys/queue.h> > +#include <errno.h> > +#include <rte_common.h> > +#include <rte_config.h> > +#include <rte_memory.h> > +#include <rte_lcore.h> > +#include <rte_atomic.h> > +#include <rte_branch_prediction.h> > +#include <rte_memzone.h> > +#include <rte_pause.h> > + > +#include "rte_ring.h" > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Calculate the memory size needed for a ring with given element size > + * > + * This function returns the number of bytes needed for a ring, given > + * the number of elements in it and the size of the element. This value > + * is the sum of the size of the structure rte_ring and the size of the > + * memory needed for storing the elements. The value is aligned to a > +cache > + * line size. > + * > + * @param count > + * The number of elements in the ring (must be a power of 2). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. > + * @return > + * - The memory size needed for the ring on success. > + * - -EINVAL if count is not a power of 2. > + */ > +__rte_experimental > +ssize_t rte_ring_get_memsize_elem(unsigned count, unsigned esize); > + > +/** > + * @warning > + * @b EXPERIMENTAL: this API may change without prior notice > + * > + * Create a new ring named *name* that stores elements with given size. > + * > + * This function uses ``memzone_reserve()`` to allocate memory. Then it > + * calls rte_ring_init() to initialize an empty ring. > + * > + * The new ring size is set to *count*, which must be a power of > + * two. Water marking is disabled by default. The real usable ring size > + * is *count-1* instead of *count* to differentiate a free ring from an > + * empty ring. > + * > + * The ring is added in RTE_TAILQ_RING list. > + * > + * @param name > + * The name of the ring. > + * @param count > + * The number of elements in the ring (must be a power of 2). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. > + * @param socket_id > + * The *socket_id* argument is the socket identifier in case of > + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA > + * constraint for the reserved zone. > + * @param flags > + * An OR of the following: > + * - RING_F_SP_ENQ: If this flag is set, the default behavior when > + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` > + * is "single-producer". Otherwise, it is "multi-producers". > + * - RING_F_SC_DEQ: If this flag is set, the default behavior when > + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` > + * is "single-consumer". Otherwise, it is "multi-consumers". > + * @return > + * On success, the pointer to the new allocated ring. NULL on error with > + * rte_errno set appropriately. Possible errno values include: > + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config > structure > + * - E_RTE_SECONDARY - function was called from a secondary process > instance > + * - EINVAL - count provided is not a power of 2 > + * - ENOSPC - the maximum number of memzones has already been > allocated > + * - EEXIST - a memzone with the same name already exists > + * - ENOMEM - no appropriate memory area found in which to create > memzone > + */ > +__rte_experimental > +struct rte_ring *rte_ring_create_elem(const char *name, unsigned count, > + unsigned esize, int socket_id, unsigned flags); > + > +/* the actual enqueue of pointers on the ring. > + * Placed here since identical code needed in both > + * single and multi producer enqueue functions. > + */ > +#define ENQUEUE_PTRS_ELEM(r, ring_start, prod_head, obj_table, esize, n) > do { \ > + if (esize == 4) \ > + ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n); \ > + else if (esize == 8) \ > + ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n); \ > + else if (esize == 16) \ > + ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n); \ } > while > +(0) > + > +#define ENQUEUE_PTRS_32(r, ring_start, prod_head, obj_table, n) do { \ > + unsigned int i; \ > + const uint32_t size = (r)->size; \ > + uint32_t idx = prod_head & (r)->mask; \ > + uint32_t *ring = (uint32_t *)ring_start; \ > + uint32_t *obj = (uint32_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & ((~(unsigned)0x7))); i += 8, idx += 8) { \ > + ring[idx] = obj[i]; \ > + ring[idx + 1] = obj[i + 1]; \ > + ring[idx + 2] = obj[i + 2]; \ > + ring[idx + 3] = obj[i + 3]; \ > + ring[idx + 4] = obj[i + 4]; \ > + ring[idx + 5] = obj[i + 5]; \ > + ring[idx + 6] = obj[i + 6]; \ > + ring[idx + 7] = obj[i + 7]; \ > + } \ > + switch (n & 0x7) { \ > + case 7: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 6: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 5: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 4: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 3: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 2: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 1: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++)\ > + ring[idx] = obj[i]; \ > + for (idx = 0; i < n; i++, idx++) \ > + ring[idx] = obj[i]; \ > + } \ > +} while (0) > + > +#define ENQUEUE_PTRS_64(r, ring_start, prod_head, obj_table, n) do { \ > + unsigned int i; \ > + const uint32_t size = (r)->size; \ > + uint32_t idx = prod_head & (r)->mask; \ > + uint64_t *ring = (uint64_t *)ring_start; \ > + uint64_t *obj = (uint64_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & ((~(unsigned)0x3))); i += 4, idx += 4) { \ > + ring[idx] = obj[i]; \ > + ring[idx + 1] = obj[i + 1]; \ > + ring[idx + 2] = obj[i + 2]; \ > + ring[idx + 3] = obj[i + 3]; \ > + } \ > + switch (n & 0x3) { \ > + case 3: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 2: \ > + ring[idx++] = obj[i++]; /* fallthrough */ \ > + case 1: \ > + ring[idx++] = obj[i++]; \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++)\ > + ring[idx] = obj[i]; \ > + for (idx = 0; i < n; i++, idx++) \ > + ring[idx] = obj[i]; \ > + } \ > +} while (0) > + > +#define ENQUEUE_PTRS_128(r, ring_start, prod_head, obj_table, n) do { \ > + unsigned int i; \ > + const uint32_t size = (r)->size; \ > + uint32_t idx = prod_head & (r)->mask; \ > + __uint128_t *ring = (__uint128_t *)ring_start; \ > + __uint128_t *obj = (__uint128_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > + ring[idx] = obj[i]; \ > + ring[idx + 1] = obj[i + 1]; \ > + } \ > + switch (n & 0x1) { \ > + case 1: \ > + ring[idx++] = obj[i++]; \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++)\ > + ring[idx] = obj[i]; \ > + for (idx = 0; i < n; i++, idx++) \ > + ring[idx] = obj[i]; \ > + } \ > +} while (0) > + > +/* the actual copy of pointers on the ring to obj_table. > + * Placed here since identical code needed in both > + * single and multi consumer dequeue functions. > + */ > +#define DEQUEUE_PTRS_ELEM(r, ring_start, cons_head, obj_table, esize, n) > do { \ > + if (esize == 4) \ > + DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n); \ > + else if (esize == 8) \ > + DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n); \ > + else if (esize == 16) \ > + DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n); \ } > while > +(0) > + > +#define DEQUEUE_PTRS_32(r, ring_start, cons_head, obj_table, n) do { \ > + unsigned int i; \ > + uint32_t idx = cons_head & (r)->mask; \ > + const uint32_t size = (r)->size; \ > + uint32_t *ring = (uint32_t *)ring_start; \ > + uint32_t *obj = (uint32_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & (~(unsigned)0x7)); i += 8, idx += 8) {\ > + obj[i] = ring[idx]; \ > + obj[i + 1] = ring[idx + 1]; \ > + obj[i + 2] = ring[idx + 2]; \ > + obj[i + 3] = ring[idx + 3]; \ > + obj[i + 4] = ring[idx + 4]; \ > + obj[i + 5] = ring[idx + 5]; \ > + obj[i + 6] = ring[idx + 6]; \ > + obj[i + 7] = ring[idx + 7]; \ > + } \ > + switch (n & 0x7) { \ > + case 7: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 6: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 5: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 4: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 3: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 2: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 1: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++) \ > + obj[i] = ring[idx]; \ > + for (idx = 0; i < n; i++, idx++) \ > + obj[i] = ring[idx]; \ > + } \ > +} while (0) > + > +#define DEQUEUE_PTRS_64(r, ring_start, cons_head, obj_table, n) do { \ > + unsigned int i; \ > + uint32_t idx = cons_head & (r)->mask; \ > + const uint32_t size = (r)->size; \ > + uint64_t *ring = (uint64_t *)ring_start; \ > + uint64_t *obj = (uint64_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n & (~(unsigned)0x3)); i += 4, idx += 4) {\ > + obj[i] = ring[idx]; \ > + obj[i + 1] = ring[idx + 1]; \ > + obj[i + 2] = ring[idx + 2]; \ > + obj[i + 3] = ring[idx + 3]; \ > + } \ > + switch (n & 0x3) { \ > + case 3: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 2: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + case 1: \ > + obj[i++] = ring[idx++]; \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++) \ > + obj[i] = ring[idx]; \ > + for (idx = 0; i < n; i++, idx++) \ > + obj[i] = ring[idx]; \ > + } \ > +} while (0) > + > +#define DEQUEUE_PTRS_128(r, ring_start, cons_head, obj_table, n) do { \ > + unsigned int i; \ > + uint32_t idx = cons_head & (r)->mask; \ > + const uint32_t size = (r)->size; \ > + __uint128_t *ring = (__uint128_t *)ring_start; \ > + __uint128_t *obj = (__uint128_t *)obj_table; \ > + if (likely(idx + n < size)) { \ > + for (i = 0; i < (n >> 1); i += 2, idx += 2) { \ > + obj[i] = ring[idx]; \ > + obj[i + 1] = ring[idx + 1]; \ > + } \ > + switch (n & 0x1) { \ > + case 1: \ > + obj[i++] = ring[idx++]; /* fallthrough */ \ > + } \ > + } else { \ > + for (i = 0; idx < size; i++, idx++) \ > + obj[i] = ring[idx]; \ > + for (idx = 0; i < n; i++, idx++) \ > + obj[i] = ring[idx]; \ > + } \ > +} while (0) > + > +/* Between load and load. there might be cpu reorder in weak model > + * (powerpc/arm). > + * There are 2 choices for the users > + * 1.use rmb() memory barrier > + * 2.use one-direction load_acquire/store_release barrier,defined by > + * CONFIG_RTE_USE_C11_MEM_MODEL=y > + * It depends on performance test results. > + * By default, move common functions to rte_ring_generic.h */ #ifdef > +RTE_USE_C11_MEM_MODEL #include "rte_ring_c11_mem.h" > +#else > +#include "rte_ring_generic.h" > +#endif > + > +/** > + * @internal Enqueue several objects on the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from > ring > + * @param is_sp > + * Indicates whether to use single producer or multi-producer head update > + * @param free_space > + * returns the amount of space after the enqueue operation has finished > + * @return > + * Actual number of objects enqueued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_enqueue_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, > + enum rte_ring_queue_behavior behavior, unsigned int is_sp, > + unsigned int *free_space) > +{ > + uint32_t prod_head, prod_next; > + uint32_t free_entries; > + > + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, > + &prod_head, &prod_next, &free_entries); > + if (n == 0) > + goto end; > + > + ENQUEUE_PTRS_ELEM(r, &r[1], prod_head, obj_table, esize, n); > + > + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); > +end: > + if (free_space != NULL) > + *free_space = free_entries - n; > + return n; > +} > + > +/** > + * @internal Dequeue several objects from the ring > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to pull from the ring. > + * @param behavior > + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring > + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from > ring > + * @param is_sc > + * Indicates whether to use single consumer or multi-consumer head update > + * @param available > + * returns the number of remaining ring entries after the dequeue has > finished > + * @return > + * - Actual number of objects dequeued. > + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. > + */ > +static __rte_always_inline unsigned int > +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, > + enum rte_ring_queue_behavior behavior, unsigned int is_sc, > + unsigned int *available) > +{ > + uint32_t cons_head, cons_next; > + uint32_t entries; > + > + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, > + &cons_head, &cons_next, &entries); > + if (n == 0) > + goto end; > + > + DEQUEUE_PTRS_ELEM(r, &r[1], cons_head, obj_table, esize, n); > + > + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); > + > +end: > + if (available != NULL) > + *available = entries - n; > + return n; > +} > + > +/** > + * Enqueue several objects on the ring (multi-producers safe). > + * > + * This function uses a "compare and set" instruction to move the > + * producer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, __IS_MP, free_space); } > + > +/** > + * Enqueue several objects on a ring (NOT multi-producers safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, __IS_SP, free_space); } > + > +/** > + * Enqueue several objects on a ring. > + * > + * This function calls the multi-producer or the single-producer > + * version depending on the default behavior that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * The number of objects enqueued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_enqueue_bulk_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, r->prod.single, free_space); } > + > +/** > + * Enqueue one object on a ring (multi-producers safe). > + * > + * This function uses a "compare and set" instruction to move the > + * producer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj > + * A pointer to the object to be added. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is > enqueued. > + */ > +static __rte_always_inline int > +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int > +esize) { > + return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : > + -ENOBUFS; > +} > + > +/** > + * Enqueue one object on a ring (NOT multi-producers safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj > + * A pointer to the object to be added. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is > enqueued. > + */ > +static __rte_always_inline int > +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int > +esize) { > + return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : > + -ENOBUFS; > +} > + > +/** > + * Enqueue one object on a ring. > + * > + * This function calls the multi-producer or the single-producer > + * version, depending on the default behaviour that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj > + * A pointer to the object to be added. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success; objects enqueued. > + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is > enqueued. > + */ > +static __rte_always_inline int > +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int > +esize) { > + return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : > + -ENOBUFS; > +} > + > +/** > + * Dequeue several objects from a ring (multi-consumers safe). > + * > + * This function uses a "compare and set" instruction to move the > + * consumer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, __IS_MC, > available); } > + > +/** > + * Dequeue several objects from a ring (NOT multi-consumers safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table, > + * must be strictly positive. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, __IS_SC, available); } > + > +/** > + * Dequeue several objects from a ring. > + * > + * This function calls the multi-consumers or the single-consumer > + * version, depending on the default behaviour that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * The number of objects dequeued, either 0 or n > + */ > +static __rte_always_inline unsigned int > +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_FIXED, r->cons.single, available); } > + > +/** > + * Dequeue one object from a ring (multi-consumers safe). > + * > + * This function uses a "compare and set" instruction to move the > + * consumer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_p > + * A pointer to a void * pointer (object) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success; objects dequeued. > + * - -ENOENT: Not enough entries in the ring to dequeue; no object is > + * dequeued. > + */ > +static __rte_always_inline int > +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, > + unsigned int esize) > +{ > + return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : > + -ENOENT; > +} > + > +/** > + * Dequeue one object from a ring (NOT multi-consumers safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_p > + * A pointer to a void * pointer (object) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success; objects dequeued. > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > + * dequeued. > + */ > +static __rte_always_inline int > +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, > + unsigned int esize) > +{ > + return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : > + -ENOENT; > +} > + > +/** > + * Dequeue one object from a ring. > + * > + * This function calls the multi-consumers or the single-consumer > + * version depending on the default behaviour that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_p > + * A pointer to a void * pointer (object) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @return > + * - 0: Success, objects dequeued. > + * - -ENOENT: Not enough entries in the ring to dequeue, no object is > + * dequeued. > + */ > +static __rte_always_inline int > +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int > +esize) { > + return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : > + -ENOENT; > +} > + > +/** > + * Enqueue several objects on the ring (multi-producers safe). > + * > + * This function uses a "compare and set" instruction to move the > + * producer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - n: Actual number of objects enqueued. > + */ > +static __rte_always_inline unsigned > +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); } > + > +/** > + * Enqueue several objects on a ring (NOT multi-producers safe). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - n: Actual number of objects enqueued. > + */ > +static __rte_always_inline unsigned > +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); } > + > +/** > + * Enqueue several objects on a ring. > + * > + * This function calls the multi-producer or the single-producer > + * version depending on the default behavior that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects). > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to add in the ring from the obj_table. > + * @param free_space > + * if non-NULL, returns the amount of space in the ring after the > + * enqueue operation has finished. > + * @return > + * - n: Actual number of objects enqueued. > + */ > +static __rte_always_inline unsigned > +rte_ring_enqueue_burst_elem(struct rte_ring *r, void * const obj_table, > + unsigned int esize, unsigned int n, unsigned int *free_space) { > + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, r->prod.single, > free_space); } > + > +/** > + * Dequeue several objects from a ring (multi-consumers safe). When the > +request > + * objects are more than the available objects, only dequeue the actual > +number > + * of objects > + * > + * This function uses a "compare and set" instruction to move the > + * consumer index atomically. > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - n: Actual number of objects dequeued, 0 if ring is empty > + */ > +static __rte_always_inline unsigned > +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); } > + > +/** > + * Dequeue several objects from a ring (NOT multi-consumers safe).When > +the > + * request objects are more than the available objects, only dequeue > +the > + * actual number of objects > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - n: Actual number of objects dequeued, 0 if ring is empty > + */ > +static __rte_always_inline unsigned > +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); } > + > +/** > + * Dequeue multiple objects from a ring up to a maximum number. > + * > + * This function calls the multi-consumers or the single-consumer > + * version, depending on the default behaviour that was specified at > + * ring creation time (see flags). > + * > + * @param r > + * A pointer to the ring structure. > + * @param obj_table > + * A pointer to a table of void * pointers (objects) that will be filled. > + * @param esize > + * The size of ring element, in bytes. It must be a multiple of 4. > + * Currently, sizes 4, 8 and 16 are supported. This should be the same > + * as passed while creating the ring, otherwise the results are undefined. > + * @param n > + * The number of objects to dequeue from the ring to the obj_table. > + * @param available > + * If non-NULL, returns the number of remaining ring entries after the > + * dequeue has finished. > + * @return > + * - Number of objects dequeued > + */ > +static __rte_always_inline unsigned > +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, > + unsigned int esize, unsigned int n, unsigned int *available) { > + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, > + RTE_RING_QUEUE_VARIABLE, > + r->cons.single, available); > +} > + > +#ifdef __cplusplus > +} > +#endif > + > +#endif /* _RTE_RING_ELEM_H_ */ > diff --git a/lib/librte_ring/rte_ring_version.map > b/lib/librte_ring/rte_ring_version.map > index 510c1386e..e410a7503 100644 > --- a/lib/librte_ring/rte_ring_version.map > +++ b/lib/librte_ring/rte_ring_version.map > @@ -21,6 +21,8 @@ DPDK_2.2 { > EXPERIMENTAL { > global: > > + rte_ring_create_elem; > + rte_ring_get_memsize_elem; > rte_ring_reset; > > }; > -- > 2.17.1