Implemented scheduler priority queues with a ring based
data structure instead of odp_queue_t queues. This enables
performance optimization since a ring of indexes is more
efficient to access than pointers in a linked list. Ring
operations maybe further optimized with another lockfree
algorithm.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 .../linux-generic/include/odp_schedule_internal.h  |   3 +-
 platform/linux-generic/odp_schedule.c              | 259 +++++++++++++--------
 2 files changed, 168 insertions(+), 94 deletions(-)

diff --git a/platform/linux-generic/include/odp_schedule_internal.h 
b/platform/linux-generic/include/odp_schedule_internal.h
index 4a04b15..1b6ae93 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -22,8 +22,7 @@ typedef struct {
        uint16_t round;
        uint16_t prefer_offset;
        uint16_t pktin_polls;
-       odp_queue_t pri_queue;
-       odp_event_t cmd_ev;
+       uint32_t queue_index;
        odp_queue_t queue;
        odp_event_t ev_stash[MAX_DEQ];
        void *origin_qe;
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index e08de54..83c81e4 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -20,9 +20,13 @@
 #include <odp/api/hints.h>
 #include <odp/api/cpu.h>
 #include <odp/api/thrmask.h>
+#include <odp/api/atomic.h>
 #include <odp_config_internal.h>
+#include <odp_align_internal.h>
 #include <odp_schedule_internal.h>
 #include <odp_schedule_ordered_internal.h>
+#include <odp/api/sync.h>
+
 #ifdef _ODP_PKTIO_IPC
 #include <odp_pool_internal.h>
 #endif
@@ -52,6 +56,29 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 /* Maximum number of packet IO interfaces */
 #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
 
+/* Priority queue ring size. In worst case, all event queues are scheduled
+ * queues and have the same priority. The ring size must be larger than or
+ * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all
+ * queues in the worst case. */
+#define PRIO_QUEUE_RING_SIZE  (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO)
+
+/* Mask for wrapping around priority queue index */
+#define PRIO_QUEUE_MASK  (PRIO_QUEUE_RING_SIZE - 1)
+
+/* Priority queue empty, not a valid queue index. */
+#define PRIO_QUEUE_EMPTY ((uint32_t)-1)
+
+/* Ring empty, not a valid index. */
+#define RING_EMPTY ((uint32_t)-1)
+
+/* For best performance, the number of queues should be a power of two. */
+ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES),
+                 "Number_of_queues_is_not_power_of_two");
+
+/* Ring size must be power of two, so that MAX_QUEUE_IDX_MASK can be used. */
+ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE),
+                 "Ring_size_is_not_power_of_two");
+
 /* Mask of queues per priority */
 typedef uint8_t pri_mask_t;
 
@@ -61,11 +88,40 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= 
QUEUES_PER_PRIO,
 /* Start of named groups in group mask arrays */
 #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
 
