Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org>
---
 platform/linux-generic/odp_queue.c | 169 +++++++++++++++++++++++++++++++++----
 1 file changed, 152 insertions(+), 17 deletions(-)

diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 4d0e1b4..a2460e7 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -14,6 +14,7 @@
 #include <odp_buffer_inlines.h>
 #include <odp_internal.h>
 #include <odp/shared_memory.h>
+#include <odp/schedule.h>
 #include <odp_schedule_internal.h>
 #include <odp/config.h>
 #include <odp_packet_io_internal.h>
@@ -401,6 +402,7 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
                        return 0;
                }
 
+               /* We're in order, so account for this and proceed with enq */
                origin_qe->s.order_out++;
        }
 
@@ -426,16 +428,56 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
         */
        if (origin_qe) {
                odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
-               odp_buffer_hdr_t *reorder_prev;
+               odp_buffer_hdr_t *reorder_prev = NULL;
+               odp_buffer_hdr_t *placeholder_buf = NULL;
+               odp_buffer_hdr_t *next_buf;
                uint32_t          release_count = 0;
+               uint32_t          placeholder_count = 0;
 
                while (reorder_buf &&
-                      reorder_buf->target_qe == queue &&
-                      reorder_buf->order <=
-                      origin_qe->s.order_out + release_count) {
-                       release_count++;
-                       reorder_prev = reorder_buf;
-                       reorder_buf  = reorder_buf->next;
+                      reorder_buf->order <= origin_qe->s.order_out +
+                      release_count + placeholder_count) {
+                       /*
+                        * Elements on the reorder list fall into one of
+                        * three categories:
+                        *
+                        * 1. Those destined for the same queue.  These
+                        *    can be enq'd now if they were waiting to
+                        *    be unblocked by this enq.
+                        *
+                        * 2. Those representing placeholders for events
+                        *    whose ordering was released by a prior
+                        *    odp_schedule_release_ordered() call.  These
+                        *    can now just be freed.
+                        *
+                        * 3. Those representing events destined for another
+                        *    queue. These cannot be consolidated with this
+                        *    enq since they have a different target.
+                        *
+                        * Detecting an element with an order sequence gap, an
+                        * element in category 3, or running out of elements
+                        * stops the scan.
+                        */
+                       next_buf = reorder_buf->next;
+
+                       if (odp_likely(reorder_buf->target_qe == queue)) {
+                               reorder_prev = reorder_buf;
+                               reorder_buf  = next_buf;
+                               release_count++;
+                       } else if (!reorder_buf->target_qe) {
+                               if (reorder_prev)
+                                       reorder_prev->next = next_buf;
+                               else
+                                       origin_qe->s.reorder_head = next_buf;
+
+                               reorder_buf->next = placeholder_buf;
+                               placeholder_buf = reorder_buf;
+
+                               reorder_buf = next_buf;
+                               placeholder_count++;
+                       } else {
+                               break;
+                       }
                }
 
                /* Add released buffers to the queue as well */
@@ -444,19 +486,28 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
                        queue->s.tail             = reorder_prev;
                        origin_qe->s.reorder_head = reorder_prev->next;
                        reorder_prev->next        = NULL;
-                       origin_qe->s.order_out   += release_count;
                }
 
