On 03/22/17 07:54, Brian Brooks wrote:
> Acquire ordering is needed to maintain proper release consistency with
> the ring enqueue operation. This issue presented itself as deadlock when
> running on an ARM-based chip.
> 
> Signed-off-by: Brian Brooks <brian.bro...@linaro.org>
> ---
>  platform/linux-generic/include/odp_ring_internal.h | 4 ++--
>  1 file changed, 2 insertions(+), 2 deletions(-)
> 
> diff --git a/platform/linux-generic/include/odp_ring_internal.h 
> b/platform/linux-generic/include/odp_ring_internal.h
> index 55fedeb3..44b83c60 100644
> --- a/platform/linux-generic/include/odp_ring_internal.h
> +++ b/platform/linux-generic/include/odp_ring_internal.h
> @@ -57,7 +57,7 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
>  
>       /* Move reader head. This thread owns data at the new head. */
>       do {
> -             tail = odp_atomic_load_u32(&ring->w_tail);
> +             tail = odp_atomic_load_acq_u32(&ring->w_tail);
>  
>               if (head == tail)
>                       return RING_EMPTY;
> @@ -90,7 +90,7 @@ static inline uint32_t ring_deq_multi(ring_t *ring, 
> uint32_t mask,
>  
>       /* Move reader head. This thread owns data at the new head. */
>       do {
> -             tail = odp_atomic_load_u32(&ring->w_tail);
> +             tail = odp_atomic_load_acq_u32(&ring->w_tail);
>  
>               /* Ring is empty */
>               if (head == tail)
> 


Brian, can you please write few words why acq (i.e. barriers) is not
used in first load and in final store?

static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
                                      uint32_t data[], uint32_t num)
{
        uint32_t head, tail, new_head, i;

        head = odp_atomic_load_u32(&ring->r_head);

        /* Move reader head. This thread owns data at the new head. */
        do {
                tail = odp_atomic_load_acq_u32(&ring->w_tail);

                /* Ring is empty */
                if (head == tail)
                        return 0;

                /* Try to take all available */
                if ((tail - head) < num)
                        num = tail - head;

                new_head = head + num;

        } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
                              new_head) == 0));

        /* Read queue index */
        for (i = 0; i < num; i++)
                data[i] = ring->data[(head + 1 + i) & mask];

        /* Wait until other readers have updated the tail */
        while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
                odp_cpu_pause();

        /* Now update the reader tail */
        odp_atomic_store_rel_u32(&ring->r_tail, new_head);

        return num;
}

Reply via email to