On 03/22/17 18:47, Bill Fischofer wrote: > On Wed, Mar 22, 2017 at 10:37 AM, Maxim Uvarov <maxim.uva...@linaro.org> > wrote: >> On 03/22/17 07:54, Brian Brooks wrote: >>> Acquire ordering is needed to maintain proper release consistency with >>> the ring enqueue operation. This issue presented itself as deadlock when >>> running on an ARM-based chip. >>> >>> Signed-off-by: Brian Brooks <brian.bro...@linaro.org> >>> --- >>> platform/linux-generic/include/odp_ring_internal.h | 4 ++-- >>> 1 file changed, 2 insertions(+), 2 deletions(-) >>> >>> diff --git a/platform/linux-generic/include/odp_ring_internal.h >>> b/platform/linux-generic/include/odp_ring_internal.h >>> index 55fedeb3..44b83c60 100644 >>> --- a/platform/linux-generic/include/odp_ring_internal.h >>> +++ b/platform/linux-generic/include/odp_ring_internal.h >>> @@ -57,7 +57,7 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t >>> mask) >>> >>> /* Move reader head. This thread owns data at the new head. */ >>> do { >>> - tail = odp_atomic_load_u32(&ring->w_tail); >>> + tail = odp_atomic_load_acq_u32(&ring->w_tail); >>> >>> if (head == tail) >>> return RING_EMPTY; >>> @@ -90,7 +90,7 @@ static inline uint32_t ring_deq_multi(ring_t *ring, >>> uint32_t mask, >>> >>> /* Move reader head. This thread owns data at the new head. */ >>> do { >>> - tail = odp_atomic_load_u32(&ring->w_tail); >>> + tail = odp_atomic_load_acq_u32(&ring->w_tail); >>> >>> /* Ring is empty */ >>> if (head == tail) >>> >> >> >> Brian, can you please write few words why acq (i.e. barriers) is not >> used in first load and in final store? > > acq is not needed for the initial fetch of head here because head is > updated by cas_acq below, which handles the sync in a self-contained > manner. The update of tail uses store_rel as a counterpart to the > previous load_acq. load_acq and store_rel operate as pairs. >
Thank you, Bill. Merged with short message: "linux-gen: ring: fix memory ordering in ring dequeue" Maxim. >> >> static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, >> uint32_t data[], uint32_t num) >> { >> uint32_t head, tail, new_head, i; >> >> head = odp_atomic_load_u32(&ring->r_head); >> >> /* Move reader head. This thread owns data at the new head. */ >> do { >> tail = odp_atomic_load_acq_u32(&ring->w_tail); >> >> /* Ring is empty */ >> if (head == tail) >> return 0; >> >> /* Try to take all available */ >> if ((tail - head) < num) >> num = tail - head; >> >> new_head = head + num; >> >> } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, >> new_head) == 0)); >> >> /* Read queue index */ >> for (i = 0; i < num; i++) >> data[i] = ring->data[(head + 1 + i) & mask]; >> >> /* Wait until other readers have updated the tail */ >> while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) >> odp_cpu_pause(); >> >> /* Now update the reader tail */ >> odp_atomic_store_rel_u32(&ring->r_tail, new_head); >> >> return num; >> }