Implemented scheduler priority queues with a ring based data structure instead of odp_queue_t queues. This enables performance optimization since a ring of indexes is more efficient to access than pointers in a linked list. Ring operations maybe further optimized with another lockfree algorithm.
Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> --- .../linux-generic/include/odp_schedule_internal.h | 3 +- platform/linux-generic/odp_schedule.c | 259 +++++++++++++-------- 2 files changed, 168 insertions(+), 94 deletions(-) diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index 4a04b15..1b6ae93 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -22,8 +22,7 @@ typedef struct { uint16_t round; uint16_t prefer_offset; uint16_t pktin_polls; - odp_queue_t pri_queue; - odp_event_t cmd_ev; + uint32_t queue_index; odp_queue_t queue; odp_event_t ev_stash[MAX_DEQ]; void *origin_qe; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index e08de54..83c81e4 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -20,9 +20,13 @@ #include <odp/api/hints.h> #include <odp/api/cpu.h> #include <odp/api/thrmask.h> +#include <odp/api/atomic.h> #include <odp_config_internal.h> +#include <odp_align_internal.h> #include <odp_schedule_internal.h> #include <odp_schedule_ordered_internal.h> +#include <odp/api/sync.h> + #ifdef _ODP_PKTIO_IPC #include <odp_pool_internal.h> #endif @@ -52,6 +56,29 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) && /* Maximum number of packet IO interfaces */ #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES +/* Priority queue ring size. In worst case, all event queues are scheduled + * queues and have the same priority. The ring size must be larger than or + * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all + * queues in the worst case. */ +#define PRIO_QUEUE_RING_SIZE (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO) + +/* Mask for wrapping around priority queue index */ +#define PRIO_QUEUE_MASK (PRIO_QUEUE_RING_SIZE - 1) + +/* Priority queue empty, not a valid queue index. */ +#define PRIO_QUEUE_EMPTY ((uint32_t)-1) + +/* Ring empty, not a valid index. */ +#define RING_EMPTY ((uint32_t)-1) + +/* For best performance, the number of queues should be a power of two. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES), + "Number_of_queues_is_not_power_of_two"); + +/* Ring size must be power of two, so that MAX_QUEUE_IDX_MASK can be used. */ +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE), + "Ring_size_is_not_power_of_two"); + /* Mask of queues per priority */ typedef uint8_t pri_mask_t; @@ -61,11 +88,40 @@ ODP_STATIC_ASSERT((8 * sizeof(pri_mask_t)) >= QUEUES_PER_PRIO, /* Start of named groups in group mask arrays */ #define SCHED_GROUP_NAMED (ODP_SCHED_GROUP_CONTROL + 1) +/* Scheduler ring + * + * Ring stores head and tail counters. Ring indexes are formed from these + * counters with a mask (mask = ring_size - 1), which requires that ring size + * must be a power of two. */ +typedef struct { + /* Writer head and tail */ + odp_atomic_u32_t w_head; + odp_atomic_u32_t w_tail; + uint8_t pad[ODP_CACHE_LINE_SIZE - (2 * sizeof(odp_atomic_u32_t))]; + + /* Reader head and tail */ + odp_atomic_u32_t r_head; + odp_atomic_u32_t r_tail; + + uint32_t data[0]; +} sched_ring_t ODP_ALIGNED_CACHE; + +/* Priority queue */ +typedef struct { + /* Ring header */ + sched_ring_t ring; + + /* Ring data: queue indexes */ + uint32_t queue_index[PRIO_QUEUE_RING_SIZE]; + +} prio_queue_t ODP_ALIGNED_CACHE; + typedef struct { - odp_queue_t pri_queue[NUM_PRIO][QUEUES_PER_PRIO]; pri_mask_t pri_mask[NUM_PRIO]; odp_spinlock_t mask_lock; + prio_queue_t prio_q[NUM_PRIO][QUEUES_PER_PRIO]; + odp_spinlock_t poll_cmd_lock; struct { odp_queue_t queue; @@ -84,9 +140,8 @@ typedef struct { } sched_grp[NUM_SCHED_GRPS]; struct { - odp_event_t cmd_ev; - odp_queue_t pri_queue; int prio; + int queue_per_prio; } queue[ODP_CONFIG_QUEUES]; struct { @@ -121,14 +176,78 @@ static sched_global_t *sched; /* Thread local scheduler context */ __thread sched_local_t sched_local; +static void ring_init(sched_ring_t *ring) +{ + odp_atomic_init_u32(&ring->w_head, 0); + odp_atomic_init_u32(&ring->w_tail, 0); + odp_atomic_init_u32(&ring->r_head, 0); + odp_atomic_init_u32(&ring->r_tail, 0); +} + +/* Dequeue data from the ring head */ +static inline uint32_t ring_deq(sched_ring_t *ring, uint32_t mask) +{ + uint32_t head, tail, new_head; + uint32_t data; + + head = odp_atomic_load_u32(&ring->r_head); + + /* Move reader head. This thread owns data at the new head. */ + do { + tail = odp_atomic_load_u32(&ring->w_tail); + + if (head == tail) + return RING_EMPTY; + + new_head = head + 1; + + } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, + new_head) == 0)); + + /* Read queue index */ + data = ring->data[new_head & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Now update the reader tail */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return data; +} + +/* Enqueue data into the ring tail */ +static inline void ring_enq(sched_ring_t *ring, uint32_t mask, uint32_t data) +{ + uint32_t old_head, new_head; + + /* Reserve a slot in the ring for writing */ + old_head = odp_atomic_fetch_inc_u32(&ring->w_head); + new_head = old_head + 1; + + /* Ring is full. Wait for the last reader to finish. */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) + odp_cpu_pause(); + + /* Write data */ + ring->data[new_head & mask] = data; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Now update the writer tail */ + odp_atomic_store_rel_u32(&ring->w_tail, new_head); +} + static void sched_local_init(void) { memset(&sched_local, 0, sizeof(sched_local_t)); sched_local.thr = odp_thread_id(); - sched_local.pri_queue = ODP_QUEUE_INVALID; sched_local.queue = ODP_QUEUE_INVALID; - sched_local.cmd_ev = ODP_EVENT_INVALID; + sched_local.queue_index = PRIO_QUEUE_EMPTY; } static int schedule_init_global(void) @@ -179,25 +298,14 @@ static int schedule_init_global(void) odp_spinlock_init(&sched->mask_lock); for (i = 0; i < NUM_PRIO; i++) { - odp_queue_t queue; - char name[] = "odp_priXX_YY"; - - name[7] = '0' + i / 10; - name[8] = '0' + i - 10*(i / 10); - for (j = 0; j < QUEUES_PER_PRIO; j++) { - name[10] = '0' + j / 10; - name[11] = '0' + j - 10*(j / 10); - - queue = odp_queue_create(name, NULL); + int k; - if (queue == ODP_QUEUE_INVALID) { - ODP_ERR("Sched init: Queue create failed.\n"); - return -1; - } + ring_init(&sched->prio_q[i][j].ring); - sched->pri_queue[i][j] = queue; - sched->pri_mask[i] = 0; + for (k = 0; k < PRIO_QUEUE_RING_SIZE; k++) + sched->prio_q[i][j].queue_index[k] = + PRIO_QUEUE_EMPTY; } } @@ -228,11 +336,6 @@ static int schedule_init_global(void) odp_thrmask_setall(&sched->mask_all); - for (i = 0; i < ODP_CONFIG_QUEUES; i++) { - sched->queue[i].cmd_ev = ODP_EVENT_INVALID; - sched->queue[i].pri_queue = ODP_QUEUE_INVALID; - } - ODP_DBG("done\n"); return 0; @@ -247,21 +350,14 @@ static int schedule_term_global(void) for (i = 0; i < NUM_PRIO; i++) { for (j = 0; j < QUEUES_PER_PRIO; j++) { - odp_queue_t pri_q; - - pri_q = sched->pri_queue[i][j]; + sched_ring_t *ring = &sched->prio_q[i][j].ring; + uint32_t qi; - while ((ev = odp_queue_deq(pri_q)) != - ODP_EVENT_INVALID) { - odp_buffer_t buf; - sched_cmd_t *sched_cmd; - uint32_t qi; + while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) != + RING_EMPTY) { odp_event_t events[1]; int num; - buf = odp_buffer_from_event(ev); - sched_cmd = odp_buffer_addr(buf); - qi = sched_cmd->queue_index; num = sched_cb_queue_deq_multi(qi, events, 1); if (num < 0) @@ -270,11 +366,6 @@ static int schedule_term_global(void) if (num > 0) ODP_ERR("Queue not empty\n"); } - - if (odp_queue_destroy(pri_q)) { - ODP_ERR("Pri queue destroy fail.\n"); - rc = -1; - } } } @@ -331,19 +422,17 @@ static int schedule_term_local(void) return 0; } -static inline int pri_id_queue(uint32_t queue_index) +static inline int queue_per_prio(uint32_t queue_index) { return ((QUEUES_PER_PRIO - 1) & queue_index); } -static odp_queue_t pri_set(int id, int prio) +static void pri_set(int id, int prio) { odp_spinlock_lock(&sched->mask_lock); sched->pri_mask[prio] |= 1 << id; sched->pri_count[prio][id]++; odp_spinlock_unlock(&sched->mask_lock); - - return sched->pri_queue[prio][id]; } static void pri_clr(int id, int prio) @@ -359,38 +448,27 @@ static void pri_clr(int id, int prio) odp_spinlock_unlock(&sched->mask_lock); } -static odp_queue_t pri_set_queue(uint32_t queue_index, int prio) +static void pri_set_queue(uint32_t queue_index, int prio) { - int id = pri_id_queue(queue_index); + int id = queue_per_prio(queue_index); return pri_set(id, prio); } static void pri_clr_queue(uint32_t queue_index, int prio) { - int id = pri_id_queue(queue_index); + int id = queue_per_prio(queue_index); pri_clr(id, prio); } static int schedule_init_queue(uint32_t queue_index, const odp_schedule_param_t *sched_param) { - odp_buffer_t buf; - sched_cmd_t *sched_cmd; int prio = sched_param->prio; - buf = odp_buffer_alloc(sched->pool); - - if (buf == ODP_BUFFER_INVALID) - return -1; - - sched_cmd = odp_buffer_addr(buf); - sched_cmd->cmd = SCHED_CMD_DEQUEUE; - sched_cmd->queue_index = queue_index; - - sched->queue[queue_index].cmd_ev = odp_buffer_to_event(buf); - sched->queue[queue_index].pri_queue = pri_set_queue(queue_index, prio); + pri_set_queue(queue_index, prio); sched->queue[queue_index].prio = prio; + sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index); return 0; } @@ -399,13 +477,9 @@ static void schedule_destroy_queue(uint32_t queue_index) { int prio = sched->queue[queue_index].prio; - odp_event_free(sched->queue[queue_index].cmd_ev); - pri_clr_queue(queue_index, prio); - - sched->queue[queue_index].cmd_ev = ODP_EVENT_INVALID; - sched->queue[queue_index].pri_queue = ODP_QUEUE_INVALID; - sched->queue[queue_index].prio = 0; + sched->queue[queue_index].prio = 0; + sched->queue[queue_index].queue_per_prio = 0; } static int poll_cmd_queue_idx(int pktio_index, int in_queue_idx) @@ -469,12 +543,16 @@ static int schedule_pktio_stop(sched_cmd_t *sched_cmd) static void schedule_release_atomic(void) { - if (sched_local.pri_queue != ODP_QUEUE_INVALID && - sched_local.num == 0) { + uint32_t qi = sched_local.queue_index; + + if (qi != PRIO_QUEUE_EMPTY && sched_local.num == 0) { + int prio = sched->queue[qi].prio; + int queue_per_prio = sched->queue[qi].queue_per_prio; + sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; + /* Release current atomic queue */ - if (odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev)) - ODP_ABORT("odp_schedule_release_atomic failed\n"); - sched_local.pri_queue = ODP_QUEUE_INVALID; + ring_enq(ring, PRIO_QUEUE_MASK, qi); + sched_local.queue_index = PRIO_QUEUE_EMPTY; } } @@ -553,11 +631,11 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], id = (sched_local.thr + offset) & (QUEUES_PER_PRIO - 1); for (j = 0; j < QUEUES_PER_PRIO;) { - odp_queue_t pri_q; int num; int grp; int ordered; odp_queue_t handle; + sched_ring_t *ring; if (id >= QUEUES_PER_PRIO) id = 0; @@ -570,20 +648,18 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], continue; } - pri_q = sched->pri_queue[i][id]; - ev = odp_queue_deq(pri_q); + /* Get queue index from the priority queue */ + ring = &sched->prio_q[i][id].ring; + qi = ring_deq(ring, PRIO_QUEUE_MASK); /* Priority queue empty */ - if (ev == ODP_EVENT_INVALID) { + if (qi == RING_EMPTY) { j++; id++; continue; } - buf = odp_buffer_from_event(ev); - sched_cmd = odp_buffer_addr(buf); - qi = sched_cmd->queue_index; - grp = sched_cb_queue_grp(qi); + grp = sched_cb_queue_grp(qi); if (grp > ODP_SCHED_GROUP_ALL && !odp_thrmask_isset(&sched->sched_grp[grp].mask, @@ -591,8 +667,7 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], /* This thread is not eligible for work from * this queue, so continue scheduling it. */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi); j++; id++; @@ -632,19 +707,16 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[], if (ordered) { /* Continue scheduling ordered queues */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi); /* Cache order info about this event */ cache_order_info(qi); } else if (sched_cb_queue_is_atomic(qi)) { /* Hold queue during atomic access */ - sched_local.pri_queue = pri_q; - sched_local.cmd_ev = ev; + sched_local.queue_index = qi; } else { /* Continue scheduling the queue */ - if (odp_queue_enq(pri_q, ev)) - ODP_ABORT("schedule failed\n"); + ring_enq(ring, PRIO_QUEUE_MASK, qi); } /* Output the source queue handle */ @@ -969,11 +1041,14 @@ static void schedule_prefetch(int num ODP_UNUSED) static int schedule_sched_queue(uint32_t queue_index) { - odp_queue_t pri_queue = sched->queue[queue_index].pri_queue; - odp_event_t cmd_ev = sched->queue[queue_index].cmd_ev; + int prio = sched->queue[queue_index].prio; + int queue_per_prio = sched->queue[queue_index].queue_per_prio; + sched_ring_t *ring = &sched->prio_q[prio][queue_per_prio].ring; sched_local.ignore_ordered_context = 1; - return odp_queue_enq(pri_queue, cmd_ev); + + ring_enq(ring, PRIO_QUEUE_MASK, queue_index); + return 0; } static int schedule_num_grps(void) -- 2.8.1