[snip]

> > +static __rte_always_inline void
> > +__rte_stack_lf_push(struct rte_stack_lf_list *list,
> > +               struct rte_stack_lf_elem *first,
> > +               struct rte_stack_lf_elem *last,
> > +               unsigned int num)
> > +{
> > +#ifndef RTE_ARCH_X86_64
> > +   RTE_SET_USED(first);
> > +   RTE_SET_USED(last);
> > +   RTE_SET_USED(list);
> > +   RTE_SET_USED(num);
> > +#else
> > +   struct rte_stack_lf_head old_head;
> > +   int success;
> > +
> > +   old_head = list->head;
> This can be a torn read (same as you have mentioned in
> __rte_stack_lf_pop). I suggest we use acquire thread fence here as well
> (please see the comments in __rte_stack_lf_pop).

Agreed. I'll add the acquire fence.

> > +
> > +   do {
> > +           struct rte_stack_lf_head new_head;
> > +
> We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here (please see
> the comments in __rte_stack_lf_pop).

Will add the fence here.

> > +           /* Swing the top pointer to the first element in the list and
> > +            * make the last element point to the old top.
> > +            */
> > +           new_head.top = first;
> > +           new_head.cnt = old_head.cnt + 1;
> > +
> > +           last->next = old_head.top;
> > +
> > +           /* Use the release memmodel to ensure the writes to the LF
> > LIFO
> > +            * elements are visible before the head pointer write.
> > +            */
> > +           success = rte_atomic128_cmp_exchange(
> > +                           (rte_int128_t *)&list->head,
> > +                           (rte_int128_t *)&old_head,
> > +                           (rte_int128_t *)&new_head,
> > +                           1, __ATOMIC_RELEASE,
> > +                           __ATOMIC_RELAXED);
> Success memory order can be RELAXED as the store to list->len.cnt is
> RELEASE.

The RELEASE success order here ensures that the store to 'last->next' is 
visible before the head update. The RELEASE in the list->len.cnt store only 
guarantees that the preceding stores are visible before list->len.cnt's store, 
but doesn't guarantee any ordering between the 'last->next' store and the head 
update, so we can't rely on that.

> > +   } while (success == 0);
> > +
> > +   /* Ensure the stack modifications are not reordered with respect
> > +    * to the LIFO len update.
> > +    */
> > +   __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE);
> > #endif }
> > +
> > +static __rte_always_inline struct rte_stack_lf_elem *
> > +__rte_stack_lf_pop(struct rte_stack_lf_list *list,
> > +              unsigned int num,
> > +              void **obj_table,
> > +              struct rte_stack_lf_elem **last) { #ifndef
> RTE_ARCH_X86_64
> > +   RTE_SET_USED(obj_table);
> > +   RTE_SET_USED(last);
> > +   RTE_SET_USED(list);
> > +   RTE_SET_USED(num);
> > +
> > +   return NULL;
> > +#else
> > +   struct rte_stack_lf_head old_head;
> > +   int success;
> > +
> > +   /* Reserve num elements, if available */
> > +   while (1) {
> > +           uint64_t len = __atomic_load_n(&list->len.cnt,
> > +                                          __ATOMIC_ACQUIRE);
> This can be outside the loop.

Good idea. I'll move this out of the loop and change the 
atomic_compare_exchange_n's failure memory order to ACQUIRE.

> > +
> > +           /* Does the list contain enough elements? */
> > +           if (unlikely(len < num))
> > +                   return NULL;
> > +
> > +           if (__atomic_compare_exchange_n(&list->len.cnt,
> > +                                           &len, len - num,
> > +                                           0, __ATOMIC_RELAXED,
> > +                                           __ATOMIC_RELAXED))
> > +                   break;
> > +   }
> > +
> > +#ifndef RTE_ARCH_X86_64
> > +   /* Use the acquire memmodel to ensure the reads to the LF LIFO
> > elements
> > +    * are properly ordered with respect to the head pointer read.
> > +    *
> > +    * Note that for aarch64, GCC's implementation of __atomic_load_16
> > in
> > +    * libatomic uses locks, and so this function should be replaced by
> > +    * a new function (e.g. "rte_atomic128_load()").
> aarch64 does not have 'pure' atomic 128b load instructions. They have to be
> implemented using load/store exclusives.
>
> > +    */
> > +   __atomic_load((volatile __int128 *)&list->head,
> > +                 &old_head,
> > +                 __ATOMIC_ACQUIRE);
> Since, we know of x86/aarch64 (power?) that cannot implement pure atomic
> 128b loads, should we just use relaxed reads and assume the possibility of
> torn reads for all architectures? Then, we can use acquire fence to prevent
> the reordering (see below)

That's a cleaner solution. I'll implement that, dropping the architecture 
distinction.

> > +#else
> > +   /* x86-64 does not require an atomic load here; if a torn read
> > +occurs,
> IMO, we should not make architecture specific distinctions as this algorithm 
> is
> based on C11 memory model.
>
> > +    * the CAS will fail and set old_head to the correct/latest value.
> > +    */
> > +   old_head = list->head;
> > +#endif
> > +
> > +   /* Pop num elements */
> > +   do {
> > +           struct rte_stack_lf_head new_head;
> > +           struct rte_stack_lf_elem *tmp;
> > +           unsigned int i;
> > +
> We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here.

Will do.

> > +           rte_prefetch0(old_head.top);
> > +
> > +           tmp = old_head.top;
> > +
> > +           /* Traverse the list to find the new head. A next pointer will
> > +            * either point to another element or NULL; if a thread
> > +            * encounters a pointer that has already been popped, the
> > CAS
> > +            * will fail.
> > +            */
> > +           for (i = 0; i < num && tmp != NULL; i++) {
> > +                   rte_prefetch0(tmp->next);
> > +                   if (obj_table)
> > +                           obj_table[i] = tmp->data;
> > +                   if (last)
> > +                           *last = tmp;
> > +                   tmp = tmp->next;
> > +           }
> > +
> > +           /* If NULL was encountered, the list was modified while
> > +            * traversing it. Retry.
> > +            */
> > +           if (i != num)
> > +                   continue;
> > +
> > +           new_head.top = tmp;
> > +           new_head.cnt = old_head.cnt + 1;
> > +
> > +           success = rte_atomic128_cmp_exchange(
> > +                           (rte_int128_t *)&list->head,
> > +                           (rte_int128_t *)&old_head,
> > +                           (rte_int128_t *)&new_head,
> > +                           1, __ATOMIC_ACQUIRE,
> > +                           __ATOMIC_ACQUIRE);
> The success order should be __ATOMIC_RELEASE as the write to list->len.cnt
> is relaxed.
> The failure order can be __ATOMIC_RELAXED if the thread fence is added.

Agreed on both counts. Will address.

> > +   } while (success == 0);
> > +
> > +   return old_head.top;
> > +#endif
> > +}
> > +
> > +#endif /* _RTE_STACK_C11_MEM_H_ */
> > --
> > 2.13.6

Reply via email to