Moved scheduler ring code into a new header file, so that
it can be used also in other parts of the implementation.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 platform/linux-generic/Makefile.am                 |   1 +
 platform/linux-generic/include/odp_ring_internal.h | 111 +++++++++++++++++++++
 platform/linux-generic/odp_schedule.c              | 102 ++-----------------
 3 files changed, 120 insertions(+), 94 deletions(-)
 create mode 100644 platform/linux-generic/include/odp_ring_internal.h

diff --git a/platform/linux-generic/Makefile.am 
b/platform/linux-generic/Makefile.am
index 19dc0ba..b60eacb 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -151,6 +151,7 @@ noinst_HEADERS = \
                  ${srcdir}/include/odp_pool_internal.h \
                  ${srcdir}/include/odp_posix_extensions.h \
                  ${srcdir}/include/odp_queue_internal.h \
+                 ${srcdir}/include/odp_ring_internal.h \
                  ${srcdir}/include/odp_schedule_if.h \
                  ${srcdir}/include/odp_schedule_internal.h \
                  ${srcdir}/include/odp_schedule_ordered_internal.h \
diff --git a/platform/linux-generic/include/odp_ring_internal.h 
b/platform/linux-generic/include/odp_ring_internal.h
new file mode 100644
index 0000000..6a6291a
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -0,0 +1,111 @@
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_RING_INTERNAL_H_
+#define ODP_RING_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/atomic.h>
+#include <odp/api/hints.h>
+#include <odp_align_internal.h>
+
+/* Ring empty, not a valid data value. */
+#define RING_EMPTY ((uint32_t)-1)
+
+/* Ring of uint32_t data
+ *
+ * Ring stores head and tail counters. Ring indexes are formed from these
+ * counters with a mask (mask = ring_size - 1), which requires that ring size
+ * must be a power of two. Also ring size must be larger than the maximum
+ * number of data items that will be stored on it (there's no check against
+ * overwriting). */
+typedef struct {
+       /* Writer head and tail */
+       odp_atomic_u32_t w_head;
+       odp_atomic_u32_t w_tail;
+       uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
+
+       /* Reader head and tail */
+       odp_atomic_u32_t r_head;
+       odp_atomic_u32_t r_tail;
+
+       uint32_t data[0];
+} ring_t ODP_ALIGNED_CACHE;
+
+/* Initialize ring */
+static inline void ring_init(ring_t *ring)
+{
+       odp_atomic_init_u32(&ring->w_head, 0);
+       odp_atomic_init_u32(&ring->w_tail, 0);
+       odp_atomic_init_u32(&ring->r_head, 0);
+       odp_atomic_init_u32(&ring->r_tail, 0);
+}
+
+/* Dequeue data from the ring head */
+static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
+{
+       uint32_t head, tail, new_head;
+       uint32_t data;
+
+       head = odp_atomic_load_u32(&ring->r_head);
+
+       /* Move reader head. This thread owns data at the new head. */
+       do {
+               tail = odp_atomic_load_u32(&ring->w_tail);
+
+               if (head == tail)
+                       return RING_EMPTY;
+
+               new_head = head + 1;
+
+       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+                             new_head) == 0));
+
+       /* Read queue index */
+       data = ring->data[new_head & mask];
+
+       /* Wait until other readers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+               odp_cpu_pause();
+
+       /* Now update the reader tail */
+       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+       return data;
+}
+
+/* Enqueue data into the ring tail */
+static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
+{
+       uint32_t old_head, new_head;
+
+       /* Reserve a slot in the ring for writing */
+       old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
+       new_head = old_head + 1;
+
+       /* Ring is full. Wait for the last reader to finish. */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+               odp_cpu_pause();
+
+       /* Write data */
+       ring->data[new_head & mask] = data;
+
+       /* Wait until other writers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+               odp_cpu_pause();
+
+       /* Now update the writer tail */
+       odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 86b1cec..dfc9555 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -17,12 +17,12 @@
 #include <odp/api/hints.h>
 #include <odp/api/cpu.h>
 #include <odp/api/thrmask.h>
-#include <odp/api/atomic.h>
 #include <odp_config_internal.h>
 #include <odp_align_internal.h>
 #include <odp_schedule_internal.h>
 #include <odp_schedule_ordered_internal.h>
 #include <odp/api/sync.h>
+#include <odp_ring_internal.h>
 
 /* Number of priority levels  */
 #define NUM_PRIO 8
@@ -82,9 +82,6 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 /* Priority queue empty, not a valid queue index. */
 #define PRIO_QUEUE_EMPTY ((uint32_t)-1)
 
-/* Ring empty, not a valid index. */
-#define RING_EMPTY ((uint32_t)-1)
-
 /* For best performance, the number of queues should be a power of two. */
 ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES),
                  "Number_of_queues_is_not_power_of_two");
