Hi Paul, Thank you for your feedback. > -----Original Message----- > From: Paul E. McKenney <paul...@linux.ibm.com> > Sent: Wednesday, April 10, 2019 1:15 PM > To: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > Cc: konstantin.anan...@intel.com; step...@networkplumber.org; > marko.kovace...@intel.com; dev@dpdk.org; Gavin Hu (Arm Technology > China) <gavin...@arm.com>; Dharmik Thakkar > <dharmik.thak...@arm.com>; Malvika Gupta <malvika.gu...@arm.com> > Subject: Re: [PATCH v4 1/3] rcu: add RCU library supporting QSBR > mechanism > > On Wed, Apr 10, 2019 at 06:20:04AM -0500, Honnappa Nagarahalli wrote: > > Add RCU library supporting quiescent state based memory reclamation > method. > > This library helps identify the quiescent state of the reader threads > > so that the writers can free the memory associated with the lock less > > data structures. > > I don't see any sign of read-side markers (rcu_read_lock() and > rcu_read_unlock() in the Linux kernel, userspace RCU, etc.). > > Yes, strictly speaking, these are not needed for QSBR to operate, but they These APIs would be empty for QSBR.
> make it way easier to maintain and debug code using RCU. For example, > given the read-side markers, you can check for errors like having a call to > rte_rcu_qsbr_quiescent() in the middle of a reader quite easily. > Without those read-side markers, life can be quite hard and you will really > hate yourself for failing to have provided them. Want to make sure I understood this, do you mean the application would mark before and after accessing the shared data structure on the reader side? rte_rcu_qsbr_lock() <begin access shared data structure> ... ... <end access shared data structure> rte_rcu_qsbr_unlock() If someone is debugging this code, they have to make sure that there is an unlock for every lock and there is no call to rte_rcu_qsbr_quiescent in between. It sounds good to me. Obviously, they will not add any additional cycles as well. Please let me know if my understanding is correct. > > Some additional questions and comments interspersed. > > Thanx, Paul > > > Signed-off-by: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > Reviewed-by: Steve Capper <steve.cap...@arm.com> > > Reviewed-by: Gavin Hu <gavin...@arm.com> > > Reviewed-by: Ola Liljedahl <ola.liljed...@arm.com> > > Acked-by: Konstantin Ananyev <konstantin.anan...@intel.com> > > --- > > MAINTAINERS | 5 + > > config/common_base | 6 + > > lib/Makefile | 2 + > > lib/librte_rcu/Makefile | 23 ++ > > lib/librte_rcu/meson.build | 5 + > > lib/librte_rcu/rte_rcu_qsbr.c | 237 ++++++++++++ > > lib/librte_rcu/rte_rcu_qsbr.h | 554 > +++++++++++++++++++++++++++++ > > lib/librte_rcu/rte_rcu_version.map | 11 + > > lib/meson.build | 2 +- > > mk/rte.app.mk | 1 + > > 10 files changed, 845 insertions(+), 1 deletion(-) create mode > > 100644 lib/librte_rcu/Makefile create mode 100644 > > lib/librte_rcu/meson.build create mode 100644 > > lib/librte_rcu/rte_rcu_qsbr.c create mode 100644 > > lib/librte_rcu/rte_rcu_qsbr.h create mode 100644 > > lib/librte_rcu/rte_rcu_version.map > > > > diff --git a/MAINTAINERS b/MAINTAINERS index 9774344dd..6e9766eed > > 100644 > > --- a/MAINTAINERS > > +++ b/MAINTAINERS > > @@ -1267,6 +1267,11 @@ F: examples/bpf/ > > F: app/test/test_bpf.c > > F: doc/guides/prog_guide/bpf_lib.rst > > > > +RCU - EXPERIMENTAL > > +M: Honnappa Nagarahalli <honnappa.nagaraha...@arm.com> > > +F: lib/librte_rcu/ > > +F: doc/guides/prog_guide/rcu_lib.rst > > + > > > > Test Applications > > ----------------- > > diff --git a/config/common_base b/config/common_base index > > 8da08105b..ad70c79e1 100644 > > --- a/config/common_base > > +++ b/config/common_base > > @@ -829,6 +829,12 @@ CONFIG_RTE_LIBRTE_LATENCY_STATS=y # > > CONFIG_RTE_LIBRTE_TELEMETRY=n > > > > +# > > +# Compile librte_rcu > > +# > > +CONFIG_RTE_LIBRTE_RCU=y > > +CONFIG_RTE_LIBRTE_RCU_DEBUG=n > > + > > # > > # Compile librte_lpm > > # > > diff --git a/lib/Makefile b/lib/Makefile index 26021d0c0..791e0d991 > > 100644 > > --- a/lib/Makefile > > +++ b/lib/Makefile > > @@ -111,6 +111,8 @@ DIRS-$(CONFIG_RTE_LIBRTE_IPSEC) += > librte_ipsec > > DEPDIRS-librte_ipsec := librte_eal librte_mbuf librte_cryptodev > > librte_security > > DIRS-$(CONFIG_RTE_LIBRTE_TELEMETRY) += librte_telemetry > > DEPDIRS-librte_telemetry := librte_eal librte_metrics librte_ethdev > > +DIRS-$(CONFIG_RTE_LIBRTE_RCU) += librte_rcu DEPDIRS-librte_rcu := > > +librte_eal > > > > ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) > > DIRS-$(CONFIG_RTE_LIBRTE_KNI) += librte_kni diff --git > > a/lib/librte_rcu/Makefile b/lib/librte_rcu/Makefile new file mode > > 100644 index 000000000..6aa677bd1 > > --- /dev/null > > +++ b/lib/librte_rcu/Makefile > > @@ -0,0 +1,23 @@ > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm > > +Limited > > + > > +include $(RTE_SDK)/mk/rte.vars.mk > > + > > +# library name > > +LIB = librte_rcu.a > > + > > +CFLAGS += -DALLOW_EXPERIMENTAL_API > > +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 LDLIBS += -lrte_eal > > + > > +EXPORT_MAP := rte_rcu_version.map > > + > > +LIBABIVER := 1 > > + > > +# all source are stored in SRCS-y > > +SRCS-$(CONFIG_RTE_LIBRTE_RCU) := rte_rcu_qsbr.c > > + > > +# install includes > > +SYMLINK-$(CONFIG_RTE_LIBRTE_RCU)-include := rte_rcu_qsbr.h > > + > > +include $(RTE_SDK)/mk/rte.lib.mk > > diff --git a/lib/librte_rcu/meson.build b/lib/librte_rcu/meson.build > > new file mode 100644 index 000000000..c009ae4b7 > > --- /dev/null > > +++ b/lib/librte_rcu/meson.build > > @@ -0,0 +1,5 @@ > > +# SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2018 Arm > > +Limited > > + > > +sources = files('rte_rcu_qsbr.c') > > +headers = files('rte_rcu_qsbr.h') > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.c > > b/lib/librte_rcu/rte_rcu_qsbr.c new file mode 100644 index > > 000000000..53d08446a > > --- /dev/null > > +++ b/lib/librte_rcu/rte_rcu_qsbr.c > > @@ -0,0 +1,237 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * > > + * Copyright (c) 2018 Arm Limited > > + */ > > + > > +#include <stdio.h> > > +#include <string.h> > > +#include <stdint.h> > > +#include <errno.h> > > + > > +#include <rte_common.h> > > +#include <rte_log.h> > > +#include <rte_memory.h> > > +#include <rte_malloc.h> > > +#include <rte_eal.h> > > +#include <rte_eal_memconfig.h> > > +#include <rte_atomic.h> > > +#include <rte_per_lcore.h> > > +#include <rte_lcore.h> > > +#include <rte_errno.h> > > + > > +#include "rte_rcu_qsbr.h" > > + > > +/* Get the memory size of QSBR variable */ size_t __rte_experimental > > +rte_rcu_qsbr_get_memsize(uint32_t max_threads) { > > + size_t sz; > > + > > + if (max_threads == 0) { > > + rte_log(RTE_LOG_ERR, rcu_log_type, > > + "%s(): Invalid max_threads %u\n", > > + __func__, max_threads); > > + rte_errno = EINVAL; > > + > > + return 1; > > + } > > + > > + sz = sizeof(struct rte_rcu_qsbr); > > + > > + /* Add the size of quiescent state counter array */ > > + sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads; > > + > > + /* Add the size of the registered thread ID bitmap array */ > > + sz += RTE_QSBR_THRID_ARRAY_SIZE(max_threads); > > + > > + return RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); > > Given that you align here, should you also align in the earlier steps in the > computation of sz? Agree. I will remove the align here and keep the earlier one as the intent is to align the thread ID array. > > > +} > > + > > +/* Initialize a quiescent state variable */ int __rte_experimental > > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) { > > + size_t sz; > > + > > + if (v == NULL) { > > + rte_log(RTE_LOG_ERR, rcu_log_type, > > + "%s(): Invalid input parameter\n", __func__); > > + rte_errno = EINVAL; > > + > > + return 1; > > + } > > + > > + sz = rte_rcu_qsbr_get_memsize(max_threads); > > + if (sz == 1) > > + return 1; > > + > > + /* Set all the threads to offline */ > > + memset(v, 0, sz); > > We calculate sz here, but it looks like the caller must also calculate it in > order to correctly allocate the memory referenced by the "v" argument to > this function, with bad things happening if the two calculations get > different results. Should "v" instead be allocated within this function to > avoid this sort of problem? Earlier version allocated the memory with-in this library. However, it was decided to go with the current implementation as it provides flexibility for the application to manage the memory as it sees fit. For ex: it could allocate this as part of another structure in a single allocation. This also falls inline with similar approach taken in other libraries. > > > + v->max_threads = max_threads; > > + v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads, > > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) / > > + RTE_QSBR_THRID_ARRAY_ELM_SIZE; > > + v->token = RTE_QSBR_CNT_INIT; > > + > > + return 0; > > +} > > + > > +/* Register a reader thread to report its quiescent state > > + * on a QS variable. > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + unsigned int i, id, success; > > + uint64_t old_bmap, new_bmap; > > + > > + if (v == NULL || thread_id >= v->max_threads) { > > + rte_log(RTE_LOG_ERR, rcu_log_type, > > + "%s(): Invalid input parameter\n", __func__); > > + rte_errno = EINVAL; > > + > > + return 1; > > + } > > + > > + id = thread_id & RTE_QSBR_THRID_MASK; > > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > > + > > + /* Make sure that the counter for registered threads does not > > + * go out of sync. Hence, additional checks are required. > > + */ > > + /* Check if the thread is already registered */ > > + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > > + __ATOMIC_RELAXED); > > + if (old_bmap & 1UL << id) > > + return 0; > > + > > + do { > > + new_bmap = old_bmap | (1UL << id); > > + success = __atomic_compare_exchange( > > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > > + &old_bmap, &new_bmap, 0, > > + __ATOMIC_RELEASE, > __ATOMIC_RELAXED); > > + > > + if (success) > > + __atomic_fetch_add(&v->num_threads, > > + 1, __ATOMIC_RELAXED); > > + else if (old_bmap & (1UL << id)) > > + /* Someone else registered this thread. > > + * Counter should not be incremented. > > + */ > > + return 0; > > + } while (success == 0); > > This would be simpler if threads were required to register themselves. > Maybe you have use cases requiring registration of other threads, but this > capability is adding significant complexity, so it might be worth some > thought. > It was simple earlier (__atomic_fetch_or). The complexity is added as 'num_threads' should not go out of sync. > > + return 0; > > +} > > + > > +/* Remove a reader thread, from the list of threads reporting their > > + * quiescent state on a QS variable. > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + unsigned int i, id, success; > > + uint64_t old_bmap, new_bmap; > > + > > + if (v == NULL || thread_id >= v->max_threads) { > > + rte_log(RTE_LOG_ERR, rcu_log_type, > > + "%s(): Invalid input parameter\n", __func__); > > + rte_errno = EINVAL; > > + > > + return 1; > > + } > > + > > + id = thread_id & RTE_QSBR_THRID_MASK; > > + i = thread_id >> RTE_QSBR_THRID_INDEX_SHIFT; > > + > > + /* Make sure that the counter for registered threads does not > > + * go out of sync. Hence, additional checks are required. > > + */ > > + /* Check if the thread is already unregistered */ > > + old_bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, i), > > + __ATOMIC_RELAXED); > > + if (old_bmap & ~(1UL << id)) > > + return 0; > > + > > + do { > > + new_bmap = old_bmap & ~(1UL << id); > > + /* Make sure any loads of the shared data structure are > > + * completed before removal of the thread from the list of > > + * reporting threads. > > + */ > > + success = __atomic_compare_exchange( > > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > > + &old_bmap, &new_bmap, 0, > > + __ATOMIC_RELEASE, > __ATOMIC_RELAXED); > > + > > + if (success) > > + __atomic_fetch_sub(&v->num_threads, > > + 1, __ATOMIC_RELAXED); > > + else if (old_bmap & ~(1UL << id)) > > + /* Someone else unregistered this thread. > > + * Counter should not be incremented. > > + */ > > + return 0; > > + } while (success == 0); > > Ditto! > > > + return 0; > > +} > > + > > +/* Dump the details of a single quiescent state variable to a file. > > +*/ int __rte_experimental rte_rcu_qsbr_dump(FILE *f, struct > > +rte_rcu_qsbr *v) { > > + uint64_t bmap; > > + uint32_t i, t; > > + > > + if (v == NULL || f == NULL) { > > + rte_log(RTE_LOG_ERR, rcu_log_type, > > + "%s(): Invalid input parameter\n", __func__); > > + rte_errno = EINVAL; > > + > > + return 1; > > + } > > + > > + fprintf(f, "\nQuiescent State Variable @%p\n", v); > > + > > + fprintf(f, " QS variable memory size = %lu\n", > > + rte_rcu_qsbr_get_memsize(v- > >max_threads)); > > + fprintf(f, " Given # max threads = %u\n", v->max_threads); > > + fprintf(f, " Current # threads = %u\n", v->num_threads); > > + > > + fprintf(f, " Registered thread ID mask = 0x"); > > + for (i = 0; i < v->num_elems; i++) > > + fprintf(f, "%lx", __atomic_load_n( > > + RTE_QSBR_THRID_ARRAY_ELM(v, i), > > + __ATOMIC_ACQUIRE)); > > + fprintf(f, "\n"); > > + > > + fprintf(f, " Token = %lu\n", > > + __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); > > + > > + fprintf(f, "Quiescent State Counts for readers:\n"); > > + for (i = 0; i < v->num_elems; i++) { > > + bmap = __atomic_load_n(RTE_QSBR_THRID_ARRAY_ELM(v, > i), > > + __ATOMIC_ACQUIRE); > > + while (bmap) { > > + t = __builtin_ctzl(bmap); > > + fprintf(f, "thread ID = %d, count = %lu\n", t, > > + __atomic_load_n( > > + &v->qsbr_cnt[i].cnt, > > + __ATOMIC_RELAXED)); > > + bmap &= ~(1UL << t); > > + } > > + } > > + > > + return 0; > > +} > > + > > +int rcu_log_type; > > + > > +RTE_INIT(rte_rcu_register) > > +{ > > + rcu_log_type = rte_log_register("lib.rcu"); > > + if (rcu_log_type >= 0) > > + rte_log_set_level(rcu_log_type, RTE_LOG_ERR); } > > diff --git a/lib/librte_rcu/rte_rcu_qsbr.h > > b/lib/librte_rcu/rte_rcu_qsbr.h new file mode 100644 index > > 000000000..ff696aeab > > --- /dev/null > > +++ b/lib/librte_rcu/rte_rcu_qsbr.h > > @@ -0,0 +1,554 @@ > > +/* SPDX-License-Identifier: BSD-3-Clause > > + * Copyright (c) 2018 Arm Limited > > + */ > > + > > +#ifndef _RTE_RCU_QSBR_H_ > > +#define _RTE_RCU_QSBR_H_ > > + > > +/** > > + * @file > > + * RTE Quiescent State Based Reclamation (QSBR) > > + * > > + * Quiescent State (QS) is any point in the thread execution > > + * where the thread does not hold a reference to a data structure > > + * in shared memory. While using lock-less data structures, the > > +writer > > + * can safely free memory once all the reader threads have entered > > + * quiescent state. > > + * > > + * This library provides the ability for the readers to report > > +quiescent > > + * state and for the writers to identify when all the readers have > > + * entered quiescent state. > > + */ > > + > > +#ifdef __cplusplus > > +extern "C" { > > +#endif > > + > > +#include <stdio.h> > > +#include <stdint.h> > > +#include <errno.h> > > +#include <rte_common.h> > > +#include <rte_memory.h> > > +#include <rte_lcore.h> > > +#include <rte_debug.h> > > +#include <rte_atomic.h> > > + > > +extern int rcu_log_type; > > + > > +#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG #define RCU_DP_LOG(level, > fmt, > > +args...) \ > > + rte_log(RTE_LOG_ ## level, rcu_log_type, \ > > + "%s(): " fmt "\n", __func__, ## args) #else #define > > +RCU_DP_LOG(level, fmt, args...) #endif > > + > > +/* Registered thread IDs are stored as a bitmap of 64b element array. > > + * Given thread id needs to be converted to index into the array and > > + * the id within the array element. > > + */ > > +#define RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8) > #define > > +RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \ > > + RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \ > > + RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, > RTE_CACHE_LINE_SIZE) #define > > +RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \ > > + ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i) #define > > +RTE_QSBR_THRID_INDEX_SHIFT 6 #define RTE_QSBR_THRID_MASK 0x3f > #define > > +RTE_QSBR_THRID_INVALID 0xffffffff > > + > > +/* Worker thread counter */ > > +struct rte_rcu_qsbr_cnt { > > + uint64_t cnt; > > + /**< Quiescent state counter. Value 0 indicates the thread is > > +offline */ } __rte_cache_aligned; > > + > > +#define RTE_QSBR_CNT_THR_OFFLINE 0 > > +#define RTE_QSBR_CNT_INIT 1 > > + > > +/* RTE Quiescent State variable structure. > > + * This structure has two elements that vary in size based on the > > + * 'max_threads' parameter. > > + * 1) Quiescent state counter array > > + * 2) Register thread ID array > > + */ > > +struct rte_rcu_qsbr { > > + uint64_t token __rte_cache_aligned; > > + /**< Counter to allow for multiple concurrent quiescent state > > +queries */ > > + > > + uint32_t num_elems __rte_cache_aligned; > > + /**< Number of elements in the thread ID array */ > > + uint32_t num_threads; > > + /**< Number of threads currently using this QS variable */ > > + uint32_t max_threads; > > + /**< Maximum number of threads using this QS variable */ > > + > > + struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned; > > + /**< Quiescent state counter array of 'max_threads' elements */ > > + > > + /**< Registered thread IDs are stored in a bitmap array, > > + * after the quiescent state counter array. > > + */ > > +} __rte_cache_aligned; > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Return the size of the memory occupied by a Quiescent State > variable. > > + * > > + * @param max_threads > > + * Maximum number of threads reporting quiescent state on this > variable. > > + * @return > > + * On success - size of memory in bytes required for this QS variable. > > + * On error - 1 with error code set in rte_errno. > > + * Possible rte_errno codes are: > > + * - EINVAL - max_threads is 0 > > + */ > > +size_t __rte_experimental > > +rte_rcu_qsbr_get_memsize(uint32_t max_threads); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Initialize a Quiescent State (QS) variable. > > + * > > + * @param v > > + * QS variable > > + * @param max_threads > > + * Maximum number of threads reporting quiescent state on this > variable. > > + * This should be the same value as passed to > rte_rcu_qsbr_get_memsize. > > + * @return > > + * On success - 0 > > + * On error - 1 with error code set in rte_errno. > > + * Possible rte_errno codes are: > > + * - EINVAL - max_threads is 0 or 'v' is NULL. > > + * > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Register a reader thread to report its quiescent state > > + * on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe. > > + * Any reader thread that wants to report its quiescent state must > > + * call this API. This can be called during initialization or as part > > + * of the packet processing loop. > > + * > > + * Note that rte_rcu_qsbr_thread_online must be called before the > > + * thread updates its quiescent state using rte_rcu_qsbr_quiescent. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Reader thread with this thread ID will report its quiescent state on > > + * the QS variable. thread_id is a value between 0 and (max_threads - > 1). > > + * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API. > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int > > +thread_id); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Remove a reader thread, from the list of threads reporting their > > + * quiescent state on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread safe. > > + * This API can be called from the reader threads during shutdown. > > + * Ongoing quiescent state queries will stop waiting for the status > > +from this > > + * unregistered reader thread. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Reader thread with this thread ID will stop reporting its quiescent > > + * state on the QS variable. > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int > > +thread_id); > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Add a registered reader thread, to the list of threads reporting > > +their > > + * quiescent state on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe. > > + * > > + * Any registered reader thread that wants to report its quiescent > > +state must > > + * call this API before calling rte_rcu_qsbr_quiescent. This can be > > +called > > + * during initialization or as part of the packet processing loop. > > + * > > + * The reader thread must call rte_rcu_thread_offline API, before > > + * calling any functions that block, to ensure that > > +rte_rcu_qsbr_check > > + * API does not wait indefinitely for the reader thread to update its QS. > > + * > > + * The reader thread must call rte_rcu_thread_online API, after the > > +blocking > > + * function call returns, to ensure that rte_rcu_qsbr_check API > > + * waits for the reader thread to update its quiescent state. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Reader thread with this thread ID will report its quiescent state on > > + * the QS variable. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) > > I am not clear on why this function should be inline. Or do you have use > cases where threads go offline and come back online extremely frequently? Yes, there are use cases where the function call to receive the packets can block. > > > +{ > > + uint64_t t; > > + > > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > > + > > + /* Copy the current value of token. > > + * The fence at the end of the function will ensure that > > + * the following will not move down after the load of any shared > > + * data structure. > > + */ > > + t = __atomic_load_n(&v->token, __ATOMIC_RELAXED); > > + > > + /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure > > + * 'cnt' (64b) is accessed atomically. > > + */ > > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > > + t, __ATOMIC_RELAXED); > > + > > + /* The subsequent load of the data structure should not > > + * move above the store. Hence a store-load barrier > > + * is required. > > + * If the load of the data structure moves above the store, > > + * writer might not see that the reader is online, even though > > + * the reader is referencing the shared data structure. > > + */ > > +#ifdef RTE_ARCH_X86_64 > > + /* rte_smp_mb() for x86 is lighter */ > > + rte_smp_mb(); > > +#else > > + __atomic_thread_fence(__ATOMIC_SEQ_CST); > > +#endif > > +} > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Remove a registered reader thread from the list of threads > > +reporting their > > + * quiescent state on a QS variable. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe. > > + * > > + * This can be called during initialization or as part of the packet > > + * processing loop. > > + * > > + * The reader thread must call rte_rcu_thread_offline API, before > > + * calling any functions that block, to ensure that > > +rte_rcu_qsbr_check > > + * API does not wait indefinitely for the reader thread to update its QS. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * rte_rcu_qsbr_check API will not wait for the reader thread with > > + * this thread ID to report its quiescent state on the QS variable. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) > > Same here on inlining. > > > +{ > > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > > + > > + /* The reader can go offline only after the load of the > > + * data structure is completed. i.e. any load of the > > + * data strcture can not move after this store. > > + */ > > + > > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > > + RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE); } > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Ask the reader threads to report the quiescent state > > + * status. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe and can be called from worker threads. > > + * > > + * @param v > > + * QS variable > > + * @return > > + * - This is the token for this call of the API. This should be > > + * passed to rte_rcu_qsbr_check API. > > + */ > > +static __rte_always_inline uint64_t __rte_experimental > > +rte_rcu_qsbr_start(struct rte_rcu_qsbr *v) { > > + uint64_t t; > > + > > + RTE_ASSERT(v != NULL); > > + > > + /* Release the changes to the shared data structure. > > + * This store release will ensure that changes to any data > > + * structure are visible to the workers before the token > > + * update is visible. > > + */ > > + t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE); > > + > > + return t; > > +} > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Update quiescent state for a reader thread. > > + * > > + * This is implemented as a lock-free function. It is multi-thread safe. > > + * All the reader threads registered to report their quiescent state > > + * on the QS variable must call this API. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Update the quiescent state for the reader with this thread ID. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + uint64_t t; > > + > > + RTE_ASSERT(v != NULL && thread_id < v->max_threads); > > + > > + /* Acquire the changes to the shared data structure released > > + * by rte_rcu_qsbr_start. > > + * Later loads of the shared data structure should not move > > + * above this load. Hence, use load-acquire. > > + */ > > + t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE); > > + > > + /* Inform the writer that updates are visible to this reader. > > + * Prior loads of the shared data structure should not move > > + * beyond this store. Hence use store-release. > > + */ > > + __atomic_store_n(&v->qsbr_cnt[thread_id].cnt, > > + t, __ATOMIC_RELEASE); > > + > > + RCU_DP_LOG(DEBUG, "%s: update: token = %lu, Thread ID = %d", > > + __func__, t, thread_id); > > +} > > + > > +/* Check the quiescent state counter for registered threads only, > > +assuming > > + * that not all threads have registered. > > + */ > > +static __rte_always_inline int > > +__rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool > > +wait) { > > + uint32_t i, j, id; > > + uint64_t bmap; > > + uint64_t c; > > + uint64_t *reg_thread_id; > > + > > + for (i = 0, reg_thread_id = RTE_QSBR_THRID_ARRAY_ELM(v, 0); > > + i < v->num_elems; > > + i++, reg_thread_id++) { > > + /* Load the current registered thread bit map before > > + * loading the reader thread quiescent state counters. > > + */ > > + bmap = __atomic_load_n(reg_thread_id, > __ATOMIC_ACQUIRE); > > + id = i << RTE_QSBR_THRID_INDEX_SHIFT; > > + > > + while (bmap) { > > + j = __builtin_ctzl(bmap); > > + RCU_DP_LOG(DEBUG, > > + "%s: check: token = %lu, wait = %d, Bit Map > = 0x%lx, Thread ID = %d", > > + __func__, t, wait, bmap, id + j); > > + c = __atomic_load_n( > > + &v->qsbr_cnt[id + j].cnt, > > + __ATOMIC_ACQUIRE); > > + RCU_DP_LOG(DEBUG, > > + "%s: status: token = %lu, wait = %d, Thread > QS cnt = %lu, Thread ID = %d", > > + __func__, t, wait, c, id+j); > > + /* Counter is not checked for wrap-around > condition > > + * as it is a 64b counter. > > + */ > > + if (unlikely(c != RTE_QSBR_CNT_THR_OFFLINE && c > < t)) { > > This assumes that a 64-bit counter won't overflow, which is close enough > to true given current CPU clock frequencies. ;-) > > > + /* This thread is not in quiescent state */ > > + if (!wait) > > + return 0; > > + > > + rte_pause(); > > + /* This thread might have unregistered. > > + * Re-read the bitmap. > > + */ > > + bmap = __atomic_load_n(reg_thread_id, > > + __ATOMIC_ACQUIRE); > > + > > + continue; > > + } > > + > > + bmap &= ~(1UL << j); > > + } > > + } > > + > > + return 1; > > +} > > + > > +/* Check the quiescent state counter for all threads, assuming that > > + * all the threads have registered. > > + */ > > +static __rte_always_inline int > > +__rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) > > Does checking the bitmap really take long enough to make this worthwhile > as a separate function? I would think that the bitmap-checking time > would be lost in the noise of cache misses from the ->cnt loads. It avoids accessing one cache line. I think this is where the savings are (may be in theory). This is the most probable use case. On the other hand, __rcu_qsbr_check_selective() will result in savings (depending on how many threads are currently registered) by avoiding accessing unwanted counters. > > Sure, if you invoke __rcu_qsbr_check_selective() in a tight loop in the > absence of readers, you might see __rcu_qsbr_check_all() being a bit > faster. But is that really what DPDK does? I see improvements in the synthetic test case (similar to the one you have described, around 27%). However, in the more practical test cases I do not see any difference. > > > +{ > > + uint32_t i; > > + struct rte_rcu_qsbr_cnt *cnt; > > + uint64_t c; > > + > > + for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { > > + RCU_DP_LOG(DEBUG, > > + "%s: check: token = %lu, wait = %d, Thread ID = %d", > > + __func__, t, wait, i); > > + while (1) { > > + c = __atomic_load_n(&cnt->cnt, > __ATOMIC_ACQUIRE); > > + RCU_DP_LOG(DEBUG, > > + "%s: status: token = %lu, wait = %d, Thread > QS cnt = %lu, Thread ID = %d", > > + __func__, t, wait, c, i); > > + /* Counter is not checked for wrap-around > condition > > + * as it is a 64b counter. > > + */ > > + if (likely(c == RTE_QSBR_CNT_THR_OFFLINE || c >= > t)) > > + break; > > + > > + /* This thread is not in quiescent state */ > > + if (!wait) > > + return 0; > > + > > + rte_pause(); > > + } > > + } > > + > > + return 1; > > +} > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Checks if all the reader threads have entered the quiescent state > > + * referenced by token. > > + * > > + * This is implemented as a lock-free function. It is multi-thread > > + * safe and can be called from the worker threads as well. > > + * > > + * If this API is called with 'wait' set to true, the following > > + * factors must be considered: > > + * > > + * 1) If the calling thread is also reporting the status on the > > + * same QS variable, it must update the quiescent state status, > > +before > > + * calling this API. > > + * > > + * 2) In addition, while calling from multiple threads, only > > + * one of those threads can be reporting the quiescent state status > > + * on a given QS variable. > > + * > > + * @param v > > + * QS variable > > + * @param t > > + * Token returned by rte_rcu_qsbr_start API > > + * @param wait > > + * If true, block till all the reader threads have completed entering > > + * the quiescent state referenced by token 't'. > > + * @return > > + * - 0 if all reader threads have NOT passed through specified number > > + * of quiescent states. > > + * - 1 if all reader threads have passed through specified number > > + * of quiescent states. > > + */ > > +static __rte_always_inline int __rte_experimental > > +rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { > > + RTE_ASSERT(v != NULL); > > + > > + if (likely(v->num_threads == v->max_threads)) > > + return __rcu_qsbr_check_all(v, t, wait); > > + else > > + return __rcu_qsbr_check_selective(v, t, wait); } > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Wait till the reader threads have entered quiescent state. > > + * > > + * This is implemented as a lock-free function. It is multi-thread safe. > > + * This API can be thought of as a wrapper around rte_rcu_qsbr_start > > +and > > + * rte_rcu_qsbr_check APIs. > > + * > > + * If this API is called from multiple threads, only one of > > + * those threads can be reporting the quiescent state status on a > > + * given QS variable. > > + * > > + * @param v > > + * QS variable > > + * @param thread_id > > + * Thread ID of the caller if it is registered to report quiescent state > > + * on this QS variable (i.e. the calling thread is also part of the > > + * readside critical section). If not, pass RTE_QSBR_THRID_INVALID. > > + */ > > +static __rte_always_inline void __rte_experimental > > +rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int > > +thread_id) { > > + uint64_t t; > > + > > + RTE_ASSERT(v != NULL); > > + > > + t = rte_rcu_qsbr_start(v); > > + > > + /* If the current thread has readside critical section, > > + * update its quiescent state status. > > + */ > > + if (thread_id != RTE_QSBR_THRID_INVALID) > > + rte_rcu_qsbr_quiescent(v, thread_id); > > + > > + /* Wait for other readers to enter quiescent state */ > > + rte_rcu_qsbr_check(v, t, true); > > And you are presumably relying on 64-bit counters to avoid the need to > execute the above code twice in succession. Which again works given > current CPU clock rates combined with system and human lifespans. > Otherwise, there are interesting race conditions that can happen, so don't > try this with a 32-bit counter!!! Yes. I am relying on 64-bit counters to avoid having to spend cycles (and time). > > (But think of the great^N grandchildren!!!) (It is an interesting thought. I wonder what would happen to all the code we are writing today 😊) > > More seriously, a comment warning people not to make the counter be 32 > bits is in order. Agree, I will add it in the structure definition. > > > +} > > + > > +/** > > + * @warning > > + * @b EXPERIMENTAL: this API may change without prior notice > > + * > > + * Dump the details of a single QS variables to a file. > > + * > > + * It is NOT multi-thread safe. > > + * > > + * @param f > > + * A pointer to a file for output > > + * @param v > > + * QS variable > > + * @return > > + * On success - 0 > > + * On error - 1 with error code set in rte_errno. > > + * Possible rte_errno codes are: > > + * - EINVAL - NULL parameters are passed > > + */ > > +int __rte_experimental > > +rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v); > > + > > +#ifdef __cplusplus > > +} > > +#endif > > + > > +#endif /* _RTE_RCU_QSBR_H_ */ > > diff --git a/lib/librte_rcu/rte_rcu_version.map > > b/lib/librte_rcu/rte_rcu_version.map > > new file mode 100644 > > index 000000000..ad8cb517c > > --- /dev/null > > +++ b/lib/librte_rcu/rte_rcu_version.map > > @@ -0,0 +1,11 @@ > > +EXPERIMENTAL { > > + global: > > + > > + rte_rcu_qsbr_get_memsize; > > + rte_rcu_qsbr_init; > > + rte_rcu_qsbr_thread_register; > > + rte_rcu_qsbr_thread_unregister; > > + rte_rcu_qsbr_dump; > > + > > + local: *; > > +}; > > diff --git a/lib/meson.build b/lib/meson.build index > > 595314d7d..67be10659 100644 > > --- a/lib/meson.build > > +++ b/lib/meson.build > > @@ -22,7 +22,7 @@ libraries = [ > > 'gro', 'gso', 'ip_frag', 'jobstats', > > 'kni', 'latencystats', 'lpm', 'member', > > 'power', 'pdump', 'rawdev', > > - 'reorder', 'sched', 'security', 'stack', 'vhost', > > + 'reorder', 'sched', 'security', 'stack', 'vhost', 'rcu', > > #ipsec lib depends on crypto and security > > 'ipsec', > > # add pkt framework libs which use other libs from above diff --git > > a/mk/rte.app.mk b/mk/rte.app.mk index 7d994bece..e93cc366d 100644 > > --- a/mk/rte.app.mk > > +++ b/mk/rte.app.mk > > @@ -97,6 +97,7 @@ _LDLIBS-$(CONFIG_RTE_LIBRTE_EAL) += - > lrte_eal > > _LDLIBS-$(CONFIG_RTE_LIBRTE_CMDLINE) += -lrte_cmdline > > _LDLIBS-$(CONFIG_RTE_LIBRTE_REORDER) += -lrte_reorder > > _LDLIBS-$(CONFIG_RTE_LIBRTE_SCHED) += -lrte_sched > > +_LDLIBS-$(CONFIG_RTE_LIBRTE_RCU) += -lrte_rcu > > > > ifeq ($(CONFIG_RTE_EXEC_ENV_LINUX),y) > > _LDLIBS-$(CONFIG_RTE_LIBRTE_KNI) += -lrte_kni > > -- > > 2.17.1 > >