On Thu, Dec 1, 2016 at 5:37 AM, Matias Elo <matias....@nokia.com> wrote:
> Add new implementation for ordered queues. Compared to the old
> implementation this is much simpler and improves performance ~1-4x
> depending on the test case.
>
> The implementation is based on an atomic ordered context, which only a
> single thread may possess at a time. Only the thread owning the atomic
> context may do enqueue(s) from the ordered queue. All other threads put
> their enqueued events to a thread local enqueue stash (ordered_stash_t).
> All stashed enqueue operations will be performed in the original order when
> the thread acquires the ordered context. If the ordered stash becomes full,
> the enqueue blocks. At the latest a thread blocks when the ev_stash is
> empty and the thread tries to release the order context.
>
> Signed-off-by: Matias Elo <matias....@nokia.com>
> ---
>  .../linux-generic/include/odp_queue_internal.h     |   5 +
>  platform/linux-generic/odp_queue.c                 |  14 +-
>  platform/linux-generic/odp_schedule.c              | 171 
> +++++++++++++++++++--
>  3 files changed, 172 insertions(+), 18 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_queue_internal.h 
> b/platform/linux-generic/include/odp_queue_internal.h
> index df36b76..b905bd8 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -56,6 +56,11 @@ struct queue_entry_s {
>         odp_buffer_hdr_t *tail;
>         int               status;
>
> +       struct {
> +               odp_atomic_u64_t  ctx; /**< Current ordered context id */
> +               odp_atomic_u64_t  next_ctx; /**< Next unallocated context id 
> */
> +       } ordered ODP_ALIGNED_CACHE;
> +
>         enq_func_t       enqueue ODP_ALIGNED_CACHE;
>         deq_func_t       dequeue;
>         enq_multi_func_t enqueue_multi;
> diff --git a/platform/linux-generic/odp_queue.c 
> b/platform/linux-generic/odp_queue.c
> index 99c91e7..4c7f497 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -73,9 +73,14 @@ static int queue_init(queue_entry_t *queue, const char 
> *name,
>         if (queue->s.param.sched.lock_count > sched_fn->max_ordered_locks())
>                 return -1;
>
> -       if (param->type == ODP_QUEUE_TYPE_SCHED)
> +       if (param->type == ODP_QUEUE_TYPE_SCHED) {
>                 queue->s.param.deq_mode = ODP_QUEUE_OP_DISABLED;
>
> +               if (param->sched.sync == ODP_SCHED_SYNC_ORDERED) {
> +                       odp_atomic_init_u64(&queue->s.ordered.ctx, 0);
> +                       odp_atomic_init_u64(&queue->s.ordered.next_ctx, 0);
> +               }
> +       }
>         queue->s.type = queue->s.param.type;
>
>         queue->s.enqueue = queue_enq;
> @@ -301,6 +306,13 @@ int odp_queue_destroy(odp_queue_t handle)
>                 ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
>                 return -1;
>         }
> +       if (queue_is_ordered(queue) &&
> +           odp_atomic_load_u64(&queue->s.ordered.ctx) !=
> +                           odp_atomic_load_u64(&queue->s.ordered.next_ctx)) {
> +               UNLOCK(&queue->s.lock);
> +               ODP_ERR("queue \"%s\" reorder incomplete\n", queue->s.name);
> +               return -1;
> +       }
>
>         switch (queue->s.status) {
>         case QUEUE_STATUS_READY:
> diff --git a/platform/linux-generic/odp_schedule.c 
> b/platform/linux-generic/odp_schedule.c
> index 5bc274f..4b33513 100644
> --- a/platform/linux-generic/odp_schedule.c
> +++ b/platform/linux-generic/odp_schedule.c
> @@ -111,11 +111,21 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= 
> QUEUES_PER_PRIO,
>  #define MAX_DEQ CONFIG_BURST_SIZE
>
>  /* Maximum number of ordered locks per queue */
> -#define MAX_ORDERED_LOCKS_PER_QUEUE 1
> +#define MAX_ORDERED_LOCKS_PER_QUEUE 2
>
>  ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS,
>                   "Too_many_ordered_locks");
>
> +/* Ordered stash size */
> +#define MAX_ORDERED_STASH 512
> +
> +/* Storage for stashed enqueue operation arguments */
> +typedef struct {
> +       odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
> +       queue_entry_t *queue;
> +       int num;
> +} ordered_stash_t;
> +
>  /* Scheduler local data */
>  typedef struct {
>         int thr;
> @@ -128,7 +138,15 @@ typedef struct {
>         uint32_t queue_index;
>         odp_queue_t queue;
>         odp_event_t ev_stash[MAX_DEQ];
> -       void *queue_entry;
> +       struct {
> +               queue_entry_t *src_queue; /**< Source queue entry */
> +               uint64_t ctx; /**< Ordered context id */
> +               /** Storage for stashed enqueue operations */
> +               ordered_stash_t stash[MAX_ORDERED_STASH];
> +               int stash_num; /**< Number of stashed enqueue operations */
> +               uint8_t in_order; /**< Order status */
> +       } ordered;
> +
>  } sched_local_t;
>
>  /* Priority queue */
> @@ -491,17 +509,81 @@ static void schedule_release_atomic(void)
>         }
>  }
>
> +static inline int ordered_own_turn(queue_entry_t *queue)
> +{
> +       uint64_t ctx;
> +
> +       ctx = odp_atomic_load_acq_u64(&queue->s.ordered.ctx);
> +
> +       return ctx == sched_local.ordered.ctx;
> +}
> +
> +static inline void wait_for_order(queue_entry_t *queue)
> +{
> +       /* Busy loop to synchronize ordered processing */
> +       while (1) {
> +               if (ordered_own_turn(queue))
> +                       break;
> +               odp_cpu_pause();
> +       }
> +}
> +
> +/**
> + * Perform stashed enqueue operations
> + *
> + * Should be called only when already in order.
> + */
> +static inline void ordered_stash_release(void)
> +{
> +       int i;
> +
> +       for (i = 0; i < sched_local.ordered.stash_num; i++) {
> +               queue_entry_t *queue;
> +               odp_buffer_hdr_t **buf_hdr;
> +               int num;
> +
> +               queue = sched_local.ordered.stash[i].queue;
> +               buf_hdr = sched_local.ordered.stash[i].buf_hdr;
> +               num = sched_local.ordered.stash[i].num;
> +
> +               queue_enq_multi(queue, buf_hdr, num);
> +       }
> +       sched_local.ordered.stash_num = 0;
> +}
> +
> +static inline void release_ordered(void)
> +{
> +       queue_entry_t *queue;
> +
> +       queue = sched_local.ordered.src_queue;
> +
> +       wait_for_order(queue);
> +
> +       sched_local.ordered.src_queue = NULL;
> +       sched_local.ordered.in_order = 0;
> +
> +       ordered_stash_release();
> +
> +       /* Next thread can continue processing */
> +       odp_atomic_add_rel_u64(&queue->s.ordered.ctx, 1);
> +}
> +
>  static void schedule_release_ordered(void)
>  {
> -       /* Process ordered queue as atomic */
> -       schedule_release_atomic();
> -       sched_local.queue_entry = NULL;
> +       queue_entry_t *queue;
> +
> +       queue = sched_local.ordered.src_queue;
> +
> +       if (odp_unlikely(!queue || sched_local.num))
> +               return;
> +
> +       release_ordered();
>  }
>
>  static inline void schedule_release_context(void)
>  {
> -       if (sched_local.queue_entry != NULL)
> -               schedule_release_ordered();
> +       if (sched_local.ordered.src_queue != NULL)
> +               release_ordered();
>         else
>                 schedule_release_atomic();
>  }
> @@ -524,13 +606,41 @@ static inline int copy_events(odp_event_t out_ev[], 
> unsigned int max)
>  static int schedule_ord_enq_multi(uint32_t queue_index, void *buf_hdr[],
>                                   int num, int *ret)
>  {
> -       (void)queue_index;
> -       (void)buf_hdr;
> -       (void)num;
> -       (void)ret;
> +       int i;
> +       uint32_t stash_num = sched_local.ordered.stash_num;
> +       queue_entry_t *dst_queue = get_qentry(queue_index);
> +       queue_entry_t *src_queue = sched_local.ordered.src_queue;
>
> -       /* didn't consume the events */
> -       return 0;
> +       if (!sched_local.ordered.src_queue || sched_local.ordered.in_order)
> +               return 0;
> +
> +       if (ordered_own_turn(src_queue)) {
> +               /* Own turn, so can do enqueue directly. */
> +               sched_local.ordered.in_order = 1;
> +               ordered_stash_release();
> +               return 0;
> +       }
> +
> +       if (odp_unlikely(stash_num >=  MAX_ORDERED_STASH)) {
> +               /* If the local stash is full, wait until it is our turn and
> +                * then release the stash and do enqueue directly. */
> +               wait_for_order(src_queue);
> +
> +               sched_local.ordered.in_order = 1;
> +
> +               ordered_stash_release();
> +               return 0;
> +       }
> +
> +       sched_local.ordered.stash[stash_num].queue = dst_queue;
> +       sched_local.ordered.stash[stash_num].num = num;
> +       for (i = 0; i < num; i++)
> +               sched_local.ordered.stash[stash_num].buf_hdr[i] = buf_hdr[i];
> +
> +       sched_local.ordered.stash_num++;
> +
> +       *ret = num;
> +       return 1;
>  }
>

