Separate packet input polling from event scheduling. A thread polls
packet input only when there are no events. Packet input queue index
is included into poll command queue selection.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 .../linux-generic/include/odp_schedule_internal.h  |   2 +-
 platform/linux-generic/odp_packet_io.c             |   3 +-
 platform/linux-generic/odp_schedule.c              | 216 ++++++++++++++-------
 3 files changed, 145 insertions(+), 76 deletions(-)

diff --git a/platform/linux-generic/include/odp_schedule_internal.h 
b/platform/linux-generic/include/odp_schedule_internal.h
index 78e16c4..0868394 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -24,7 +24,7 @@ int schedule_queue_init(queue_entry_t *qe);
 void schedule_queue_destroy(queue_entry_t *qe);
 int schedule_queue(const queue_entry_t *qe);
 void schedule_pktio_start(odp_pktio_t pktio, int num_in_queue,
-                         int in_queue_idx[], int prio);
+                         int in_queue_idx[]);
 void odp_schedule_release_context(void);
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/odp_packet_io.c 
b/platform/linux-generic/odp_packet_io.c
index d625db2..a235798 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -334,8 +334,7 @@ int odp_pktio_start(odp_pktio_t id)
                                return -1;
                        }
 
-                       schedule_pktio_start(id, 1, &index,
-                                            ODP_SCHED_PRIO_LOWEST);
+                       schedule_pktio_start(id, 1, &index);
                }
        }
 
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 22af755..e0fadfa 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -30,9 +30,12 @@ odp_thrmask_t sched_mask_all;
  * One per scheduled queue and packet interface */
 #define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES)
 
-/* Scheduler sub queues */
+/* Priority queues per priority */
 #define QUEUES_PER_PRIO  4
 
+/* Packet input poll cmd queues */
+#define POLL_CMD_QUEUES  4
+
 /* Maximum number of dequeues */
 #define MAX_DEQ 4
 
@@ -52,6 +55,13 @@ typedef struct {
        odp_queue_t    pri_queue[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
        pri_mask_t     pri_mask[ODP_CONFIG_SCHED_PRIOS];
        odp_spinlock_t mask_lock;
+
+       odp_spinlock_t poll_cmd_lock;
+       struct {
+               odp_queue_t queue;
+               uint16_t    num;
+       } poll_cmd[POLL_CMD_QUEUES];
+
        odp_pool_t     pool;
        odp_shm_t      shm;
        uint32_t       pri_count[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
@@ -74,7 +84,6 @@ typedef struct {
                        int           num;
                        int           index[MAX_PKTIN];
                        pktio_entry_t *pe;
-                       int           prio;
                };
        };
 } sched_cmd_t;
@@ -84,19 +93,20 @@ typedef struct {
 
 
 typedef struct {
+       int thr;
+       int num;
+       int index;
+       int pause;
+       uint32_t pktin_polls;
        odp_queue_t pri_queue;
        odp_event_t cmd_ev;
-
-       odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
        queue_entry_t *qe;
        queue_entry_t *origin_qe;
+       odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
        uint64_t order;
        uint64_t sync[ODP_CONFIG_MAX_ORDERED_LOCKS_PER_QUEUE];
        odp_pool_t pool;
        int enq_called;
-       int num;
-       int index;
-       int pause;
        int ignore_ordered_context;
 } sched_local_t;
 
@@ -113,6 +123,7 @@ static void sched_local_init(void)
 {
        memset(&sched_local, 0, sizeof(sched_local_t));
 
+       sched_local.thr       = odp_thread_id();
        sched_local.pri_queue = ODP_QUEUE_INVALID;
        sched_local.cmd_ev    = ODP_EVENT_INVALID;
 }
@@ -180,6 +191,24 @@ int odp_schedule_init_global(void)
                }
        }
 
