Add the new ordered_queue_enq() internal routine. This is done in two
parts to make the diffs easier to follow. Part 1 adds the new routine
while Part 2 replaces queue_enq() to use it.

Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |   2 +
 platform/linux-generic/odp_queue.c                 | 118 +++++++++++++++++++++
 2 files changed, 120 insertions(+)

diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 32e3288..1bd365b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -96,6 +96,8 @@ union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr, int sustain);
+int ordered_queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr,
+                     int systain, queue_entry_t *origin_qe, uint64_t order);
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
 
 int queue_enq_internal(odp_buffer_hdr_t *buf_hdr);
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index bcc8190..a545927 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -529,6 +529,124 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr, int sustain)
        return 0;
 }
 
+int ordered_queue_enq(queue_entry_t *queue,
+                     odp_buffer_hdr_t *buf_hdr,
+                     int sustain,
+                     queue_entry_t *origin_qe,
+                     uint64_t order)
+{
+       odp_buffer_hdr_t *reorder_buf;
+       odp_buffer_hdr_t *next_buf;
+       odp_buffer_hdr_t *reorder_prev;
+       odp_buffer_hdr_t *placeholder_buf = NULL;
+       int               release_count, placeholder_count;
+       int               sched = 0;
+
+       /* Need two locks for enq operations from ordered queues */
+       get_qe_locks(origin_qe, queue);
+
+       if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY ||
+                        queue->s.status < QUEUE_STATUS_READY)) {
+               free_qe_locks(queue, origin_qe);
+               ODP_ERR("Bad queue status\n");
+               ODP_ERR("queue = %s, origin q = %s, buf = %p\n",
+                       queue->s.name, origin_qe->s.name, buf_hdr);
+               return -1;
+       }
+
+       /* Remember that enq was called for this order */
+       sched_enq_called();
+
+       /* We can only complete this enq if we're in order */
+       if (order > origin_qe->s.order_out) {
+               reorder_enq(queue, order, origin_qe, buf_hdr, sustain);
+
+               /* This enq can't complete until order is restored, so
+                * we're done here.
+                */
+               free_qe_locks(queue, origin_qe);
+               return 0;
+       }
+
+       /* Resolve order if requested */
+       if (!sustain) {
+               order_release(origin_qe, 1);
+               sched_order_resolved(buf_hdr);
+       }
+
+       /* Update queue status */
+       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+               queue->s.status = QUEUE_STATUS_SCHED;
+               sched = 1;
+       }
+
+       /* We're in order, however the reorder queue may have other buffers
+        * sharing this order on it and this buffer must not be enqueued ahead
+        * of them. If the reorder queue is empty we can short-cut and
+        * simply add to the target queue directly.
+        */
+
+       if (!origin_qe->s.reorder_head) {
+               queue_add_chain(queue, buf_hdr);
+               free_qe_locks(queue, origin_qe);
+
+               /* Add queue to scheduling */
+               if (sched && schedule_queue(queue))
+                       ODP_ABORT("schedule_queue failed\n");
+               return 0;
+       }
+
+       /* The reorder_queue is non-empty, so sort this buffer into it.  Note
+        * that we force the sustain bit on here because we'll be removing
+        * this immediately and we already accounted for this order earlier.
+        */
+       reorder_enq(queue, order, origin_qe, buf_hdr, 1);
+
+       /* Pick up this element, and all others resolved by this enq,
+        * and add them to the target queue.
+        */
+       reorder_deq(queue, origin_qe, &reorder_buf, &reorder_prev,
+                   &placeholder_buf, &release_count, &placeholder_count);
+
+       /* Move the list from the reorder queue to the target queue */
+       if (queue->s.head)
+               queue->s.tail->next = origin_qe->s.reorder_head;
+       else
+               queue->s.head       = origin_qe->s.reorder_head;
+       queue->s.tail               = reorder_prev;
+       origin_qe->s.reorder_head   = reorder_prev->next;
+       reorder_prev->next          = NULL;
+
+       /* Reflect resolved orders in the output sequence */
+       order_release(origin_qe, release_count + placeholder_count);
+
+       /* Now handle any resolved orders for events destined for other
+        * queues, appending placeholder bufs as needed.
+        */
+       if (origin_qe != queue)
+               UNLOCK(&queue->s.lock);
+
+       /* Add queue to scheduling */
+       if (sched && schedule_queue(queue))
+               ODP_ABORT("schedule_queue failed\n");
+
+       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
+                        1, 0);
+       UNLOCK(&origin_qe->s.lock);
+
+       if (reorder_buf)
+               queue_enq_internal(reorder_buf);
+
+       /* Free all placeholder bufs that are now released */
+       while (placeholder_buf) {
+               next_buf = placeholder_buf->next;
+               odp_buffer_free(placeholder_buf->handle.handle);
+               placeholder_buf = next_buf;
+       }
+
+       return 0;
+}
+
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
                    int num, int sustain)
 {
-- 
2.1.4

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to