Implement scheduler pktio poll command queues with a ring
based data structure instead of odp_queue_t queues.

Also scheduler initialization and termination is
simplified as there is no more dependency to pool, buffer
and queue APIs.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 platform/linux-generic/odp_schedule.c | 251 ++++++++++++++--------------------
 1 file changed, 103 insertions(+), 148 deletions(-)

diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 310b80f..36cf05f 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -8,10 +8,7 @@
 #include <odp/api/schedule.h>
 #include <odp_schedule_if.h>
 #include <odp/api/align.h>
-#include <odp/api/queue.h>
 #include <odp/api/shared_memory.h>
-#include <odp/api/buffer.h>
-#include <odp/api/pool.h>
 #include <odp_internal.h>
 #include <odp_debug_internal.h>
 #include <odp/api/thread.h>
@@ -27,10 +24,6 @@
 #include <odp_schedule_ordered_internal.h>
 #include <odp/api/sync.h>
 
-#ifdef _ODP_PKTIO_IPC
-#include <odp_pool_internal.h>
-#endif
-
 /* Number of priority levels  */
 #define NUM_PRIO 8
 
@@ -48,7 +41,7 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 #define QUEUES_PER_PRIO  4
 
 /* Packet input poll cmd queues */
-#define POLL_CMD_QUEUES  4
+#define PKTIO_CMD_QUEUES  4
 
 /* Maximum number of packet input queues per command */
 #define MAX_PKTIN 8
@@ -56,6 +49,24 @@ ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 /* Maximum number of packet IO interfaces */
 #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
 
+/* Maximum number of pktio poll commands */
+#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+
+/* Not a valid poll command */
+#define PKTIO_CMD_INVALID ((uint32_t)-1)
+
+/* Not a valid pktio index */
+#define PKTIO_FREE (-1)
+
+/* Packet IO poll queue ring size. In worst case, all pktios have all pktins
+ * enabled and one poll command is created per pktin queue. The ring size must
+ * be larger than or equal to NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can
+ * hold all poll commands in the worst case. */
+#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)
+
+/* Mask for wrapping around pktio poll command index */
+#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)
+
 /* Priority queue ring size. In worst case, all event queues are scheduled
  * queues and have the same priority. The ring size must be larger than or
  * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all
@@ -79,6 +90,10 @@ ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(ODP_CONFIG_QUEUES),
 ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PRIO_QUEUE_RING_SIZE),
                  "Ring_size_is_not_power_of_two");
 
+/* Ring size must be power of two, so that PKTIO_RING_MASK can be used. */
+ODP_STATIC_ASSERT(ODP_VAL_IS_POWER_2(PKTIO_RING_SIZE),
+                 "pktio_ring_size_is_not_power_of_two");
+
 /* Mask of queues per priority */
 typedef uint8_t pri_mask_t;
 
@@ -116,6 +131,23 @@ typedef struct {
 
 } prio_queue_t ODP_ALIGNED_CACHE;
 
+/* Packet IO queue */
+typedef struct {
+       /* Ring header */
+       sched_ring_t ring;
+
+       /* Ring data: pktio poll command indexes */
+       uint32_t cmd_index[PKTIO_RING_SIZE];
+
+} pktio_queue_t ODP_ALIGNED_CACHE;
+
+/* Packet IO poll command */
+typedef struct {
+       int pktio_index;
+       int num_pktin;
+       int pktin[MAX_PKTIN];
+} pktio_cmd_t;
+
 typedef struct {
        pri_mask_t     pri_mask[NUM_PRIO];
        odp_spinlock_t mask_lock;
@@ -123,12 +155,15 @@ typedef struct {
        prio_queue_t   prio_q[NUM_PRIO][QUEUES_PER_PRIO];
 
        odp_spinlock_t poll_cmd_lock;
-       struct {
-               odp_queue_t queue;
-               uint16_t    num;
-       } poll_cmd[POLL_CMD_QUEUES];
+       /* Number of commands in a command queue */
+       uint16_t       num_pktio_cmd[PKTIO_CMD_QUEUES];
+
+       /* Packet IO command queues */
+       pktio_queue_t  pktio_q[PKTIO_CMD_QUEUES];
+
+       /* Packet IO poll commands */
+       pktio_cmd_t    pktio_cmd[NUM_PKTIO_CMD];
 
-       odp_pool_t     pool;
        odp_shm_t      shm;
        uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];
 
@@ -145,31 +180,12 @@ typedef struct {
        } queue[ODP_CONFIG_QUEUES];
 
        struct {
-               int         num;
+               /* Number of active commands for a pktio interface */
+               int num_cmd;
        } pktio[NUM_PKTIO];
 
 } sched_global_t;
 
