Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   5 +
 .../linux-generic/include/odp_queue_internal.h     |   4 +
 .../linux-generic/include/odp_schedule_internal.h  |   2 +-
 platform/linux-generic/odp_pool.c                  |   3 +
 platform/linux-generic/odp_queue.c                 | 144 ++++++++++++++++++++-
 platform/linux-generic/odp_schedule.c              |   2 -
 6 files changed, 151 insertions(+), 9 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index ae799dd..c459fce 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -103,6 +103,8 @@ typedef union odp_buffer_bits_t {
 
 /* forward declaration */
 struct odp_buffer_hdr_t;
+union queue_entry_u;
+typedef union queue_entry_u queue_entry_t;
 
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
@@ -131,6 +133,9 @@ typedef struct odp_buffer_hdr_t {
        uint32_t                 segcount;   /* segment count */
        uint32_t                 segsize;    /* segment size */
        void                    *addr[ODP_BUFFER_MAX_SEG]; /* block addrs */
+       uint64_t                 order;      /* sequence for ordered queues */
+       queue_entry_t           *origin_qe;  /* ordered queue origin */
+       queue_entry_t           *target_qe;  /* ordered queue target */
 } odp_buffer_hdr_t;
 
 /** @internal Compile time assert that the
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 61d0c43..9cca552 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -77,6 +77,10 @@ struct queue_entry_s {
        odp_pktio_t       pktin;
        odp_pktio_t       pktout;
        char              name[ODP_QUEUE_NAME_LEN];
+       uint64_t          order_in;
+       uint64_t          order_out;
+       odp_buffer_hdr_t *reorder_head;
+       odp_buffer_hdr_t *reorder_tail;
 };
 
 typedef union queue_entry_u {
diff --git a/platform/linux-generic/include/odp_schedule_internal.h 
b/platform/linux-generic/include/odp_schedule_internal.h
index 4c6577d..6ea90fb 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -15,6 +15,7 @@ extern "C" {
 
 
 #include <odp/buffer.h>
+#include <odp_buffer_internal.h>
 #include <odp/queue.h>
 #include <odp/packet_io.h>
 #include <odp_queue_internal.h>
@@ -28,7 +29,6 @@ static inline int schedule_queue(const queue_entry_t *qe)
        return odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
 }
 
-
 int schedule_pktio_start(odp_pktio_t pktio, int prio);
 
 
diff --git a/platform/linux-generic/odp_pool.c 
b/platform/linux-generic/odp_pool.c
index 14221fd..30d4b2b 100644
--- a/platform/linux-generic/odp_pool.c
+++ b/platform/linux-generic/odp_pool.c
@@ -514,6 +514,9 @@ odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
        /* By default, buffers inherit their pool's zeroization setting */
        buf->buf.flags.zeroized = pool->s.flags.zeroized;
 
+       /* By default, buffers are not associated with an ordered queue */
+       buf->buf.origin_qe = NULL;
+
        if (buf->buf.type == ODP_EVENT_PACKET)
                packet_init(pool, &buf->pkt, size);
 
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 4a0df11..4d0e1b4 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -27,11 +27,13 @@
 #define LOCK(a)      odp_ticketlock_lock(a)
 #define UNLOCK(a)    odp_ticketlock_unlock(a)
 #define LOCK_INIT(a) odp_ticketlock_init(a)
+#define LOCK_TRY(a)  odp_ticketlock_trylock(a)
 #else
 #include <odp/spinlock.h>
 #define LOCK(a)      odp_spinlock_lock(a)
 #define UNLOCK(a)    odp_spinlock_unlock(a)
 #define LOCK_INIT(a) odp_spinlock_init(a)
+#define LOCK_TRY(a)  odp_spinlock_trylock(a)
 #endif
 
 #include <string.h>
@@ -89,6 +91,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
        queue->s.head = NULL;
        queue->s.tail = NULL;
 
+       queue->s.reorder_head = NULL;
+       queue->s.reorder_tail = NULL;
+
        queue->s.pri_queue = ODP_QUEUE_INVALID;
        queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -329,14 +334,76 @@ odp_queue_t odp_queue_lookup(const char *name)
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
 {
        int sched = 0;
+       queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+       /* Need two locks for enq operations from ordered queues */
+       if (origin_qe) {
+               LOCK(&origin_qe->s.lock);
+               while (!LOCK_TRY(&queue->s.lock)) {
+                       UNLOCK(&origin_qe->s.lock);
+                       LOCK(&origin_qe->s.lock);
+               }
+               if (odp_unlikely(origin_qe->s.status < QUEUE_STATUS_READY)) {
+                       UNLOCK(&queue->s.lock);
+                       UNLOCK(&origin_qe->s.lock);
+                       ODP_ERR("Bad origin queue status\n");
+                       return -1;
+               }
+       } else {
+               LOCK(&queue->s.lock);
+       }
 
-       LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
                UNLOCK(&queue->s.lock);
+               if (origin_qe)
+                       UNLOCK(&origin_qe->s.lock);
                ODP_ERR("Bad queue status\n");
                return -1;
        }
 
+       /* We can only complete the enq if we're in order */
+       if (origin_qe) {
+               if (buf_hdr->order > origin_qe->s.order_out) {
+                       odp_buffer_hdr_t *reorder_buf =
+                               origin_qe->s.reorder_head;
+
+                       if (!reorder_buf) {
+                               buf_hdr->next = NULL;
+                               origin_qe->s.reorder_head = buf_hdr;
+                               origin_qe->s.reorder_tail = buf_hdr;
+                       } else {
+                               odp_buffer_hdr_t *reorder_prev = NULL;
+
+                               while (buf_hdr->order > reorder_buf->order) {
+                                       reorder_prev = reorder_buf;
+                                       reorder_buf  = reorder_buf->next;
+                                       if (!reorder_buf)
+                                               break;
+                               }
+
+                               buf_hdr->next = reorder_buf;
+                               if (reorder_prev)
+                                       reorder_prev->next = buf_hdr;
+                               else
+                                       origin_qe->s.reorder_head = buf_hdr;
+
+                               if (!reorder_buf)
+                                       origin_qe->s.reorder_tail = buf_hdr;
+                       }
+
+                       buf_hdr->target_qe = queue;
+
+                       /* This enq can't complete until order is restored, so
+                        * we're done here.
+                        */
+                       UNLOCK(&queue->s.lock);
+                       UNLOCK(&origin_qe->s.lock);
+                       return 0;
+               }
+
+               origin_qe->s.order_out++;
+       }
+
        if (queue->s.head == NULL) {
                /* Empty queue */
                queue->s.head = buf_hdr;
@@ -352,7 +419,48 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
                queue->s.status = QUEUE_STATUS_SCHED;
                sched = 1; /* retval: schedule queue */
        }
-       UNLOCK(&queue->s.lock);
+
+       /*
+        * If we came from an ordered queue, check to see if our successful
+        * enq has unblocked other buffers in the origin's reorder queue.
+        */
+       if (origin_qe) {
+               odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
+               odp_buffer_hdr_t *reorder_prev;
+               uint32_t          release_count = 0;
+
+               while (reorder_buf &&
+                      reorder_buf->target_qe == queue &&
+                      reorder_buf->order <=
+                      origin_qe->s.order_out + release_count) {
+                       release_count++;
+                       reorder_prev = reorder_buf;
+                       reorder_buf  = reorder_buf->next;
+               }
+
+               /* Add released buffers to the queue as well */
+               if (release_count > 0) {
+                       queue->s.tail->next       = origin_qe->s.reorder_head;
+                       queue->s.tail             = reorder_prev;
+                       origin_qe->s.reorder_head = reorder_prev->next;
+                       reorder_prev->next        = NULL;
+                       origin_qe->s.order_out   += release_count;
+               }
+
+               /* Now handle unblocked buffers destined for other queues */
+               if (reorder_buf &&
+                   reorder_buf->order <= origin_qe->s.order_out) {
+                       UNLOCK(&origin_qe->s.lock);
+                       UNLOCK(&queue->s.lock);
+                       if (schedule_enq(reorder_buf->target_qe, origin_qe))
+                               ODP_ABORT("schedule_enq failed\n");
+               } else {
+                       UNLOCK(&origin_qe->s.lock);
+                       UNLOCK(&queue->s.lock);
+               }
+       } else {
+               UNLOCK(&queue->s.lock);
+       }
 
        /* Add queue to scheduling */
        if (sched && schedule_queue(queue))
@@ -364,14 +472,26 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
        int sched = 0;
-       int i;
+       int i, j;
        odp_buffer_hdr_t *tail;
 
-       for (i = 0; i < num - 1; i++)
-               buf_hdr[i]->next = buf_hdr[i+1];
+       for (i = 0; i < num; i++) {
+               /* If any buffer is coming from an ordered queue, enqueue them
+                * individually since in the general case each might originate
+                * from a different ordered queue.  If any of these fail, the
+                * return code tells the caller how many succeeded.
+                */
+               if (buf_hdr[i]->origin_qe) {
+                       for (j = 0; j < num; j++) {
+                               if (queue_enq(queue, buf_hdr[j]))
+                                       return j;
+                       }
+                       return num;
+               }
+               buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+       }
 
        tail = buf_hdr[num-1];
-       buf_hdr[num-1]->next = NULL;
 
        LOCK(&queue->s.lock);
        if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
@@ -449,6 +569,12 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
        buf_hdr       = queue->s.head;
        queue->s.head = buf_hdr->next;
        buf_hdr->next = NULL;
+       if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+               buf_hdr->origin_qe = queue;
+               buf_hdr->order     = queue->s.order_in++;
+       } else {
+               buf_hdr->origin_qe = NULL;
+       }
 
        if (queue->s.head == NULL) {
                /* Queue is now empty */
@@ -489,6 +615,12 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
                buf_hdr[i]       = hdr;
                hdr              = hdr->next;
                buf_hdr[i]->next = NULL;
+               if (queue->s.param.sched.sync == ODP_SCHED_SYNC_ORDERED) {
+                       buf_hdr[i]->origin_qe = queue;
+                       buf_hdr[i]->order     = queue->s.order_in++;
+               } else {
+                       buf_hdr[i]->origin_qe = NULL;
+               }
        }
 
        queue->s.head = hdr;
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 2a2cc1d..d595375 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -411,8 +411,6 @@ static inline int copy_events(odp_event_t out_ev[], 
unsigned int max)
 
 /*
  * Schedule queues
- *
- * TODO: SYNC_ORDERED not implemented yet
  */
 static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
                    unsigned int max_num, unsigned int max_deq)
-- 
2.1.4

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to