[snip] > > +static __rte_always_inline void > > +__rte_stack_lf_push(struct rte_stack_lf_list *list, > > + struct rte_stack_lf_elem *first, > > + struct rte_stack_lf_elem *last, > > + unsigned int num) > > +{ > > +#ifndef RTE_ARCH_X86_64 > > + RTE_SET_USED(first); > > + RTE_SET_USED(last); > > + RTE_SET_USED(list); > > + RTE_SET_USED(num); > > +#else > > + struct rte_stack_lf_head old_head; > > + int success; > > + > > + old_head = list->head; > This can be a torn read (same as you have mentioned in > __rte_stack_lf_pop). I suggest we use acquire thread fence here as well > (please see the comments in __rte_stack_lf_pop).
Agreed. I'll add the acquire fence. > > + > > + do { > > + struct rte_stack_lf_head new_head; > > + > We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here (please see > the comments in __rte_stack_lf_pop). Will add the fence here. > > + /* Swing the top pointer to the first element in the list and > > + * make the last element point to the old top. > > + */ > > + new_head.top = first; > > + new_head.cnt = old_head.cnt + 1; > > + > > + last->next = old_head.top; > > + > > + /* Use the release memmodel to ensure the writes to the LF > > LIFO > > + * elements are visible before the head pointer write. > > + */ > > + success = rte_atomic128_cmp_exchange( > > + (rte_int128_t *)&list->head, > > + (rte_int128_t *)&old_head, > > + (rte_int128_t *)&new_head, > > + 1, __ATOMIC_RELEASE, > > + __ATOMIC_RELAXED); > Success memory order can be RELAXED as the store to list->len.cnt is > RELEASE. The RELEASE success order here ensures that the store to 'last->next' is visible before the head update. The RELEASE in the list->len.cnt store only guarantees that the preceding stores are visible before list->len.cnt's store, but doesn't guarantee any ordering between the 'last->next' store and the head update, so we can't rely on that. > > + } while (success == 0); > > + > > + /* Ensure the stack modifications are not reordered with respect > > + * to the LIFO len update. > > + */ > > + __atomic_add_fetch(&list->len.cnt, num, __ATOMIC_RELEASE); > > #endif } > > + > > +static __rte_always_inline struct rte_stack_lf_elem * > > +__rte_stack_lf_pop(struct rte_stack_lf_list *list, > > + unsigned int num, > > + void **obj_table, > > + struct rte_stack_lf_elem **last) { #ifndef > RTE_ARCH_X86_64 > > + RTE_SET_USED(obj_table); > > + RTE_SET_USED(last); > > + RTE_SET_USED(list); > > + RTE_SET_USED(num); > > + > > + return NULL; > > +#else > > + struct rte_stack_lf_head old_head; > > + int success; > > + > > + /* Reserve num elements, if available */ > > + while (1) { > > + uint64_t len = __atomic_load_n(&list->len.cnt, > > + __ATOMIC_ACQUIRE); > This can be outside the loop. Good idea. I'll move this out of the loop and change the atomic_compare_exchange_n's failure memory order to ACQUIRE. > > + > > + /* Does the list contain enough elements? */ > > + if (unlikely(len < num)) > > + return NULL; > > + > > + if (__atomic_compare_exchange_n(&list->len.cnt, > > + &len, len - num, > > + 0, __ATOMIC_RELAXED, > > + __ATOMIC_RELAXED)) > > + break; > > + } > > + > > +#ifndef RTE_ARCH_X86_64 > > + /* Use the acquire memmodel to ensure the reads to the LF LIFO > > elements > > + * are properly ordered with respect to the head pointer read. > > + * > > + * Note that for aarch64, GCC's implementation of __atomic_load_16 > > in > > + * libatomic uses locks, and so this function should be replaced by > > + * a new function (e.g. "rte_atomic128_load()"). > aarch64 does not have 'pure' atomic 128b load instructions. They have to be > implemented using load/store exclusives. > > > + */ > > + __atomic_load((volatile __int128 *)&list->head, > > + &old_head, > > + __ATOMIC_ACQUIRE); > Since, we know of x86/aarch64 (power?) that cannot implement pure atomic > 128b loads, should we just use relaxed reads and assume the possibility of > torn reads for all architectures? Then, we can use acquire fence to prevent > the reordering (see below) That's a cleaner solution. I'll implement that, dropping the architecture distinction. > > +#else > > + /* x86-64 does not require an atomic load here; if a torn read > > +occurs, > IMO, we should not make architecture specific distinctions as this algorithm > is > based on C11 memory model. > > > + * the CAS will fail and set old_head to the correct/latest value. > > + */ > > + old_head = list->head; > > +#endif > > + > > + /* Pop num elements */ > > + do { > > + struct rte_stack_lf_head new_head; > > + struct rte_stack_lf_elem *tmp; > > + unsigned int i; > > + > We can add __atomic_thread_fence(__ATOMIC_ACQUIRE) here. Will do. > > + rte_prefetch0(old_head.top); > > + > > + tmp = old_head.top; > > + > > + /* Traverse the list to find the new head. A next pointer will > > + * either point to another element or NULL; if a thread > > + * encounters a pointer that has already been popped, the > > CAS > > + * will fail. > > + */ > > + for (i = 0; i < num && tmp != NULL; i++) { > > + rte_prefetch0(tmp->next); > > + if (obj_table) > > + obj_table[i] = tmp->data; > > + if (last) > > + *last = tmp; > > + tmp = tmp->next; > > + } > > + > > + /* If NULL was encountered, the list was modified while > > + * traversing it. Retry. > > + */ > > + if (i != num) > > + continue; > > + > > + new_head.top = tmp; > > + new_head.cnt = old_head.cnt + 1; > > + > > + success = rte_atomic128_cmp_exchange( > > + (rte_int128_t *)&list->head, > > + (rte_int128_t *)&old_head, > > + (rte_int128_t *)&new_head, > > + 1, __ATOMIC_ACQUIRE, > > + __ATOMIC_ACQUIRE); > The success order should be __ATOMIC_RELEASE as the write to list->len.cnt > is relaxed. > The failure order can be __ATOMIC_RELAXED if the thread fence is added. Agreed on both counts. Will address. > > + } while (success == 0); > > + > > + return old_head.top; > > +#endif > > +} > > + > > +#endif /* _RTE_STACK_C11_MEM_H_ */ > > -- > > 2.13.6