ok, thanks.

On 22 March 2017 at 19:27, Brian Brooks <brian.bro...@linaro.org> wrote:

> On Wed, Mar 22, 2017 at 10:37 AM, Maxim Uvarov <maxim.uva...@linaro.org>
> wrote:
> > On 03/22/17 07:54, Brian Brooks wrote:
> >> Acquire ordering is needed to maintain proper release consistency with
> >> the ring enqueue operation. This issue presented itself as deadlock when
> >> running on an ARM-based chip.
> >>
> >> Signed-off-by: Brian Brooks <brian.bro...@linaro.org>
> >> ---
> >>  platform/linux-generic/include/odp_ring_internal.h | 4 ++--
> >>  1 file changed, 2 insertions(+), 2 deletions(-)
> >>
> >> diff --git a/platform/linux-generic/include/odp_ring_internal.h
> b/platform/linux-generic/include/odp_ring_internal.h
> >> index 55fedeb3..44b83c60 100644
> >> --- a/platform/linux-generic/include/odp_ring_internal.h
> >> +++ b/platform/linux-generic/include/odp_ring_internal.h
> >> @@ -57,7 +57,7 @@ static inline uint32_t ring_deq(ring_t *ring,
> uint32_t mask)
> >>
> >>       /* Move reader head. This thread owns data at the new head. */
> >>       do {
> >> -             tail = odp_atomic_load_u32(&ring->w_tail);
> >> +             tail = odp_atomic_load_acq_u32(&ring->w_tail);
> >>
> >>               if (head == tail)
> >>                       return RING_EMPTY;
> >> @@ -90,7 +90,7 @@ static inline uint32_t ring_deq_multi(ring_t *ring,
> uint32_t mask,
> >>
> >>       /* Move reader head. This thread owns data at the new head. */
> >>       do {
> >> -             tail = odp_atomic_load_u32(&ring->w_tail);
> >> +             tail = odp_atomic_load_acq_u32(&ring->w_tail);
> >>
> >>               /* Ring is empty */
> >>               if (head == tail)
> >>
> >
> >
> > Brian, can you please write few words why acq (i.e. barriers) is not
> > used in first load and in final store?
>
> Memory ordering on these compiler built-ins can lower to either a
> sequence of instructions with an explicit memory barrier instruction
> (e.g. a dmb variant on ARMv8) or a single instruction with implied
> memory ordering (e.g. ldar (a load instruction with acquire memory
> ordering) on ARMv8).
>
> Acquire ordering could be used on the initial load of r_head instead.
> An explicit acquire fence could be placed before the do-while loop
> too.
> It is just a bit more symmetrical to have acquire ordering on the
> load of w_tail.
>
> What is needed is an ordering to prevent the load of w_tail (in the
> do-while loop) to be reordered before the store of w_tail in the enq
> operation... on the same core.
>
> Think of the situation where the thread running on one core does
> an enq() followed by a deq().  The final store release in enq() prevents
> memory accesses from being reordered to after the store.  But, memory
> accesses can still be reordered to before that store.  We need acquire
> ordering in deq() to prevent subsequent memory accesses from being
> reordered into the enq() operation.
>
> > static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
> >                                       uint32_t data[], uint32_t num)
> > {
> >         uint32_t head, tail, new_head, i;
> >
> >         head = odp_atomic_load_u32(&ring->r_head);
> >
> >         /* Move reader head. This thread owns data at the new head. */
> >         do {
> >                 tail = odp_atomic_load_acq_u32(&ring->w_tail);
> >
> >                 /* Ring is empty */
> >                 if (head == tail)
> >                         return 0;
> >
> >                 /* Try to take all available */
> >                 if ((tail - head) < num)
> >                         num = tail - head;
> >
> >                 new_head = head + num;
> >
> >         } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head,
> &head,
> >                               new_head) == 0));
> >
> >         /* Read queue index */
> >         for (i = 0; i < num; i++)
> >                 data[i] = ring->data[(head + 1 + i) & mask];
> >
> >         /* Wait until other readers have updated the tail */
> >         while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) !=
> head))
> >                 odp_cpu_pause();
> >
> >         /* Now update the reader tail */
> >         odp_atomic_store_rel_u32(&ring->r_tail, new_head);
> >
> >         return num;
> > }
>

Reply via email to