+/* Scheduler ring
+ *
+ * Ring stores head and tail counters. Ring indexes are formed from these
+ * counters with a mask (mask = ring_size - 1), which requires that ring size
+ * must be a power of two. */
+typedef struct {
+       /* Writer head and tail */
+       odp_atomic_u32_t w_head;
+       odp_atomic_u32_t w_tail;
+       uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
+
+       /* Reader head and tail */
+       odp_atomic_u32_t r_head;
+       odp_atomic_u32_t r_tail;
+
+       uint32_t data[0];
+} sched_ring_t ODP_ALIGNED_CACHE;
+
+/* Priority queue */
+typedef struct {
+       /* Ring header */
+       sched_ring_t ring;
+
+       /* Ring data: queue indexes */
+       uint32_t queue_index[PRIO_QUEUE_RING_SIZE];
+
+} prio_queue_t ODP_ALIGNED_CACHE;
+
 typedef struct {
-       odp_queue_t    pri_queue[NUM_PRIO][QUEUES_PER_PRIO];
        pri_mask_t     pri_mask[NUM_PRIO];
        odp_spinlock_t mask_lock;
 
+       prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];
+
        odp_spinlock_t poll_cmd_lock;
        struct {
                odp_queue_t queue;
@@ -84,9 +140,8 @@ typedef struct {
        } sched_grp[NUM_SCHED_GRPS];
 
        struct {
-               odp_event_t cmd_ev;
-               odp_queue_t pri_queue;
                int         prio;
+               int         queue_per_prio;
        } queue[ODP_CONFIG_QUEUES];
 
        struct {
@@ -121,14 +176,78 @@ static sched_global_t *sched;
 /* Thread local scheduler context */
 __thread sched_local_t sched_local;
 
+static void ring_init(sched_ring_t *ring)
+{
+       odp_atomic_init_u32(&ring->w_head, 0);
+       odp_atomic_init_u32(&ring->w_tail, 0);
+       odp_atomic_init_u32(&ring->r_head, 0);
+       odp_atomic_init_u32(&ring->r_tail, 0);
+}
+
+/* Dequeue data from the ring head */
+static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask)
+{
+       uint32_t head, tail, new_head;
+       uint32_t data;
+
+       head = odp_atomic_load_u32(&ring->r_head);
+
+       /* Move reader head. This thread owns data at the new head. */
+       do {
+               tail = odp_atomic_load_u32(&ring->w_tail);
+
+               if (head == tail)
+                       return RING_EMPTY;
+
+               new_head = head + 1;
+
+       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+                             new_head) == 0));
+
+       /* Read queue index */
+       data = ring->data[new_head & mask];
+
+       /* Wait until other readers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+               odp_cpu_pause();
+
+       /* Now update the reader tail */
+       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+       return data;
+}
+
+/* Enqueue data into the ring tail */
+static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data)
+{
+       uint32_t old_head, new_head;
+
+       /* Reserve a slot in the ring for writing */
+       old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
+       new_head = old_head + 1;
+
+       /* Ring is full. Wait for the last reader to finish. */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+               odp_cpu_pause();
+
+       /* Write data */
+       ring->data[new_head & mask] = data;
+
+       /* Wait until other writers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+               odp_cpu_pause();
+
+       /* Now update the writer tail */
+       odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+}
+
 static void sched_local_init(void)
 {
        memset(&sched_local, 0, sizeof(sched_local_t));
 
        sched_local.thr       = odp_thread_id();
-       sched_local.pri_queue = ODP_QUEUE_INVALID;
        sched_local.queue     = ODP_QUEUE_INVALID;
-       sched_local.cmd_ev    = ODP_EVENT_INVALID;
+       sched_local.queue_index = PRIO_QUEUE_EMPTY;
 }
 
 static int schedule_init_global(void)
@@ -179,25 +298,14 @@ static int schedule_init_global(void)
        odp_spinlock_init(&sched->mask_lock);
 
        for (i = 0; i < NUM_PRIO; i++) {
-               odp_queue_t queue;
-               char name[] = "odp_priXX_YY";
-
-               name[7] = '0' + i / 10;
-               name[8] = '0' + i - 10*(i / 10);
-
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
-                       name[10] = '0' + j / 10;
-                       name[11] = '0' + j - 10*(j / 10);
-
-                       queue = odp_queue_create(name, NULL);
+                       int k;
 
-                       if (queue == ODP_QUEUE_INVALID) {
-                               ODP_ERR("Sched init: Queue create failed.\n");
-                               return -1;
-                       }
+                       ring_init(&sched->prio_q[i][j].ring);
 
-                       sched->pri_queue[i][j] = queue;
-                       sched->pri_mask[i]     = 0;
+                       for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++)
+                               sched->prio_q[i][j].queue_index[k] =
+                               PRIO_QUEUE_EMPTY;
                }
        }
 
@@ -228,11 +336,6 @@ static int schedule_init_global(void)
 
        odp_thrmask_setall(&sched->mask_all);
 
-       for (i = 0; i < ODP_CONFIG_QUEUES; i++) {
-               sched->queue[i].cmd_ev    = ODP_EVENT_INVALID;
-               sched->queue[i].pri_queue = ODP_QUEUE_INVALID;
-       }
-
        ODP_DBG("done\n");
 
        return 0;
