Scheduler runs by polling scheduler priority queues for schedule
commands. There are two types of scheduler commands: queue dequeue
and packet input poll. Packet input is polled directly when a poll
command is received. From schduler point of view, the default
packet input queue is like any other queue.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 .../linux-generic/include/odp_packet_io_internal.h |  17 +-
 .../linux-generic/include/odp_queue_internal.h     |  34 +--
 .../linux-generic/include/odp_schedule_internal.h  |  14 +-
 platform/linux-generic/odp_packet_io.c             |  78 ++++--
 platform/linux-generic/odp_queue.c                 | 193 ++++++--------
 platform/linux-generic/odp_schedule.c              | 277 ++++++++++++++-------
 6 files changed, 369 insertions(+), 244 deletions(-)

diff --git a/platform/linux-generic/include/odp_packet_io_internal.h 
b/platform/linux-generic/include/odp_packet_io_internal.h
index 47b8992..161be16 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -40,6 +40,8 @@ typedef enum {
 struct pktio_entry {
        odp_spinlock_t lock;            /**< entry spinlock */
        int taken;                      /**< is entry taken(1) or free(0) */
+       int cls_ena;                    /**< is classifier enabled */
+       odp_pktio_t handle;             /**< pktio handle */
        odp_queue_t inq_default;        /**< default input queue, if set */
        odp_queue_t outq_default;       /**< default out queue */
        odp_queue_t loopq;              /**< loopback queue for "loop" device */
@@ -64,15 +66,22 @@ typedef struct {
 
 extern void *pktio_entry_ptr[];
 
+static inline int pktio_to_id(odp_pktio_t pktio)
+{
+       return _odp_typeval(pktio) - 1;
+}
 
-static inline pktio_entry_t *get_pktio_entry(odp_pktio_t id)
+static inline pktio_entry_t *get_pktio_entry(odp_pktio_t pktio)
 {
-       if (odp_unlikely(id == ODP_PKTIO_INVALID ||
-                        _odp_typeval(id) > ODP_CONFIG_PKTIO_ENTRIES))
+       if (odp_unlikely(pktio == ODP_PKTIO_INVALID ||
+                        _odp_typeval(pktio) > ODP_CONFIG_PKTIO_ENTRIES))
                return NULL;
 
-       return pktio_entry_ptr[_odp_typeval(id) - 1];
+       return pktio_entry_ptr[pktio_to_id(pktio)];
 }
+
+int pktin_poll(pktio_entry_t *entry);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 65aae14..61d0c43 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -36,10 +36,11 @@ extern "C" {
 #define QUEUE_MULTI_MAX 8
 
 #define QUEUE_STATUS_FREE         0
-#define QUEUE_STATUS_READY        1
-#define QUEUE_STATUS_NOTSCHED     2
-#define QUEUE_STATUS_SCHED        3
-#define QUEUE_STATUS_DESTROYED    4
+#define QUEUE_STATUS_DESTROYED    1
+#define QUEUE_STATUS_READY        2
+#define QUEUE_STATUS_NOTSCHED     3
+#define QUEUE_STATUS_SCHED        4
+
 
 /* forward declaration */
 union queue_entry_u;
@@ -69,7 +70,8 @@ struct queue_entry_s {
        deq_multi_func_t dequeue_multi;
 
        odp_queue_t       handle;
-       odp_buffer_t      sched_buf;
+       odp_queue_t       pri_queue;
+       odp_event_t       cmd_ev;
        odp_queue_type_t  type;
        odp_queue_param_t param;
        odp_pktio_t       pktin;
@@ -100,7 +102,6 @@ int queue_deq_multi_destroy(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
 void queue_lock(queue_entry_t *queue);
 void queue_unlock(queue_entry_t *queue);
 
-odp_buffer_t queue_sched_buf(odp_queue_t queue);
 int queue_sched_atomic(odp_queue_t handle);
 
 static inline uint32_t queue_to_id(odp_queue_t handle)
@@ -121,24 +122,23 @@ static inline queue_entry_t *queue_to_qentry(odp_queue_t 
handle)
        return get_qentry(queue_id);
 }
 
-static inline int queue_is_free(odp_queue_t handle)
+static inline int queue_is_atomic(queue_entry_t *qe)
 {
-       queue_entry_t *queue;
-
-       queue = queue_to_qentry(handle);
+       return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
+}
 
-       return queue->s.status == QUEUE_STATUS_FREE;
+static inline odp_queue_t queue_handle(queue_entry_t *qe)
+{
+       return qe->s.handle;
 }
 
-static inline int queue_is_sched(odp_queue_t handle)
+static inline int queue_prio(queue_entry_t *qe)
 {
-       queue_entry_t *queue;
+       return qe->s.param.sched.prio;
+}
 
-       queue = queue_to_qentry(handle);
+void queue_destroy_finalize(queue_entry_t *qe);
 
-       return ((queue->s.status == QUEUE_STATUS_SCHED) &&
-               (queue->s.pktin != ODP_PKTIO_INVALID));
-}
 #ifdef __cplusplus
 }
 #endif
diff --git a/platform/linux-generic/include/odp_schedule_internal.h 
b/platform/linux-generic/include/odp_schedule_internal.h
index acda2e4..904bfbd 100644
--- a/platform/linux-generic/include/odp_schedule_internal.h
+++ b/platform/linux-generic/include/odp_schedule_internal.h
@@ -16,12 +16,20 @@ extern "C" {
 
 #include <odp/buffer.h>
 #include <odp/queue.h>
+#include <odp/packet_io.h>
+#include <odp_queue_internal.h>
 
-void odp_schedule_mask_set(odp_queue_t queue, int prio);
 
-odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue);
+int schedule_queue_init(queue_entry_t *qe);
+void schedule_queue_destroy(queue_entry_t *qe);
 
-void odp_schedule_queue(odp_queue_t queue, int prio);
+static inline void schedule_queue(const queue_entry_t *qe)
+{
+       odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
+}
+
+
+int schedule_pktio_start(odp_pktio_t pktio, int prio);
 
 
 #ifdef __cplusplus
diff --git a/platform/linux-generic/odp_packet_io.c 
b/platform/linux-generic/odp_packet_io.c
index 21f0c17..4ab45c0 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -142,6 +142,7 @@ static void unlock_entry_classifier(pktio_entry_t *entry)
 static void init_pktio_entry(pktio_entry_t *entry)
 {
        set_taken(entry);
+       entry->s.cls_ena = 1; /* TODO: disable cls by default */
        entry->s.inq_default = ODP_QUEUE_INVALID;
        memset(&entry->s.pkt_sock, 0, sizeof(entry->s.pkt_sock));
        memset(&entry->s.pkt_sock_mmap, 0, sizeof(entry->s.pkt_sock_mmap));
@@ -273,6 +274,8 @@ static odp_pktio_t setup_pktio_entry(const char *dev, 
odp_pool_t pool)
                unlock_entry_classifier(pktio_entry);
        }
 
+       pktio_entry->s.handle = id;
+
        return id;
 }
 
@@ -475,19 +478,27 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t 
queue)
 
        qentry = queue_to_qentry(queue);
 
-       if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN)
-               return -1;
-
        lock_entry(pktio_entry);
        pktio_entry->s.inq_default = queue;
        unlock_entry(pktio_entry);
 
