<snip>

> diff --git a/lib/librte_stack/rte_stack.c b/lib/librte_stack/rte_stack.c index
> 96dffdf44..8f0361ea1 100644
> --- a/lib/librte_stack/rte_stack.c
> +++ b/lib/librte_stack/rte_stack.c

<snip>

> @@ -63,9 +81,16 @@ rte_stack_create(const char *name, unsigned int
> count, int socket_id,
>       unsigned int sz;
>       int ret;
> 
> -     RTE_SET_USED(flags);
> +#ifdef RTE_ARCH_X86_64
> +     RTE_BUILD_BUG_ON(sizeof(struct rte_stack_lf_head) != 16);
This check should be independent of the platform.

 #else
> +     if (flags & RTE_STACK_F_LF) {
> +             STACK_LOG_ERR("Lock-free stack is not supported on your
> platform\n");
> +             return NULL;
> +     }
> +#endif
> 
> -     sz = rte_stack_get_memsize(count);
> +     sz = rte_stack_get_memsize(count, flags);
> 
>       ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
>                      RTE_STACK_MZ_PREFIX, name);
> @@ -94,7 +119,7 @@ rte_stack_create(const char *name, unsigned int
> count, int socket_id,
> 
>       s = mz->addr;
> 
> -     rte_stack_init(s);
> +     rte_stack_init(s, count, flags);
> 
>       /* Store the name for later lookups */
>       ret = snprintf(s->name, sizeof(s->name), "%s", name); diff --git
> a/lib/librte_stack/rte_stack.h b/lib/librte_stack/rte_stack.h index
> 7a633deb5..b484313bb 100644
> --- a/lib/librte_stack/rte_stack.h
> +++ b/lib/librte_stack/rte_stack.h
> @@ -30,6 +30,35 @@ extern "C" {

<snip>

> +/**
> + * @internal Push several objects on the lock-free stack (MT-safe).
> + *
> + * @param s
> + *   A pointer to the stack structure.
> + * @param obj_table
> + *   A pointer to a table of void * pointers (objects).
> + * @param n
> + *   The number of objects to push on the stack from the obj_table.
> + * @return
> + *   Actual number of objects enqueued.
> + */
> +static __rte_always_inline unsigned int __rte_experimental
This is an internal function. Is '__rte_experimental' tag required? (applies to 
other instances in this patch)

> +rte_stack_lf_push(struct rte_stack *s, void * const *obj_table,
> +unsigned int n) {
> +     struct rte_stack_lf_elem *tmp, *first, *last = NULL;
> +     unsigned int i;
> +
> +     if (unlikely(n == 0))
> +             return 0;
> +
> +     /* Pop n free elements */
> +     first = __rte_stack_lf_pop(&s->stack_lf.free, n, NULL, &last);
> +     if (unlikely(first == NULL))
> +             return 0;
> +
> +     /* Construct the list elements */
> +     for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
> +             tmp->data = obj_table[n - i - 1];
> +
> +     /* Push them to the used list */
> +     __rte_stack_lf_push(&s->stack_lf.used, first, last, n);
> +
> +     return n;
> +}
> +
 
<snip>

> 
>  /**
> @@ -225,7 +339,10 @@ rte_stack_free_count(struct rte_stack *s)
>   *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
>   *   constraint for the reserved zone.
>   * @param flags
> - *   Reserved for future use.
> + *   An OR of the following:
> + *    - RTE_STACK_F_LF: If this flag is set, the stack uses lock-free
> + *      variants of the push and pop functions. Otherwise, it achieves
> + *      thread-safety using a lock.
>   * @return
>   *   On success, the pointer to the new allocated stack. NULL on error with
>   *    rte_errno set appropriately. Possible errno values include:
> diff --git a/lib/librte_stack/rte_stack_generic.h
> b/lib/librte_stack/rte_stack_generic.h
> new file mode 100644
> index 000000000..5e4cbc38e
> --- /dev/null
> +++ b/lib/librte_stack/rte_stack_generic.h
The name "...stack_generic.h" is confusing. It is implementing LF algorithm.
IMO, the code should be re-organized differently.
rte_stack.h, rte_stack.c - Contain the APIs (calling std or LF based on the 
flag) and top level structure definition
rte_stack_std.c, rte_stack_std.h - Contain the standard implementation
rte_stack_lf.c, rte_stack_lf.h - Contain the LF implementation
rte_stack_lf_c11.h - Contain the LF C11 implementation

> @@ -0,0 +1,151 @@
> +/* SPDX-License-Identifier: BSD-3-Clause
> + * Copyright(c) 2019 Intel Corporation
> + */
> +
> +#ifndef _RTE_STACK_GENERIC_H_
> +#define _RTE_STACK_GENERIC_H_
> +
> +#include <rte_branch_prediction.h>
> +#include <rte_prefetch.h>
> +

<snip>

> +
> +static __rte_always_inline struct rte_stack_lf_elem *
> +__rte_stack_lf_pop(struct rte_stack_lf_list *list,
> +                unsigned int num,
> +                void **obj_table,
> +                struct rte_stack_lf_elem **last)
> +{
> +#ifndef RTE_ARCH_X86_64
> +     RTE_SET_USED(obj_table);
> +     RTE_SET_USED(last);
> +     RTE_SET_USED(list);
> +     RTE_SET_USED(num);
> +
> +     return NULL;
> +#else
> +     struct rte_stack_lf_head old_head;
> +     int success;
> +
> +     /* Reserve num elements, if available */
> +     while (1) {
> +             uint64_t len = rte_atomic64_read(&list->len);
> +
> +             /* Does the list contain enough elements? */
> +             if (unlikely(len < num))
> +                     return NULL;
> +
> +             if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
> +                                     len, len - num))
> +                     break;
> +     }
> +
> +     old_head = list->head;
> +
> +     /* Pop num elements */
> +     do {
> +             struct rte_stack_lf_head new_head;
> +             struct rte_stack_lf_elem *tmp;
> +             unsigned int i;
> +
> +             rte_prefetch0(old_head.top);
> +
> +             tmp = old_head.top;
> +
> +             /* Traverse the list to find the new head. A next pointer will
> +              * either point to another element or NULL; if a thread
> +              * encounters a pointer that has already been popped, the
> CAS
> +              * will fail.
> +              */
> +             for (i = 0; i < num && tmp != NULL; i++) {
> +                     rte_prefetch0(tmp->next);
> +                     if (obj_table)
> +                             obj_table[i] = tmp->data;
> +                     if (last)
> +                             *last = tmp;
> +                     tmp = tmp->next;
> +             }
> +
> +             /* If NULL was encountered, the list was modified while
> +              * traversing it. Retry.
> +              */
> +             if (i != num)
> +                     continue;
> +
> +             new_head.top = tmp;
> +             new_head.cnt = old_head.cnt + 1;
> +
> +             /* old_head is updated on failure */
> +             success = rte_atomic128_cmp_exchange(
> +                             (rte_int128_t *)&list->head,
> +                             (rte_int128_t *)&old_head,
> +                             (rte_int128_t *)&new_head,
> +                             1, __ATOMIC_ACQUIRE,
> +                             __ATOMIC_ACQUIRE);
Just wondering if  'rte_atomic128_cmp_exchange' for x86 should have compiler 
barriers based on the memory order passed?
C++11 memory model is getting mixed with barrier based model. I think this is 
something that needs to be discussed at a wider level.

> +     } while (success == 0);
> +
> +     return old_head.top;
> +#endif
> +}
> +
> +#endif /* _RTE_STACK_GENERIC_H_ */
> --
> 2.13.6

Reply via email to