Add the new ordered_queue_enq() internal routine. This is done in two parts to make the diffs easier to follow. Part 1 adds the new routine while Part 2 replaces queue_enq() to use it.
Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org> --- .../linux-generic/include/odp_queue_internal.h | 2 + platform/linux-generic/odp_queue.c | 118 +++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 32e3288..1bd365b 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -96,6 +96,8 @@ union queue_entry_u { queue_entry_t *get_qentry(uint32_t queue_id); int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain); +int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, + int systain, queue_entry_t *origin_qe, uint64_t order); odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); int queue_enq_internal(odp_buffer_hdr_t *buf_hdr); diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index bcc8190..a545927 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain) return 0; } +int ordered_queue_enq(queue_entry_t *queue, + odp_buffer_hdr_t *buf_hdr, + int sustain, + queue_entry_t *origin_qe, + uint64_t order) +{ + odp_buffer_hdr_t *reorder_buf; + odp_buffer_hdr_t *next_buf; + odp_buffer_hdr_t *reorder_prev; + odp_buffer_hdr_t *placeholder_buf = NULL; + int release_count, placeholder_count; + int sched = 0; + + /* Need two locks for enq operations from ordered queues */ + get_qe_locks(origin_qe, queue); + + if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY || + queue->s.status < QUEUE_STATUS_READY)) { + free_qe_locks(queue, origin_qe); + ODP_ERR("Bad queue status\n"); + ODP_ERR("queue = %s, origin q = %s, buf = %p\n", + queue->s.name, origin_qe->s.name, buf_hdr); + return -1; + } + + /* Remember that enq was called for this order */ + sched_enq_called(); + + /* We can only complete this enq if we're in order */ + if (order > origin_qe->s.order_out) { + reorder_enq(queue, order, origin_qe, buf_hdr, sustain); + + /* This enq can't complete until order is restored, so + * we're done here. + */ + free_qe_locks(queue, origin_qe); + return 0; + } + + /* Resolve order if requested */ + if (!sustain) { + order_release(origin_qe, 1); + sched_order_resolved(buf_hdr); + } + + /* Update queue status */ + if (queue->s.status == QUEUE_STATUS_NOTSCHED) { + queue->s.status = QUEUE_STATUS_SCHED; + sched = 1; + } + + /* We're in order, however the reorder queue may have other buffers + * sharing this order on it and this buffer must not be enqueued ahead + * of them. If the reorder queue is empty we can short-cut and + * simply add to the target queue directly. + */ + + if (!origin_qe->s.reorder_head) { + queue_add_chain(queue, buf_hdr); + free_qe_locks(queue, origin_qe); + + /* Add queue to scheduling */ + if (sched && schedule_queue(queue)) + ODP_ABORT("schedule_queue failed\n"); + return 0; + } + + /* The reorder_queue is non-empty, so sort this buffer into it. Note + * that we force the sustain bit on here because we'll be removing + * this immediately and we already accounted for this order earlier. + */ + reorder_enq(queue, order, origin_qe, buf_hdr, 1); + + /* Pick up this element, and all others resolved by this enq, + * and add them to the target queue. + */ + reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev, + &placeholder_buf, &release_count, &placeholder_count); + + /* Move the list from the reorder queue to the target queue */ + if (queue->s.head) + queue->s.tail->next = origin_qe->s.reorder_head; + else + queue->s.head = origin_qe->s.reorder_head; + queue->s.tail = reorder_prev; + origin_qe->s.reorder_head = reorder_prev->next; + reorder_prev->next = NULL; + + /* Reflect resolved orders in the output sequence */ + order_release(origin_qe, release_count + placeholder_count); + + /* Now handle any resolved orders for events destined for other + * queues, appending placeholder bufs as needed. + */ + if (origin_qe != queue) + UNLOCK(&queue->s.lock); + + /* Add queue to scheduling */ + if (sched && schedule_queue(queue)) + ODP_ABORT("schedule_queue failed\n"); + + reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, + 1, 0); + UNLOCK(&origin_qe->s.lock); + + if (reorder_buf) + queue_enq_internal(reorder_buf); + + /* Free all placeholder bufs that are now released */ + while (placeholder_buf) { + next_buf = placeholder_buf->next; + odp_buffer_free(placeholder_buf->handle.handle); + placeholder_buf = next_buf; + } + + return 0; +} + int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num, int sustain) { -- 2.1.4 _______________________________________________ lng-odp mailing list lng-odp@lists.linaro.org https://lists.linaro.org/mailman/listinfo/lng-odp