-       queue_lock(qentry);
-       qentry->s.pktin = id;
-       qentry->s.status = QUEUE_STATUS_SCHED;
-       queue_unlock(qentry);
-
-       odp_schedule_queue(queue, qentry->s.param.sched.prio);
+       switch (qentry->s.type) {
+       case ODP_QUEUE_TYPE_PKTIN:
+               /* User polls the input queue */
+               queue_lock(qentry);
+               qentry->s.pktin = id;
+               queue_unlock(qentry);
+               /*break; TODO: Uncomment and change _TYPE_PKTIN to _POLL*/
+       case ODP_QUEUE_TYPE_SCHED:
+               /* Packet input through the scheduler */
+               if (schedule_pktio_start(id, ODP_SCHED_PRIO_LOWEST)) {
+                       ODP_ERR("Schedule pktio start failed\n");
+                       return -1;
+               }
+               break;
+       default:
+               ODP_ABORT("Bad queue type\n");
+       }
 
        return 0;
 }
@@ -506,15 +517,6 @@ int odp_pktio_inq_remdef(odp_pktio_t id)
        qentry = queue_to_qentry(queue);
 
        queue_lock(qentry);
-       if (qentry->s.status == QUEUE_STATUS_FREE) {
-               queue_unlock(qentry);
-               unlock_entry(pktio_entry);
-               return -1;
-       }
-
-       qentry->s.enqueue = queue_enq_dummy;
-       qentry->s.enqueue_multi = queue_enq_multi_dummy;
-       qentry->s.status = QUEUE_STATUS_NOTSCHED;
        qentry->s.pktin = ODP_PKTIO_INVALID;
        queue_unlock(qentry);
 
