Reviewed-and-tested-by: Yi He <yi...@linaro.org>

On 8 December 2016 at 19:04, Petri Savolainen <petri.savolai...@nokia.com>
wrote:

> Improve scalability by replacing lock protected linked list
> with a ring. Schedule group supported was updated also, since
> ring does not support peek of the head item.
>
> Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
> ---
>  platform/linux-generic/odp_schedule_sp.c | 271
> +++++++++++++++++++++++--------
>  1 file changed, 199 insertions(+), 72 deletions(-)
>
> diff --git a/platform/linux-generic/odp_schedule_sp.c
> b/platform/linux-generic/odp_schedule_sp.c
> index 76d1357..5150d28 100644
> --- a/platform/linux-generic/odp_schedule_sp.c
> +++ b/platform/linux-generic/odp_schedule_sp.c
> @@ -13,9 +13,12 @@
>  #include <odp_debug_internal.h>
>  #include <odp_align_internal.h>
>  #include <odp_config_internal.h>
> +#include <odp_ring_internal.h>
>
> +#define NUM_THREAD        ODP_THREAD_COUNT_MAX
>  #define NUM_QUEUE         ODP_CONFIG_QUEUES
>  #define NUM_PKTIO         ODP_CONFIG_PKTIO_ENTRIES
> +#define NUM_ORDERED_LOCKS 1
>  #define NUM_PRIO          3
>  #define NUM_STATIC_GROUP  3
>  #define NUM_GROUP         (NUM_STATIC_GROUP + 9)
> @@ -28,9 +31,17 @@
>  #define GROUP_ALL         ODP_SCHED_GROUP_ALL
>  #define GROUP_WORKER      ODP_SCHED_GROUP_WORKER
>  #define GROUP_CONTROL     ODP_SCHED_GROUP_CONTROL
> -#define MAX_ORDERED_LOCKS_PER_QUEUE 1
> +#define GROUP_PKTIN       GROUP_ALL
>
> -ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <=
> CONFIG_QUEUE_MAX_ORD_LOCKS,
> +/* Maximum number of commands: one priority/group for all queues and
> pktios */
> +#define RING_SIZE         (ODP_ROUNDUP_POWER_2(NUM_QUEUE + NUM_PKTIO))
> +#define RING_MASK         (RING_SIZE - 1)
> +
> +/* Ring size must be power of two */
> +ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(RING_SIZE),
> +                 "Ring_size_is_not_power_of_two");
> +
> +ODP_STATIC_ASSERT(NUM_ORDERED_LOCKS <= CONFIG_QUEUE_MAX_ORD_LOCKS,
>                   "Too_many_ordered_locks");
>
>  struct sched_cmd_t;
> @@ -38,6 +49,7 @@ struct sched_cmd_t;
>  struct sched_cmd_s {
>         struct sched_cmd_t *next;
>         uint32_t           index;
> +       uint32_t           ring_idx;
>         int                type;
>         int                prio;
>         int                group;
> @@ -52,38 +64,49 @@ typedef struct sched_cmd_t {
>                                sizeof(struct sched_cmd_s)];
>  } sched_cmd_t ODP_ALIGNED_CACHE;
>
> -struct prio_queue_s {
> -       odp_ticketlock_t  lock;
> -       sched_cmd_t       *head;
> -       sched_cmd_t       *tail;
> -};
> +typedef struct {
> +       /* Ring header */
> +       ring_t ring;
> +
> +       /* Ring data: queue indexes */
> +       uint32_t ring_idx[RING_SIZE];
>
> -typedef        struct prio_queue_t {
> -       struct prio_queue_s s;
> -       uint8_t             pad[ROUNDUP_CACHE(sizeof(struct
> prio_queue_s)) -
> -                               sizeof(struct prio_queue_s)];
>  } prio_queue_t ODP_ALIGNED_CACHE;
>
> -struct sched_group_s {
> -       odp_ticketlock_t  lock;
> +typedef struct thr_group_t {
> +       /* A generation counter for fast comparison if groups have changed
> */
> +       odp_atomic_u32_t gen_cnt;
>
> -       struct {
> -               char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
> -               odp_thrmask_t mask;
> -               int           allocated;
> -       } group[NUM_GROUP];
> -};
> +       /* Number of groups the thread belongs to */
> +       int num_group;
> +
> +       /* The groups the thread belongs to */
> +       int group[NUM_GROUP];
> +
> +} thr_group_t;
>
>  typedef struct sched_group_t {
> -       struct sched_group_s s;
> -       uint8_t              pad[ROUNDUP_CACHE(sizeof(struct
> sched_group_s)) -
> -                                sizeof(struct sched_group_s)];
> +       struct {
> +               odp_ticketlock_t  lock;
> +
> +               /* All groups */
> +               struct {
> +                       char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
> +                       odp_thrmask_t mask;
> +                       int           allocated;
> +               } group[NUM_GROUP];
> +
> +               /* Per thread group information */
> +               thr_group_t thr[NUM_THREAD];
> +
> +       } s;
> +
>  } sched_group_t ODP_ALIGNED_CACHE;
>
>  typedef struct {
>         sched_cmd_t   queue_cmd[NUM_QUEUE];
>         sched_cmd_t   pktio_cmd[NUM_PKTIO];
> -       prio_queue_t  prio_queue[NUM_PRIO];
> +       prio_queue_t  prio_queue[NUM_GROUP][NUM_PRIO];
>         sched_group_t sched_group;
>  } sched_global_t;
>
> @@ -91,14 +114,37 @@ typedef struct {
>         sched_cmd_t *cmd;
>         int          pause;
>         int          thr_id;
> +       uint32_t     gen_cnt;
> +       int          num_group;
> +       int          group[NUM_GROUP];
>  } sched_local_t;
>
>  static sched_global_t sched_global;
>  static __thread sched_local_t sched_local;
>
> +static inline uint32_t index_to_ring_idx(int pktio, uint32_t index)
> +{
> +       if (pktio)
> +               return (0x80000000 | index);
> +
> +       return index;
> +}
> +
> +static inline uint32_t index_from_ring_idx(uint32_t *index, uint32_t
> ring_idx)
> +{
> +       uint32_t pktio = ring_idx & 0x80000000;
> +
> +       if (pktio)
> +               *index = ring_idx & (~0x80000000);
> +       else
> +               *index = ring_idx;
> +
> +       return pktio;
> +}
> +
>  static int init_global(void)
>  {
> -       int i;
> +       int i, j;
>         sched_group_t *sched_group = &sched_global.sched_group;
>
>         ODP_DBG("Using SP scheduler\n");
> @@ -106,21 +152,28 @@ static int init_global(void)
>         memset(&sched_global, 0, sizeof(sched_global_t));
>
>         for (i = 0; i < NUM_QUEUE; i++) {
> -               sched_global.queue_cmd[i].s.type  = CMD_QUEUE;
> -               sched_global.queue_cmd[i].s.index = i;
> +               sched_global.queue_cmd[i].s.type     = CMD_QUEUE;
> +               sched_global.queue_cmd[i].s.index    = i;
> +               sched_global.queue_cmd[i].s.ring_idx =
> index_to_ring_idx(0, i);
>         }
>
>         for (i = 0; i < NUM_PKTIO; i++) {
> -               sched_global.pktio_cmd[i].s.type  = CMD_PKTIO;
> -               sched_global.pktio_cmd[i].s.index = i;
> -               sched_global.pktio_cmd[i].s.prio  = PKTIN_PRIO;
> +               sched_global.pktio_cmd[i].s.type     = CMD_PKTIO;
> +               sched_global.pktio_cmd[i].s.index    = i;
> +               sched_global.pktio_cmd[i].s.ring_idx =
> index_to_ring_idx(1, i);
> +               sched_global.pktio_cmd[i].s.prio     = PKTIN_PRIO;
> +               sched_global.pktio_cmd[i].s.group    = GROUP_PKTIN;
>         }
>
> -       for (i = 0; i < NUM_PRIO; i++)
> -               odp_ticketlock_init(&sched_global.prio_queue[i].s.lock);
> +       for (i = 0; i < NUM_GROUP; i++)
> +               for (j = 0; j < NUM_PRIO; j++)
> +                       ring_init(&sched_global.prio_queue[i][j].ring);
>
>         odp_ticketlock_init(&sched_group->s.lock);
>
> +       for (i = 0; i < NUM_THREAD; i++)
> +               odp_atomic_init_u32(&sched_group->s.thr[i].gen_cnt, 0);
> +
>         strncpy(sched_group->s.group[GROUP_ALL].name, "__group_all",
>                 ODP_SCHED_GROUP_NAME_LEN);
>         odp_thrmask_zero(&sched_group->s.group[GROUP_ALL].mask);
> @@ -168,7 +221,48 @@ static int term_local(void)
>
>  static unsigned max_ordered_locks(void)
>  {
> -       return MAX_ORDERED_LOCKS_PER_QUEUE;
> +       return NUM_ORDERED_LOCKS;
> +}
> +
> +static void add_group(sched_group_t *sched_group, int thr, int group)
> +{
> +       int num;
> +       uint32_t gen_cnt;
> +       thr_group_t *thr_group = &sched_group->s.thr[thr];
> +
> +       num = thr_group->num_group;
> +       thr_group->group[num] = group;
> +       thr_group->num_group  = num + 1;
> +       gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
> +       odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
> +}
> +
> +static void remove_group(sched_group_t *sched_group, int thr, int group)
> +{
> +       int i, num;
> +       int found = 0;
> +       thr_group_t *thr_group = &sched_group->s.thr[thr];
> +
> +       num = thr_group->num_group;
> +
> +       for (i = 0; i < num; i++) {
> +               if (thr_group->group[i] == group) {
> +                       found = 1;
> +
> +                       for (; i < num - 1; i++)
> +                               thr_group->group[i] = thr_group->group[i +
> 1];
> +
> +                       break;
> +               }
> +       }
> +
> +       if (found) {
> +               uint32_t gen_cnt;
> +
> +               thr_group->num_group = num - 1;
> +               gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
> +               odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
> +       }
>  }
>
>  static int thr_add(odp_schedule_group_t group, int thr)
> @@ -178,6 +272,9 @@ static int thr_add(odp_schedule_group_t group, int thr)
>         if (group < 0 || group >= NUM_GROUP)
>                 return -1;
>
> +       if (thr < 0 || thr >= NUM_THREAD)
> +               return -1;
> +
>         odp_ticketlock_lock(&sched_group->s.lock);
>
>         if (!sched_group->s.group[group].allocated) {
> @@ -186,6 +283,7 @@ static int thr_add(odp_schedule_group_t group, int thr)
>         }
>
>         odp_thrmask_set(&sched_group->s.group[group].mask, thr);
> +       add_group(sched_group, thr, group);
>
>         odp_ticketlock_unlock(&sched_group->s.lock);
>
> @@ -208,6 +306,8 @@ static int thr_rem(odp_schedule_group_t group, int thr)
>
>         odp_thrmask_clr(&sched_group->s.group[group].mask, thr);
>
> +       remove_group(sched_group, thr, group);
> +
>         odp_ticketlock_unlock(&sched_group->s.lock);
>
>         return 0;
> @@ -250,51 +350,34 @@ static void destroy_queue(uint32_t qi)
>  static inline void add_tail(sched_cmd_t *cmd)
>  {
>         prio_queue_t *prio_queue;
> +       int group    = cmd->s.group;
> +       int prio     = cmd->s.prio;
> +       uint32_t idx = cmd->s.ring_idx;
>
> -       prio_queue   = &sched_global.prio_queue[cmd->s.prio];
> -       cmd->s.next  = NULL;
> -
> -       odp_ticketlock_lock(&prio_queue->s.lock);
> -
> -       if (prio_queue->s.head == NULL)
> -               prio_queue->s.head = cmd;
> -       else
> -               prio_queue->s.tail->s.next = cmd;
> -
> -       prio_queue->s.tail = cmd;
> +       prio_queue = &sched_global.prio_queue[group][prio];
>
> -       odp_ticketlock_unlock(&prio_queue->s.lock);
> +       ring_enq(&prio_queue->ring, RING_MASK, idx);
>  }
>
> -static inline sched_cmd_t *rem_head(int prio)
> +static inline sched_cmd_t *rem_head(int group, int prio)
>  {
>         prio_queue_t *prio_queue;
> -       sched_cmd_t *cmd;
> -
> -       prio_queue = &sched_global.prio_queue[prio];
> +       uint32_t ring_idx, index;
> +       int pktio;
>
> -       odp_ticketlock_lock(&prio_queue->s.lock);
> +       prio_queue = &sched_global.prio_queue[group][prio];
>
> -       if (prio_queue->s.head == NULL) {
> -               cmd = NULL;
> -       } else {
> -               sched_group_t *sched_group = &sched_global.sched_group;
> +       ring_idx = ring_deq(&prio_queue->ring, RING_MASK);
>
> -               cmd = prio_queue->s.head;
> +       if (ring_idx == RING_EMPTY)
> +               return NULL;
>
> -               /* Remove head cmd only if thread belongs to the
> -                * scheduler group. Otherwise continue to the next priority
> -                * queue. */
> -               if (odp_thrmask_isset(&sched_group->s.group[cmd->s.group].
> mask,
> -                                     sched_local.thr_id))
> -                       prio_queue->s.head = cmd->s.next;
> -               else
> -                       cmd = NULL;
> -       }
> +       pktio = index_from_ring_idx(&index, ring_idx);
>
> -       odp_ticketlock_unlock(&prio_queue->s.lock);
> +       if (pktio)
> +               return &sched_global.pktio_cmd[index];
>
> -       return cmd;
> +       return &sched_global.queue_cmd[index];
>  }
>
>  static int sched_queue(uint32_t qi)
> @@ -341,15 +424,43 @@ static void pktio_start(int pktio_index, int num,
> int pktin_idx[])
>         add_tail(cmd);
>  }
>
> -static inline sched_cmd_t *sched_cmd(int num_prio)
> +static inline sched_cmd_t *sched_cmd(void)
>  {
> -       int prio;
> +       int prio, i;
> +       int thr = sched_local.thr_id;
> +       sched_group_t *sched_group = &sched_global.sched_group;
> +       thr_group_t *thr_group = &sched_group->s.thr[thr];
> +       uint32_t gen_cnt;
> +
> +       /* There's no matching store_rel since the value is updated while
> +        * holding a lock */
> +       gen_cnt = odp_atomic_load_acq_u32(&thr_group->gen_cnt);
> +
> +       /* Check if groups have changed and need to be read again */
> +       if (odp_unlikely(gen_cnt != sched_local.gen_cnt)) {
> +               int num_grp;
> +
> +               odp_ticketlock_lock(&sched_group->s.lock);
> +
> +               num_grp = thr_group->num_group;
> +               gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
>
> -       for (prio = 0; prio < num_prio; prio++) {
> -               sched_cmd_t *cmd = rem_head(prio);
> +               for (i = 0; i < num_grp; i++)
> +                       sched_local.group[i] = thr_group->group[i];
>
> -               if (cmd)
> -                       return cmd;
> +               odp_ticketlock_unlock(&sched_group->s.lock);
> +
> +               sched_local.num_group = num_grp;
> +               sched_local.gen_cnt   = gen_cnt;
> +       }
> +
> +       for (i = 0; i < sched_local.num_group; i++) {
> +               for (prio = 0; prio < NUM_PRIO; prio++) {
> +                       sched_cmd_t *cmd = rem_head(sched_local.group[i],
> prio);
> +
> +                       if (cmd)
> +                               return cmd;
> +               }
>         }
>
>         return NULL;
> @@ -382,7 +493,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t
> wait,
>                 uint32_t qi;
>                 int num;
>
> -               cmd = sched_cmd(NUM_PRIO);
> +               cmd = sched_cmd();
>
>                 if (cmd && cmd->s.type == CMD_PKTIO) {
>                         if (sched_cb_pktin_poll(cmd->s.index,
> cmd->s.num_pktin,
> @@ -565,11 +676,14 @@ static odp_schedule_group_t
> schedule_group_lookup(const char *name)
>  static int schedule_group_join(odp_schedule_group_t group,
>                                const odp_thrmask_t *thrmask)
>  {
> +       int thr;
>         sched_group_t *sched_group = &sched_global.sched_group;
>
>         if (group < 0 || group >= NUM_GROUP)
>                 return -1;
>
> +       thr = odp_thrmask_first(thrmask);
> +
>         odp_ticketlock_lock(&sched_group->s.lock);
>
>         if (!sched_group->s.group[group].allocated) {
> @@ -581,6 +695,11 @@ static int schedule_group_join(odp_schedule_group_t
> group,
>                        &sched_group->s.group[group].mask,
>                        thrmask);
>
> +       while (thr >= 0) {
> +               add_group(sched_group, thr, group);
> +               thr = odp_thrmask_next(thrmask, thr);
> +       }
> +
>         odp_ticketlock_unlock(&sched_group->s.lock);
>
>         return 0;
> @@ -589,6 +708,7 @@ static int schedule_group_join(odp_schedule_group_t
> group,
>  static int schedule_group_leave(odp_schedule_group_t group,
>                                 const odp_thrmask_t *thrmask)
>  {
> +       int thr;
>         sched_group_t *sched_group = &sched_global.sched_group;
>         odp_thrmask_t *all = &sched_group->s.group[GROUP_ALL].mask;
>         odp_thrmask_t not;
> @@ -596,6 +716,8 @@ static int schedule_group_leave(odp_schedule_group_t
> group,
>         if (group < 0 || group >= NUM_GROUP)
>                 return -1;
>
> +       thr = odp_thrmask_first(thrmask);
> +
>         odp_ticketlock_lock(&sched_group->s.lock);
>
>         if (!sched_group->s.group[group].allocated) {
> @@ -608,6 +730,11 @@ static int schedule_group_leave(odp_schedule_group_t
> group,
>                         &sched_group->s.group[group].mask,
>                         &not);
>
> +       while (thr >= 0) {
> +               remove_group(sched_group, thr, group);
> +               thr = odp_thrmask_next(thrmask, thr);
> +       }
> +
>         odp_ticketlock_unlock(&sched_group->s.lock);
>
>         return 0;
> --
> 2.8.1
>
>

Reply via email to