Reviewed-and-tested-by: Yi He <yi...@linaro.org> On 8 December 2016 at 19:04, Petri Savolainen <petri.savolai...@nokia.com> wrote:
> Improve scalability by replacing lock protected linked list > with a ring. Schedule group supported was updated also, since > ring does not support peek of the head item. > > Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> > --- > platform/linux-generic/odp_schedule_sp.c | 271 > +++++++++++++++++++++++-------- > 1 file changed, 199 insertions(+), 72 deletions(-) > > diff --git a/platform/linux-generic/odp_schedule_sp.c > b/platform/linux-generic/odp_schedule_sp.c > index 76d1357..5150d28 100644 > --- a/platform/linux-generic/odp_schedule_sp.c > +++ b/platform/linux-generic/odp_schedule_sp.c > @@ -13,9 +13,12 @@ > #include <odp_debug_internal.h> > #include <odp_align_internal.h> > #include <odp_config_internal.h> > +#include <odp_ring_internal.h> > > +#define NUM_THREAD ODP_THREAD_COUNT_MAX > #define NUM_QUEUE ODP_CONFIG_QUEUES > #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES > +#define NUM_ORDERED_LOCKS 1 > #define NUM_PRIO 3 > #define NUM_STATIC_GROUP 3 > #define NUM_GROUP (NUM_STATIC_GROUP + 9) > @@ -28,9 +31,17 @@ > #define GROUP_ALL ODP_SCHED_GROUP_ALL > #define GROUP_WORKER ODP_SCHED_GROUP_WORKER > #define GROUP_CONTROL ODP_SCHED_GROUP_CONTROL > -#define MAX_ORDERED_LOCKS_PER_QUEUE 1 > +#define GROUP_PKTIN GROUP_ALL > > -ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= > CONFIG_QUEUE_MAX_ORD_LOCKS, > +/* Maximum number of commands: one priority/group for all queues and > pktios */ > +#define RING_SIZE (ODP_ROUNDUP_POWER_2(NUM_QUEUE + NUM_PKTIO)) > +#define RING_MASK (RING_SIZE - 1) > + > +/* Ring size must be power of two */ > +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(RING_SIZE), > + "Ring_size_is_not_power_of_two"); > + > +ODP_STATIC_ASSERT(NUM_ORDERED_LOCKS <= CONFIG_QUEUE_MAX_ORD_LOCKS, > "Too_many_ordered_locks"); > > struct sched_cmd_t; > @@ -38,6 +49,7 @@ struct sched_cmd_t; > struct sched_cmd_s { > struct sched_cmd_t *next; > uint32_t index; > + uint32_t ring_idx; > int type; > int prio; > int group; > @@ -52,38 +64,49 @@ typedef struct sched_cmd_t { > sizeof(struct sched_cmd_s)]; > } sched_cmd_t ODP_ALIGNED_CACHE; > > -struct prio_queue_s { > - odp_ticketlock_t lock; > - sched_cmd_t *head; > - sched_cmd_t *tail; > -}; > +typedef struct { > + /* Ring header */ > + ring_t ring; > + > + /* Ring data: queue indexes */ > + uint32_t ring_idx[RING_SIZE]; > > -typedef struct prio_queue_t { > - struct prio_queue_s s; > - uint8_t pad[ROUNDUP_CACHE(sizeof(struct > prio_queue_s)) - > - sizeof(struct prio_queue_s)]; > } prio_queue_t ODP_ALIGNED_CACHE; > > -struct sched_group_s { > - odp_ticketlock_t lock; > +typedef struct thr_group_t { > + /* A generation counter for fast comparison if groups have changed > */ > + odp_atomic_u32_t gen_cnt; > > - struct { > - char name[ODP_SCHED_GROUP_NAME_LEN + 1]; > - odp_thrmask_t mask; > - int allocated; > - } group[NUM_GROUP]; > -}; > + /* Number of groups the thread belongs to */ > + int num_group; > + > + /* The groups the thread belongs to */ > + int group[NUM_GROUP]; > + > +} thr_group_t; > > typedef struct sched_group_t { > - struct sched_group_s s; > - uint8_t pad[ROUNDUP_CACHE(sizeof(struct > sched_group_s)) - > - sizeof(struct sched_group_s)]; > + struct { > + odp_ticketlock_t lock; > + > + /* All groups */ > + struct { > + char name[ODP_SCHED_GROUP_NAME_LEN + 1]; > + odp_thrmask_t mask; > + int allocated; > + } group[NUM_GROUP]; > + > + /* Per thread group information */ > + thr_group_t thr[NUM_THREAD]; > + > + } s; > + > } sched_group_t ODP_ALIGNED_CACHE; > > typedef struct { > sched_cmd_t queue_cmd[NUM_QUEUE]; > sched_cmd_t pktio_cmd[NUM_PKTIO]; > - prio_queue_t prio_queue[NUM_PRIO]; > + prio_queue_t prio_queue[NUM_GROUP][NUM_PRIO]; > sched_group_t sched_group; > } sched_global_t; > > @@ -91,14 +114,37 @@ typedef struct { > sched_cmd_t *cmd; > int pause; > int thr_id; > + uint32_t gen_cnt; > + int num_group; > + int group[NUM_GROUP]; > } sched_local_t; > > static sched_global_t sched_global; > static __thread sched_local_t sched_local; > > +static inline uint32_t index_to_ring_idx(int pktio, uint32_t index) > +{ > + if (pktio) > + return (0x80000000 | index); > + > + return index; > +} > + > +static inline uint32_t index_from_ring_idx(uint32_t *index, uint32_t > ring_idx) > +{ > + uint32_t pktio = ring_idx & 0x80000000; > + > + if (pktio) > + *index = ring_idx & (~0x80000000); > + else > + *index = ring_idx; > + > + return pktio; > +} > + > static int init_global(void) > { > - int i; > + int i, j; > sched_group_t *sched_group = &sched_global.sched_group; > > ODP_DBG("Using SP scheduler\n"); > @@ -106,21 +152,28 @@ static int init_global(void) > memset(&sched_global, 0, sizeof(sched_global_t)); > > for (i = 0; i < NUM_QUEUE; i++) { > - sched_global.queue_cmd[i].s.type = CMD_QUEUE; > - sched_global.queue_cmd[i].s.index = i; > + sched_global.queue_cmd[i].s.type = CMD_QUEUE; > + sched_global.queue_cmd[i].s.index = i; > + sched_global.queue_cmd[i].s.ring_idx = > index_to_ring_idx(0, i); > } > > for (i = 0; i < NUM_PKTIO; i++) { > - sched_global.pktio_cmd[i].s.type = CMD_PKTIO; > - sched_global.pktio_cmd[i].s.index = i; > - sched_global.pktio_cmd[i].s.prio = PKTIN_PRIO; > + sched_global.pktio_cmd[i].s.type = CMD_PKTIO; > + sched_global.pktio_cmd[i].s.index = i; > + sched_global.pktio_cmd[i].s.ring_idx = > index_to_ring_idx(1, i); > + sched_global.pktio_cmd[i].s.prio = PKTIN_PRIO; > + sched_global.pktio_cmd[i].s.group = GROUP_PKTIN; > } > > - for (i = 0; i < NUM_PRIO; i++) > - odp_ticketlock_init(&sched_global.prio_queue[i].s.lock); > + for (i = 0; i < NUM_GROUP; i++) > + for (j = 0; j < NUM_PRIO; j++) > + ring_init(&sched_global.prio_queue[i][j].ring); > > odp_ticketlock_init(&sched_group->s.lock); > > + for (i = 0; i < NUM_THREAD; i++) > + odp_atomic_init_u32(&sched_group->s.thr[i].gen_cnt, 0); > + > strncpy(sched_group->s.group[GROUP_ALL].name, "__group_all", > ODP_SCHED_GROUP_NAME_LEN); > odp_thrmask_zero(&sched_group->s.group[GROUP_ALL].mask); > @@ -168,7 +221,48 @@ static int term_local(void) > > static unsigned max_ordered_locks(void) > { > - return MAX_ORDERED_LOCKS_PER_QUEUE; > + return NUM_ORDERED_LOCKS; > +} > + > +static void add_group(sched_group_t *sched_group, int thr, int group) > +{ > + int num; > + uint32_t gen_cnt; > + thr_group_t *thr_group = &sched_group->s.thr[thr]; > + > + num = thr_group->num_group; > + thr_group->group[num] = group; > + thr_group->num_group = num + 1; > + gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt); > + odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1); > +} > + > +static void remove_group(sched_group_t *sched_group, int thr, int group) > +{ > + int i, num; > + int found = 0; > + thr_group_t *thr_group = &sched_group->s.thr[thr]; > + > + num = thr_group->num_group; > + > + for (i = 0; i < num; i++) { > + if (thr_group->group[i] == group) { > + found = 1; > + > + for (; i < num - 1; i++) > + thr_group->group[i] = thr_group->group[i + > 1]; > + > + break; > + } > + } > + > + if (found) { > + uint32_t gen_cnt; > + > + thr_group->num_group = num - 1; > + gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt); > + odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1); > + } > } > > static int thr_add(odp_schedule_group_t group, int thr) > @@ -178,6 +272,9 @@ static int thr_add(odp_schedule_group_t group, int thr) > if (group < 0 || group >= NUM_GROUP) > return -1; > > + if (thr < 0 || thr >= NUM_THREAD) > + return -1; > + > odp_ticketlock_lock(&sched_group->s.lock); > > if (!sched_group->s.group[group].allocated) { > @@ -186,6 +283,7 @@ static int thr_add(odp_schedule_group_t group, int thr) > } > > odp_thrmask_set(&sched_group->s.group[group].mask, thr); > + add_group(sched_group, thr, group); > > odp_ticketlock_unlock(&sched_group->s.lock); > > @@ -208,6 +306,8 @@ static int thr_rem(odp_schedule_group_t group, int thr) > > odp_thrmask_clr(&sched_group->s.group[group].mask, thr); > > + remove_group(sched_group, thr, group); > + > odp_ticketlock_unlock(&sched_group->s.lock); > > return 0; > @@ -250,51 +350,34 @@ static void destroy_queue(uint32_t qi) > static inline void add_tail(sched_cmd_t *cmd) > { > prio_queue_t *prio_queue; > + int group = cmd->s.group; > + int prio = cmd->s.prio; > + uint32_t idx = cmd->s.ring_idx; > > - prio_queue = &sched_global.prio_queue[cmd->s.prio]; > - cmd->s.next = NULL; > - > - odp_ticketlock_lock(&prio_queue->s.lock); > - > - if (prio_queue->s.head == NULL) > - prio_queue->s.head = cmd; > - else > - prio_queue->s.tail->s.next = cmd; > - > - prio_queue->s.tail = cmd; > + prio_queue = &sched_global.prio_queue[group][prio]; > > - odp_ticketlock_unlock(&prio_queue->s.lock); > + ring_enq(&prio_queue->ring, RING_MASK, idx); > } > > -static inline sched_cmd_t *rem_head(int prio) > +static inline sched_cmd_t *rem_head(int group, int prio) > { > prio_queue_t *prio_queue; > - sched_cmd_t *cmd; > - > - prio_queue = &sched_global.prio_queue[prio]; > + uint32_t ring_idx, index; > + int pktio; > > - odp_ticketlock_lock(&prio_queue->s.lock); > + prio_queue = &sched_global.prio_queue[group][prio]; > > - if (prio_queue->s.head == NULL) { > - cmd = NULL; > - } else { > - sched_group_t *sched_group = &sched_global.sched_group; > + ring_idx = ring_deq(&prio_queue->ring, RING_MASK); > > - cmd = prio_queue->s.head; > + if (ring_idx == RING_EMPTY) > + return NULL; > > - /* Remove head cmd only if thread belongs to the > - * scheduler group. Otherwise continue to the next priority > - * queue. */ > - if (odp_thrmask_isset(&sched_group->s.group[cmd->s.group]. > mask, > - sched_local.thr_id)) > - prio_queue->s.head = cmd->s.next; > - else > - cmd = NULL; > - } > + pktio = index_from_ring_idx(&index, ring_idx); > > - odp_ticketlock_unlock(&prio_queue->s.lock); > + if (pktio) > + return &sched_global.pktio_cmd[index]; > > - return cmd; > + return &sched_global.queue_cmd[index]; > } > > static int sched_queue(uint32_t qi) > @@ -341,15 +424,43 @@ static void pktio_start(int pktio_index, int num, > int pktin_idx[]) > add_tail(cmd); > } > > -static inline sched_cmd_t *sched_cmd(int num_prio) > +static inline sched_cmd_t *sched_cmd(void) > { > - int prio; > + int prio, i; > + int thr = sched_local.thr_id; > + sched_group_t *sched_group = &sched_global.sched_group; > + thr_group_t *thr_group = &sched_group->s.thr[thr]; > + uint32_t gen_cnt; > + > + /* There's no matching store_rel since the value is updated while > + * holding a lock */ > + gen_cnt = odp_atomic_load_acq_u32(&thr_group->gen_cnt); > + > + /* Check if groups have changed and need to be read again */ > + if (odp_unlikely(gen_cnt != sched_local.gen_cnt)) { > + int num_grp; > + > + odp_ticketlock_lock(&sched_group->s.lock); > + > + num_grp = thr_group->num_group; > + gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt); > > - for (prio = 0; prio < num_prio; prio++) { > - sched_cmd_t *cmd = rem_head(prio); > + for (i = 0; i < num_grp; i++) > + sched_local.group[i] = thr_group->group[i]; > > - if (cmd) > - return cmd; > + odp_ticketlock_unlock(&sched_group->s.lock); > + > + sched_local.num_group = num_grp; > + sched_local.gen_cnt = gen_cnt; > + } > + > + for (i = 0; i < sched_local.num_group; i++) { > + for (prio = 0; prio < NUM_PRIO; prio++) { > + sched_cmd_t *cmd = rem_head(sched_local.group[i], > prio); > + > + if (cmd) > + return cmd; > + } > } > > return NULL; > @@ -382,7 +493,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t > wait, > uint32_t qi; > int num; > > - cmd = sched_cmd(NUM_PRIO); > + cmd = sched_cmd(); > > if (cmd && cmd->s.type == CMD_PKTIO) { > if (sched_cb_pktin_poll(cmd->s.index, > cmd->s.num_pktin, > @@ -565,11 +676,14 @@ static odp_schedule_group_t > schedule_group_lookup(const char *name) > static int schedule_group_join(odp_schedule_group_t group, > const odp_thrmask_t *thrmask) > { > + int thr; > sched_group_t *sched_group = &sched_global.sched_group; > > if (group < 0 || group >= NUM_GROUP) > return -1; > > + thr = odp_thrmask_first(thrmask); > + > odp_ticketlock_lock(&sched_group->s.lock); > > if (!sched_group->s.group[group].allocated) { > @@ -581,6 +695,11 @@ static int schedule_group_join(odp_schedule_group_t > group, > &sched_group->s.group[group].mask, > thrmask); > > + while (thr >= 0) { > + add_group(sched_group, thr, group); > + thr = odp_thrmask_next(thrmask, thr); > + } > + > odp_ticketlock_unlock(&sched_group->s.lock); > > return 0; > @@ -589,6 +708,7 @@ static int schedule_group_join(odp_schedule_group_t > group, > static int schedule_group_leave(odp_schedule_group_t group, > const odp_thrmask_t *thrmask) > { > + int thr; > sched_group_t *sched_group = &sched_global.sched_group; > odp_thrmask_t *all = &sched_group->s.group[GROUP_ALL].mask; > odp_thrmask_t not; > @@ -596,6 +716,8 @@ static int schedule_group_leave(odp_schedule_group_t > group, > if (group < 0 || group >= NUM_GROUP) > return -1; > > + thr = odp_thrmask_first(thrmask); > + > odp_ticketlock_lock(&sched_group->s.lock); > > if (!sched_group->s.group[group].allocated) { > @@ -608,6 +730,11 @@ static int schedule_group_leave(odp_schedule_group_t > group, > &sched_group->s.group[group].mask, > ¬); > > + while (thr >= 0) { > + remove_group(sched_group, thr, group); > + thr = odp_thrmask_next(thrmask, thr); > + } > + > odp_ticketlock_unlock(&sched_group->s.lock); > > return 0; > -- > 2.8.1 > >