You're right. In my mind order was linked to the buffer,not the thread meaning A would have enq b1 too.
On 11/09/2015 11:05 AM, Bill Fischofer wrote: > It is necessary, and illustrates one of the complexities of ordered queuing. > Consider the following sequence with QOrigin having current order 4: > > Thread A (Order 4) Thread B (Order 5) > enq (Q, b1) > > enq(Q, a1) > release_order() > enq(Q b2) > > Thread B enqueues first but since the QOrigin is at order 4 it's element goes > onto QOrigin's reorder queue. Thread A now enqueues and resolves order 4. > Thread B now tries to enqueue another element onto Q and this time it has the > current order, however element b2 needs to follow element b1, which is on the > reorder queue. The reorder_enq() routine sorts elements into the reorder > queue so that b2 correctly follows any other elements sharing the same order > before they are all moved to the target queue. > > On Mon, Nov 9, 2015 at 3:54 AM, Nicolas Morey-Chaisemartin <nmo...@kalray.eu > <mailto:nmo...@kalray.eu>> wrote: > > > > On 11/08/2015 09:42 PM, Bill Fischofer wrote: > > Add the new ordered_queue_enq() internal routine. This is done in two > > parts to make the diffs easier to follow. Part 1 adds the new routine > > while Part 2 replaces queue_enq() to use it. > > > > Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org > <mailto:bill.fischo...@linaro.org>> > > --- > > .../linux-generic/include/odp_queue_internal.h | 2 + > > platform/linux-generic/odp_queue.c | 118 > +++++++++++++++++++++ > > 2 files changed, 120 insertions(+) > > > > diff --git a/platform/linux-generic/include/odp_queue_internal.h > b/platform/linux-generic/include/odp_queue_internal.h > > index 32e3288..1bd365b 100644 > > --- a/platform/linux-generic/include/odp_queue_internal.h > > +++ b/platform/linux-generic/include/odp_queue_internal.h > > @@ -96,6 +96,8 @@ union queue_entry_u { > > queue_entry_t *get_qentry(uint32_t queue_id); > > > > int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int > sustain); > > +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, > > + int systain, queue_entry_t *origin_qe, uint64_t > order); > > odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); > > > > int queue_enq_internal(odp_buffer_hdr_t *buf_hdr); > > diff --git a/platform/linux-generic/odp_queue.c > b/platform/linux-generic/odp_queue.c > > index bcc8190..a545927 100644 > > --- a/platform/linux-generic/odp_queue.c > > +++ b/platform/linux-generic/odp_queue.c > > @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, > odp_buffer_hdr_t *buf_hdr, int sustain) > > return 0; > > } > > > > +int ordered_queue_enq(queue_entry_t *queue, > > + odp_buffer_hdr_t *buf_hdr, > > + int sustain, > > + queue_entry_t *origin_qe, > > + uint64_t order) > > +{ > > + odp_buffer_hdr_t *reorder_buf; > > + odp_buffer_hdr_t *next_buf; > > + odp_buffer_hdr_t *reorder_prev; > > + odp_buffer_hdr_t *placeholder_buf = NULL; > > + int release_count, placeholder_count; > > + int sched = 0; > > + > > + /* Need two locks for enq operations from ordered queues */ > > + get_qe_locks(origin_qe, queue); > > + > > + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY || > > + queue->s.status < QUEUE_STATUS_READY)) { > > + free_qe_locks(queue, origin_qe); > > + ODP_ERR("Bad queue status\n"); > > + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", > > + queue->s.name <http://s.name>, origin_qe->s.name > <http://s.name>, buf_hdr); > > + return -1; > > + } > > + > > + /* Remember that enq was called for this order */ > > + sched_enq_called(); > > + > > + /* We can only complete this enq if we're in order */ > > + if (order > origin_qe->s.order_out) { > > + reorder_enq(queue, order, origin_qe, buf_hdr, sustain); > > + > > + /* This enq can't complete until order is restored, so > > + * we're done here. > > + */ > > + free_qe_locks(queue, origin_qe); > > + return 0; > > + } > > + > > + /* Resolve order if requested */ > > + if (!sustain) { > > + order_release(origin_qe, 1); > > + sched_order_resolved(buf_hdr); > > + } > > + > > + /* Update queue status */ > > + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { > > + queue->s.status = QUEUE_STATUS_SCHED; > > + sched = 1; > > + } > > + > > + /* We're in order, however the reorder queue may have other > buffers > > + * sharing this order on it and this buffer must not be enqueued > ahead > > + * of them. If the reorder queue is empty we can short-cut and > > + * simply add to the target queue directly. > > + */ > > + > > + if (!origin_qe->s.reorder_head) { > > + queue_add_chain(queue, buf_hdr); > > + free_qe_locks(queue, origin_qe); > > + > > + /* Add queue to scheduling */ > > + if (sched && schedule_queue(queue)) > > + ODP_ABORT("schedule_queue failed\n"); > > + return 0; > > + } > > + > > + /* The reorder_queue is non-empty, so sort this buffer into it. > Note > > + * that we force the sustain bit on here because we'll be removing > > + * this immediately and we already accounted for this order > earlier. > > + */ > > + reorder_enq(queue, order, origin_qe, buf_hdr, 1); > > + > Do we really need this call? > When we reach this point in the code, buf_hdr is next inline to be > queued, and what we want is pull out of the reorder queue the buffer that > might be right after this one. > In this case we push buf_hdr to the reorder queue just to pull it back > afterward. It works but I'm not sure this is completely necessary. > > + /* Pick up this element, and all others resolved by this enq, > > + * and add them to the target queue. > > + */ > > + reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev, > > + &placeholder_buf, &release_count, &placeholder_count); > > + > > + /* Move the list from the reorder queue to the target queue */ > > + if (queue->s.head) > > + queue->s.tail->next = origin_qe->s.reorder_head; > > + else > > + queue->s.head = origin_qe->s.reorder_head; > > + queue->s.tail = reorder_prev; > > + origin_qe->s.reorder_head = reorder_prev->next; > > + reorder_prev->next = NULL; > > + > > + /* Reflect resolved orders in the output sequence */ > > + order_release(origin_qe, release_count + placeholder_count); > > + > > + /* Now handle any resolved orders for events destined for other > > + * queues, appending placeholder bufs as needed. > > + */ > > + if (origin_qe != queue) > > + UNLOCK(&queue->s.lock); > > + > > + /* Add queue to scheduling */ > > + if (sched && schedule_queue(queue)) > > + ODP_ABORT("schedule_queue failed\n"); > > + > > + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, > > + 1, 0); > > + UNLOCK(&origin_qe->s.lock); > > + > > + if (reorder_buf) > > + queue_enq_internal(reorder_buf); > > + > > + /* Free all placeholder bufs that are now released */ > > + while (placeholder_buf) { > > + next_buf = placeholder_buf->next; > > + odp_buffer_free(placeholder_buf->handle.handle); > > + placeholder_buf = next_buf; > > + } > > + > > + return 0; > > +} > > + > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], > > int num, int sustain) > > { > >
_______________________________________________ lng-odp mailing list lng-odp@lists.linaro.org https://lists.linaro.org/mailman/listinfo/lng-odp