Improve scalability by replacing lock protected linked list
with a ring. Schedule group supported was updated also, since
ring does not support peek of the head item.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 platform/linux-generic/odp_schedule_sp.c | 271 +++++++++++++++++++++++--------
 1 file changed, 199 insertions(+), 72 deletions(-)

diff --git a/platform/linux-generic/odp_schedule_sp.c 
b/platform/linux-generic/odp_schedule_sp.c
index 76d1357..5150d28 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -13,9 +13,12 @@
 #include <odp_debug_internal.h>
 #include <odp_align_internal.h>
 #include <odp_config_internal.h>
+#include <odp_ring_internal.h>
 
+#define NUM_THREAD        ODP_THREAD_COUNT_MAX
 #define NUM_QUEUE         ODP_CONFIG_QUEUES
 #define NUM_PKTIO         ODP_CONFIG_PKTIO_ENTRIES
+#define NUM_ORDERED_LOCKS 1
 #define NUM_PRIO          3
 #define NUM_STATIC_GROUP  3
 #define NUM_GROUP         (NUM_STATIC_GROUP + 9)
@@ -28,9 +31,17 @@
 #define GROUP_ALL         ODP_SCHED_GROUP_ALL
 #define GROUP_WORKER      ODP_SCHED_GROUP_WORKER
 #define GROUP_CONTROL     ODP_SCHED_GROUP_CONTROL
-#define MAX_ORDERED_LOCKS_PER_QUEUE 1
+#define GROUP_PKTIN       GROUP_ALL
 
-ODP_STATIC_ASSERT(MAX_ORDERED_LOCKS_PER_QUEUE <= CONFIG_QUEUE_MAX_ORD_LOCKS,
+/* Maximum number of commands: one priority/group for all queues and pktios */
+#define RING_SIZE         (ODP_ROUNDUP_POWER_2(NUM_QUEUE + NUM_PKTIO))
+#define RING_MASK         (RING_SIZE - 1)
+
+/* Ring size must be power of two */
+ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(RING_SIZE),
+                 "Ring_size_is_not_power_of_two");
+
+ODP_STATIC_ASSERT(NUM_ORDERED_LOCKS <= CONFIG_QUEUE_MAX_ORD_LOCKS,
                  "Too_many_ordered_locks");
 
 struct sched_cmd_t;
@@ -38,6 +49,7 @@ struct sched_cmd_t;
 struct sched_cmd_s {
        struct sched_cmd_t *next;
        uint32_t           index;
+       uint32_t           ring_idx;
        int                type;
        int                prio;
        int                group;
@@ -52,38 +64,49 @@ typedef struct sched_cmd_t {
                               sizeof(struct sched_cmd_s)];
 } sched_cmd_t ODP_ALIGNED_CACHE;
 
