Resolve the corner case of releasing order for an order that still has
events on the reorder queue. This also allows the reorder_complete()
routine to be streamlined.

This patch resolves Bug https://bugs.linaro.org/show_bug.cgi?id=1879

Signed-off-by: Bill Fischofer <bill.fischo...@linaro.org>
---
 .../linux-generic/include/odp_queue_internal.h     |  5 +-
 platform/linux-generic/odp_queue.c                 | 57 +++++++++++++++++-----
 2 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 6120740..a70044b 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -335,8 +335,7 @@ static inline int reorder_deq(queue_entry_t *queue,
 static inline void reorder_complete(queue_entry_t *origin_qe,
                                    odp_buffer_hdr_t **reorder_buf_return,
                                    odp_buffer_hdr_t **placeholder_buf,
-                                   int placeholder_append,
-                                   int order_released)
+                                   int placeholder_append)
 {
        odp_buffer_hdr_t *reorder_buf = origin_qe->s.reorder_head;
        odp_buffer_hdr_t *next_buf;
@@ -356,7 +355,7 @@ static inline void reorder_complete(queue_entry_t 
*origin_qe,
 
                        reorder_buf = next_buf;
                        order_release(origin_qe, 1);
-               } else if (!order_released && reorder_buf->flags.sustain) {
+               } else if (reorder_buf->flags.sustain) {
                        reorder_buf = next_buf;
                } else {
                        *reorder_buf_return = origin_qe->s.reorder_head;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 9cab9b2..a5e60d7 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -39,6 +39,11 @@
 
 #include <string.h>
 
+#define RESOLVE_ORDER 0
+#define SUSTAIN_ORDER 1
+
+#define NOAPPEND 0
+#define APPEND   1
 
 typedef struct queue_table_t {
        queue_entry_t  queue[ODP_CONFIG_QUEUES];
@@ -521,8 +526,7 @@ int ordered_queue_enq(queue_entry_t *queue,
        if (sched && schedule_queue(queue))
                ODP_ABORT("schedule_queue failed\n");
 
-       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf,
-                        1, 0);
+       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
        UNLOCK(&origin_qe->s.lock);
 
        if (reorder_buf)
@@ -606,7 +610,8 @@ int odp_queue_enq_multi(odp_queue_t handle, const 
odp_event_t ev[], int num)
        for (i = 0; i < num; i++)
                buf_hdr[i] = odp_buf_to_hdr(odp_buffer_from_event(ev[i]));
 
-       return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr, num, 1);
+       return num == 0 ? 0 : queue->s.enqueue_multi(queue, buf_hdr,
+                                                    num, SUSTAIN_ORDER);
 }
 
 int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
@@ -620,7 +625,7 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
        /* No chains via this entry */
        buf_hdr->link = NULL;
 
-       return queue->s.enqueue(queue, buf_hdr, 1);
+       return queue->s.enqueue(queue, buf_hdr, SUSTAIN_ORDER);
 }
 
 int queue_enq_internal(odp_buffer_hdr_t *buf_hdr)
@@ -660,7 +665,7 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
                        buf_hdr->sync[i] =
                                odp_atomic_fetch_inc_u64(&queue->s.sync_in[i]);
                }
-               buf_hdr->flags.sustain = 0;
+               buf_hdr->flags.sustain = SUSTAIN_ORDER;
        } else {
                buf_hdr->origin_qe = NULL;
        }
@@ -713,7 +718,7 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
                                        odp_atomic_fetch_inc_u64
                                        (&queue->s.sync_in[j]);
                        }
-                       buf_hdr[i]->flags.sustain = 0;
+                       buf_hdr[i]->flags.sustain = SUSTAIN_ORDER;
                } else {
                        buf_hdr[i]->origin_qe = NULL;
                }
@@ -879,7 +884,7 @@ int queue_pktout_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr,
        order_release(origin_qe, release_count + placeholder_count);
 
        /* Now handle sends to other queues that are ready to go */
-       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, 1, 0);
+       reorder_complete(origin_qe, &reorder_buf, &placeholder_buf, APPEND);
 
        /* We're fully done with the origin_qe at last */
        UNLOCK(&origin_qe->s.lock);
@@ -947,13 +952,43 @@ int release_order(queue_entry_t *origin_qe, uint64_t 
order,
        odp_buffer_t placeholder_buf;
        odp_buffer_hdr_t *placeholder_buf_hdr, *reorder_buf, *next_buf;
 
-       /* Must tlock the origin queue to process the release */
+       /* Must lock the origin queue to process the release */
        LOCK(&origin_qe->s.lock);
 
-       /* If we are in the order we can release immediately since there can
-        * be no confusion about intermediate elements
+       /* If we are in order we can release immediately since there can be no
+        * confusion about intermediate elements
         */
        if (order <= origin_qe->s.order_out) {
+               reorder_buf = origin_qe->s.reorder_head;
+
+               /* We're in order, however there may be one or more events on
+                * the reorder queue that are part of this order. If that is
+                * the case, remove them and let ordered_queue_enq() handle
+                * them and resolve the order for us.
+                */
+               if (reorder_buf && reorder_buf->order == order) {
+                       odp_buffer_hdr_t *reorder_head = reorder_buf;
+
+                       next_buf = reorder_buf->next;
+
+                       while (next_buf && next_buf->order == order) {
+                               reorder_buf = next_buf;
+                               next_buf    = next_buf->next;
+                       }
+
+                       origin_qe->s.reorder_head = reorder_buf->next;
+                       reorder_buf->next = NULL;
+
+                       UNLOCK(&origin_qe->s.lock);
+                       reorder_head->link = reorder_buf->next;
+                       return ordered_queue_enq(reorder_head->target_qe,
+                                                reorder_head, RESOLVE_ORDER,
+                                                origin_qe, order);
+               }
+
+               /* Reorder queue has no elements for this order, so it's safe
+                * to resolve order here
+                */
                order_release(origin_qe, 1);
 
                /* Check if this release allows us to unblock waiters.  At the
@@ -965,7 +1000,7 @@ int release_order(queue_entry_t *origin_qe, uint64_t order,
                 * element(s) on the reorder queue
                 */
                reorder_complete(origin_qe, &reorder_buf,
-                                &placeholder_buf_hdr, 0, 1);
+                                &placeholder_buf_hdr, NOAPPEND);
 
                /* Now safe to unlock */
                UNLOCK(&origin_qe->s.lock);
-- 
2.1.4

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to