@@ -247,21 +350,14 @@ static int schedule_term_global(void)
 
        for (i = 0; i < NUM_PRIO; i++) {
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
-                       odp_queue_t  pri_q;
-
-                       pri_q = sched->pri_queue[i][j];
+                       sched_ring_t *ring = &sched->prio_q[i][j].ring;
+                       uint32_t qi;
 
-                       while ((ev = odp_queue_deq(pri_q)) !=
-                             ODP_EVENT_INVALID) {
-                               odp_buffer_t buf;
-                               sched_cmd_t *sched_cmd;
-                               uint32_t qi;
+                       while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
+                              RING_EMPTY) {
                                odp_event_t events[1];
                                int num;
 
-                               buf = odp_buffer_from_event(ev);
-                               sched_cmd = odp_buffer_addr(buf);
-                               qi  = sched_cmd->queue_index;
                                num = sched_cb_queue_deq_multi(qi, events, 1);
 
                                if (num < 0)
@@ -270,11 +366,6 @@ static int schedule_term_global(void)
                                if (num > 0)
                                        ODP_ERR("Queue not empty\n");
                        }
-
-                       if (odp_queue_destroy(pri_q)) {
-                               ODP_ERR("Pri queue destroy fail.\n");
-                               rc = -1;
-                       }
                }
        }
 
@@ -331,19 +422,17 @@ static int schedule_term_local(void)
        return 0;
 }
 
-static inline int pri_id_queue(uint32_t queue_index)
+static inline int queue_per_prio(uint32_t queue_index)
 {
        return ((QUEUES_PER_PRIO - 1) & queue_index);
 }
 
-static odp_queue_t pri_set(int id, int prio)
+static void pri_set(int id, int prio)
 {
        odp_spinlock_lock(&sched->mask_lock);
        sched->pri_mask[prio] |= 1 << id;
        sched->pri_count[prio][id]++;
        odp_spinlock_unlock(&sched->mask_lock);
-
-       return sched->pri_queue[prio][id];
 }
 
 static void pri_clr(int id, int prio)
@@ -359,38 +448,27 @@ static void pri_clr(int id, int prio)
        odp_spinlock_unlock(&sched->mask_lock);
 }
 
-static odp_queue_t pri_set_queue(uint32_t queue_index, int prio)
+static void pri_set_queue(uint32_t queue_index, int prio)
 {
-       int id = pri_id_queue(queue_index);
+       int id = queue_per_prio(queue_index);
 
        return pri_set(id, prio);
 }
 
 static void pri_clr_queue(uint32_t queue_index, int prio)
 {
-       int id = pri_id_queue(queue_index);
+       int id = queue_per_prio(queue_index);
        pri_clr(id, prio);
 }
 
 static int schedule_init_queue(uint32_t queue_index,
                               const odp_schedule_param_t *sched_param)
 {
-       odp_buffer_t buf;
-       sched_cmd_t *sched_cmd;
        int prio = sched_param->prio;
 
-       buf = odp_buffer_alloc(sched->pool);
-
-       if (buf == ODP_BUFFER_INVALID)
-               return -1;
-
-       sched_cmd        = odp_buffer_addr(buf);
-       sched_cmd->cmd   = SCHED_CMD_DEQUEUE;
-       sched_cmd->queue_index = queue_index;
-
-       sched->queue[queue_index].cmd_ev = odp_buffer_to_event(buf);
-       sched->queue[queue_index].pri_queue = pri_set_queue(queue_index, prio);
+       pri_set_queue(queue_index, prio);
        sched->queue[queue_index].prio = prio;
+       sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
 
        return 0;
 }
@@ -399,13 +477,9 @@ static void schedule_destroy_queue(uint32_t queue_index)
 {
        int prio = sched->queue[queue_index].prio;
 
-       odp_event_free(sched->queue[queue_index].cmd_ev);
-
        pri_clr_queue(queue_index, prio);
-
-       sched->queue[queue_index].cmd_ev    = ODP_EVENT_INVALID;
-       sched->queue[queue_index].pri_queue = ODP_QUEUE_INVALID;
-       sched->queue[queue_index].prio      = 0;
+       sched->queue[queue_index].prio = 0;
+       sched->queue[queue_index].queue_per_prio = 0;
 }
 
 static int poll_cmd_queue_idx(int pktio_index, int in_queue_idx)
