Last caller of rte_atomic32_cmpset() in lib/, blocking deprecation of the rte_atomicNN_*() family.
Replace cmpset with rte_atomic_compare_exchange_weak_explicit(), and convert head/tail loads/stores from implicit seq_cst to explicit acquire/release. Matches the HTS/RTS pattern. Acquire-load of d->head orders the subsequent load of s->tail (was rte_smp_rmb()). Acquire-load of s->tail pairs with the release-store of the counterpart tail in __rte_ring_update_tail(), which subsumes the previous wmb/rmb barriers. Weak CAS avoids arm64's hidden inner retry; the outer do-while already loops. CAS orderings relaxed: no data published by the reservation. The now-unused 'enqueue' parameter of __rte_ring_update_tail() is removed; both call sites updated. Signed-off-by: Stephen Hemminger <[email protected]> --- lib/ring/rte_ring_generic_pvt.h | 65 +++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 20 deletions(-) diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h index affd2d5ba7..84570fd5fc 100644 --- a/lib/ring/rte_ring_generic_pvt.h +++ b/lib/ring/rte_ring_generic_pvt.h @@ -23,21 +23,24 @@ */ static __rte_always_inline void __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val, - uint32_t new_val, uint32_t single, uint32_t enqueue) + uint32_t new_val, uint32_t single, + uint32_t enqueue __rte_unused) { - if (enqueue) - rte_smp_wmb(); - else - rte_smp_rmb(); /* * If there are other enqueues/dequeues in progress that preceded us, * we need to wait for them to complete */ if (!single) - rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, old_val, - rte_memory_order_relaxed); - - ht->tail = new_val; + rte_wait_until_equal_32((volatile uint32_t *)(uintptr_t)&ht->tail, + old_val, rte_memory_order_relaxed); + /* + * R0: Release store on the tail. Pairs with the acquire load of the + * counterpart's tail at A0 in __rte_ring_headtail_move_head() on the + * other side. Ensures slot operations performed by this thread (writes + * for enqueue, reads for dequeue) become visible before the new tail + * value is observed by the other side. + */ + rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release); } /** @@ -76,25 +79,35 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, { unsigned int max = n; int success; + uint32_t tail; do { /* Reset n to the initial burst count */ n = max; - *old_head = d->head; + /* + * Acquire on d->head and acquire on s->tail below together prevent + * the two loads from being reordered (was rte_smp_rmb()) and + * re-establish ordering after a failed CAS on retry. + */ + *old_head = rte_atomic_load_explicit(&d->head, + rte_memory_order_acquire); - /* add rmb barrier to avoid load/load reorder in weak - * memory model. It is noop on x86 + /* + * A0: Acquire load on the counterpart's tail. Pairs with the + * release store at R0 in __rte_ring_update_tail(), ensuring slot + * operations on the other side are visible before this thread + * accesses the reserved slots. */ - rte_smp_rmb(); + tail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire); /* * The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have - * *old_head > s->tail). So 'entries' is always between 0 + * *old_head > tail). So 'entries' is always between 0 * and capacity (which is < size). */ - *entries = (capacity + s->tail - *old_head); + *entries = (capacity + tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *entries)) @@ -106,12 +119,24 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d, *new_head = *old_head + n; if (is_st) { - d->head = *new_head; + rte_atomic_store_explicit(&d->head, *new_head, rte_memory_order_relaxed); success = 1; - } else - success = rte_atomic32_cmpset( - (uint32_t *)(uintptr_t)&d->head, - *old_head, *new_head); + } else { + /* + * Weak CAS: the outer do-while handles spurious + * failures, so we avoid the strong variant's + * internal retry (which on arm64 wraps the LL/SC + * pair in a hidden inner loop). + * + * Relaxed on both success and failure: this CAS + * does not publish data. Slot data visibility is + * provided by the acquire loads above and the + * release store of tail in __rte_ring_update_tail(). + */ + success = rte_atomic_compare_exchange_weak_explicit( + &d->head, old_head, *new_head, + rte_memory_order_relaxed, rte_memory_order_relaxed); + } } while (unlikely(success == 0)); return n; } -- 2.53.0