-/* Schedule command */
-typedef struct {
-       int           cmd;
-
-       union {
-               struct {
-                       uint32_t      queue_index;
-               };
-
-               struct {
-                       int           pktio_index;
-                       int           num;
-                       int           index[MAX_PKTIN];
-               };
-       };
-} sched_cmd_t;
-
-#define SCHED_CMD_DEQUEUE    0
-#define SCHED_CMD_POLL_PKTIN 1
-
 /* Global scheduler context */
 static sched_global_t *sched;
 
@@ -253,10 +269,7 @@ static void sched_local_init(void)
 static int schedule_init_global(void)
 {
        odp_shm_t shm;
-       odp_pool_t pool;
        int i, j;
-       odp_pool_param_t params;
-       int num_cmd;
 
        ODP_DBG("Schedule init ... ");
 
@@ -273,27 +286,6 @@ static int schedule_init_global(void)
 
        memset(sched, 0, sizeof(sched_global_t));
 
-       /* Number of schedule commands.
-        * One per scheduled queue and packet interface */
-       num_cmd = sched_cb_num_queues() + sched_cb_num_pktio();
-
-       odp_pool_param_init(&params);
-       params.buf.size  = sizeof(sched_cmd_t);
-       params.buf.align = 0;
-       params.buf.num   = num_cmd;
-       params.type      = ODP_POOL_BUFFER;
-
-#ifdef _ODP_PKTIO_IPC
-       pool = _pool_create("odp_sched_pool", &params, 0);
-#else
-       pool = odp_pool_create("odp_sched_pool", &params);
-#endif
-       if (pool == ODP_POOL_INVALID) {
-               ODP_ERR("Schedule init: Pool create failed.\n");
-               return -1;
-       }
-
-       sched->pool = pool;
        sched->shm  = shm;
        odp_spinlock_init(&sched->mask_lock);
 
@@ -310,23 +302,16 @@ static int schedule_init_global(void)
        }
 
        odp_spinlock_init(&sched->poll_cmd_lock);
