There's a surprising amount of subtlety involved in ordered queues, which
is one of the reasons they're such a powerful primitive to have.

On Mon, Nov 9, 2015 at 4:10 AM, Nicolas Morey-Chaisemartin <nmo...@kalray.eu
> wrote:

> You're right. In my mind order was linked to the buffer,not the thread
> meaning A would have enq b1 too.
>
>
> On 11/09/2015 11:05 AM, Bill Fischofer wrote:
>
> It is necessary, and illustrates one of the complexities of ordered
> queuing.  Consider the following sequence with QOrigin having current order
> 4:
>
> Thread A (Order 4)                Thread B (Order 5)
>                                                  enq (Q, b1)
>
>    enq(Q, a1)
>    release_order()
>                                                   enq(Q b2)
>
> Thread B enqueues first but since the QOrigin is at order 4 it's element
> goes onto QOrigin's reorder queue.  Thread A now enqueues and resolves
> order 4.  Thread B now tries to enqueue another element onto Q and this
> time it has the current order, however element b2 needs to follow element
> b1, which is on the reorder queue.  The reorder_enq() routine sorts
> elements into the reorder queue so that b2 correctly follows any other
> elements sharing the same order before they are all moved to the target
> queue.
>
> On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin <
> <nmo...@kalray.eu>nmo...@kalray.eu> wrote:
>
>>
>>
>> On 11/08/2015 09:42 PM, Bill Fischofer wrote:
>> > Add the new ordered_queue_enq() internal routine. This is done in two
>> > parts to make the diffs easier to follow. Part 1 adds the new routine
>> > while Part 2 replaces queue_enq() to use it.
>> >
>> > Signed-off-by: Bill Fischofer < <bill.fischo...@linaro.org>
>> bill.fischo...@linaro.org>
>> > ---
>> >  .../linux-generic/include/odp_queue_internal.h     |   2 +
>> >  platform/linux-generic/odp_queue.c                 | 118
>> +++++++++++++++++++++
>> >  2 files changed, 120 insertions(+)
>> >
>> > diff --git a/platform/linux-generic/include/odp_queue_internal.h
>> b/platform/linux-generic/include/odp_queue_internal.h
>> > index 32e3288..1bd365b 100644
>> > --- a/platform/linux-generic/include/odp_queue_internal.h
>> > +++ b/platform/linux-generic/include/odp_queue_internal.h
>> > @@ -96,6 +96,8 @@ union queue_entry_u {
>> >  queue_entry_t *get_qentry(uint32_t queue_id);
>> >
>> >  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int
>> sustain);
>> > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
>> > +                   int systain, queue_entry_t *origin_qe, uint64_t
>> order);
>> >  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
>> >
>> >  int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
>> > diff --git a/platform/linux-generic/odp_queue.c
>> b/platform/linux-generic/odp_queue.c
>> > index bcc8190..a545927 100644
>> > --- a/platform/linux-generic/odp_queue.c
>> > +++ b/platform/linux-generic/odp_queue.c
>> > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue,
>> odp_buffer_hdr_t *buf_hdr, int sustain)
>> >       return 0;
>> >  }
>> >
>> > +int ordered_queue_enq(queue_entry_t *queue,
>> > +                   odp_buffer_hdr_t *buf_hdr,
>> > +                   int sustain,
>> > +                   queue_entry_t *origin_qe,
>> > +                   uint64_t order)
>> > +{
>> > +     odp_buffer_hdr_t *reorder_buf;
>> > +     odp_buffer_hdr_t *next_buf;
>> > +     odp_buffer_hdr_t *reorder_prev;
>> > +     odp_buffer_hdr_t *placeholder_buf = NULL;
>> > +     int               release_count, placeholder_count;
>> > +     int               sched = 0;
>> > +
>> > +     /* Need two locks for enq operations from ordered queues */
>> > +     get_qe_locks(origin_qe, queue);
>> > +
>> > +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
>> > +                      queue->s.status < QUEUE_STATUS_READY)) {
>> > +             free_qe_locks(queue, origin_qe);
>> > +             ODP_ERR("Bad queue status\n");
>> > +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
>> > +                     queue->s.name, origin_qe->s.name, buf_hdr);
>> > +             return -1;
>> > +     }
>> > +
>> > +     /* Remember that enq was called for this order */
>> > +     sched_enq_called();
>> > +
>> > +     /* We can only complete this enq if we're in order */
>> > +     if (order > origin_qe->s.order_out) {
>> > +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
>> > +
>> > +             /* This enq can't complete until order is restored, so
>> > +              * we're done here.
>> > +              */
>> > +             free_qe_locks(queue, origin_qe);
>> > +             return 0;
>> > +     }
>> > +
>> > +     /* Resolve order if requested */
>> > +     if (!sustain) {
>> > +             order_release(origin_qe, 1);
>> > +             sched_order_resolved(buf_hdr);
>> > +     }
>> > +
>> > +     /* Update queue status */
>> > +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>> > +             queue->s.status = QUEUE_STATUS_SCHED;
>> > +             sched = 1;
>> > +     }
>> > +
>> > +     /* We're in order, however the reorder queue may have other
>> buffers
>> > +      * sharing this order on it and this buffer must not be enqueued
>> ahead
>> > +      * of them. If the reorder queue is empty we can short-cut and
>> > +      * simply add to the target queue directly.
>> > +      */
>> > +
>> > +     if (!origin_qe->s.reorder_head) {
>> > +             queue_add_chain(queue, buf_hdr);
>> > +             free_qe_locks(queue, origin_qe);
>> > +
>> > +             /* Add queue to scheduling */
>> > +             if (sched && schedule_queue(queue))
>> > +                     ODP_ABORT("schedule_queue failed\n");
>> > +             return 0;
>> > +     }
>> > +
>> > +     /* The reorder_queue is non-empty, so sort this buffer into it.
>> Note
>> > +      * that we force the sustain bit on here because we'll be removing
>> > +      * this immediately and we already accounted for this order
>> earlier.
>> > +      */
>> > +     reorder_enq(queue, order, origin_qe, buf_hdr, 1);
>> > +
>> Do we really need this call?
>> When we reach this point in the code, buf_hdr is next inline to be
>> queued, and what we want is pull out of the reorder queue the buffer that
>> might be right after this one.
>> In this case we push buf_hdr to the reorder queue just to pull it back
>> afterward. It works but I'm not sure this is completely necessary.
>> > +     /* Pick up this element, and all others resolved by this enq,
>> > +      * and add them to the target queue.
>> > +      */
>> > +     reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
>> > +                 &placeholder_buf, &release_count, &placeholder_count);
>> > +
>> > +     /* Move the list from the reorder queue to the target queue */
>> > +     if (queue->s.head)
>> > +             queue->s.tail->next = origin_qe->s.reorder_head;
>> > +     else
>> > +             queue->s.head       = origin_qe->s.reorder_head;
>> > +     queue->s.tail               = reorder_prev;
>> > +     origin_qe->s.reorder_head   = reorder_prev->next;
>> > +     reorder_prev->next          = NULL;
>> > +
>> > +     /* Reflect resolved orders in the output sequence */
>> > +     order_release(origin_qe, release_count + placeholder_count);
>> > +
>> > +     /* Now handle any resolved orders for events destined for other
>> > +      * queues, appending placeholder bufs as needed.
>> > +      */
>> > +     if (origin_qe != queue)
>> > +             UNLOCK(&queue->s.lock);
>> > +
>> > +     /* Add queue to scheduling */
>> > +     if (sched && schedule_queue(queue))
>> > +             ODP_ABORT("schedule_queue failed\n");
>> > +
>> > +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
>> > +                      1, 0);
>> > +     UNLOCK(&origin_qe->s.lock);
>> > +
>> > +     if (reorder_buf)
>> > +             queue_enq_internal(reorder_buf);
>> > +
>> > +     /* Free all placeholder bufs that are now released */
>> > +     while (placeholder_buf) {
>> > +             next_buf = placeholder_buf->next;
>> > +             odp_buffer_free(placeholder_buf->handle.handle);
>> > +             placeholder_buf = next_buf;
>> > +     }
>> > +
>> > +     return 0;
>> > +}
>> > +
>> >  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
>> >                   int num, int sustain)
>> >  {
>>
>>
>
>
_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to