On Thu, Dec 1, 2016 at 5:37 AM, Matias Elo <matias....@nokia.com> wrote:
> Implement ordered locks using per lock atomic counters. The counter values
> are compared to the queue’s atomic context to guarantee ordered locking.
> Compared to the previous implementation this enables parallel processing of
> ordered events outside of the lock context.
>
> Signed-off-by: Matias Elo <matias....@nokia.com>
> ---
>  .../linux-generic/include/odp_queue_internal.h     |  2 +
>  platform/linux-generic/odp_queue.c                 |  6 +++
>  platform/linux-generic/odp_schedule.c              | 49 
> ++++++++++++++++++++--
>  3 files changed, 54 insertions(+), 3 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_queue_internal.h 
> b/platform/linux-generic/include/odp_queue_internal.h
> index b905bd8..8b55de1 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -59,6 +59,8 @@ struct queue_entry_s {
>         struct {
>                 odp_atomic_u64_t  ctx; /**< Current ordered context id */
>                 odp_atomic_u64_t  next_ctx; /**< Next unallocated context id 
> */
> +               /** Array of ordered locks */
> +               odp_atomic_u64_t  lock[CONFIG_QUEUE_MAX_ORD_LOCKS];
>         } ordered ODP_ALIGNED_CACHE;
>
>         enq_func_t       enqueue ODP_ALIGNED_CACHE;
> diff --git a/platform/linux-generic/odp_queue.c 
> b/platform/linux-generic/odp_queue.c
> index 4c7f497..d9cb9f3 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -77,8 +77,14 @@ static int queue_init(queue_entry_t *queue, const char 
> *name,
>                 queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
>
>                 if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
> +                       unsigned i;
> +
>                         odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
>                         odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
> +
> +                       for (i = 0; i < queue->s.param.sched.lock_count; i++)
> +                               odp_atomic_init_u64(&queue->s.ordered.lock[i],
> +                                                   0);
>                 }
>         }
>         queue->s.type = queue->s.param.type;
> diff --git a/platform/linux-generic/odp_schedule.c 
> b/platform/linux-generic/odp_schedule.c
> index 4b33513..c628142 100644
> --- a/platform/linux-generic/odp_schedule.c
> +++ b/platform/linux-generic/odp_schedule.c
> @@ -126,6 +126,15 @@ typedef struct {
>         int num;
>  } ordered_stash_t;
>
> +/* Ordered lock states */
> +typedef union {
> +       uint8_t u8[CONFIG_QUEUE_MAX_ORD_LOCKS];
> +       uint32_t all;
> +} lock_called_t;
> +
> +ODP_STATIC_ASSERT(sizeof(lock_called_t) == sizeof(uint32_t),
> +                 "Lock_called_values_do_not_fit_in_uint32");
> +
>  /* Scheduler local data */
>  typedef struct {
>         int thr;
> @@ -145,6 +154,7 @@ typedef struct {
>                 ordered_stash_t stash[MAX_ORDERED_STASH];
>                 int stash_num; /**< Number of stashed enqueue operations */
>                 uint8_t in_order; /**< Order status */
> +               lock_called_t lock_called; /**< States of ordered locks */
>         } ordered;
>
>  } sched_local_t;
> @@ -553,12 +563,21 @@ static inline void ordered_stash_release(void)
>
>  static inline void release_ordered(void)
>  {
> +       unsigned i;
>         queue_entry_t *queue;
>
>         queue = sched_local.ordered.src_queue;
>
>         wait_for_order(queue);
>
> +       /* Release all ordered locks */
> +       for (i = 0; i < queue->s.param.sched.lock_count; i++) {
> +               if (!sched_local.ordered.lock_called.u8[i])
> +                       odp_atomic_store_rel_u64(&queue->s.ordered.lock[i],
> +                                                sched_local.ordered.ctx + 1);
> +       }
> +
> +       sched_local.ordered.lock_called.all = 0;
>         sched_local.ordered.src_queue = NULL;
>         sched_local.ordered.in_order = 0;
>
> @@ -923,19 +942,43 @@ static void order_unlock(void)
>  {
>  }
>
> -static void schedule_order_lock(unsigned lock_index ODP_UNUSED)
> +static void schedule_order_lock(unsigned lock_index)
>  {
> +       odp_atomic_u64_t *ord_lock;
>         queue_entry_t *queue;
>
>         queue = sched_local.ordered.src_queue;
>
>         ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);

Sorry, I should have been more precise. The staleness test I was
referring to was to verify that the lock had not been previously used
in this ordered context. In the current code that's the following
assert (in odp_schedule_order.c)

sync = sched_local.sync[lock_index];
sync_out = odp_atomic_load_u64(&origin_qe->s.sync_out[lock_index]);
ODP_ASSERT(sync >= sync_out);

The test above should be open code since it's a validity check on the
call. The current code treats odp_schedule_order_lock/unlock as no-ops
if we aren't running in an ordered context as that permits queue types
to be moved from say ordered to atomic without having to change the
application.

>
> -       wait_for_order(queue);
> +       ord_lock = &queue->s.ordered.lock[lock_index];
> +
> +       /* Busy loop to synchronize ordered processing */
> +       while (1) {
> +               uint64_t lock_seq;
> +
> +               lock_seq = odp_atomic_load_acq_u64(ord_lock);

In this logic that test would take the form:

ODP_ASSERT(sched_local.ordered_ctx >= lock_seq);

Though given the existence of lock_called.u8, perhaps

ODP_ASSERT(sched_local.lordered.lock_called.ui[lock_index] == 0);
would be equally effective.

> +
> +               if (lock_seq == sched_local.ordered.ctx) {
> +                       sched_local.ordered.lock_called.u8[lock_index] = 1;
> +                       return;
> +               }
> +               odp_cpu_pause();
> +       }
>  }
>
> -static void schedule_order_unlock(unsigned lock_index ODP_UNUSED)
> +static void schedule_order_unlock(unsigned lock_index)
>  {
> +       odp_atomic_u64_t *ord_lock;
> +       queue_entry_t *queue;
> +
> +       queue = sched_local.ordered.src_queue;
> +
> +       ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
> +
> +       ord_lock = &queue->s.ordered.lock[lock_index];

The corresponding staleness test here would be:

ODP_ASSERT(sched_local.ordered_ctx == odp_atomic_load_acq_u64(ord_lock));

The point being that you want to detect erroneous duplicate lock
acquires or frees, which can lead to very subtle bugs that are
difficult to find.

> +
> +       odp_atomic_store_rel_u64(ord_lock, sched_local.ordered.ctx + 1);
>  }
>
>  static void schedule_pause(void)
> --
> 2.7.4
>

Reply via email to