@@ -111,28 +108,10 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= 
QUEUES_PER_PRIO,
 /* Start of named groups in group mask arrays */
 #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
 
-/* Scheduler ring
- *
- * Ring stores head and tail counters. Ring indexes are formed from these
- * counters with a mask (mask = ring_size - 1), which requires that ring size
- * must be a power of two. */
-typedef struct {
-       /* Writer head and tail */
-       odp_atomic_u32_t w_head;
-       odp_atomic_u32_t w_tail;
-       uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
-
-       /* Reader head and tail */
-       odp_atomic_u32_t r_head;
-       odp_atomic_u32_t r_tail;
-
-       uint32_t data[0];
-} sched_ring_t ODP_ALIGNED_CACHE;
-
 /* Priority queue */
 typedef struct {
        /* Ring header */
-       sched_ring_t ring;
+       ring_t ring;
 
        /* Ring data: queue indexes */
        uint32_t queue_index[PRIO_QUEUE_RING_SIZE];
@@ -142,7 +121,7 @@ typedef struct {
 /* Packet IO queue */
 typedef struct {
        /* Ring header */
-       sched_ring_t ring;
+       ring_t ring;
 
        /* Ring data: pktio poll command indexes */
        uint32_t cmd_index[PKTIO_RING_SIZE];
@@ -205,71 +184,6 @@ __thread sched_local_t sched_local;
 /* Function prototypes */
 static inline void schedule_release_context(void);
 
-static void ring_init(sched_ring_t *ring)
-{
-       odp_atomic_init_u32(&ring->w_head, 0);
-       odp_atomic_init_u32(&ring->w_tail, 0);
-       odp_atomic_init_u32(&ring->r_head, 0);
-       odp_atomic_init_u32(&ring->r_tail, 0);
-}
-
-/* Dequeue data from the ring head */
-static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask)
-{
-       uint32_t head, tail, new_head;
-       uint32_t data;
-
-       head = odp_atomic_load_u32(&ring->r_head);
-
-       /* Move reader head. This thread owns data at the new head. */
-       do {
-               tail = odp_atomic_load_u32(&ring->w_tail);
-
-               if (head == tail)
-                       return RING_EMPTY;
-
-               new_head = head + 1;
-
-       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
-                             new_head) == 0));
-
-       /* Read queue index */
-       data = ring->data[new_head & mask];
-
-       /* Wait until other readers have updated the tail */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
-               odp_cpu_pause();
-
-       /* Now update the reader tail */
-       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
-
-       return data;
-}
-
-/* Enqueue data into the ring tail */
-static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data)
-{
-       uint32_t old_head, new_head;
-
-       /* Reserve a slot in the ring for writing */
-       old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
-       new_head = old_head + 1;
-
-       /* Ring is full. Wait for the last reader to finish. */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
-               odp_cpu_pause();
-
-       /* Write data */
-       ring->data[new_head & mask] = data;
-
-       /* Wait until other writers have updated the tail */
-       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
-               odp_cpu_pause();
-
-       /* Now update the writer tail */
-       odp_atomic_store_rel_u32(&ring->w_tail, new_head);
-}
-
 static void sched_local_init(void)
 {
        memset(&sched_local, 0, sizeof(sched_local_t));
@@ -347,7 +261,7 @@ static int schedule_term_global(void)
 
        for (i = 0; i < NUM_PRIO; i++) {
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
-                       sched_ring_t *ring = &sched->prio_q[i][j].ring;
+                       ring_t *ring = &sched->prio_q[i][j].ring;
                        uint32_t qi;
 
                        while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
@@ -541,7 +455,7 @@ static void schedule_release_atomic(void)
        if (qi != PRIO_QUEUE_EMPTY && sched_local.num  == 0) {
                int prio           = sched->queue[qi].prio;
                int queue_per_prio = sched->queue[qi].queue_per_prio;
-               sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+               ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
 
                /* Release current atomic queue */
                ring_enq(ring, PRIO_QUEUE_MASK, qi);
@@ -636,7 +550,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        int grp;
                        int ordered;
                        odp_queue_t handle;
-                       sched_ring_t *ring;
+                       ring_t *ring;
 
                        if (id >= QUEUES_PER_PRIO)
                                id = 0;
@@ -747,7 +661,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
 
        for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) &
             PKTIO_CMD_QUEUE_MASK)) {
-               sched_ring_t *ring;
+               ring_t *ring;
                uint32_t cmd_index;
                pktio_cmd_t *cmd;
 
@@ -1051,7 +965,7 @@ static int schedule_sched_queue(uint32_t queue_index)
 {
        int prio           = sched->queue[queue_index].prio;
        int queue_per_prio = sched->queue[queue_index].queue_per_prio;
-       sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+       ring_t *ring       = &sched->prio_q[prio][queue_per_prio].ring;
 
        sched_local.ignore_ordered_context = 1;
 
-- 
2.8.1

Reply via email to