-               /* Now handle unblocked buffers destined for other queues */
+               /* Reflect the above two in the output sequence */
+               origin_qe->s.order_out += release_count + placeholder_count;
+
+               /* Now handle any unblocked buffers destined for other queues */
+               UNLOCK(&queue->s.lock);
                if (reorder_buf &&
-                   reorder_buf->order <= origin_qe->s.order_out) {
-                       UNLOCK(&origin_qe->s.lock);
-                       UNLOCK(&queue->s.lock);
-                       if (schedule_enq(reorder_buf->target_qe, origin_qe))
-                               ODP_ABORT("schedule_enq failed\n");
-               } else {
-                       UNLOCK(&origin_qe->s.lock);
-                       UNLOCK(&queue->s.lock);
+                   reorder_buf->order <= origin_qe->s.order_out)
+                       origin_qe->s.reorder_head = reorder_buf->next;
+               else
+                       reorder_buf = NULL;
+               UNLOCK(&origin_qe->s.lock);
+               if (reorder_buf)
+                       odp_queue_enq(reorder_buf->target_qe->s.handle,
+                                     (odp_event_t)reorder_buf->handle.handle);
+
+               /* Free all placeholder bufs that are now released */
+               while (placeholder_buf) {
+                       next_buf = placeholder_buf->next;
+                       odp_buffer_free(buf_hdr->handle.handle);
+                       placeholder_buf = next_buf;
                }
        } else {
                UNLOCK(&queue->s.lock);
@@ -685,3 +736,87 @@ void odp_queue_param_init(odp_queue_param_t *params)
 {
        memset(params, 0, sizeof(odp_queue_param_t));
 }
+
+/* This routine exists here rather than in odp_schedule
+ * because it operates on queue interenal structures
+ */
+int odp_schedule_release_ordered(odp_event_t ev)
+{
+       odp_buffer_t placeholder_buf;
+       odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *reorder_prev;
+       odp_buffer_hdr_t *buf_hdr =
+               odp_buf_to_hdr(odp_buffer_from_event(ev));
+       queue_entry_t *origin_qe = buf_hdr->origin_qe;
+
+       /* Can't release if we didn't originate from an ordered queue */
+       if (!origin_qe)
+               return -1;
+
+       LOCK(&origin_qe->s.lock);
+
+       /* If we are the first or second element beyond the next in order,
+        * we can release immediately since there can be no confusion about
+        * intermediate elements
+        */
+       if (buf_hdr->order <= origin_qe->s.order_out + 1) {
+               buf_hdr->origin_qe = NULL;
+               origin_qe->s.order_out++;
+
+               /* check if this release allows us to unblock waiters */
+               reorder_buf = origin_qe->s.reorder_head;
+               if (reorder_buf &&
+                   reorder_buf->order <= origin_qe->s.order_out)
+                       origin_qe->s.reorder_head = reorder_buf->next;
+               else
+                       reorder_buf = NULL;
+               UNLOCK(&origin_qe->s.lock);
+               if (reorder_buf)
+                       odp_queue_enq(reorder_buf->target_qe->s.handle,
+                                     (odp_event_t)reorder_buf->handle.handle);
+               return 0;
+       }
+
+       /* If we are beyond the second element in the expected order, we need
+        * a placeholder to represent our "place in line". Just use an element
+        * from the same pool the buffer being released is from.
+        */
+       placeholder_buf = odp_buffer_alloc(buf_hdr->pool_hdl);
+
+       /* Can't release if no placeholder is available */
+       if (placeholder_buf == ODP_BUFFER_INVALID) {
+               UNLOCK(&origin_qe->s.lock);
+               return -1;
+       }
+
+       placeholder_buf_hdr = odp_buf_to_hdr(placeholder_buf);
+       reorder_buf = origin_qe->s.reorder_head;
+
+       if (!reorder_buf) {
+               placeholder_buf_hdr->next = NULL;
+               origin_qe->s.reorder_head = placeholder_buf_hdr;
+               origin_qe->s.reorder_tail = placeholder_buf_hdr;
+       } else {
+               reorder_prev = NULL;
+
+               while (buf_hdr->order > reorder_buf->order) {
+                       reorder_prev = reorder_buf;
+                       reorder_buf  = reorder_buf->next;
+                       if (!reorder_buf)
+                               break;
+               }
+
+               placeholder_buf_hdr->next = reorder_buf;
+               if (reorder_prev)
+                       reorder_prev->next = placeholder_buf_hdr;
+               else
+                       origin_qe->s.reorder_head = placeholder_buf_hdr;
+
+               if (!reorder_buf)
+                       origin_qe->s.reorder_tail = placeholder_buf_hdr;
+       }
+
+       placeholder_buf_hdr->target_qe = NULL;
+
+       UNLOCK(&origin_qe->s.lock);
+       return 0;
+}
-- 
2.1.4

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to