You're right. In my mind order was linked to the buffer,not the thread meaning 
A would have enq b1 too.

On 11/09/2015 11:05 AM, Bill Fischofer wrote:
> It is necessary, and illustrates one of the complexities of ordered queuing.  
> Consider the following sequence with QOrigin having current order 4:
>
> Thread A (Order 4)                Thread B (Order 5)
>                                                  enq (Q, b1)
>
>    enq(Q, a1)
>    release_order()
>                                                   enq(Q b2)
>
> Thread B enqueues first but since the QOrigin is at order 4 it's element goes 
> onto QOrigin's reorder queue.  Thread A now enqueues and resolves order 4.  
> Thread B now tries to enqueue another element onto Q and this time it has the 
> current order, however element b2 needs to follow element b1, which is on the 
> reorder queue.  The reorder_enq() routine sorts elements into the reorder 
> queue so that b2 correctly follows any other elements sharing the same order 
> before they are all moved to the target queue.
>
> On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin <nmo...@kalray.eu 
> <mailto:nmo...@kalray.eu>> wrote:
>
>
>
>     On 11/08/2015 09:42 PM, Bill Fischofer wrote:
>     > Add the new ordered_queue_enq() internal routine. This is done in two
>     > parts to make the diffs easier to follow. Part 1 adds the new routine
>     > while Part 2 replaces queue_enq() to use it.
>     >
>     > Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org 
> <mailto:bill.fischo...@linaro.org>>
>     > ---
>     >  .../linux-generic/include/odp_queue_internal.h     |   2 +
>     >  platform/linux-generic/odp_queue.c                 | 118 
> +++++++++++++++++++++
>     >  2 files changed, 120 insertions(+)
>     >
>     > diff --git a/platform/linux-generic/include/odp_queue_internal.h 
> b/platform/linux-generic/include/odp_queue_internal.h
>     > index 32e3288..1bd365b 100644
>     > --- a/platform/linux-generic/include/odp_queue_internal.h
>     > +++ b/platform/linux-generic/include/odp_queue_internal.h
>     > @@ -96,6 +96,8 @@ union queue_entry_u {
>     >  queue_entry_t *get_qentry(uint32_t queue_id);
>     >
>     >  int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int 
> sustain);
>     > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
>     > +                   int systain, queue_entry_t *origin_qe, uint64_t 
> order);
>     >  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
>     >
>     >  int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
>     > diff --git a/platform/linux-generic/odp_queue.c 
> b/platform/linux-generic/odp_queue.c
>     > index bcc8190..a545927 100644
>     > --- a/platform/linux-generic/odp_queue.c
>     > +++ b/platform/linux-generic/odp_queue.c
>     > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, 
> odp_buffer_hdr_t *buf_hdr, int sustain)
>     >       return 0;
>     >  }
>     >
>     > +int ordered_queue_enq(queue_entry_t *queue,
>     > +                   odp_buffer_hdr_t *buf_hdr,
>     > +                   int sustain,
>     > +                   queue_entry_t *origin_qe,
>     > +                   uint64_t order)
>     > +{
>     > +     odp_buffer_hdr_t *reorder_buf;
>     > +     odp_buffer_hdr_t *next_buf;
>     > +     odp_buffer_hdr_t *reorder_prev;
>     > +     odp_buffer_hdr_t *placeholder_buf = NULL;
>     > +     int               release_count, placeholder_count;
>     > +     int               sched = 0;
>     > +
>     > +     /* Need two locks for enq operations from ordered queues */
>     > +     get_qe_locks(origin_qe, queue);
>     > +
>     > +     if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
>     > +                      queue->s.status < QUEUE_STATUS_READY)) {
>     > +             free_qe_locks(queue, origin_qe);
>     > +             ODP_ERR("Bad queue status\n");
>     > +             ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
>     > +                     queue->s.name <http://s.name>, origin_qe->s.name 
> <http://s.name>, buf_hdr);
>     > +             return -1;
>     > +     }
>     > +
>     > +     /* Remember that enq was called for this order */
>     > +     sched_enq_called();
>     > +
>     > +     /* We can only complete this enq if we're in order */
>     > +     if (order > origin_qe->s.order_out) {
>     > +             reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
>     > +
>     > +             /* This enq can't complete until order is restored, so
>     > +              * we're done here.
>     > +              */
>     > +             free_qe_locks(queue, origin_qe);
>     > +             return 0;
>     > +     }
>     > +
>     > +     /* Resolve order if requested */
>     > +     if (!sustain) {
>     > +             order_release(origin_qe, 1);
>     > +             sched_order_resolved(buf_hdr);
>     > +     }
>     > +
>     > +     /* Update queue status */
>     > +     if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
>     > +             queue->s.status = QUEUE_STATUS_SCHED;
>     > +             sched = 1;
>     > +     }
>     > +
>     > +     /* We're in order, however the reorder queue may have other 
> buffers
>     > +      * sharing this order on it and this buffer must not be enqueued 
> ahead
>     > +      * of them. If the reorder queue is empty we can short-cut and
>     > +      * simply add to the target queue directly.
>     > +      */
>     > +
>     > +     if (!origin_qe->s.reorder_head) {
>     > +             queue_add_chain(queue, buf_hdr);
>     > +             free_qe_locks(queue, origin_qe);
>     > +
>     > +             /* Add queue to scheduling */
>     > +             if (sched && schedule_queue(queue))
>     > +                     ODP_ABORT("schedule_queue failed\n");
>     > +             return 0;
>     > +     }
>     > +
>     > +     /* The reorder_queue is non-empty, so sort this buffer into it.  
> Note
>     > +      * that we force the sustain bit on here because we'll be removing
>     > +      * this immediately and we already accounted for this order 
> earlier.
>     > +      */
>     > +     reorder_enq(queue, order, origin_qe, buf_hdr, 1);
>     > +
>     Do we really need this call?
>     When we reach this point in the code, buf_hdr is next inline to be 
> queued, and what we want is pull out of the reorder queue the buffer that 
> might be right after this one.
>     In this case we push buf_hdr to the reorder queue just to pull it back 
> afterward. It works but I'm not sure this is completely necessary.
>     > +     /* Pick up this element, and all others resolved by this enq,
>     > +      * and add them to the target queue.
>     > +      */
>     > +     reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
>     > +                 &placeholder_buf, &release_count, &placeholder_count);
>     > +
>     > +     /* Move the list from the reorder queue to the target queue */
>     > +     if (queue->s.head)
>     > +             queue->s.tail->next = origin_qe->s.reorder_head;
>     > +     else
>     > +             queue->s.head       = origin_qe->s.reorder_head;
>     > +     queue->s.tail               = reorder_prev;
>     > +     origin_qe->s.reorder_head   = reorder_prev->next;
>     > +     reorder_prev->next          = NULL;
>     > +
>     > +     /* Reflect resolved orders in the output sequence */
>     > +     order_release(origin_qe, release_count + placeholder_count);
>     > +
>     > +     /* Now handle any resolved orders for events destined for other
>     > +      * queues, appending placeholder bufs as needed.
>     > +      */
>     > +     if (origin_qe != queue)
>     > +             UNLOCK(&queue->s.lock);
>     > +
>     > +     /* Add queue to scheduling */
>     > +     if (sched && schedule_queue(queue))
>     > +             ODP_ABORT("schedule_queue failed\n");
>     > +
>     > +     reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
>     > +                      1, 0);
>     > +     UNLOCK(&origin_qe->s.lock);
>     > +
>     > +     if (reorder_buf)
>     > +             queue_enq_internal(reorder_buf);
>     > +
>     > +     /* Free all placeholder bufs that are now released */
>     > +     while (placeholder_buf) {
>     > +             next_buf = placeholder_buf->next;
>     > +             odp_buffer_free(placeholder_buf->handle.handle);
>     > +             placeholder_buf = next_buf;
>     > +     }
>     > +
>     > +     return 0;
>     > +}
>     > +
>     >  int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
>     >                   int num, int sustain)
>     >  {
>
>

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to