Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org>
---
 .../linux-generic/include/odp_buffer_internal.h    |   4 +
 platform/linux-generic/odp_queue.c                 | 151 +++++++++++++++------
 2 files changed, 115 insertions(+), 40 deletions(-)

diff --git a/platform/linux-generic/include/odp_buffer_internal.h 
b/platform/linux-generic/include/odp_buffer_internal.h
index c459fce..30f061e 100644
--- a/platform/linux-generic/include/odp_buffer_internal.h
+++ b/platform/linux-generic/include/odp_buffer_internal.h
@@ -109,6 +109,10 @@ typedef union queue_entry_u queue_entry_t;
 /* Common buffer header */
 typedef struct odp_buffer_hdr_t {
        struct odp_buffer_hdr_t *next;       /* next buf in a list */
+       union {                              /* Multi-use secondary link */
+               struct odp_buffer_hdr_t *prev;
+               struct odp_buffer_hdr_t *link;
+       };
        odp_buffer_bits_t        handle;     /* handle */
        union {
                uint32_t all;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index a2460e7..e56da71 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -336,6 +336,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
 {
        int sched = 0;
        queue_entry_t *origin_qe = buf_hdr->origin_qe;
+       odp_buffer_hdr_t *buf_tail;
 
        /* Need two locks for enq operations from ordered queues */
        if (origin_qe) {
@@ -404,17 +405,33 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
 
                /* We're in order, so account for this and proceed with enq */
                origin_qe->s.order_out++;
+
+               /* if this element is linked, restore the linked chain */
+               buf_tail = buf_hdr->link;
+
+               if (buf_tail) {
+                       buf_hdr->next = buf_tail;
+                       buf_hdr->link = NULL;
+
+                       /* find end of the chain */
+                       while (buf_tail->next)
+                               buf_tail = buf_tail->next;
+               } else {
+                       buf_tail = buf_hdr;
+               }
+       } else {
+               buf_tail = buf_hdr;
        }
 
-       if (queue->s.head == NULL) {
+       if (!queue->s.head) {
                /* Empty queue */
                queue->s.head = buf_hdr;
-               queue->s.tail = buf_hdr;
-               buf_hdr->next = NULL;
+               queue->s.tail = buf_tail;
+               buf_tail->next = NULL;
        } else {
                queue->s.tail->next = buf_hdr;
-               queue->s.tail = buf_hdr;
-               buf_hdr->next = NULL;
+               queue->s.tail = buf_tail;
+               buf_tail->next = NULL;
        }
 
        if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
@@ -461,8 +478,23 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
                        next_buf = reorder_buf->next;
 
                        if (odp_likely(reorder_buf->target_qe == queue)) {
-                               reorder_prev = reorder_buf;
-                               reorder_buf  = next_buf;
+                               /* promote any chain */
+                               odp_buffer_hdr_t *reorder_link =
+                                       reorder_buf->link;
+
+                               if (reorder_link) {
+                                       reorder_buf->next = reorder_link;
+                                       reorder_buf->link = NULL;
+                                       while (reorder_link->next)
+                                               reorder_link =
+                                                       reorder_link->next;
+                                       reorder_link->next = next_buf;
+                                       reorder_prev = reorder_link;
+                               } else {
+                                       reorder_prev = reorder_buf;
+                               }
+
+                               reorder_buf = next_buf;
                                release_count++;
                        } else if (!reorder_buf->target_qe) {
                                if (reorder_prev)
@@ -523,53 +555,89 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
        int sched = 0;
-       int i, j;
+       int i, rc, ret_count = 0;
+       int ordered_head[num];
+       int ordered_count = 0;
        odp_buffer_hdr_t *tail;
 
+       /* Identify ordered chains in the input buffer list */
        for (i = 0; i < num; i++) {
-               /* If any buffer is coming from an ordered queue, enqueue them
-                * individually since in the general case each might originate
-                * from a different ordered queue.  If any of these fail, the
-                * return code tells the caller how many succeeded.
-                */
-               if (buf_hdr[i]->origin_qe) {
-                       for (j = 0; j < num; j++) {
-                               if (queue_enq(queue, buf_hdr[j]))
-                                       return j;
-                       }
-                       return num;
+               if (buf_hdr[i]->origin_qe)
+                       ordered_head[ordered_count++] = i;
+               if (i < num - 1)
+                       buf_hdr[i]->next = buf_hdr[i + 1];
+       }
+
+       buf_hdr[num - 1]->next = NULL;
+
+       if (ordered_count) {
+               if (ordered_head[0] > 0) {
+                       tail = buf_hdr[ordered_head[0] - 1];
+                       tail->next = NULL;
+                       ret_count = ordered_head[0];
+               } else {
+                       tail = NULL;
+                       ret_count = 0;
                }
-               buf_hdr[i]->next = i == num - 1 ? NULL : buf_hdr[i + 1];
+       } else {
+               tail = buf_hdr[num - 1];
+               ret_count = num;
        }
 
-       tail = buf_hdr[num-1];
+       /* Handle regular enq's at start of list */
+       if (tail) {
+               tail->next = NULL;
+               LOCK(&queue->s.lock);
+               if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+                       UNLOCK(&queue->s.lock);
+                       ODP_ERR("Bad queue status\n");
+                       return -1;
+               }
 
-       LOCK(&queue->s.lock);
-       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+               /* Empty queue */
+               if (queue->s.head)
+                       queue->s.tail->next = buf_hdr[0];
+               else
+                       queue->s.head = buf_hdr[0];
+
+               queue->s.tail = tail;
+
+               if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
+                       queue->s.status = QUEUE_STATUS_SCHED;
+                       sched = 1; /* retval: schedule queue */
+               }
                UNLOCK(&queue->s.lock);
-               ODP_ERR("Bad queue status\n");
-               return -1;
+
+               /* Add queue to scheduling */
+               if (sched && schedule_queue(queue))
+                       ODP_ABORT("schedule_queue failed\n");
        }
 
-       /* Empty queue */
-       if (queue->s.head == NULL)
-               queue->s.head = buf_hdr[0];
-       else
-               queue->s.tail->next = buf_hdr[0];
+       /* Handle ordered chains in the list */
+       for (i = 0; i < ordered_count; i++) {
+               int eol = i < ordered_count - 1 ? ordered_head[i + 1] : num;
+               int list_count = eol - i;
 
-       queue->s.tail = tail;
+               if (i < ordered_count - 1)
+                       buf_hdr[ordered_head[i + 1] - 1]->next = NULL;
 
-       if (queue->s.status == QUEUE_STATUS_NOTSCHED) {
-               queue->s.status = QUEUE_STATUS_SCHED;
-               sched = 1; /* retval: schedule queue */
-       }
-       UNLOCK(&queue->s.lock);
+               if (ordered_head[i] < eol - 1)
+                       buf_hdr[ordered_head[i]]->link =
+                               buf_hdr[ordered_head[i] + 1];
+               else
+                       buf_hdr[ordered_head[i]]->link = NULL;
 
-       /* Add queue to scheduling */
-       if (sched && schedule_queue(queue))
-               ODP_ABORT("schedule_queue failed\n");
+               rc = queue_enq(queue, buf_hdr[ordered_head[i]]);
+               if (rc < 0)
+                       return ret_count;
+
+               if (rc < list_count)
+                       return ret_count + rc;
 
-       return num; /* All events enqueued */
+               ret_count += rc;
+       }
+
+       return ret_count;
 }
 
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
@@ -598,6 +666,9 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
        queue   = queue_to_qentry(handle);
        buf_hdr = odp_buf_to_hdr(odp_buffer_from_event(ev));
 
+       /* No chains via this entry */
+       buf_hdr->link = NULL;
+
        return queue->s.enqueue(queue, buf_hdr);
 }
 
-- 
2.1.4

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to