@@ -665,6 +667,46 @@ int pktin_deq_multi(queue_entry_t *qentry, 
odp_buffer_hdr_t *buf_hdr[], int num)
        return nbr;
 }
 
+int pktin_poll(pktio_entry_t *entry)
+{
+       odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
+       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
+       int num, num_enq, i;
+
+       if (odp_unlikely(is_free(entry)))
+               return -1;
+
+       num = odp_pktio_recv(entry->s.handle, pkt_tbl, QUEUE_MULTI_MAX);
+
+       if (num < 0) {
+               ODP_ERR("Packet recv error\n");
+               return -1;
+       }
+
+       for (i = 0, num_enq = 0; i < num; ++i) {
+               odp_buffer_t buf;
+               odp_buffer_hdr_t *hdr;
+
+               buf = _odp_packet_to_buffer(pkt_tbl[i]);
+               hdr = odp_buf_to_hdr(buf);
+
+               if (entry->s.cls_ena) {
+                       if (packet_classifier(entry->s.handle, pkt_tbl[i]) < 0)
+                               hdr_tbl[num_enq++] = hdr;
+               } else {
+                       hdr_tbl[num_enq++] = hdr;
+               }
+       }
+
+       if (num_enq) {
+               queue_entry_t *qentry;
+               qentry = queue_to_qentry(entry->s.inq_default);
+               queue_enq_multi(qentry, hdr_tbl, num_enq);
+       }
+
+       return 0;
+}
+
 /** function should be called with locked entry */
 static int sockfd_from_pktio_entry(pktio_entry_t *entry)
 {
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index 4bb8b9b..4a0465b 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -88,7 +88,9 @@ static void queue_init(queue_entry_t *queue, const char *name,
 
        queue->s.head = NULL;
        queue->s.tail = NULL;
-       queue->s.sched_buf = ODP_BUFFER_INVALID;
+
+       queue->s.pri_queue = ODP_QUEUE_INVALID;
+       queue->s.cmd_ev    = ODP_EVENT_INVALID;
 }
 
 
@@ -222,22 +224,26 @@ odp_queue_t odp_queue_create(const char *name, 
odp_queue_type_t type,
 
        if (handle != ODP_QUEUE_INVALID &&
            (type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN)) {
-               odp_buffer_t buf;
-
-               buf = odp_schedule_buffer_alloc(handle);
-               if (buf == ODP_BUFFER_INVALID) {
-                       queue->s.status = QUEUE_STATUS_FREE;
-                       ODP_ERR("queue_init: sched buf alloc failed\n");
+               if (schedule_queue_init(queue)) {
+                       ODP_ERR("schedule queue init failed\n");
                        return ODP_QUEUE_INVALID;
                }
-
-               queue->s.sched_buf = buf;
-               odp_schedule_mask_set(handle, queue->s.param.sched.prio);
        }
 
        return handle;
 }
 
+void queue_destroy_finalize(queue_entry_t *queue)
+{
+       LOCK(&queue->s.lock);
+
+       if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+               queue->s.status = QUEUE_STATUS_FREE;
+               schedule_queue_destroy(queue);
+       }
+       UNLOCK(&queue->s.lock);
+}
+
 int odp_queue_destroy(odp_queue_t handle)
 {
        queue_entry_t *queue;
@@ -246,41 +252,31 @@ int odp_queue_destroy(odp_queue_t handle)
        LOCK(&queue->s.lock);
        if (queue->s.status == QUEUE_STATUS_FREE) {
                UNLOCK(&queue->s.lock);
-               ODP_ERR("queue_destroy: queue \"%s\" already free\n",
-                       queue->s.name);
+               ODP_ERR("queue \"%s\" already free\n", queue->s.name);
+               return -1;
+       }
+       if (queue->s.status == QUEUE_STATUS_DESTROYED) {
+               UNLOCK(&queue->s.lock);
+               ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name);
                return -1;
        }
        if (queue->s.head != NULL) {
                UNLOCK(&queue->s.lock);
-               ODP_ERR("queue_destroy: queue \"%s\" not empty\n",
-                       queue->s.name);
+               ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
                return -1;
        }
 
-       queue->s.enqueue = queue_enq_dummy;
-       queue->s.enqueue_multi = queue_enq_multi_dummy;
-
        switch (queue->s.status) {
        case QUEUE_STATUS_READY:
                queue->s.status = QUEUE_STATUS_FREE;
-               queue->s.head = NULL;
-               queue->s.tail = NULL;
+               break;
+       case QUEUE_STATUS_NOTSCHED:
+               queue->s.status = QUEUE_STATUS_FREE;
+               schedule_queue_destroy(queue);
                break;
        case QUEUE_STATUS_SCHED:
-               /*
-                * Override dequeue_multi to destroy queue when it will
-                * be scheduled next time.
-                */
+               /* Queue is still in scheduling */
                queue->s.status = QUEUE_STATUS_DESTROYED;
-               queue->s.dequeue_multi = queue_deq_multi_destroy;
-               break;
-       case QUEUE_STATUS_NOTSCHED:
-               /* Queue won't be scheduled anymore */
-               odp_buffer_free(queue->s.sched_buf);
-               queue->s.sched_buf = ODP_BUFFER_INVALID;
-               queue->s.status = QUEUE_STATUS_FREE;
-               queue->s.head = NULL;
-               queue->s.tail = NULL;
                break;
        default:
                ODP_ABORT("Unexpected queue status\n");
@@ -290,23 +286,6 @@ int odp_queue_destroy(odp_queue_t handle)
        return 0;
 }
 
-odp_buffer_t queue_sched_buf(odp_queue_t handle)
-{
-       queue_entry_t *queue;
-       queue = queue_to_qentry(handle);
-
-       return queue->s.sched_buf;
-}
-
-
-int queue_sched_atomic(odp_queue_t handle)
-{
-       queue_entry_t *queue;
-       queue = queue_to_qentry(handle);
-
-       return queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
-}
-
 int odp_queue_set_context(odp_queue_t handle, void *context)
 {
        queue_entry_t *queue;
@@ -352,6 +331,12 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
        int sched = 0;
 
        LOCK(&queue->s.lock);
+       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+               UNLOCK(&queue->s.lock);
+               ODP_ERR("Bad queue status\n");
+               return -1;
+       }
+
        if (queue->s.head == NULL) {
                /* Empty queue */
                queue->s.head = buf_hdr;
@@ -370,8 +355,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
        UNLOCK(&queue->s.lock);
 
        /* Add queue to scheduling */
-       if (sched == 1)
-               odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio);
+       if (sched)
+               schedule_queue(queue);
 
        return 0;
 }