+       odp_spinlock_init(&sched->poll_cmd_lock);
+       for (i = 0; i < POLL_CMD_QUEUES; i++) {
+               odp_queue_t queue;
+               char name[] = "odp_poll_cmd_YY";
+
+               name[13] = '0' + i / 10;
+               name[14] = '0' + i - 10 * (i / 10);
+
+               queue = odp_queue_create(name, ODP_QUEUE_TYPE_POLL, NULL);
+
+               if (queue == ODP_QUEUE_INVALID) {
+                       ODP_ERR("Sched init: Queue create failed.\n");
+                       return -1;
+               }
+
+               sched->poll_cmd[i].queue = queue;
+       }
+
        odp_spinlock_init(&sched->grp_lock);
 
        for (i = 0; i < ODP_CONFIG_SCHED_GRPS; i++) {
@@ -199,11 +228,11 @@ int odp_schedule_term_global(void)
        int ret = 0;
        int rc = 0;
        int i, j;
+       odp_event_t  ev;
 
        for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
                for (j = 0; j < QUEUES_PER_PRIO; j++) {
                        odp_queue_t  pri_q;
-                       odp_event_t  ev;
 
                        pri_q = sched->pri_queue[i][j];
 
@@ -211,25 +240,20 @@ int odp_schedule_term_global(void)
                              ODP_EVENT_INVALID) {
                                odp_buffer_t buf;
                                sched_cmd_t *sched_cmd;
+                               queue_entry_t *qe;
+                               odp_buffer_hdr_t *buf_hdr[1];
+                               int num;
 
                                buf = odp_buffer_from_event(ev);
                                sched_cmd = odp_buffer_addr(buf);
+                               qe  = sched_cmd->qe;
+                               num = queue_deq_multi(qe, buf_hdr, 1);
 
-                               if (sched_cmd->cmd == SCHED_CMD_DEQUEUE) {
-                                       queue_entry_t *qe;
-                                       odp_buffer_hdr_t *buf_hdr[1];
-                                       int num;
+                               if (num < 0)
+                                       queue_destroy_finalize(qe);
 
-                                       qe  = sched_cmd->qe;
-                                       num = queue_deq_multi(qe, buf_hdr, 1);
-
-                                       if (num < 0)
-                                               queue_destroy_finalize(qe);
-
-                                       if (num > 0)
-                                               ODP_ERR("Queue not empty\n");
-                               } else
-                                       odp_buffer_free(buf);
+                               if (num > 0)
+                                       ODP_ERR("Queue not empty\n");
                        }
 
                        if (odp_queue_destroy(pri_q)) {
@@ -239,6 +263,18 @@ int odp_schedule_term_global(void)
                }
        }
 
+       for (i = 0; i < POLL_CMD_QUEUES; i++) {
+               odp_queue_t queue = sched->poll_cmd[i].queue;
+
+               while ((ev = odp_queue_deq(queue)) != ODP_EVENT_INVALID)
+                       odp_event_free(ev);
+
+               if (odp_queue_destroy(queue)) {
+                       ODP_ERR("Poll cmd queue destroy failed\n");
+                       rc = -1;
+               }
+       }
+
        if (odp_pool_destroy(sched->pool) != 0) {
                ODP_ERR("Pool destroy fail.\n");
                rc = -1;
@@ -277,11 +313,6 @@ static int pri_id_queue(odp_queue_t queue)
        return (QUEUES_PER_PRIO-1) & (queue_to_id(queue));
 }
 
-static int pri_id_pktio(odp_pktio_t pktio)
-{
-       return (QUEUES_PER_PRIO-1) & (pktio_to_id(pktio));
-}
-
 static odp_queue_t pri_set(int id, int prio)
 {
        odp_spinlock_lock(&sched->mask_lock);
@@ -312,25 +343,12 @@ static odp_queue_t pri_set_queue(odp_queue_t queue, int 
prio)
        return pri_set(id, prio);
 }
 