-       for (i = 0; i < POLL_CMD_QUEUES; i++) {
-               odp_queue_t queue;
-               char name[] = "odp_poll_cmd_YY";
-
-               name[13] = '0' + i / 10;
-               name[14] = '0' + i - 10 * (i / 10);
-
-               queue = odp_queue_create(name, NULL);
-
-               if (queue == ODP_QUEUE_INVALID) {
-                       ODP_ERR("Sched init: Queue create failed.\n");
-                       return -1;
-               }
+       for (i = 0; i < PKTIO_CMD_QUEUES; i++) {
+               ring_init(&sched->pktio_q[i].ring);
 
-               sched->poll_cmd[i].queue = queue;
+               for (j = 0; j < PKTIO_RING_SIZE; j++)
+                       sched->pktio_q[i].cmd_index[j] = PKTIO_CMD_INVALID;
        }
 
+       for (i = 0; i < NUM_PKTIO_CMD; i++)
+               sched->pktio_cmd[i].pktio_index = PKTIO_FREE;
+
        odp_spinlock_init(&sched->grp_lock);
 
        for (i = 0; i < NUM_SCHED_GRPS; i++) {
@@ -346,7 +331,6 @@ static int schedule_term_global(void)
        int ret = 0;
        int rc = 0;
        int i, j;
-       odp_event_t  ev;
 
        for (i = 0; i < NUM_PRIO; i++) {
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
@@ -369,23 +353,6 @@ static int schedule_term_global(void)
                }
        }
 
-       for (i = 0; i < POLL_CMD_QUEUES; i++) {
-               odp_queue_t queue = sched->poll_cmd[i].queue;
-
-               while ((ev = odp_queue_deq(queue)) != ODP_EVENT_INVALID)
-                       odp_event_free(ev);
-
-               if (odp_queue_destroy(queue)) {
-                       ODP_ERR("Poll cmd queue destroy failed\n");
-                       rc = -1;
-               }
-       }
-
-       if (odp_pool_destroy(sched->pool) != 0) {
-               ODP_ERR("Pool destroy fail.\n");
-               rc = -1;
-       }
-
        ret = odp_shm_free(sched->shm);
        if (ret < 0) {
                ODP_ERR("Shm free failed for odp_scheduler");
@@ -482,60 +449,58 @@ static void schedule_destroy_queue(uint32_t queue_index)
        sched->queue[queue_index].queue_per_prio = 0;
 }
 
-static int poll_cmd_queue_idx(int pktio_index, int in_queue_idx)
+static int poll_cmd_queue_idx(int pktio_index, int pktin_idx)
 {
-       return (POLL_CMD_QUEUES - 1) & (pktio_index ^ in_queue_idx);
+       return (PKTIO_CMD_QUEUES - 1) & (pktio_index ^ pktin_idx);
 }
 
-static void schedule_pktio_start(int pktio_index, int num_in_queue,
-                                int in_queue_idx[])
+static void schedule_pktio_start(int pktio_index, int num_pktin,
+                                int pktin_idx[])
 {
-       odp_buffer_t buf;
-       sched_cmd_t *sched_cmd;
-       odp_queue_t queue;
-       int i, idx;
+       int i, j, idx;
 
-       if (num_in_queue > MAX_PKTIN)
+       if (num_pktin > MAX_PKTIN)
                ODP_ABORT("Too many input queues for scheduler\n");
 
-       sched->pktio[pktio_index].num = num_in_queue;
+       sched->pktio[pktio_index].num_cmd = num_pktin;
 
        /* Create a pktio poll command per queue */
-       for (i = 0; i < num_in_queue; i++) {
-               buf = odp_buffer_alloc(sched->pool);
+       for (i = 0; i < num_pktin; i++) {
+               int cmd = -1;
 
-               if (buf == ODP_BUFFER_INVALID)
-                       ODP_ABORT("Sched pool empty\n");
+               /* Find next free command */
+               for (j = 0; j < NUM_PKTIO_CMD; j++) {
+                       if (sched->pktio_cmd[j].pktio_index == PKTIO_FREE) {
+                               cmd = j;
+                               break;
+                       }
+               }
 
-               sched_cmd        = odp_buffer_addr(buf);
-               sched_cmd->cmd   = SCHED_CMD_POLL_PKTIN;
-               sched_cmd->pktio_index = pktio_index;
-               sched_cmd->num   = 1;
-               sched_cmd->index[0] = in_queue_idx[i];
+               if (cmd < 0)
+                       ODP_ABORT("Scheduler out of pktio commands\n");
 
-               idx = poll_cmd_queue_idx(pktio_index, in_queue_idx[i]);
+               idx = poll_cmd_queue_idx(pktio_index, pktin_idx[i]);
 
                odp_spinlock_lock(&sched->poll_cmd_lock);
-               sched->poll_cmd[idx].num++;
+               sched->num_pktio_cmd[idx]++;
                odp_spinlock_unlock(&sched->poll_cmd_lock);
 
-               queue = sched->poll_cmd[idx].queue;
-
-               if (odp_queue_enq(queue, odp_buffer_to_event(buf)))
-                       ODP_ABORT("schedule_pktio_start failed\n");
+               sched->pktio_cmd[cmd].pktio_index = pktio_index;
+               sched->pktio_cmd[cmd].num_pktin   = 1;
+               sched->pktio_cmd[cmd].pktin[0]    = pktin_idx[i];
+               ring_enq(&sched->pktio_q[idx].ring, PKTIO_RING_MASK, cmd);
        }
 }
 
-static int schedule_pktio_stop(sched_cmd_t *sched_cmd)
+static int schedule_pktio_stop(int pktio_index, int first_pktin)
 {
        int num;
-       int pktio_index = sched_cmd->pktio_index;
-       int idx = poll_cmd_queue_idx(pktio_index, sched_cmd->index[0]);
+       int idx = poll_cmd_queue_idx(pktio_index, first_pktin);
 
        odp_spinlock_lock(&sched->poll_cmd_lock);
-       sched->poll_cmd[idx].num--;
-       sched->pktio[pktio_index].num--;
-       num = sched->pktio[pktio_index].num;
+       sched->num_pktio_cmd[idx]--;
+       sched->pktio[pktio_index].num_cmd--;
+       num = sched->pktio[pktio_index].num_cmd;
        odp_spinlock_unlock(&sched->poll_cmd_lock);
 
        return num;
@@ -592,9 +557,6 @@ static int do_schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
        int i, j;
        int ret;
        int id;
-       odp_event_t ev;
-       odp_buffer_t buf;
-       sched_cmd_t *sched_cmd;
        int offset = 0;
        uint32_t qi;
 
@@ -737,47 +699,40 @@ static int do_schedule(odp_queue_t *out_queue, 
odp_event_t out_ev[],
         *     have to do full iteration to avoid packet input starvation when
         *     there are less threads than command queues.
         */
-       id = sched_local.thr & (POLL_CMD_QUEUES - 1);
+       id = sched_local.thr & (PKTIO_CMD_QUEUES - 1);
 
-       for (i = 0; i < POLL_CMD_QUEUES; i++, id++) {
-               odp_queue_t cmd_queue;
-               int pktio_index;
+       for (i = 0; i < PKTIO_CMD_QUEUES; i++, id++) {
+               uint32_t cmd_index;
+               pktio_cmd_t *cmd;
 
-               if (id == POLL_CMD_QUEUES)
+               if (id == PKTIO_CMD_QUEUES)
                        id = 0;
 
-               if (sched->poll_cmd[id].num == 0)
+               if (sched->num_pktio_cmd[id] == 0)
                        continue;
 
-               cmd_queue = sched->poll_cmd[id].queue;
-               ev = odp_queue_deq(cmd_queue);
+               cmd_index = ring_deq(&sched->pktio_q[id].ring, PKTIO_RING_MASK);
 
-               if (ev == ODP_EVENT_INVALID)
+               if (cmd_index == RING_EMPTY)
                        continue;
 
-               buf       = odp_buffer_from_event(ev);
-               sched_cmd = odp_buffer_addr(buf);
-
-               if (sched_cmd->cmd != SCHED_CMD_POLL_PKTIN)
-                       ODP_ABORT("Bad poll command\n");
-
-               pktio_index = sched_cmd->pktio_index;
+               cmd = &sched->pktio_cmd[cmd_index];
 
                /* Poll packet input */
-               if (sched_cb_pktin_poll(pktio_index,
-                                       sched_cmd->num,
-                                       sched_cmd->index)) {
+               if (sched_cb_pktin_poll(cmd->pktio_index,
+                                       cmd->num_pktin, cmd->pktin)){
                        /* Pktio stopped or closed. Remove poll command and call
                         * stop_finalize when all commands of the pktio has
                         * been removed. */
-                       if (schedule_pktio_stop(sched_cmd) == 0)
-                               sched_cb_pktio_stop_finalize(pktio_index);
+                       if (schedule_pktio_stop(cmd->pktio_index,
+                                               cmd->pktin[0]) == 0)
+                               sched_cb_pktio_stop_finalize(cmd->pktio_index);
 
-                       odp_buffer_free(buf);
+                       cmd->pktio_index = PKTIO_FREE;
                } else {
                        /* Continue scheduling the pktio */
-                       if (odp_queue_enq(cmd_queue, ev))
-                               ODP_ABORT("Poll command enqueue failed\n");
+                       ring_enq(&sched->pktio_q[id].ring, PKTIO_RING_MASK,
+                                cmd_index);
 
                        /* Do not iterate through all pktin poll command queues
                         * every time. */
-- 
2.8.1

Reply via email to