-struct prio_queue_s {
-       odp_ticketlock_t  lock;
-       sched_cmd_t       *head;
-       sched_cmd_t       *tail;
-};
+typedef struct {
+       /* Ring header */
+       ring_t ring;
+
+       /* Ring data: queue indexes */
+       uint32_t ring_idx[RING_SIZE];
 
-typedef        struct prio_queue_t {
-       struct prio_queue_s s;
-       uint8_t             pad[ROUNDUP_CACHE(sizeof(struct prio_queue_s)) -
-                               sizeof(struct prio_queue_s)];
 } prio_queue_t ODP_ALIGNED_CACHE;
 
-struct sched_group_s {
-       odp_ticketlock_t  lock;
+typedef struct thr_group_t {
+       /* A generation counter for fast comparison if groups have changed */
+       odp_atomic_u32_t gen_cnt;
 
-       struct {
-               char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
-               odp_thrmask_t mask;
-               int           allocated;
-       } group[NUM_GROUP];
-};
+       /* Number of groups the thread belongs to */
+       int num_group;
+
+       /* The groups the thread belongs to */
+       int group[NUM_GROUP];
+
+} thr_group_t;
 
 typedef struct sched_group_t {
-       struct sched_group_s s;
-       uint8_t              pad[ROUNDUP_CACHE(sizeof(struct sched_group_s)) -
-                                sizeof(struct sched_group_s)];
+       struct {
+               odp_ticketlock_t  lock;
+
+               /* All groups */
+               struct {
+                       char          name[ODP_SCHED_GROUP_NAME_LEN + 1];
+                       odp_thrmask_t mask;
+                       int           allocated;
+               } group[NUM_GROUP];
+
+               /* Per thread group information */
+               thr_group_t thr[NUM_THREAD];
+
+       } s;
+
 } sched_group_t ODP_ALIGNED_CACHE;
 
 typedef struct {
        sched_cmd_t   queue_cmd[NUM_QUEUE];
        sched_cmd_t   pktio_cmd[NUM_PKTIO];
-       prio_queue_t  prio_queue[NUM_PRIO];
+       prio_queue_t  prio_queue[NUM_GROUP][NUM_PRIO];
        sched_group_t sched_group;
 } sched_global_t;
 
@@ -91,14 +114,37 @@ typedef struct {
        sched_cmd_t *cmd;
        int          pause;
        int          thr_id;
+       uint32_t     gen_cnt;
+       int          num_group;
+       int          group[NUM_GROUP];
 } sched_local_t;
 
 static sched_global_t sched_global;
 static __thread sched_local_t sched_local;
 
+static inline uint32_t index_to_ring_idx(int pktio, uint32_t index)
+{
+       if (pktio)
+               return (0x80000000 | index);
+
+       return index;
+}
+
+static inline uint32_t index_from_ring_idx(uint32_t *index, uint32_t ring_idx)
+{
+       uint32_t pktio = ring_idx & 0x80000000;
+
+       if (pktio)
+               *index = ring_idx & (~0x80000000);
+       else
+               *index = ring_idx;
+
+       return pktio;
+}
+
 static int init_global(void)
 {
-       int i;
+       int i, j;
        sched_group_t *sched_group = &sched_global.sched_group;
 
        ODP_DBG("Using SP scheduler\n");
@@ -106,21 +152,28 @@ static int init_global(void)
        memset(&sched_global, 0, sizeof(sched_global_t));
 
        for (i = 0; i < NUM_QUEUE; i++) {
-               sched_global.queue_cmd[i].s.type  = CMD_QUEUE;
-               sched_global.queue_cmd[i].s.index = i;
+               sched_global.queue_cmd[i].s.type     = CMD_QUEUE;
+               sched_global.queue_cmd[i].s.index    = i;
+               sched_global.queue_cmd[i].s.ring_idx = index_to_ring_idx(0, i);
        }
 
        for (i = 0; i < NUM_PKTIO; i++) {
-               sched_global.pktio_cmd[i].s.type  = CMD_PKTIO;
-               sched_global.pktio_cmd[i].s.index = i;
-               sched_global.pktio_cmd[i].s.prio  = PKTIN_PRIO;
+               sched_global.pktio_cmd[i].s.type     = CMD_PKTIO;
+               sched_global.pktio_cmd[i].s.index    = i;
+               sched_global.pktio_cmd[i].s.ring_idx = index_to_ring_idx(1, i);
+               sched_global.pktio_cmd[i].s.prio     = PKTIN_PRIO;
+               sched_global.pktio_cmd[i].s.group    = GROUP_PKTIN;
        }
 
-       for (i = 0; i < NUM_PRIO; i++)
-               odp_ticketlock_init(&sched_global.prio_queue[i].s.lock);
+       for (i = 0; i < NUM_GROUP; i++)
+               for (j = 0; j < NUM_PRIO; j++)
+                       ring_init(&sched_global.prio_queue[i][j].ring);
 
        odp_ticketlock_init(&sched_group->s.lock);
 
+       for (i = 0; i < NUM_THREAD; i++)
+               odp_atomic_init_u32(&sched_group->s.thr[i].gen_cnt, 0);
+
        strncpy(sched_group->s.group[GROUP_ALL].name, "__group_all",
                ODP_SCHED_GROUP_NAME_LEN);
        odp_thrmask_zero(&sched_group->s.group[GROUP_ALL].mask);
@@ -168,7 +221,48 @@ static int term_local(void)
 
 static unsigned max_ordered_locks(void)
 {
-       return MAX_ORDERED_LOCKS_PER_QUEUE;
+       return NUM_ORDERED_LOCKS;
+}
+
+static void add_group(sched_group_t *sched_group, int thr, int group)
+{
+       int num;
+       uint32_t gen_cnt;
+       thr_group_t *thr_group = &sched_group->s.thr[thr];
+
+       num = thr_group->num_group;
+       thr_group->group[num] = group;
+       thr_group->num_group  = num + 1;
+       gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
+       odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
+}
+
+static void remove_group(sched_group_t *sched_group, int thr, int group)
+{
+       int i, num;
+       int found = 0;
+       thr_group_t *thr_group = &sched_group->s.thr[thr];
+
+       num = thr_group->num_group;
+
+       for (i = 0; i < num; i++) {
+               if (thr_group->group[i] == group) {
+                       found = 1;
+
+                       for (; i < num - 1; i++)
+                               thr_group->group[i] = thr_group->group[i + 1];
+
+                       break;
+               }
+       }
+
+       if (found) {
+               uint32_t gen_cnt;
+
+               thr_group->num_group = num - 1;
+               gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
+               odp_atomic_store_u32(&thr_group->gen_cnt, gen_cnt + 1);
+       }
 }
 
 static int thr_add(odp_schedule_group_t group, int thr)
@@ -178,6 +272,9 @@ static int thr_add(odp_schedule_group_t group, int thr)
        if (group < 0 || group >= NUM_GROUP)
                return -1;
 
+       if (thr < 0 || thr >= NUM_THREAD)
+               return -1;
+
        odp_ticketlock_lock(&sched_group->s.lock);
 
        if (!sched_group->s.group[group].allocated) {
@@ -186,6 +283,7 @@ static int thr_add(odp_schedule_group_t group, int thr)
        }
 
        odp_thrmask_set(&sched_group->s.group[group].mask, thr);
+       add_group(sched_group, thr, group);
 
        odp_ticketlock_unlock(&sched_group->s.lock);
 
@@ -208,6 +306,8 @@ static int thr_rem(odp_schedule_group_t group, int thr)
 
        odp_thrmask_clr(&sched_group->s.group[group].mask, thr);
 
+       remove_group(sched_group, thr, group);
+
        odp_ticketlock_unlock(&sched_group->s.lock);
 
        return 0;
@@ -250,51 +350,34 @@ static void destroy_queue(uint32_t qi)
 static inline void add_tail(sched_cmd_t *cmd)
 {
        prio_queue_t *prio_queue;
+       int group    = cmd->s.group;
+       int prio     = cmd->s.prio;
+       uint32_t idx = cmd->s.ring_idx;
 
-       prio_queue   = &sched_global.prio_queue[cmd->s.prio];
-       cmd->s.next  = NULL;
-
-       odp_ticketlock_lock(&prio_queue->s.lock);
-
-       if (prio_queue->s.head == NULL)
-               prio_queue->s.head = cmd;
-       else
-               prio_queue->s.tail->s.next = cmd;
-
-       prio_queue->s.tail = cmd;
+       prio_queue = &sched_global.prio_queue[group][prio];
 
-       odp_ticketlock_unlock(&prio_queue->s.lock);
+       ring_enq(&prio_queue->ring, RING_MASK, idx);
 }
 
-static inline sched_cmd_t *rem_head(int prio)
+static inline sched_cmd_t *rem_head(int group, int prio)
 {
        prio_queue_t *prio_queue;
-       sched_cmd_t *cmd;
-
-       prio_queue = &sched_global.prio_queue[prio];
+       uint32_t ring_idx, index;
+       int pktio;
 
-       odp_ticketlock_lock(&prio_queue->s.lock);
+       prio_queue = &sched_global.prio_queue[group][prio];
 
-       if (prio_queue->s.head == NULL) {
-               cmd = NULL;
-       } else {
-               sched_group_t *sched_group = &sched_global.sched_group;
+       ring_idx = ring_deq(&prio_queue->ring, RING_MASK);
 
-               cmd = prio_queue->s.head;
+       if (ring_idx == RING_EMPTY)
+               return NULL;
 
-               /* Remove head cmd only if thread belongs to the
-                * scheduler group. Otherwise continue to the next priority
-                * queue. */
-               if (odp_thrmask_isset(&sched_group->s.group[cmd->s.group].mask,
-                                     sched_local.thr_id))
-                       prio_queue->s.head = cmd->s.next;
-               else
-                       cmd = NULL;
-       }
+       pktio = index_from_ring_idx(&index, ring_idx);
 
-       odp_ticketlock_unlock(&prio_queue->s.lock);
+       if (pktio)
+               return &sched_global.pktio_cmd[index];
 
-       return cmd;
+       return &sched_global.queue_cmd[index];
 }
 
 static int sched_queue(uint32_t qi)
@@ -341,15 +424,43 @@ static void pktio_start(int pktio_index, int num, int 
pktin_idx[])
        add_tail(cmd);
 }
 
-static inline sched_cmd_t *sched_cmd(int num_prio)
+static inline sched_cmd_t *sched_cmd(void)
 {
-       int prio;
+       int prio, i;
+       int thr = sched_local.thr_id;
+       sched_group_t *sched_group = &sched_global.sched_group;
+       thr_group_t *thr_group = &sched_group->s.thr[thr];
+       uint32_t gen_cnt;
+
+       /* There's no matching store_rel since the value is updated while
+        * holding a lock */
+       gen_cnt = odp_atomic_load_acq_u32(&thr_group->gen_cnt);
+
+       /* Check if groups have changed and need to be read again */
+       if (odp_unlikely(gen_cnt != sched_local.gen_cnt)) {
+               int num_grp;
+
+               odp_ticketlock_lock(&sched_group->s.lock);
+
+               num_grp = thr_group->num_group;
+               gen_cnt = odp_atomic_load_u32(&thr_group->gen_cnt);
 
-       for (prio = 0; prio < num_prio; prio++) {
-               sched_cmd_t *cmd = rem_head(prio);
+               for (i = 0; i < num_grp; i++)
+                       sched_local.group[i] = thr_group->group[i];
 
-               if (cmd)
-                       return cmd;
+               odp_ticketlock_unlock(&sched_group->s.lock);
+
+               sched_local.num_group = num_grp;
+               sched_local.gen_cnt   = gen_cnt;
+       }
+
+       for (i = 0; i < sched_local.num_group; i++) {
+               for (prio = 0; prio < NUM_PRIO; prio++) {
+                       sched_cmd_t *cmd = rem_head(sched_local.group[i], prio);
+
+                       if (cmd)
+                               return cmd;
+               }
        }
 
        return NULL;
@@ -382,7 +493,7 @@ static int schedule_multi(odp_queue_t *from, uint64_t wait,
                uint32_t qi;
                int num;
 
-               cmd = sched_cmd(NUM_PRIO);
+               cmd = sched_cmd();
 
                if (cmd && cmd->s.type == CMD_PKTIO) {
                        if (sched_cb_pktin_poll(cmd->s.index, cmd->s.num_pktin,
@@ -565,11 +676,14 @@ static odp_schedule_group_t schedule_group_lookup(const 
char *name)
 static int schedule_group_join(odp_schedule_group_t group,
                               const odp_thrmask_t *thrmask)
 {
+       int thr;
        sched_group_t *sched_group = &sched_global.sched_group;
 
        if (group < 0 || group >= NUM_GROUP)
                return -1;
 
+       thr = odp_thrmask_first(thrmask);
+
        odp_ticketlock_lock(&sched_group->s.lock);
 
        if (!sched_group->s.group[group].allocated) {
@@ -581,6 +695,11 @@ static int schedule_group_join(odp_schedule_group_t group,
                       &sched_group->s.group[group].mask,
                       thrmask);
 
+       while (thr >= 0) {
+               add_group(sched_group, thr, group);
+               thr = odp_thrmask_next(thrmask, thr);
+       }
+
        odp_ticketlock_unlock(&sched_group->s.lock);
 
        return 0;
@@ -589,6 +708,7 @@ static int schedule_group_join(odp_schedule_group_t group,
 static int schedule_group_leave(odp_schedule_group_t group,
                                const odp_thrmask_t *thrmask)
 {
+       int thr;
        sched_group_t *sched_group = &sched_global.sched_group;
        odp_thrmask_t *all = &sched_group->s.group[GROUP_ALL].mask;
        odp_thrmask_t not;
@@ -596,6 +716,8 @@ static int schedule_group_leave(odp_schedule_group_t group,
        if (group < 0 || group >= NUM_GROUP)
                return -1;
 
+       thr = odp_thrmask_first(thrmask);
+
        odp_ticketlock_lock(&sched_group->s.lock);
 
        if (!sched_group->s.group[group].allocated) {
@@ -608,6 +730,11 @@ static int schedule_group_leave(odp_schedule_group_t group,
                        &sched_group->s.group[group].mask,
                        &not);
 
+       while (thr >= 0) {
+               remove_group(sched_group, thr, group);
+               thr = odp_thrmask_next(thrmask, thr);
+       }
+
        odp_ticketlock_unlock(&sched_group->s.lock);
 
        return 0;
-- 
2.8.1

Reply via email to