> > Add callback event handler for enqueue dequeue operation on ring. > The pre-enqueue and post-dequeue operation on ring is selected to invoke > user callback handler. Can you provide a use case for this to better understand the need?
> > Signed-off-by: Vipin Varghese <vipin.vargh...@intel.com> > --- > config/common_base | 1 + > lib/librte_ring/Makefile | 1 + > lib/librte_ring/meson.build | 2 + > lib/librte_ring/rte_ring.c | 187 +++++++++++++++++++++++++++ > lib/librte_ring/rte_ring.h | 117 ++++++++++++++++- > lib/librte_ring/rte_ring_version.map | 9 ++ > 6 files changed, 316 insertions(+), 1 deletion(-) > > diff --git a/config/common_base b/config/common_base index > ec29455d2..022734f19 100644 > --- a/config/common_base > +++ b/config/common_base > @@ -500,6 +500,7 @@ CONFIG_RTE_LIBRTE_PMD_PCAP=n > CONFIG_RTE_LIBRTE_PMD_RING=y > CONFIG_RTE_PMD_RING_MAX_RX_RINGS=16 > CONFIG_RTE_PMD_RING_MAX_TX_RINGS=16 > +CONFIG_RTE_RING_ENQDEQ_CALLBACKS=n > > # > # Compile SOFTNIC PMD > diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index > 21a36770d..4f086e687 100644 > --- a/lib/librte_ring/Makefile > +++ b/lib/librte_ring/Makefile > @@ -6,6 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = > librte_ring.a > > +CFLAGS += -DALLOW_EXPERIMENTAL_API > CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal > > diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index > ab8b0b469..b92dcf027 100644 > --- a/lib/librte_ring/meson.build > +++ b/lib/librte_ring/meson.build > @@ -2,6 +2,8 @@ > # Copyright(c) 2017 Intel Corporation > > version = 2 > +allow_experimental_apis = true > + > sources = files('rte_ring.c') > headers = files('rte_ring.h', > 'rte_ring_c11_mem.h', > diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index > b89ecf999..ee740c401 100644 > --- a/lib/librte_ring/rte_ring.c > +++ b/lib/librte_ring/rte_ring.c > @@ -43,6 +43,11 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) > /* true if x is a power of 2 */ > #define POWEROF2(x) ((((x)-1) & (x)) == 0) > > +/* spinlock for pre-enqueue callback */ rte_spinlock_t > +rte_ring_preenq_cb_lock = RTE_SPINLOCK_INITIALIZER; > +/* spinlock for post-dequeue callback */ rte_spinlock_t > +rte_ring_pstdeq_cb_lock = RTE_SPINLOCK_INITIALIZER; > + > /* return the size of memory occupied by a ring */ ssize_t > rte_ring_get_memsize(unsigned count) @@ -103,6 +108,9 @@ > rte_ring_init(struct rte_ring *r, const char *name, unsigned count, > r->prod.head = r->cons.head = 0; > r->prod.tail = r->cons.tail = 0; > > + TAILQ_INIT(&(r->enq_cbs)); > + TAILQ_INIT(&(r->deq_cbs)); > + > return 0; > } > > @@ -220,6 +228,185 @@ rte_ring_free(struct rte_ring *r) > rte_free(te); > } > > +int __rte_experimental > +rte_ring_preenq_callback_register(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno = ENOTSUP; > + return -ENOTSUP; > +#endif > + > + struct rte_ring_callback *user_cb; > + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + TAILQ_FOREACH(user_cb, &(r->enq_cbs), next) { > + if ((void *) user_cb->cb_fn == (void *) cb_fn && > + user_cb->cb_arg == cb_arg) > + break; > + } > + > + /* create a new callback. */ > + if (user_cb == NULL) { > + user_cb = rte_zmalloc("RING_USER_CALLBACK", > + sizeof(struct rte_ring_callback), 0); > + if (user_cb != NULL) { > + user_cb->cb_fn = cb_fn; > + user_cb->cb_arg = cb_arg; > + TAILQ_INSERT_TAIL(&(r->enq_cbs), user_cb, next); > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return (user_cb == NULL) ? -ENOMEM : 0; } > + > +int __rte_experimental > +rte_ring_pstdeq_callback_register(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno = ENOTSUP; > + return -ENOTSUP; > +#endif > + > + struct rte_ring_callback *user_cb; > + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + TAILQ_FOREACH(user_cb, &(r->deq_cbs), next) { > + if ((void *) user_cb->cb_fn == (void *) cb_fn && > + user_cb->cb_arg == cb_arg) > + break; > + } > + > + /* create a new callback. */ > + if (user_cb == NULL) { > + user_cb = rte_zmalloc("RING_USER_CALLBACK", > + sizeof(struct rte_ring_callback), 0); > + if (user_cb != NULL) { > + user_cb->cb_fn = cb_fn; > + user_cb->cb_arg = cb_arg; > + TAILQ_INSERT_TAIL(&(r->deq_cbs), user_cb, next); > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return (user_cb == NULL) ? -ENOMEM : 0; } > + > +int __rte_experimental > +rte_ring_preenq_callback_unregister(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno = ENOTSUP; > + return -ENOTSUP; > +#endif > + > + int ret = 0; > + struct rte_ring_callback *cb, *next; > + rte_spinlock_t *lock = &rte_ring_preenq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + ret = -EINVAL; > + for (cb = TAILQ_FIRST(&r->enq_cbs); cb != NULL; cb = next) { > + next = TAILQ_NEXT(cb, next); > + > + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg) > + continue; > + > + if (cb->active == 0) { > + TAILQ_REMOVE(&(r->enq_cbs), cb, next); > + rte_free(cb); > + ret = 0; > + } else { > + ret = -EAGAIN; > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return ret; > +} > + > +int __rte_experimental > +rte_ring_pstdeq_callback_unregister(struct rte_ring *r, > + rte_ring_cb_fn cb_fn, void *cb_arg) > +{ > +#ifndef RTE_RING_ENQDEQ_CALLBACKS > + rte_errno = ENOTSUP; > + return -ENOTSUP; > +#endif > + > + int ret = 0; > + struct rte_ring_callback *cb, *next; > + rte_spinlock_t *lock = &rte_ring_pstdeq_cb_lock; > + > + if (!cb_fn) > + return -EINVAL; > + > + if (!rte_ring_get_capacity(r)) { > + RTE_LOG(ERR, RING, "Invalid ring=%p", r); > + return -EINVAL; > + } > + > + rte_spinlock_lock(lock); > + > + ret = -EINVAL; > + for (cb = TAILQ_FIRST(&r->deq_cbs); cb != NULL; cb = next) { > + next = TAILQ_NEXT(cb, next); > + > + if (cb->cb_fn != cb_fn || cb->cb_arg != cb_arg) > + continue; > + > + if (cb->active == 0) { > + TAILQ_REMOVE(&(r->deq_cbs), cb, next); > + rte_free(cb); > + ret = 0; > + } else { > + ret = -EAGAIN; > + } > + } > + > + rte_spinlock_unlock(lock); > + > + return ret; > + > + return 0; > +} > + > + > /* dump the status of the ring on the console */ void rte_ring_dump(FILE > *f, > const struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h > b/lib/librte_ring/rte_ring.h index e265e9479..fb0f3efb5 100644 > --- a/lib/librte_ring/rte_ring.h > +++ b/lib/librte_ring/rte_ring.h > @@ -63,6 +63,11 @@ enum rte_ring_queue_behavior { > > struct rte_memzone; /* forward declaration, so as not to require memzone.h > */ > > +struct rte_ring_callback; > + > +TAILQ_HEAD(rte_ring_enq_cb_list, rte_ring_callback); > +TAILQ_HEAD(rte_ring_deq_cb_list, rte_ring_callback); > + > /* structure to hold a pair of head/tail values and other metadata */ struct > rte_ring_headtail { > volatile uint32_t head; /**< Prod/consumer head. */ @@ -103,6 > +108,20 @@ struct rte_ring { > /** Ring consumer status. */ > struct rte_ring_headtail cons __rte_cache_aligned; > char pad2 __rte_cache_aligned; /**< empty cache line */ > + > + struct rte_ring_enq_cb_list enq_cbs; > + struct rte_ring_deq_cb_list deq_cbs; > +}; This breaks ABI compatibility > + > +typedef unsigned int (*rte_ring_cb_fn)(struct rte_ring *r, > + void *obj_table, unsigned int n, > + void *cb_arg); > + > +struct rte_ring_callback { > + TAILQ_ENTRY(rte_ring_callback) next; /* Callbacks list */ > + rte_ring_cb_fn cb_fn; /* Callback address */ > + void *cb_arg; /* Parameter for callback */ > + uint32_t active; /* Callback is executing */ > }; > > #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single- > producer". */ @@ -850,9 +869,23 @@ rte_ring_sp_enqueue_burst(struct > rte_ring *r, void * const *obj_table, > * - n: Actual number of objects enqueued. > */ > static __rte_always_inline unsigned > -rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, > +rte_ring_enqueue_burst(struct rte_ring *r, void **obj_table, > unsigned int n, unsigned int *free_space) { > +#ifdef RTE_RING_ENQDEQ_CALLBACKS > + struct rte_ring_callback *cb = NULL; > + > + TAILQ_FOREACH(cb, &(r->enq_cbs), next) { Need to take the TAILQ lock before this. For ex: what happens if a un-register is called concurrently? Also, traversing a linked list for every enqueue call would be too costly. May be, understanding the use case will help. > + if (cb->cb_fn == NULL) > + continue; > + > + cb->active = 1; > + n = cb->cb_fn(r, obj_table, n, cb->cb_arg); > + cb->active = 0; > + } > + > +#endif > + > return __rte_ring_do_enqueue(r, obj_table, n, > RTE_RING_QUEUE_VARIABLE, > r->prod.single, free_space); > } <snip>