@@ -389,6 +374,12 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
        buf_hdr[num-1]->next = NULL;
 
        LOCK(&queue->s.lock);
+       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+               UNLOCK(&queue->s.lock);
+               ODP_ERR("Bad queue status\n");
+               return -1;
+       }
+
        /* Empty queue */
        if (queue->s.head == NULL)
                queue->s.head = buf_hdr[0];
@@ -404,25 +395,12 @@ int queue_enq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[], int num)
        UNLOCK(&queue->s.lock);
 
        /* Add queue to scheduling */
-       if (sched == 1)
-               odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio);
+       if (sched)
+               schedule_queue(queue);
 
        return num; /* All events enqueued */
 }
 
-int queue_enq_dummy(queue_entry_t *queue ODP_UNUSED,
-                   odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
-{
-       return -1;
-}
-
-int queue_enq_multi_dummy(queue_entry_t *queue ODP_UNUSED,
-                         odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
-                         int num ODP_UNUSED)
-{
-       return -1;
-}
-
 int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num)
 {
        odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
@@ -455,24 +433,26 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
 
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 {
-       odp_buffer_hdr_t *buf_hdr = NULL;
+       odp_buffer_hdr_t *buf_hdr;
 
        LOCK(&queue->s.lock);
 
        if (queue->s.head == NULL) {
                /* Already empty queue */
-               if (queue->s.status == QUEUE_STATUS_SCHED &&
-                   queue->s.type != ODP_QUEUE_TYPE_PKTIN)
+               if (queue->s.status == QUEUE_STATUS_SCHED)
                        queue->s.status = QUEUE_STATUS_NOTSCHED;
-       } else {
-               buf_hdr       = queue->s.head;
-               queue->s.head = buf_hdr->next;
-               buf_hdr->next = NULL;
 
-               if (queue->s.head == NULL) {
-                       /* Queue is now empty */
-                       queue->s.tail = NULL;
-               }
+               UNLOCK(&queue->s.lock);
+               return NULL;
+       }
+
+       buf_hdr       = queue->s.head;
+       queue->s.head = buf_hdr->next;
+       buf_hdr->next = NULL;
+
+       if (queue->s.head == NULL) {
+               /* Queue is now empty */
+               queue->s.tail = NULL;
        }
 
        UNLOCK(&queue->s.lock);
@@ -483,31 +463,39 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
 
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
-       int i = 0;
+       odp_buffer_hdr_t *hdr;
+       int i;
 
        LOCK(&queue->s.lock);
+       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
+               /* Bad queue, or queue has been destroyed.
+                * Scheduler finalizes queue destroy after this. */
+               UNLOCK(&queue->s.lock);
+               return -1;
+       }
 
-       if (queue->s.head == NULL) {
+       hdr = queue->s.head;
+
+       if (hdr == NULL) {
                /* Already empty queue */
-               if (queue->s.status == QUEUE_STATUS_SCHED &&
-                   queue->s.type != ODP_QUEUE_TYPE_PKTIN)
+               if (queue->s.status == QUEUE_STATUS_SCHED)
                        queue->s.status = QUEUE_STATUS_NOTSCHED;
-       } else {
-               odp_buffer_hdr_t *hdr = queue->s.head;
 
-               for (; i < num && hdr; i++) {
-                       buf_hdr[i]       = hdr;
-                       /* odp_prefetch(hdr->addr); */
-                       hdr              = hdr->next;
-                       buf_hdr[i]->next = NULL;
-               }
+               UNLOCK(&queue->s.lock);
+               return 0;
+       }
 
-               queue->s.head = hdr;
+       for (i = 0; i < num && hdr; i++) {
+               buf_hdr[i]       = hdr;
+               hdr              = hdr->next;
+               buf_hdr[i]->next = NULL;
+       }
 
-               if (hdr == NULL) {
-                       /* Queue is now empty */
-                       queue->s.tail = NULL;
-               }
+       queue->s.head = hdr;
+
+       if (hdr == NULL) {
+               /* Queue is now empty */
+               queue->s.tail = NULL;
        }
 
        UNLOCK(&queue->s.lock);
@@ -515,23 +503,6 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
        return i;
 }
 
-int queue_deq_multi_destroy(queue_entry_t *queue,
-                           odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
-                           int num ODP_UNUSED)
-{
-       LOCK(&queue->s.lock);
-
-       odp_buffer_free(queue->s.sched_buf);
-       queue->s.sched_buf = ODP_BUFFER_INVALID;
-       queue->s.status = QUEUE_STATUS_FREE;
-       queue->s.head = NULL;
-       queue->s.tail = NULL;
-
-       UNLOCK(&queue->s.lock);
-
-       return 0;
-}
-
 int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
 {
        queue_entry_t *queue;
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index dd65168..59e40c7 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -21,17 +21,15 @@
 #include <odp/hints.h>
 
 #include <odp_queue_internal.h>
+#include <odp_packet_io_internal.h>
 
-
-/* Limits to number of scheduled queues */
-#define SCHED_POOL_SIZE (256*1024)
+/* Number of schedule commands.
+ * One per scheduled queue and packet interface */
+#define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES)
 
 /* Scheduler sub queues */
 #define QUEUES_PER_PRIO  4
 
-/* TODO: random or queue based selection */
-#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
-
 /* Maximum number of dequeues */
 #define MAX_DEQ 4
 
@@ -48,21 +46,36 @@ typedef struct {
        pri_mask_t     pri_mask[ODP_CONFIG_SCHED_PRIOS];
        odp_spinlock_t mask_lock;
        odp_pool_t     pool;
+       uint32_t       pri_count[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
 } sched_t;
 
+/* Schedule command */
 typedef struct {
-       odp_queue_t queue;
+       int           cmd;
+
+       union {
+               queue_entry_t *qe;
+
+               struct {
+                       odp_pktio_t   pktio;
+                       pktio_entry_t *pe;
+                       int           prio;
+               };
+       };
+} sched_cmd_t;
+
+#define SCHED_CMD_DEQUEUE    0
+#define SCHED_CMD_POLL_PKTIN 1
 
-} queue_desc_t;
 
 typedef struct {
        odp_queue_t pri_queue;
-       odp_event_t desc_ev;
+       odp_event_t cmd_ev;
 
-       odp_event_t ev[MAX_DEQ];
+       odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
+       queue_entry_t *qe;
        int num;
        int index;
-       odp_queue_t queue;
        int pause;
 
 } sched_local_t;
@@ -73,14 +86,6 @@ static sched_t *sched;
 /* Thread local scheduler context */
 static __thread sched_local_t sched_local;
 
-
-static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
-{
-       int id = SEL_PRI_QUEUE(queue);
-       return sched->pri_queue[prio][id];
-}
-
-
 int odp_schedule_init_global(void)
 {
        odp_shm_t shm;
@@ -101,9 +106,11 @@ int odp_schedule_init_global(void)
                return -1;
        }
 
-       params.buf.size  = sizeof(queue_desc_t);
+       memset(sched, 0, sizeof(sched_t));
+
+       params.buf.size  = sizeof(sched_cmd_t);
        params.buf.align = 0;
-       params.buf.num   = SCHED_POOL_SIZE/sizeof(queue_desc_t);
+       params.buf.num   = NUM_SCHED_CMD;
        params.type      = ODP_POOL_BUFFER;
 
        pool = odp_pool_create("odp_sched_pool", ODP_SHM_NULL, &params);
@@ -178,15 +185,17 @@ int odp_schedule_init_local(void)
 {
        int i;
 
+       memset(&sched_local, 0, sizeof(sched_local_t));
+
        sched_local.pri_queue = ODP_QUEUE_INVALID;
-       sched_local.desc_ev   = ODP_EVENT_INVALID;
+       sched_local.cmd_ev    = ODP_EVENT_INVALID;
 
        for (i = 0; i < MAX_DEQ; i++)
-               sched_local.ev[i] = ODP_EVENT_INVALID;
+               sched_local.buf_hdr[i] = NULL;
 
+       sched_local.qe    = NULL;
        sched_local.num   = 0;
        sched_local.index = 0;
-       sched_local.queue = ODP_QUEUE_INVALID;
        sched_local.pause = 0;
 
        return 0;
@@ -198,50 +207,128 @@ int odp_schedule_term_local(void)
        return 0;
 }
 
-void odp_schedule_mask_set(odp_queue_t queue, int prio)
+static int pri_id_queue(odp_queue_t queue)
 {
-       int id = SEL_PRI_QUEUE(queue);
+       return (QUEUES_PER_PRIO-1) & (queue_to_id(queue));
+}
 
+static int pri_id_pktio(odp_pktio_t pktio)
+{
+       return (QUEUES_PER_PRIO-1) & (pktio_to_id(pktio));
+}
+
+static odp_queue_t pri_set(int id, int prio)
+{
        odp_spinlock_lock(&sched->mask_lock);
        sched->pri_mask[prio] |= 1 << id;
+       sched->pri_count[prio][id]++;
        odp_spinlock_unlock(&sched->mask_lock);
+
+       return sched->pri_queue[prio][id];
+}
+
+static void pri_clr(int id, int prio)
+{
+       odp_spinlock_lock(&sched->mask_lock);
+
+       /* Clear mask bit when last queue is removed*/
+       sched->pri_count[prio][id]--;
+
+       if (sched->pri_count[prio][id] == 0)
+               sched->pri_mask[prio] &= (uint8_t)(~(1 << id));
+
+       odp_spinlock_unlock(&sched->mask_lock);
+}
+
+static odp_queue_t pri_set_queue(odp_queue_t queue, int prio)
+{
+       int id = pri_id_queue(queue);
+
+       return pri_set(id, prio);
+}
+
+static odp_queue_t pri_set_pktio(odp_pktio_t pktio, int prio)
+{
+       int id = pri_id_pktio(pktio);
+
+       return pri_set(id, prio);
+}
+
+static void pri_clr_queue(odp_queue_t queue, int prio)
+{
+       int id = pri_id_queue(queue);
+       pri_clr(id, prio);
 }
 
+static void pri_clr_pktio(odp_pktio_t pktio, int prio)
+{
+       int id = pri_id_pktio(pktio);
+       pri_clr(id, prio);
+}
 
-odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
+int schedule_queue_init(queue_entry_t *qe)
 {
        odp_buffer_t buf;
+       sched_cmd_t *sched_cmd;
 
        buf = odp_buffer_alloc(sched->pool);
 
-       if (buf != ODP_BUFFER_INVALID) {
-               queue_desc_t *desc;
-               desc        = odp_buffer_addr(buf);
-               desc->queue = queue;
-       }
+       if (buf == ODP_BUFFER_INVALID)
+               return -1;
 
-       return buf;
-}
+       sched_cmd      = odp_buffer_addr(buf);
+       sched_cmd->cmd = SCHED_CMD_DEQUEUE;
+       sched_cmd->qe  = qe;
 
+       qe->s.cmd_ev    = odp_buffer_to_event(buf);
+       qe->s.pri_queue = pri_set_queue(queue_handle(qe), queue_prio(qe));
 
-void odp_schedule_queue(odp_queue_t queue, int prio)
+       return 0;
+}
+
+void schedule_queue_destroy(queue_entry_t *qe)
 {
-       odp_buffer_t desc_buf;
-       odp_queue_t  pri_queue;
+       odp_buffer_t buf;
 
-       pri_queue = select_pri_queue(queue, prio);
-       desc_buf  = queue_sched_buf(queue);
+       buf = odp_buffer_from_event(qe->s.cmd_ev);
+       odp_buffer_free(buf);
 
-       odp_queue_enq(pri_queue, odp_buffer_to_event(desc_buf));
+       pri_clr_queue(queue_handle(qe), queue_prio(qe));
+
+       qe->s.cmd_ev    = ODP_EVENT_INVALID;
+       qe->s.pri_queue = ODP_QUEUE_INVALID;
 }
 
+int schedule_pktio_start(odp_pktio_t pktio, int prio)
+{
+       odp_buffer_t buf;
+       sched_cmd_t *sched_cmd;
+       odp_queue_t pri_queue;
+
+       buf = odp_buffer_alloc(sched->pool);
+
+       if (buf == ODP_BUFFER_INVALID)
+               return -1;
+
+       sched_cmd        = odp_buffer_addr(buf);
+       sched_cmd->cmd   = SCHED_CMD_POLL_PKTIN;
+       sched_cmd->pktio = pktio;
+       sched_cmd->pe    = get_pktio_entry(pktio);
+       sched_cmd->prio  = prio;
+
+       pri_queue  = pri_set_pktio(pktio, prio);
+
+       odp_queue_enq(pri_queue, odp_buffer_to_event(buf));
+
+       return 0;
+}
 
 void odp_schedule_release_atomic(void)
 {
        if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
            sched_local.num       == 0) {
                /* Release current atomic queue */
-               odp_queue_enq(sched_local.pri_queue, sched_local.desc_ev);
+               odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev);
                sched_local.pri_queue = ODP_QUEUE_INVALID;
        }
 }