@@ -469,12 +543,16 @@ static int schedule_pktio_stop(sched_cmd_t *sched_cmd)
 
 static void schedule_release_atomic(void)
 {
-       if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
-           sched_local.num       == 0) {
+       uint32_t qi = sched_local.queue_index;
+
+       if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {
+               int prio           = sched->queue[qi].prio;
+               int queue_per_prio = sched->queue[qi].queue_per_prio;
+               sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+
                /* Release current atomic queue */
-               if (odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev))
-                       ODP_ABORT("odp_schedule_release_atomic failed\n");
-               sched_local.pri_queue = ODP_QUEUE_INVALID;
+               ring_enq(ring, PRIO_QUEUE_MASK, qi);
+               sched_local.queue_index = PRIO_QUEUE_EMPTY;
        }
 }
 
@@ -553,11 +631,11 @@ static int do_schedule(odp_queue_t *out_queue, 
odp_event_t out_ev[],
                id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1);
 
                for (j = 0; j < QUEUES_PER_PRIO;) {
-                       odp_queue_t  pri_q;
                        int num;
                        int grp;
                        int ordered;
                        odp_queue_t handle;
+                       sched_ring_t *ring;
 
                        if (id >= QUEUES_PER_PRIO)
                                id = 0;
@@ -570,20 +648,18 @@ static int do_schedule(odp_queue_t *out_queue, 
odp_event_t out_ev[],
                                continue;
                        }
 
-                       pri_q = sched->pri_queue[i][id];
-                       ev    = odp_queue_deq(pri_q);
+                       /* Get queue index from the priority queue */
+                       ring = &sched->prio_q[i][id].ring;
+                       qi   = ring_deq(ring, PRIO_QUEUE_MASK);
 
                        /* Priority queue empty */
-                       if (ev == ODP_EVENT_INVALID) {
+                       if (qi == RING_EMPTY) {
                                j++;
                                id++;
                                continue;
                        }
 
-                       buf       = odp_buffer_from_event(ev);
-                       sched_cmd = odp_buffer_addr(buf);
-                       qi        = sched_cmd->queue_index;
-                       grp       = sched_cb_queue_grp(qi);
+                       grp = sched_cb_queue_grp(qi);
 
                        if (grp > ODP_SCHED_GROUP_ALL &&
                            !odp_thrmask_isset(&sched->sched_grp[grp].mask,
@@ -591,8 +667,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                                /* This thread is not eligible for work from
                                 * this queue, so continue scheduling it.
                                 */
-                               if (odp_queue_enq(pri_q, ev))
-                                       ODP_ABORT("schedule failed\n");
+                               ring_enq(ring, PRIO_QUEUE_MASK, qi);
 
                                j++;
                                id++;
@@ -632,19 +707,16 @@ static int do_schedule(odp_queue_t *out_queue, 
odp_event_t out_ev[],
 
                        if (ordered) {
                                /* Continue scheduling ordered queues */
-                               if (odp_queue_enq(pri_q, ev))
-                                       ODP_ABORT("schedule failed\n");
+                               ring_enq(ring, PRIO_QUEUE_MASK, qi);
 
                                /* Cache order info about this event */
                                cache_order_info(qi);
                        } else if (sched_cb_queue_is_atomic(qi)) {
                                /* Hold queue during atomic access */
-                               sched_local.pri_queue = pri_q;
-                               sched_local.cmd_ev    = ev;
+                               sched_local.queue_index = qi;
                        } else {
                                /* Continue scheduling the queue */
-                               if (odp_queue_enq(pri_q, ev))
-                                       ODP_ABORT("schedule failed\n");
+                               ring_enq(ring, PRIO_QUEUE_MASK, qi);
                        }
 
                        /* Output the source queue handle */
@@ -969,11 +1041,14 @@ static void schedule_prefetch(int num ODP_UNUSED)
 
 static int schedule_sched_queue(uint32_t queue_index)
 {
-       odp_queue_t pri_queue = sched->queue[queue_index].pri_queue;
-       odp_event_t cmd_ev    = sched->queue[queue_index].cmd_ev;
+       int prio           = sched->queue[queue_index].prio;
+       int queue_per_prio = sched->queue[queue_index].queue_per_prio;
+       sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
 
        sched_local.ignore_ordered_context = 1;
-       return odp_queue_enq(pri_queue, cmd_ev);
+
+       ring_enq(ring, PRIO_QUEUE_MASK, queue_index);
+       return 0;
 }
 
 static int schedule_num_grps(void)
-- 
2.8.1

Reply via email to