Last caller of rte_atomic32_cmpset() in lib/, blocking deprecation
of the rte_atomicNN_*() family.

Replace cmpset with rte_atomic_compare_exchange_weak_explicit(),
and convert head/tail loads/stores from implicit seq_cst to explicit
acquire/release. Matches the HTS/RTS pattern.

Acquire-load of d->head orders the subsequent load of s->tail (was
rte_smp_rmb()). Acquire-load of s->tail pairs with the release-store
of the counterpart tail in __rte_ring_update_tail(), which subsumes
the previous wmb/rmb barriers.

Weak CAS avoids arm64's hidden inner retry; the outer do-while already
loops. CAS orderings relaxed: no data published by the reservation.

The now-unused 'enqueue' parameter of __rte_ring_update_tail() is
removed; both call sites updated.

Signed-off-by: Stephen Hemminger <[email protected]>
---
 lib/ring/rte_ring_generic_pvt.h | 64 +++++++++++++++++++++++----------
 1 file changed, 45 insertions(+), 19 deletions(-)

diff --git a/lib/ring/rte_ring_generic_pvt.h b/lib/ring/rte_ring_generic_pvt.h
index affd2d5ba7..9497f6737b 100644
--- a/lib/ring/rte_ring_generic_pvt.h
+++ b/lib/ring/rte_ring_generic_pvt.h
@@ -23,21 +23,25 @@
  */
 static __rte_always_inline void
 __rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
-               uint32_t new_val, uint32_t single, uint32_t enqueue)
+               uint32_t new_val, uint32_t single,
+               uint32_t enqueue __rte_unused)
 {
-       if (enqueue)
-               rte_smp_wmb();
-       else
-               rte_smp_rmb();
        /*
         * If there are other enqueues/dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
        if (!single)
-               rte_wait_until_equal_32((volatile uint32_t 
*)(uintptr_t)&ht->tail, old_val,
-                       rte_memory_order_relaxed);
+               rte_wait_until_equal_32((volatile uint32_t 
*)(uintptr_t)&ht->tail,
+                       old_val, rte_memory_order_relaxed);
 
-       ht->tail = new_val;
+       /*
+        * Release ordering on the tail store ensures that the slot reads
+        * (dequeue) or writes (enqueue) performed by this thread are visible
+        * to the other side before the new tail value is observed.
+        * Pairs with the acquire load of the counterpart's tail in
+        * __rte_ring_headtail_move_head().
+        */
+       rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
 }
 
 /**
@@ -76,25 +80,35 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 {
        unsigned int max = n;
        int success;
+       uint32_t tail;
 
        do {
                /* Reset n to the initial burst count */
                n = max;
 
-               *old_head = d->head;
+               /*
+                * Acquire load: orders this load before the load of s->tail
+                * below (replaces rte_smp_rmb() in the previous version) and
+                * re-establishes ordering after a failed CAS on retry.
+                */
+               *old_head = rte_atomic_load_explicit(&d->head,
+                               rte_memory_order_acquire);
 
-               /* add rmb barrier to avoid load/load reorder in weak
-                * memory model. It is noop on x86
+               /*
+                * Acquire load on the counterpart's tail pairs with the
+                * release store in __rte_ring_update_tail() on the other
+                * side, ensuring slot operations performed there are visible
+                * before the caller accesses the reserved slots.
                 */
-               rte_smp_rmb();
+               tail = rte_atomic_load_explicit(&s->tail, 
rte_memory_order_acquire);
 
                /*
                 *  The subtraction is done between two unsigned 32bits value
                 * (the result is always modulo 32 bits even if we have
-                * *old_head > s->tail). So 'entries' is always between 0
+                * *old_head > tail). So 'entries' is always between 0
                 * and capacity (which is < size).
                 */
-               *entries = (capacity + s->tail - *old_head);
+               *entries = (capacity + tail - *old_head);
 
                /* check that we have enough room in ring */
                if (unlikely(n > *entries))
@@ -106,12 +120,24 @@ __rte_ring_headtail_move_head(struct rte_ring_headtail *d,
 
                *new_head = *old_head + n;
                if (is_st) {
-                       d->head = *new_head;
+                       rte_atomic_store_explicit(&d->head, *new_head, 
rte_memory_order_relaxed);
                        success = 1;
-               } else
-                       success = rte_atomic32_cmpset(
-                                       (uint32_t *)(uintptr_t)&d->head,
-                                       *old_head, *new_head);
+               } else {
+                       /*
+                        * Weak CAS: the outer do-while handles spurious
+                        * failures, so we avoid the strong variant's
+                        * internal retry (which on arm64 wraps the LL/SC
+                        * pair in a hidden inner loop).
+                        *
+                        * Relaxed on both success and failure: this CAS
+                        * does not publish data. Slot data visibility is
+                        * provided by the acquire loads above and the
+                        * release store of tail in __rte_ring_update_tail().
+                        */
+                       success = rte_atomic_compare_exchange_weak_explicit(
+                               &d->head, old_head, *new_head,
+                               rte_memory_order_relaxed, 
rte_memory_order_relaxed);
+               }
        } while (unlikely(success == 0));
        return n;
 }
-- 
2.53.0

Reply via email to