This code still has the following in do_schedule():

ordered = sched_cb_queue_is_ordered(qi);

/* Do not cache ordered events locally to improve
* parallelism. Ordered context can only be released
* when the local cache is empty. */
if (ordered && max_num < MAX_DEQ)
    max_deq = max_num;

To do what the comment says this should be changed to:

if (ordered)
   max_deq = 1;

Because in the case of ordered queues, you want to schedule
consecutive events to separate threads to allow them to be processed
in parallel. Allowing multiple events to be processed by a single
thread introduces head-of-line blocking.

Of course, if you make this change I suspect some of the performance
gains measured in the simple test cases we have with this
implementation will go away since I suspect a good portion of those
gains is due to effectively turning ordered queues back into atomic
queues, which is what this sort of event batching with limited numbers
of events does.



>  /*
> @@ -658,9 +768,21 @@ static int do_schedule(odp_queue_t *out_queue, 
> odp_event_t out_ev[],
>                         ret = copy_events(out_ev, max_num);
>
>                         if (ordered) {
> -                               /* Operate as atomic */
> -                               sched_local.queue_index = qi;
> -                               sched_local.queue_entry = get_qentry(qi);
> +                               uint64_t ctx;
> +                               queue_entry_t *queue;
> +                               odp_atomic_u64_t *next_ctx;
> +
> +                               queue = get_qentry(qi);
> +                               next_ctx = &queue->s.ordered.next_ctx;
> +
> +                               ctx = odp_atomic_fetch_inc_u64(next_ctx);
> +
> +                               sched_local.ordered.ctx = ctx;
> +                               sched_local.ordered.src_queue = queue;
> +
> +                               /* Continue scheduling ordered queues */
> +                               ring_enq(ring, PRIO_QUEUE_MASK, qi);
> +
>                         } else if (sched_cb_queue_is_atomic(qi)) {
>                                 /* Hold queue during atomic access */
>                                 sched_local.queue_index = qi;
> @@ -785,8 +907,16 @@ static int schedule_multi(odp_queue_t *out_queue, 
> uint64_t wait,
>         return schedule_loop(out_queue, wait, events, num);
>  }
>
> -static void order_lock(void)
> +static inline void order_lock(void)
>  {
> +       queue_entry_t *queue;
> +
> +       queue = sched_local.ordered.src_queue;
> +
> +       if (!queue)
> +               return;
> +
> +       wait_for_order(queue);
>  }
>
>  static void order_unlock(void)
> @@ -795,6 +925,13 @@ static void order_unlock(void)
>
>  static void schedule_order_lock(unsigned lock_index ODP_UNUSED)
>  {
> +       queue_entry_t *queue;
> +
> +       queue = sched_local.ordered.src_queue;
> +
> +       ODP_ASSERT(queue && lock_index <= queue->s.param.sched.lock_count);
> +
> +       wait_for_order(queue);
>  }
>
>  static void schedule_order_unlock(unsigned lock_index ODP_UNUSED)
> --
> 2.7.4
>

Reply via email to