This commit adds an implementation of the lock-free stack push, pop, and length functions that use __atomic builtins, for systems that benefit from the finer-grained memory ordering control.
Signed-off-by: Gage Eads <gage.e...@intel.com> --- lib/librte_stack/Makefile | 3 +- lib/librte_stack/meson.build | 3 +- lib/librte_stack/rte_stack.h | 4 + lib/librte_stack/rte_stack_c11_mem.h | 175 +++++++++++++++++++++++++++++++++++ 4 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 lib/librte_stack/rte_stack_c11_mem.h diff --git a/lib/librte_stack/Makefile b/lib/librte_stack/Makefile index 3ecddf033..94a7c1476 100644 --- a/lib/librte_stack/Makefile +++ b/lib/librte_stack/Makefile @@ -19,6 +19,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \ - rte_stack_generic.h + rte_stack_generic.h \ + rte_stack_c11_mem.h include $(RTE_SDK)/mk/rte.lib.mk diff --git a/lib/librte_stack/meson.build b/lib/librte_stack/meson.build index 99d7f9ec5..7e2d1dbb8 100644 --- a/lib/librte_stack/meson.build +++ b/lib/librte_stack/meson.build @@ -6,4 +6,5 @@ allow_experimental_apis = true version = 1 sources = files('rte_stack.c') headers = files('rte_stack.h', - 'rte_stack_generic.h') + 'rte_stack_generic.h', + 'rte_stack_c11_mem.h') diff --git a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h index b484313bb..de16f8fff 100644 --- a/lib/librte_stack/rte_stack.h +++ b/lib/librte_stack/rte_stack.h @@ -91,7 +91,11 @@ struct rte_stack { */ #define RTE_STACK_F_LF 0x0001 +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_stack_c11_mem.h" +#else #include "rte_stack_generic.h" +#endif /** * @internal Push several objects on the lock-free stack (MT-safe). diff --git a/lib/librte_stack/rte_stack_c11_mem.h b/lib/librte_stack/rte_stack_c11_mem.h new file mode 100644 index 000000000..44f9ece6e --- /dev/null +++ b/lib/librte_stack/rte_stack_c11_mem.h @@ -0,0 +1,175 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2019 Intel Corporation + */ + +#ifndef _RTE_STACK_C11_MEM_H_ +#define _RTE_STACK_C11_MEM_H_ + +#include <rte_branch_prediction.h> +#include <rte_prefetch.h> + +static __rte_always_inline unsigned int +rte_stack_lf_len(struct rte_stack *s) +{ + /* stack_lf_push() and stack_lf_pop() do not update the list's contents + * and stack_lf->len atomically, which can cause the list to appear + * shorter than it actually is if this function is called while other + * threads are modifying the list. + * + * However, given the inherently approximate nature of the get_count + * callback -- even if the list and its size were updated atomically, + * the size could change between when get_count executes and when the + * value is returned to the caller -- this is acceptable. + * + * The stack_lf->len updates are placed such that the list may appear to + * have fewer elements than it does, but will never appear to have more + * elements. If the mempool is near-empty to the point that this is a + * concern, the user should consider increasing the mempool size. + */ + return (unsigned int)__atomic_load_n(&s->stack_lf.used.len.cnt, + __ATOMIC_RELAXED); +} + +static __rte_always_inline void +__rte_stack_lf_push(struct rte_stack_lf_list *list, + struct rte_stack_lf_elem *first, + struct rte_stack_lf_elem *last, + unsigned int num) +{ +#ifndef RTE_ARCH_X86_64 + RTE_SET_USED(first); + RTE_SET_USED(last); + RTE_SET_USED(list); + RTE_SET_USED(num); +#else + struct rte_stack_lf_head old_head; + int success; + + old_head = list->head; + + do { + struct rte_stack_lf_head new_head; + + /* Swing the top pointer to the first element in the list and + * make the last element point to the old top. + */ + new_head.top = first; + new_head.cnt = old_head.cnt + 1; + + last->next = old_head.top; + + /* Use the release memmodel to ensure the writes to the LF LIFO + * elements are visible before the head pointer write. + */ + success = rte_atomic128_cmp_exchange( + (rte_int128_t *)&list->head, + (rte_int128_t *)&old_head, + (rte_int128_t *)&new_head, + 1, __ATOMIC_RELEASE, + __ATOMIC_RELAXED); + } while (success == 0); + + /* Ensure the stack modifications are not reordered with respect + * to the LIFO len update. + */ + __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE); +#endif +} + +static __rte_always_inline struct rte_stack_lf_elem * +__rte_stack_lf_pop(struct rte_stack_lf_list *list, + unsigned int num, + void **obj_table, + struct rte_stack_lf_elem **last) +{ +#ifndef RTE_ARCH_X86_64 + RTE_SET_USED(obj_table); + RTE_SET_USED(last); + RTE_SET_USED(list); + RTE_SET_USED(num); + + return NULL; +#else + struct rte_stack_lf_head old_head; + int success; + + /* Reserve num elements, if available */ + while (1) { + uint64_t len = __atomic_load_n(&list->len.cnt, + __ATOMIC_ACQUIRE); + + /* Does the list contain enough elements? */ + if (unlikely(len < num)) + return NULL; + + if (__atomic_compare_exchange_n(&list->len.cnt, + &len, len - num, + 0, __ATOMIC_RELAXED, + __ATOMIC_RELAXED)) + break; + } + +#ifndef RTE_ARCH_X86_64 + /* Use the acquire memmodel to ensure the reads to the LF LIFO elements + * are properly ordered with respect to the head pointer read. + * + * Note that for aarch64, GCC's implementation of __atomic_load_16 in + * libatomic uses locks, and so this function should be replaced by + * a new function (e.g. "rte_atomic128_load()"). + */ + __atomic_load((volatile __int128 *)&list->head, + &old_head, + __ATOMIC_ACQUIRE); +#else + /* x86-64 does not require an atomic load here; if a torn read occurs, + * the CAS will fail and set old_head to the correct/latest value. + */ + old_head = list->head; +#endif + + /* Pop num elements */ + do { + struct rte_stack_lf_head new_head; + struct rte_stack_lf_elem *tmp; + unsigned int i; + + rte_prefetch0(old_head.top); + + tmp = old_head.top; + + /* Traverse the list to find the new head. A next pointer will + * either point to another element or NULL; if a thread + * encounters a pointer that has already been popped, the CAS + * will fail. + */ + for (i = 0; i < num && tmp != NULL; i++) { + rte_prefetch0(tmp->next); + if (obj_table) + obj_table[i] = tmp->data; + if (last) + *last = tmp; + tmp = tmp->next; + } + + /* If NULL was encountered, the list was modified while + * traversing it. Retry. + */ + if (i != num) + continue; + + new_head.top = tmp; + new_head.cnt = old_head.cnt + 1; + + success = rte_atomic128_cmp_exchange( + (rte_int128_t *)&list->head, + (rte_int128_t *)&old_head, + (rte_int128_t *)&new_head, + 1, __ATOMIC_ACQUIRE, + __ATOMIC_ACQUIRE); + } while (success == 0); + + return old_head.top; +#endif +} + +#endif /* _RTE_STACK_C11_MEM_H_ */ -- 2.13.6