There's a surprising amount of subtlety involved in ordered queues, which is one of the reasons they're such a powerful primitive to have.
On Mon, Nov 9, 2015 at 4:10 AM, Nicolas Morey-Chaisemartin <nmo...@kalray.eu > wrote: > You're right. In my mind order was linked to the buffer,not the thread > meaning A would have enq b1 too. > > > On 11/09/2015 11:05 AM, Bill Fischofer wrote: > > It is necessary, and illustrates one of the complexities of ordered > queuing. Consider the following sequence with QOrigin having current order > 4: > > Thread A (Order 4) Thread B (Order 5) > enq (Q, b1) > > enq(Q, a1) > release_order() > enq(Q b2) > > Thread B enqueues first but since the QOrigin is at order 4 it's element > goes onto QOrigin's reorder queue. Thread A now enqueues and resolves > order 4. Thread B now tries to enqueue another element onto Q and this > time it has the current order, however element b2 needs to follow element > b1, which is on the reorder queue. The reorder_enq() routine sorts > elements into the reorder queue so that b2 correctly follows any other > elements sharing the same order before they are all moved to the target > queue. > > On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin < > <nmo...@kalray.eu>nmo...@kalray.eu> wrote: > >> >> >> On 11/08/2015 09:42 PM, Bill Fischofer wrote: >> > Add the new ordered_queue_enq() internal routine. This is done in two >> > parts to make the diffs easier to follow. Part 1 adds the new routine >> > while Part 2 replaces queue_enq() to use it. >> > >> > Signed-off-by: Bill Fischofer < <bill.fischo...@linaro.org> >> bill.fischo...@linaro.org> >> > --- >> > .../linux-generic/include/odp_queue_internal.h | 2 + >> > platform/linux-generic/odp_queue.c | 118 >> +++++++++++++++++++++ >> > 2 files changed, 120 insertions(+) >> > >> > diff --git a/platform/linux-generic/include/odp_queue_internal.h >> b/platform/linux-generic/include/odp_queue_internal.h >> > index 32e3288..1bd365b 100644 >> > --- a/platform/linux-generic/include/odp_queue_internal.h >> > +++ b/platform/linux-generic/include/odp_queue_internal.h >> > @@ -96,6 +96,8 @@ union queue_entry_u { >> > queue_entry_t *get_qentry(uint32_t queue_id); >> > >> > int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int >> sustain); >> > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, >> > + int systain, queue_entry_t *origin_qe, uint64_t >> order); >> > odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); >> > >> > int queue_enq_internal(odp_buffer_hdr_t *buf_hdr); >> > diff --git a/platform/linux-generic/odp_queue.c >> b/platform/linux-generic/odp_queue.c >> > index bcc8190..a545927 100644 >> > --- a/platform/linux-generic/odp_queue.c >> > +++ b/platform/linux-generic/odp_queue.c >> > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, >> odp_buffer_hdr_t *buf_hdr, int sustain) >> > return 0; >> > } >> > >> > +int ordered_queue_enq(queue_entry_t *queue, >> > + odp_buffer_hdr_t *buf_hdr, >> > + int sustain, >> > + queue_entry_t *origin_qe, >> > + uint64_t order) >> > +{ >> > + odp_buffer_hdr_t *reorder_buf; >> > + odp_buffer_hdr_t *next_buf; >> > + odp_buffer_hdr_t *reorder_prev; >> > + odp_buffer_hdr_t *placeholder_buf = NULL; >> > + int release_count, placeholder_count; >> > + int sched = 0; >> > + >> > + /* Need two locks for enq operations from ordered queues */ >> > + get_qe_locks(origin_qe, queue); >> > + >> > + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY || >> > + queue->s.status < QUEUE_STATUS_READY)) { >> > + free_qe_locks(queue, origin_qe); >> > + ODP_ERR("Bad queue status\n"); >> > + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", >> > + queue->s.name, origin_qe->s.name, buf_hdr); >> > + return -1; >> > + } >> > + >> > + /* Remember that enq was called for this order */ >> > + sched_enq_called(); >> > + >> > + /* We can only complete this enq if we're in order */ >> > + if (order > origin_qe->s.order_out) { >> > + reorder_enq(queue, order, origin_qe, buf_hdr, sustain); >> > + >> > + /* This enq can't complete until order is restored, so >> > + * we're done here. >> > + */ >> > + free_qe_locks(queue, origin_qe); >> > + return 0; >> > + } >> > + >> > + /* Resolve order if requested */ >> > + if (!sustain) { >> > + order_release(origin_qe, 1); >> > + sched_order_resolved(buf_hdr); >> > + } >> > + >> > + /* Update queue status */ >> > + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { >> > + queue->s.status = QUEUE_STATUS_SCHED; >> > + sched = 1; >> > + } >> > + >> > + /* We're in order, however the reorder queue may have other >> buffers >> > + * sharing this order on it and this buffer must not be enqueued >> ahead >> > + * of them. If the reorder queue is empty we can short-cut and >> > + * simply add to the target queue directly. >> > + */ >> > + >> > + if (!origin_qe->s.reorder_head) { >> > + queue_add_chain(queue, buf_hdr); >> > + free_qe_locks(queue, origin_qe); >> > + >> > + /* Add queue to scheduling */ >> > + if (sched && schedule_queue(queue)) >> > + ODP_ABORT("schedule_queue failed\n"); >> > + return 0; >> > + } >> > + >> > + /* The reorder_queue is non-empty, so sort this buffer into it. >> Note >> > + * that we force the sustain bit on here because we'll be removing >> > + * this immediately and we already accounted for this order >> earlier. >> > + */ >> > + reorder_enq(queue, order, origin_qe, buf_hdr, 1); >> > + >> Do we really need this call? >> When we reach this point in the code, buf_hdr is next inline to be >> queued, and what we want is pull out of the reorder queue the buffer that >> might be right after this one. >> In this case we push buf_hdr to the reorder queue just to pull it back >> afterward. It works but I'm not sure this is completely necessary. >> > + /* Pick up this element, and all others resolved by this enq, >> > + * and add them to the target queue. >> > + */ >> > + reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev, >> > + &placeholder_buf, &release_count, &placeholder_count); >> > + >> > + /* Move the list from the reorder queue to the target queue */ >> > + if (queue->s.head) >> > + queue->s.tail->next = origin_qe->s.reorder_head; >> > + else >> > + queue->s.head = origin_qe->s.reorder_head; >> > + queue->s.tail = reorder_prev; >> > + origin_qe->s.reorder_head = reorder_prev->next; >> > + reorder_prev->next = NULL; >> > + >> > + /* Reflect resolved orders in the output sequence */ >> > + order_release(origin_qe, release_count + placeholder_count); >> > + >> > + /* Now handle any resolved orders for events destined for other >> > + * queues, appending placeholder bufs as needed. >> > + */ >> > + if (origin_qe != queue) >> > + UNLOCK(&queue->s.lock); >> > + >> > + /* Add queue to scheduling */ >> > + if (sched && schedule_queue(queue)) >> > + ODP_ABORT("schedule_queue failed\n"); >> > + >> > + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, >> > + 1, 0); >> > + UNLOCK(&origin_qe->s.lock); >> > + >> > + if (reorder_buf) >> > + queue_enq_internal(reorder_buf); >> > + >> > + /* Free all placeholder bufs that are now released */ >> > + while (placeholder_buf) { >> > + next_buf = placeholder_buf->next; >> > + odp_buffer_free(placeholder_buf->handle.handle); >> > + placeholder_buf = next_buf; >> > + } >> > + >> > + return 0; >> > +} >> > + >> > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], >> > int num, int sustain) >> > { >> >> > >
_______________________________________________ lng-odp mailing list lng-odp@lists.linaro.org https://lists.linaro.org/mailman/listinfo/lng-odp