This commit adds an implementation of the lock-free stack push, pop, and
length functions that use __atomic builtins, for systems that benefit from
the finer-grained memory ordering control.

Signed-off-by: Gage Eads <gage.e...@intel.com>
---
 lib/librte_stack/Makefile            |   3 +-
 lib/librte_stack/meson.build         |   3 +-
 lib/librte_stack/rte_stack.h         |   4 +
 lib/librte_stack/rte_stack_c11_mem.h | 175 +++++++++++++++++++++++++++++++++++
 4 files changed, 183 insertions(+), 2 deletions(-)
 create mode 100644 lib/librte_stack/rte_stack_c11_mem.h

diff --git a/lib/librte_stack/Makefile b/lib/librte_stack/Makefile
index 3ecddf033..94a7c1476 100644
--- a/lib/librte_stack/Makefile
+++ b/lib/librte_stack/Makefile
@@ -19,6 +19,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c
 
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \
-                                             rte_stack_generic.h
+                                             rte_stack_generic.h \
+                                             rte_stack_c11_mem.h
 
 include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/lib/librte_stack/meson.build b/lib/librte_stack/meson.build
index 99d7f9ec5..7e2d1dbb8 100644
--- a/lib/librte_stack/meson.build
+++ b/lib/librte_stack/meson.build
@@ -6,4 +6,5 @@ allow_experimental_apis = true
 version = 1
 sources = files('rte_stack.c')
 headers = files('rte_stack.h',
-               'rte_stack_generic.h')
+               'rte_stack_generic.h',
+               'rte_stack_c11_mem.h')
diff --git a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h
index b484313bb..de16f8fff 100644
--- a/lib/librte_stack/rte_stack.h
+++ b/lib/librte_stack/rte_stack.h
@@ -91,7 +91,11 @@ struct rte_stack {
  */
 #define RTE_STACK_F_LF 0x0001
 
+#ifdef RTE_USE_C11_MEM_MODEL
+#include "rte_stack_c11_mem.h"
+#else
 #include "rte_stack_generic.h"
+#endif
 
 /**
  * @internal Push several objects on the lock-free stack (MT-safe).
diff --git a/lib/librte_stack/rte_stack_c11_mem.h 
b/lib/librte_stack/rte_stack_c11_mem.h
new file mode 100644
index 000000000..44f9ece6e
--- /dev/null
+++ b/lib/librte_stack/rte_stack_c11_mem.h
@@ -0,0 +1,175 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _RTE_STACK_C11_MEM_H_
+#define _RTE_STACK_C11_MEM_H_
+
+#include <rte_branch_prediction.h>
+#include <rte_prefetch.h>
+
+static __rte_always_inline unsigned int
+rte_stack_lf_len(struct rte_stack *s)
+{
+       /* stack_lf_push() and stack_lf_pop() do not update the list's contents
+        * and stack_lf->len atomically, which can cause the list to appear
+        * shorter than it actually is if this function is called while other
+        * threads are modifying the list.
+        *
+        * However, given the inherently approximate nature of the get_count
+        * callback -- even if the list and its size were updated atomically,
+        * the size could change between when get_count executes and when the
+        * value is returned to the caller -- this is acceptable.
+        *
+        * The stack_lf->len updates are placed such that the list may appear to
+        * have fewer elements than it does, but will never appear to have more
+        * elements. If the mempool is near-empty to the point that this is a
+        * concern, the user should consider increasing the mempool size.
+        */
+       return (unsigned int)__atomic_load_n(&s->stack_lf.used.len.cnt,
+                                            __ATOMIC_RELAXED);
+}
+
+static __rte_always_inline void
+__rte_stack_lf_push(struct rte_stack_lf_list *list,
+                   struct rte_stack_lf_elem *first,
+                   struct rte_stack_lf_elem *last,
+                   unsigned int num)
+{
+#ifndef RTE_ARCH_X86_64
+       RTE_SET_USED(first);
+       RTE_SET_USED(last);
+       RTE_SET_USED(list);
+       RTE_SET_USED(num);
+#else
+       struct rte_stack_lf_head old_head;
+       int success;
+
+       old_head = list->head;
+
+       do {
+               struct rte_stack_lf_head new_head;
+
+               /* Swing the top pointer to the first element in the list and
+                * make the last element point to the old top.
+                */
+               new_head.top = first;
+               new_head.cnt = old_head.cnt + 1;
+
+               last->next = old_head.top;
+
+               /* Use the release memmodel to ensure the writes to the LF LIFO
+                * elements are visible before the head pointer write.
+                */
+               success = rte_atomic128_cmp_exchange(
+                               (rte_int128_t *)&list->head,
+                               (rte_int128_t *)&old_head,
+                               (rte_int128_t *)&new_head,
+                               1, __ATOMIC_RELEASE,
+                               __ATOMIC_RELAXED);
+       } while (success == 0);
+
+       /* Ensure the stack modifications are not reordered with respect
+        * to the LIFO len update.
+        */
+       __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE);
+#endif
+}
+
+static __rte_always_inline struct rte_stack_lf_elem *
+__rte_stack_lf_pop(struct rte_stack_lf_list *list,
+                  unsigned int num,
+                  void **obj_table,
+                  struct rte_stack_lf_elem **last)
+{
+#ifndef RTE_ARCH_X86_64
+       RTE_SET_USED(obj_table);
+       RTE_SET_USED(last);
+       RTE_SET_USED(list);
+       RTE_SET_USED(num);
+
+       return NULL;
+#else
+       struct rte_stack_lf_head old_head;
+       int success;
+
+       /* Reserve num elements, if available */
+       while (1) {
+               uint64_t len = __atomic_load_n(&list->len.cnt,
+                                              __ATOMIC_ACQUIRE);
+
+               /* Does the list contain enough elements? */
+               if (unlikely(len < num))
+                       return NULL;
+
+               if (__atomic_compare_exchange_n(&list->len.cnt,
+                                               &len, len - num,
+                                               0, __ATOMIC_RELAXED,
+                                               __ATOMIC_RELAXED))
+                       break;
+       }
+
+#ifndef RTE_ARCH_X86_64
+       /* Use the acquire memmodel to ensure the reads to the LF LIFO elements
+        * are properly ordered with respect to the head pointer read.
+        *
+        * Note that for aarch64, GCC's implementation of __atomic_load_16 in
+        * libatomic uses locks, and so this function should be replaced by
+        * a new function (e.g. "rte_atomic128_load()").
+        */
+       __atomic_load((volatile __int128 *)&list->head,
+                     &old_head,
+                     __ATOMIC_ACQUIRE);
+#else
+       /* x86-64 does not require an atomic load here; if a torn read occurs,
+        * the CAS will fail and set old_head to the correct/latest value.
+        */
+       old_head = list->head;
+#endif
+
+       /* Pop num elements */
+       do {
+               struct rte_stack_lf_head new_head;
+               struct rte_stack_lf_elem *tmp;
+               unsigned int i;
+
+               rte_prefetch0(old_head.top);
+
+               tmp = old_head.top;
+
+               /* Traverse the list to find the new head. A next pointer will
+                * either point to another element or NULL; if a thread
+                * encounters a pointer that has already been popped, the CAS
+                * will fail.
+                */
+               for (i = 0; i < num && tmp != NULL; i++) {
+                       rte_prefetch0(tmp->next);
+                       if (obj_table)
+                               obj_table[i] = tmp->data;
+                       if (last)
+                               *last = tmp;
+                       tmp = tmp->next;
+               }
+
+               /* If NULL was encountered, the list was modified while
+                * traversing it. Retry.
+                */
+               if (i != num)
+                       continue;
+
+               new_head.top = tmp;
+               new_head.cnt = old_head.cnt + 1;
+
+               success = rte_atomic128_cmp_exchange(
+                               (rte_int128_t *)&list->head,
+                               (rte_int128_t *)&old_head,
+                               (rte_int128_t *)&new_head,
+                               1, __ATOMIC_ACQUIRE,
+                               __ATOMIC_ACQUIRE);
+       } while (success == 0);
+
+       return old_head.top;
+#endif
+}
+
+#endif /* _RTE_STACK_C11_MEM_H_ */
-- 
2.13.6

Reply via email to