Moved scheduler ring code into a new header file, so that it can be used also in other parts of the implementation.
Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> --- platform/linux-generic/Makefile.am | 1 + platform/linux-generic/include/odp_ring_internal.h | 111 +++++++++++++++++++++ platform/linux-generic/odp_schedule.c | 102 ++----------------- 3 files changed, 120 insertions(+), 94 deletions(-) create mode 100644 platform/linux-generic/include/odp_ring_internal.h diff --git a/platform/linux-generic/Makefile.am b/platform/linux-generic/Makefile.am index 19dc0ba..b60eacb 100644 --- a/platform/linux-generic/Makefile.am +++ b/platform/linux-generic/Makefile.am @@ -151,6 +151,7 @@ noinst_HEADERS = \ ${srcdir}/include/odp_pool_internal.h \ ${srcdir}/include/odp_posix_extensions.h \ ${srcdir}/include/odp_queue_internal.h \ + ${srcdir}/include/odp_ring_internal.h \ ${srcdir}/include/odp_schedule_if.h \ ${srcdir}/include/odp_schedule_internal.h \ ${srcdir}/include/odp_schedule_ordered_internal.h \ diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h new file mode 100644 index 0000000..6a6291a --- /dev/null +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -0,0 +1,111 @@ +/* Copyright (c) 2016, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_INTERNAL_H_ +#define ODP_RING_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/atomic.h> +#include <odp/api/hints.h> +#include <odp_align_internal.h> + +/* Ring empty, not a valid data value. */ +#define RING_EMPTY ((uint32_t)-1) + +/* Ring of uint32_t data + * + * Ring stores head and tail counters. Ring indexes are formed from these + * counters with a mask (mask = ring_size - 1), which requires that ring size + * must be a power of two. Also ring size must be larger than the maximum + * number of data items that will be stored on it (there's no check against + * overwriting). */ +typedef struct { + /* Writer head and tail */ + odp_atomic_u32_t w_head; + odp_atomic_u32_t w_tail; + uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))]; + + /* Reader head and tail */ + odp_atomic_u32_t r_head; + odp_atomic_u32_t r_tail; + + uint32_t data[0]; +} ring_t ODP_ALIGNED_CACHE; + +/* Initialize ring */ +static inline void ring_init(ring_t *ring) +{ + odp_atomic_init_u32(&ring->w_head, 0); + odp_atomic_init_u32(&ring->w_tail, 0); + odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); +} + +/* Dequeue data from the ring head */ +static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) +{ + uint32_t head, tail, new_head; + uint32_t data; + + head = odp_atomic_load_u32(&ring->r_head); + + /* Move reader head. This thread owns data at the new head. */ + do { + tail = odp_atomic_load_u32(&ring->w_tail); + + if (head == tail) + return RING_EMPTY; + + new_head = head + 1; + + } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, + new_head) == 0)); + + /* Read queue index */ + data = ring->data[new_head & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Now update the reader tail */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return data; +} + +/* Enqueue data into the ring tail */ +static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) +{ + uint32_t old_head, new_head; + + /* Reserve a slot in the ring for writing */ + old_head = odp_atomic_fetch_inc_u32(&ring->w_head); + new_head = old_head + 1; + + /* Ring is full. Wait for the last reader to finish. */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) + odp_cpu_pause(); + + /* Write data */ + ring->data[new_head & mask] = data; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Now update the writer tail */ + odp_atomic_store_rel_u32(&ring->w_tail, new_head); +} + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 86b1cec..dfc9555 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -17,12 +17,12 @@ #include <odp/api/hints.h> #include <odp/api/cpu.h> #include <odp/api/thrmask.h> -#include <odp/api/atomic.h> #include <odp_config_internal.h> #include <odp_align_internal.h> #include <odp_schedule_internal.h> #include <odp_schedule_ordered_internal.h> #include <odp/api/sync.h> +#include <odp_ring_internal.h> /* Number of priority levels */ #define NUM_PRIO 8 @@ -82,9 +82,6 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Priority queue empty, not a valid queue index. */ #define PRIO_QUEUE_EMPTY ((uint32_t)-1) -/* Ring empty, not a valid index. */ -#define RING_EMPTY ((uint32_t)-1) - /* For best performance, the number of queues should be a power of two. */ ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES), "Number_of_queues_is_not_power_of_two"); @@ -111,28 +108,10 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Start of named groups in group mask arrays */ #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1) -/* Scheduler ring - * - * Ring stores head and tail counters. Ring indexes are formed from these - * counters with a mask (mask = ring_size - 1), which requires that ring size - * must be a power of two. */ -typedef struct { - /* Writer head and tail */ - odp_atomic_u32_t w_head; - odp_atomic_u32_t w_tail; - uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))]; - - /* Reader head and tail */ - odp_atomic_u32_t r_head; - odp_atomic_u32_t r_tail; - - uint32_t data[0]; -} sched_ring_t ODP_ALIGNED_CACHE; - /* Priority queue */ typedef struct { /* Ring header */ - sched_ring_t ring; + ring_t ring; /* Ring data: queue indexes */ uint32_t queue_index[PRIO_QUEUE_RING_SIZE]; @@ -142,7 +121,7 @@ typedef struct { /* Packet IO queue */ typedef struct { /* Ring header */ - sched_ring_t ring; + ring_t ring; /* Ring data: pktio poll command indexes */ uint32_t cmd_index[PKTIO_RING_SIZE]; @@ -205,71 +184,6 @@ __thread sched_local_t sched_local; /* Function prototypes */ static inline void schedule_release_context(void); -static void ring_init(sched_ring_t *ring) -{ - odp_atomic_init_u32(&ring->w_head, 0); - odp_atomic_init_u32(&ring->w_tail, 0); - odp_atomic_init_u32(&ring->r_head, 0); - odp_atomic_init_u32(&ring->r_tail, 0); -} - -/* Dequeue data from the ring head */ -static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask) -{ - uint32_t head, tail, new_head; - uint32_t data; - - head = odp_atomic_load_u32(&ring->r_head); - - /* Move reader head. This thread owns data at the new head. */ - do { - tail = odp_atomic_load_u32(&ring->w_tail); - - if (head == tail) - return RING_EMPTY; - - new_head = head + 1; - - } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, - new_head) == 0)); - - /* Read queue index */ - data = ring->data[new_head & mask]; - - /* Wait until other readers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) - odp_cpu_pause(); - - /* Now update the reader tail */ - odp_atomic_store_rel_u32(&ring->r_tail, new_head); - - return data; -} - -/* Enqueue data into the ring tail */ -static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data) -{ - uint32_t old_head, new_head; - - /* Reserve a slot in the ring for writing */ - old_head = odp_atomic_fetch_inc_u32(&ring->w_head); - new_head = old_head + 1; - - /* Ring is full. Wait for the last reader to finish. */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) - odp_cpu_pause(); - - /* Write data */ - ring->data[new_head & mask] = data; - - /* Wait until other writers have updated the tail */ - while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) - odp_cpu_pause(); - - /* Now update the writer tail */ - odp_atomic_store_rel_u32(&ring->w_tail, new_head); -} - static void sched_local_init(void) { memset(&sched_local, 0, sizeof(sched_local_t)); @@ -347,7 +261,7 @@ static int schedule_term_global(void) for (i = 0; i < NUM_PRIO; i++) { for (j = 0; j < QUEUES_PER_PRIO; j++) { - sched_ring_t *ring = &sched->prio_q[i][j].ring; + ring_t *ring = &sched->prio_q[i][j].ring; uint32_t qi; while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != @@ -541,7 +455,7 @@ static void schedule_release_atomic(void) if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) { int prio = sched->queue[qi].prio; int queue_per_prio = sched->queue[qi].queue_per_prio; - sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; /* Release current atomic queue */ ring_enq(ring, PRIO_QUEUE_MASK, qi); @@ -636,7 +550,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], int grp; int ordered; odp_queue_t handle; - sched_ring_t *ring; + ring_t *ring; if (id >= QUEUES_PER_PRIO) id = 0; @@ -747,7 +661,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) & PKTIO_CMD_QUEUE_MASK)) { - sched_ring_t *ring; + ring_t *ring; uint32_t cmd_index; pktio_cmd_t *cmd; @@ -1051,7 +965,7 @@ static int schedule_sched_queue(uint32_t queue_index) { int prio = sched->queue[queue_index].prio; int queue_per_prio = sched->queue[queue_index].queue_per_prio; - sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; sched_local.ignore_ordered_context = 1; -- 2.8.1