@@ -252,7 +339,8 @@ static inline int copy_events(odp_event_t out_ev[], 
unsigned int max)
        int i = 0;
 
        while (sched_local.num && max) {
-               out_ev[i] = sched_local.ev[sched_local.index];
+               odp_buffer_hdr_t *hdr = sched_local.buf_hdr[sched_local.index];
+               out_ev[i] = odp_buffer_to_event(hdr->handle.handle);
                sched_local.index++;
                sched_local.num--;
                max--;
@@ -279,7 +367,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                ret = copy_events(out_ev, max_num);
 
                if (out_queue)
-                       *out_queue = sched_local.queue;
+                       *out_queue = queue_handle(sched_local.qe);
 
                return ret;
        }
@@ -302,7 +390,10 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
                        odp_queue_t  pri_q;
                        odp_event_t  ev;
-                       odp_buffer_t desc_buf;
+                       odp_buffer_t buf;
+                       sched_cmd_t *sched_cmd;
+                       queue_entry_t *qe;
+                       int num;
 
                        if (id >= QUEUES_PER_PRIO)
                                id = 0;
@@ -310,59 +401,63 @@ static int schedule(odp_queue_t *out_queue, odp_event_t 
out_ev[],
                        if (odp_unlikely((sched->pri_mask[i] & (1 << id)) == 0))
                                continue;
 
-                       pri_q    = sched->pri_queue[i][id];
-                       ev       = odp_queue_deq(pri_q);
-                       desc_buf = odp_buffer_from_event(ev);
-
-                       if (desc_buf != ODP_BUFFER_INVALID) {
-                               queue_desc_t *desc;
-                               odp_queue_t queue;
-                               int num;
-
-                               desc  = odp_buffer_addr(desc_buf);
-                               queue = desc->queue;
-
-                               if (odp_queue_type(queue) ==
-                                       ODP_QUEUE_TYPE_PKTIN &&
-                                       !queue_is_sched(queue))
-                                       continue;
-
-                               num = odp_queue_deq_multi(queue, sched_local.ev,
-                                                         max_deq);
-
-                               if (num == 0) {
-                                       /* Remove empty queue from scheduling,
-                                        * except packet input queues
-                                        */
-                                       if (odp_queue_type(queue) ==
-                                           ODP_QUEUE_TYPE_PKTIN &&
-                                           !queue_is_free(queue))
-                                               odp_queue_enq(pri_q, ev);
-
-                                       continue;
-                               }
-
-                               sched_local.num   = num;
-                               sched_local.index = 0;
-                               ret = copy_events(out_ev, max_num);
-
-                               sched_local.queue = queue;
-
-                               if (queue_sched_atomic(queue)) {
-                                       /* Hold queue during atomic access */
-                                       sched_local.pri_queue = pri_q;
-                                       sched_local.desc_ev   = ev;
+                       pri_q = sched->pri_queue[i][id];
+                       ev    = odp_queue_deq(pri_q);
+                       buf   = odp_buffer_from_event(ev);
+
+                       if (buf == ODP_BUFFER_INVALID)
+                               continue;
+
+                       sched_cmd = odp_buffer_addr(buf);
+
+                       if (sched_cmd->cmd == SCHED_CMD_POLL_PKTIN) {
+                               /* Poll packet input */
+                               if (pktin_poll(sched_cmd->pe)) {
+                                       /* Stop scheduling the pktio */
+                                       pri_clr_pktio(sched_cmd->pktio,
+                                                     sched_cmd->prio);
+                                       odp_buffer_free(buf);
                                } else {
-                                       /* Continue scheduling the queue */
+                                       /* Continue scheduling the pktio */
                                        odp_queue_enq(pri_q, ev);
                                }
 
-                               /* Output the source queue handle */
-                               if (out_queue)
-                                       *out_queue = queue;
+                               continue;
+                       }
+
+                       qe  = sched_cmd->qe;
+                       num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq);
 
-                               return ret;
+                       if (num < 0) {
+                               /* Destroyed queue */
+                               queue_destroy_finalize(qe);
+                               continue;
                        }
+
+                       if (num == 0) {
+                               /* Remove empty queue from scheduling */
+                               continue;
+                       }
+
+                       sched_local.num   = num;
+                       sched_local.index = 0;
+                       sched_local.qe    = qe;
+                       ret = copy_events(out_ev, max_num);
+
+                       if (queue_is_atomic(qe)) {
+                               /* Hold queue during atomic access */
+                               sched_local.pri_queue = pri_q;
+                               sched_local.cmd_ev    = ev;
+                       } else {
+                               /* Continue scheduling the queue */
+                               odp_queue_enq(pri_q, ev);
+                       }
+
+                       /* Output the source queue handle */
+                       if (out_queue)
+                               *out_queue = queue_handle(qe);
+
+                       return ret;
                }
        }
 
-- 
2.3.3


_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
http://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to