Moved scheduler ring code into a new header file, so that
it can be used also in other parts of the implementation.
Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
platform/linux-generic/Makefile.am | 1 +
platform/linux-generic/include/odp_ring_internal.h | 109 +++++++++++++++++++++
platform/linux-generic/odp_schedule.c | 102 ++-----------------
3 files changed, 118 insertions(+), 94 deletions(-)
create mode 100644 platform/linux-generic/include/odp_ring_internal.h
diff --git a/platform/linux-generic/Makefile.am
b/platform/linux-generic/Makefile.am
index 0fba393..63f9ce7 100644
--- a/platform/linux-generic/Makefile.am
+++ b/platform/linux-generic/Makefile.am
@@ -121,6 +121,7 @@ noinst_HEADERS = \
${srcdir}/include/odp_pool_internal.h \
${srcdir}/include/odp_posix_extensions.h \
${srcdir}/include/odp_queue_internal.h \
+ ${srcdir}/include/odp_ring_internal.h \
${srcdir}/include/odp_schedule_if.h \
${srcdir}/include/odp_schedule_internal.h \
${srcdir}/include/odp_schedule_ordered_internal.h \
diff --git a/platform/linux-generic/include/odp_ring_internal.h
b/platform/linux-generic/include/odp_ring_internal.h
new file mode 100644
index 0000000..c89b298
--- /dev/null
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -0,0 +1,109 @@
+/* Copyright (c) 2016, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier: BSD-3-Clause
+ */
+
+#ifndef ODP_RING_INTERNAL_H_
+#define ODP_RING_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/atomic.h>
+#include <odp/api/hints.h>
+#include <odp_align_internal.h>
+
+/* Ring empty, not a valid data value. */
+#define RING_EMPTY ((uint32_t)-1)
+
+/* Ring of uint32_t data
+ *
+ * Ring stores head and tail counters. Ring indexes are formed from these
+ * counters with a mask (mask = ring_size - 1), which requires that ring size
+ * must be a power of two. */
+typedef struct {
+ /* Writer head and tail */
+ odp_atomic_u32_t w_head;
+ odp_atomic_u32_t w_tail;
+ uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
+
+ /* Reader head and tail */
+ odp_atomic_u32_t r_head;
+ odp_atomic_u32_t r_tail;
+
+ uint32_t data[0];
+} ring_t ODP_ALIGNED_CACHE;
+
+/* Initialize ring */
+static inline void ring_init(ring_t *ring)
+{
+ odp_atomic_init_u32(&ring->w_head, 0);
+ odp_atomic_init_u32(&ring->w_tail, 0);
+ odp_atomic_init_u32(&ring->r_head, 0);
+ odp_atomic_init_u32(&ring->r_tail, 0);
+}
+
+/* Dequeue data from the ring head */
+static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
+{
+ uint32_t head, tail, new_head;
+ uint32_t data;
+
+ head = odp_atomic_load_u32(&ring->r_head);
+
+ /* Move reader head. This thread owns data at the new head. */
+ do {
+ tail = odp_atomic_load_u32(&ring->w_tail);
+
+ if (head == tail)
+ return RING_EMPTY;
+
+ new_head = head + 1;
+
+ } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+ new_head) == 0));
+
+ /* Read queue index */
+ data = ring->data[new_head & mask];
+
+ /* Wait until other readers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+ odp_cpu_pause();
+
+ /* Now update the reader tail */
+ odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+ return data;
+}
+
+/* Enqueue data into the ring tail */
+static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
+{
+ uint32_t old_head, new_head;
+
+ /* Reserve a slot in the ring for writing */
+ old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
+ new_head = old_head + 1;
+
+ /* Ring is full. Wait for the last reader to finish. */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+ odp_cpu_pause();
+
+ /* Write data */
+ ring->data[new_head & mask] = data;
+
+ /* Wait until other writers have updated the tail */
+ while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+ odp_cpu_pause();
+
+ /* Now update the writer tail */
+ odp_atomic_store_rel_u32(&ring->w_tail, new_head);
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/platform/linux-generic/odp_schedule.c
b/platform/linux-generic/odp_schedule.c
index 81e79c9..86c98fe 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -17,12 +17,12 @@
#include <odp/api/hints.h>
#include <odp/api/cpu.h>
#include <odp/api/thrmask.h>
-#include <odp/api/atomic.h>
#include <odp_config_internal.h>
#include <odp_align_internal.h>
#include <odp_schedule_internal.h>
#include <odp_schedule_ordered_internal.h>
#include <odp/api/sync.h>
+#include <odp_ring_internal.h>
/* Number of priority levels */
#define NUM_PRIO 8
@@ -82,9 +82,6 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
/* Priority queue empty, not a valid queue index. */
#define PRIO_QUEUE_EMPTY ((uint32_t)-1)
-/* Ring empty, not a valid index. */
-#define RING_EMPTY ((uint32_t)-1)
-
/* For best performance, the number of queues should be a power of two. */
ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES),
"Number_of_queues_is_not_power_of_two");
@@ -111,28 +108,10 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >=
QUEUES_PER_PRIO,
/* Start of named groups in group mask arrays */
#define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1)
-/* Scheduler ring
- *
- * Ring stores head and tail counters. Ring indexes are formed from these
- * counters with a mask (mask = ring_size - 1), which requires that ring size
- * must be a power of two. */
-typedef struct {
- /* Writer head and tail */
- odp_atomic_u32_t w_head;
- odp_atomic_u32_t w_tail;
- uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))];
-
- /* Reader head and tail */
- odp_atomic_u32_t r_head;
- odp_atomic_u32_t r_tail;
-
- uint32_t data[0];
-} sched_ring_t ODP_ALIGNED_CACHE;
-
/* Priority queue */
typedef struct {
/* Ring header */
- sched_ring_t ring;
+ ring_t ring;
/* Ring data: queue indexes */
uint32_t queue_index[PRIO_QUEUE_RING_SIZE];
@@ -142,7 +121,7 @@ typedef struct {
/* Packet IO queue */
typedef struct {
/* Ring header */
- sched_ring_t ring;
+ ring_t ring;
/* Ring data: pktio poll command indexes */
uint32_t cmd_index[PKTIO_RING_SIZE];
@@ -204,71 +183,6 @@ __thread sched_local_t sched_local;
/* Function prototypes */
static inline void schedule_release_context(void);
-static void ring_init(sched_ring_t *ring)
-{
- odp_atomic_init_u32(&ring->w_head, 0);
- odp_atomic_init_u32(&ring->w_tail, 0);
- odp_atomic_init_u32(&ring->r_head, 0);
- odp_atomic_init_u32(&ring->r_tail, 0);
-}
-
-/* Dequeue data from the ring head */
-static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask)
-{
- uint32_t head, tail, new_head;
- uint32_t data;
-
- head = odp_atomic_load_u32(&ring->r_head);
-
- /* Move reader head. This thread owns data at the new head. */
- do {
- tail = odp_atomic_load_u32(&ring->w_tail);
-
- if (head == tail)
- return RING_EMPTY;
-
- new_head = head + 1;
-
- } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
- new_head) == 0));
-
- /* Read queue index */
- data = ring->data[new_head & mask];
-
- /* Wait until other readers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
- odp_cpu_pause();
-
- /* Now update the reader tail */
- odp_atomic_store_rel_u32(&ring->r_tail, new_head);
-
- return data;
-}
-
-/* Enqueue data into the ring tail */
-static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data)
-{
- uint32_t old_head, new_head;
-
- /* Reserve a slot in the ring for writing */
- old_head = odp_atomic_fetch_inc_u32(&ring->w_head);
- new_head = old_head + 1;
-
- /* Ring is full. Wait for the last reader to finish. */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
- odp_cpu_pause();
-
- /* Write data */
- ring->data[new_head & mask] = data;
-
- /* Wait until other writers have updated the tail */
- while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
- odp_cpu_pause();
-
- /* Now update the writer tail */
- odp_atomic_store_rel_u32(&ring->w_tail, new_head);
-}
-
static void sched_local_init(void)
{
memset(&sched_local, 0, sizeof(sched_local_t));
@@ -346,7 +260,7 @@ static int schedule_term_global(void)
for (i = 0; i < NUM_PRIO; i++) {
for (j = 0; j < QUEUES_PER_PRIO; j++) {
- sched_ring_t *ring = &sched->prio_q[i][j].ring;
+ ring_t *ring = &sched->prio_q[i][j].ring;
uint32_t qi;
while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
@@ -540,7 +454,7 @@ static void schedule_release_atomic(void)
if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) {
int prio = sched->queue[qi].prio;
int queue_per_prio = sched->queue[qi].queue_per_prio;
- sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+ ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
/* Release current atomic queue */
ring_enq(ring, PRIO_QUEUE_MASK, qi);
@@ -635,7 +549,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t
out_ev[],
int grp;
int ordered;
odp_queue_t handle;
- sched_ring_t *ring;
+ ring_t *ring;
if (id >= QUEUES_PER_PRIO)
id = 0;
@@ -746,7 +660,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t
out_ev[],
for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) &
PKTIO_CMD_QUEUE_MASK)) {
- sched_ring_t *ring;
+ ring_t *ring;
uint32_t cmd_index;
pktio_cmd_t *cmd;
@@ -1041,7 +955,7 @@ static int schedule_sched_queue(uint32_t queue_index)
{
int prio = sched->queue[queue_index].prio;
int queue_per_prio = sched->queue[queue_index].queue_per_prio;
- sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
+ ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring;
sched_local.ignore_ordered_context = 1;