-static odp_queue_t pri_set_pktio(odp_pktio_t pktio, int prio)
-{
-       int id = pri_id_pktio(pktio);
-
-       return pri_set(id, prio);
-}
-
 static void pri_clr_queue(odp_queue_t queue, int prio)
 {
        int id = pri_id_queue(queue);
        pri_clr(id, prio);
 }
 
-static void pri_clr_pktio(odp_pktio_t pktio, int prio)
-{
-       int id = pri_id_pktio(pktio);
-       pri_clr(id, prio);
-}
-
 int schedule_queue_init(queue_entry_t *qe)
 {
        odp_buffer_t buf;
@@ -361,13 +379,18 @@ void schedule_queue_destroy(queue_entry_t *qe)
        qe->s.pri_queue = ODP_QUEUE_INVALID;
 }
 
+static int poll_cmd_queue_idx(odp_pktio_t pktio, int in_queue_idx)
+{
+       return (POLL_CMD_QUEUES - 1) & (pktio_to_id(pktio) ^ in_queue_idx);
+}
+
 void schedule_pktio_start(odp_pktio_t pktio, int num_in_queue,
-                         int in_queue_idx[], int prio)
+                         int in_queue_idx[])
 {
        odp_buffer_t buf;
        sched_cmd_t *sched_cmd;
-       odp_queue_t pri_queue;
-       int i;
+       odp_queue_t queue;
+       int i, idx;
 
        buf = odp_buffer_alloc(sched->pool);
 
@@ -379,7 +402,6 @@ void schedule_pktio_start(odp_pktio_t pktio, int 
num_in_queue,
        sched_cmd->pktio = pktio;
        sched_cmd->num   = num_in_queue;
        sched_cmd->pe    = get_pktio_entry(pktio);
-       sched_cmd->prio  = prio;
 
        if (num_in_queue > MAX_PKTIN)
                ODP_ABORT("Too many input queues for scheduler\n");
@@ -387,12 +409,27 @@ void schedule_pktio_start(odp_pktio_t pktio, int 
num_in_queue,
        for (i = 0; i < num_in_queue; i++)
                sched_cmd->index[i] = in_queue_idx[i];
 
-       pri_queue  = pri_set_pktio(pktio, prio);
+       idx = poll_cmd_queue_idx(pktio, in_queue_idx[0]);
 
-       if (odp_queue_enq(pri_queue, odp_buffer_to_event(buf)))
+       odp_spinlock_lock(&sched->poll_cmd_lock);
+       sched->poll_cmd[idx].num++;
+       odp_spinlock_unlock(&sched->poll_cmd_lock);
+
+       queue = sched->poll_cmd[idx].queue;
+
+       if (odp_queue_enq(queue, odp_buffer_to_event(buf)))
                ODP_ABORT("schedule_pktio_start failed\n");
 }
 
+static void schedule_pktio_stop(sched_cmd_t *sched_cmd)
+{
+       int idx = poll_cmd_queue_idx(sched_cmd->pktio, sched_cmd->index[0]);
+
+       odp_spinlock_lock(&sched->poll_cmd_lock);
+       sched->poll_cmd[idx].num--;
+       odp_spinlock_unlock(&sched->poll_cmd_lock);
+}
+
 void odp_schedule_release_atomic(void)
 {
        if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
@@ -449,9 +486,12 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                    unsigned int max_num, unsigned int max_deq)
 {
        int i, j;
-       int thr;
        int ret;
        uint32_t k;
+       int id;
+       odp_event_t ev;
+       odp_buffer_t buf;
+       sched_cmd_t *sched_cmd;
 
        if (sched_local.num) {
                ret = copy_events(out_ev, max_num);
@@ -467,21 +507,16 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
        if (odp_unlikely(sched_local.pause))
                return 0;
 
-       thr = odp_thread_id();
-
+       /* Schedule events */
        for (i = 0; i < ODP_CONFIG_SCHED_PRIOS; i++) {
-               int id;
 
                if (sched->pri_mask[i] == 0)
                        continue;
 
-               id = thr & (QUEUES_PER_PRIO-1);
+               id = sched_local.thr & (QUEUES_PER_PRIO - 1);
 
                for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
                        odp_queue_t  pri_q;
-                       odp_event_t  ev;
-                       odp_buffer_t buf;
-                       sched_cmd_t *sched_cmd;
                        queue_entry_t *qe;
                        int num;
                        int qe_grp;
@@ -501,30 +536,12 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        buf       = odp_buffer_from_event(ev);
                        sched_cmd = odp_buffer_addr(buf);
 
-                       if (sched_cmd->cmd == SCHED_CMD_POLL_PKTIN) {
-                               /* Poll packet input */
-                               if (pktin_poll(sched_cmd->pe,
-                                              sched_cmd->num,
-                                              sched_cmd->index)) {
-                                       /* Stop scheduling the pktio */
-                                       pri_clr_pktio(sched_cmd->pktio,
-                                                     sched_cmd->prio);
-                                       odp_buffer_free(buf);
-                               } else {
-                                       /* Continue scheduling the pktio */
-                                       if (odp_queue_enq(pri_q, ev))
-                                               ODP_ABORT("schedule failed\n");
-                               }
-
-                               continue;
-                       }
-
                        qe     = sched_cmd->qe;
                        qe_grp = qe->s.param.sched.group;
 
                        if (qe_grp > ODP_SCHED_GROUP_ALL &&
                            !odp_thrmask_isset(sched->sched_grp[qe_grp].mask,
-                                              thr)) {
+                                              sched_local.thr)) {
                                /* This thread is not eligible for work from
                                 * this queue, so continue scheduling it.
                                 */
@@ -592,6 +609,59 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                }
        }
 
+       /*
+        * Poll packet input when there are no events
+        *   * Each thread starts the search for a poll command from its
+        *     preferred command queue. If the queue is empty, it moves to other
+        *     queues.
+        *   * Most of the times, the search stops on the first command found to
+        *     optimize multi-threaded performance. A small portion of polls
+        *     have to do full iteration to avoid packet input starvation when
+        *     there are less threads than command queues.
+        */
+       id = sched_local.thr & (POLL_CMD_QUEUES - 1);
+
+       for (i = 0; i < POLL_CMD_QUEUES; i++, id++) {
+               odp_queue_t cmd_queue;
+
+               if (id == POLL_CMD_QUEUES)
+                       id = 0;
+
+               if (sched->poll_cmd[id].num == 0)
+                       continue;
+
+               cmd_queue = sched->poll_cmd[id].queue;
+               ev = odp_queue_deq(cmd_queue);
+
+               if (ev == ODP_EVENT_INVALID)
+                       continue;
+
+               buf       = odp_buffer_from_event(ev);
+               sched_cmd = odp_buffer_addr(buf);
+
+               if (sched_cmd->cmd != SCHED_CMD_POLL_PKTIN)
+                       ODP_ABORT("Bad poll command\n");
+
+               /* Poll packet input */
+               if (pktin_poll(sched_cmd->pe,
+                              sched_cmd->num,
+                              sched_cmd->index)) {
+                       /* Stop scheduling the pktio */
+                       schedule_pktio_stop(sched_cmd);
+                       odp_buffer_free(buf);
+               } else {
+                       /* Continue scheduling the pktio */
+                       if (odp_queue_enq(cmd_queue, ev))
+                               ODP_ABORT("Poll command enqueue failed\n");
+
+                       /* Do not iterate through all pktin poll command queues
+                        * every time. */
+                       if (odp_likely(sched_local.pktin_polls & 0xf))
+                               break;
+               }
+       }
+
+       sched_local.pktin_polls++;
        return 0;
 }
 
-- 
2.6.3

_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
https://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to