On 23 March 2015 at 10:42, Petri Savolainen <petri.savolai...@nokia.com>
wrote:

> Scheduler runs by polling scheduler priority queues for schedule
> commands. There are two types of scheduler commands: queue dequeue
> and packet input poll. Packet input is polled directly when a poll
> command is received. From schduler point of view, the default
> packet input queue is like any other queue.
>
> Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
> ---
>  .../linux-generic/include/odp_packet_io_internal.h |  17 +-
>  .../linux-generic/include/odp_queue_internal.h     |  34 +--
>  .../linux-generic/include/odp_schedule_internal.h  |  14 +-
>  platform/linux-generic/odp_packet_io.c             |  78 ++++--
>  platform/linux-generic/odp_queue.c                 | 193 ++++++--------
>  platform/linux-generic/odp_schedule.c              | 277
> ++++++++++++++-------
>  6 files changed, 369 insertions(+), 244 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h
> b/platform/linux-generic/include/odp_packet_io_internal.h
> index 47b8992..161be16 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -40,6 +40,8 @@ typedef enum {
>  struct pktio_entry {
>         odp_spinlock_t lock;            /**< entry spinlock */
>         int taken;                      /**< is entry taken(1) or free(0)
> */
> +       int cls_ena;                    /**< is classifier enabled */
>

cls_ena is not very descriptive clsfy_enable is better but still ugly


> +       odp_pktio_t handle;             /**< pktio handle */
>         odp_queue_t inq_default;        /**< default input queue, if set */
>         odp_queue_t outq_default;       /**< default out queue */
>         odp_queue_t loopq;              /**< loopback queue for "loop"
> device */
> @@ -64,15 +66,22 @@ typedef struct {
>
>  extern void *pktio_entry_ptr[];
>
> +static inline int pktio_to_id(odp_pktio_t pktio)
> +{
> +       return _odp_typeval(pktio) - 1;
> +}
>
> -static inline pktio_entry_t *get_pktio_entry(odp_pktio_t id)
> +static inline pktio_entry_t *get_pktio_entry(odp_pktio_t pktio)
>  {
> -       if (odp_unlikely(id == ODP_PKTIO_INVALID ||
> -                        _odp_typeval(id) > ODP_CONFIG_PKTIO_ENTRIES))
> +       if (odp_unlikely(pktio == ODP_PKTIO_INVALID ||
> +                        _odp_typeval(pktio) > ODP_CONFIG_PKTIO_ENTRIES))
>                 return NULL;
>
> -       return pktio_entry_ptr[_odp_typeval(id) - 1];
> +       return pktio_entry_ptr[pktio_to_id(pktio)];
>  }
> +
> +int pktin_poll(pktio_entry_t *entry);
> +
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/platform/linux-generic/include/odp_queue_internal.h
> b/platform/linux-generic/include/odp_queue_internal.h
> index 65aae14..61d0c43 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -36,10 +36,11 @@ extern "C" {
>  #define QUEUE_MULTI_MAX 8
>
>  #define QUEUE_STATUS_FREE         0
> -#define QUEUE_STATUS_READY        1
> -#define QUEUE_STATUS_NOTSCHED     2
> -#define QUEUE_STATUS_SCHED        3
> -#define QUEUE_STATUS_DESTROYED    4
> +#define QUEUE_STATUS_DESTROYED    1
> +#define QUEUE_STATUS_READY        2
> +#define QUEUE_STATUS_NOTSCHED     3
> +#define QUEUE_STATUS_SCHED        4
> +
>
>  /* forward declaration */
>  union queue_entry_u;
> @@ -69,7 +70,8 @@ struct queue_entry_s {
>         deq_multi_func_t dequeue_multi;
>
>         odp_queue_t       handle;
> -       odp_buffer_t      sched_buf;
> +       odp_queue_t       pri_queue;
> +       odp_event_t       cmd_ev;
>         odp_queue_type_t  type;
>         odp_queue_param_t param;
>         odp_pktio_t       pktin;
> @@ -100,7 +102,6 @@ int queue_deq_multi_destroy(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[],
>  void queue_lock(queue_entry_t *queue);
>  void queue_unlock(queue_entry_t *queue);
>
> -odp_buffer_t queue_sched_buf(odp_queue_t queue);
>  int queue_sched_atomic(odp_queue_t handle);
>
>  static inline uint32_t queue_to_id(odp_queue_t handle)
> @@ -121,24 +122,23 @@ static inline queue_entry_t
> *queue_to_qentry(odp_queue_t handle)
>         return get_qentry(queue_id);
>  }
>
> -static inline int queue_is_free(odp_queue_t handle)
> +static inline int queue_is_atomic(queue_entry_t *qe)
>  {
> -       queue_entry_t *queue;
> -
> -       queue = queue_to_qentry(handle);
> +       return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
> +}


> -       return queue->s.status == QUEUE_STATUS_FREE;
> +static inline odp_queue_t queue_handle(queue_entry_t *qe)
> +{
> +       return qe->s.handle;
>  }
>
> -static inline int queue_is_sched(odp_queue_t handle)
> +static inline int queue_prio(queue_entry_t *qe)
>  {
> -       queue_entry_t *queue;
> +       return qe->s.param.sched.prio;
> +}
>
> -       queue = queue_to_qentry(handle);
> +void queue_destroy_finalize(queue_entry_t *qe);
>
> -       return ((queue->s.status == QUEUE_STATUS_SCHED) &&
> -               (queue->s.pktin != ODP_PKTIO_INVALID));
> -}
>  #ifdef __cplusplus
>  }
>  #endif
> diff --git a/platform/linux-generic/include/odp_schedule_internal.h
> b/platform/linux-generic/include/odp_schedule_internal.h
> index acda2e4..904bfbd 100644
> --- a/platform/linux-generic/include/odp_schedule_internal.h
> +++ b/platform/linux-generic/include/odp_schedule_internal.h
> @@ -16,12 +16,20 @@ extern "C" {
>
>  #include <odp/buffer.h>
>  #include <odp/queue.h>
> +#include <odp/packet_io.h>
> +#include <odp_queue_internal.h>
>
> -void odp_schedule_mask_set(odp_queue_t queue, int prio);
>
> -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue);
> +int schedule_queue_init(queue_entry_t *qe);
> +void schedule_queue_destroy(queue_entry_t *qe);
>
> -void odp_schedule_queue(odp_queue_t queue, int prio);
> +static inline void schedule_queue(const queue_entry_t *qe)
> +{
> +       odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev);
> +}
> +
> +
> +int schedule_pktio_start(odp_pktio_t pktio, int prio);
>
>
>  #ifdef __cplusplus
> diff --git a/platform/linux-generic/odp_packet_io.c
> b/platform/linux-generic/odp_packet_io.c
> index 21f0c17..4ab45c0 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -142,6 +142,7 @@ static void unlock_entry_classifier(pktio_entry_t
> *entry)
>  static void init_pktio_entry(pktio_entry_t *entry)
>  {
>         set_taken(entry);
> +       entry->s.cls_ena = 1; /* TODO: disable cls by default */
>

Needs to have a bug link, a todo that makes it into the repo is a known
deficiency in the code for that published revision of the code.


>         entry->s.inq_default = ODP_QUEUE_INVALID;
>         memset(&entry->s.pkt_sock, 0, sizeof(entry->s.pkt_sock));
>         memset(&entry->s.pkt_sock_mmap, 0, sizeof(entry->s.pkt_sock_mmap));
> @@ -273,6 +274,8 @@ static odp_pktio_t setup_pktio_entry(const char *dev,
> odp_pool_t pool)
>                 unlock_entry_classifier(pktio_entry);
>         }
>
> +       pktio_entry->s.handle = id;
> +
>         return id;
>  }
>
> @@ -475,19 +478,27 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t
> queue)
>
>         qentry = queue_to_qentry(queue);
>
> -       if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN)
> -               return -1;
> -
>         lock_entry(pktio_entry);
>         pktio_entry->s.inq_default = queue;
>         unlock_entry(pktio_entry);
>
> -       queue_lock(qentry);
> -       qentry->s.pktin = id;
> -       qentry->s.status = QUEUE_STATUS_SCHED;
> -       queue_unlock(qentry);
> -
> -       odp_schedule_queue(queue, qentry->s.param.sched.prio);
> +       switch (qentry->s.type) {
> +       case ODP_QUEUE_TYPE_PKTIN:
> +               /* User polls the input queue */
> +               queue_lock(qentry);
> +               qentry->s.pktin = id;
> +               queue_unlock(qentry);
> +               /*break; TODO: Uncomment and change _TYPE_PKTIN to _POLL*/
>

Needs to have a bug link, a todo that makes it into the repo is a known
deficiency in the code


> +       case ODP_QUEUE_TYPE_SCHED:
> +               /* Packet input through the scheduler */
> +               if (schedule_pktio_start(id, ODP_SCHED_PRIO_LOWEST)) {
> +                       ODP_ERR("Schedule pktio start failed\n");
> +                       return -1;
> +               }
> +               break;
> +       default:
> +               ODP_ABORT("Bad queue type\n");
>

If it permissible for an API to abort, I would say that is important enough
to be described in the API docs as part of the expected and permissible
behavior.
You would otherwise expect an error return code.


> +       }
>
>         return 0;
>  }
> @@ -506,15 +517,6 @@ int odp_pktio_inq_remdef(odp_pktio_t id)
>         qentry = queue_to_qentry(queue);
>
>         queue_lock(qentry);
> -       if (qentry->s.status == QUEUE_STATUS_FREE) {
> -               queue_unlock(qentry);
> -               unlock_entry(pktio_entry);
> -               return -1;
> -       }
> -
> -       qentry->s.enqueue = queue_enq_dummy;
> -       qentry->s.enqueue_multi = queue_enq_multi_dummy;
> -       qentry->s.status = QUEUE_STATUS_NOTSCHED;
>         qentry->s.pktin = ODP_PKTIO_INVALID;
>         queue_unlock(qentry);
>
> @@ -665,6 +667,46 @@ int pktin_deq_multi(queue_entry_t *qentry,
> odp_buffer_hdr_t *buf_hdr[], int num)
>         return nbr;
>  }
>
> +int pktin_poll(pktio_entry_t *entry)
> +{
> +       odp_packet_t pkt_tbl[QUEUE_MULTI_MAX];
> +       odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
> +       int num, num_enq, i;
> +
> +       if (odp_unlikely(is_free(entry)))
> +               return -1;
> +
> +       num = odp_pktio_recv(entry->s.handle, pkt_tbl, QUEUE_MULTI_MAX);
> +
> +       if (num < 0) {
> +               ODP_ERR("Packet recv error\n");
> +               return -1;
> +       }
> +
> +       for (i = 0, num_enq = 0; i < num; ++i) {
> +               odp_buffer_t buf;
> +               odp_buffer_hdr_t *hdr;
> +
> +               buf = _odp_packet_to_buffer(pkt_tbl[i]);
> +               hdr = odp_buf_to_hdr(buf);
> +
> +               if (entry->s.cls_ena) {
> +                       if (packet_classifier(entry->s.handle, pkt_tbl[i])
> < 0)
> +                               hdr_tbl[num_enq++] = hdr;
> +               } else {
> +                       hdr_tbl[num_enq++] = hdr;
> +               }
> +       }
> +
> +       if (num_enq) {
> +               queue_entry_t *qentry;
> +               qentry = queue_to_qentry(entry->s.inq_default);
> +               queue_enq_multi(qentry, hdr_tbl, num_enq);
> +       }
> +
> +       return 0;
> +}
> +
>  /** function should be called with locked entry */
>  static int sockfd_from_pktio_entry(pktio_entry_t *entry)
>  {
> diff --git a/platform/linux-generic/odp_queue.c
> b/platform/linux-generic/odp_queue.c
> index 4bb8b9b..4a0465b 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -88,7 +88,9 @@ static void queue_init(queue_entry_t *queue, const char
> *name,
>
>         queue->s.head = NULL;
>         queue->s.tail = NULL;
> -       queue->s.sched_buf = ODP_BUFFER_INVALID;
> +
> +       queue->s.pri_queue = ODP_QUEUE_INVALID;
> +       queue->s.cmd_ev    = ODP_EVENT_INVALID;
>  }
>
>
> @@ -222,22 +224,26 @@ odp_queue_t odp_queue_create(const char *name,
> odp_queue_type_t type,
>
>         if (handle != ODP_QUEUE_INVALID &&
>             (type == ODP_QUEUE_TYPE_SCHED || type ==
> ODP_QUEUE_TYPE_PKTIN)) {
> -               odp_buffer_t buf;
> -
> -               buf = odp_schedule_buffer_alloc(handle);
> -               if (buf == ODP_BUFFER_INVALID) {
> -                       queue->s.status = QUEUE_STATUS_FREE;
> -                       ODP_ERR("queue_init: sched buf alloc failed\n");
> +               if (schedule_queue_init(queue)) {
> +                       ODP_ERR("schedule queue init failed\n");
>                         return ODP_QUEUE_INVALID;
>                 }
> -
> -               queue->s.sched_buf = buf;
> -               odp_schedule_mask_set(handle, queue->s.param.sched.prio);
>         }
>
>         return handle;
>  }
>
> +void queue_destroy_finalize(queue_entry_t *queue)
> +{
> +       LOCK(&queue->s.lock);
> +
> +       if (queue->s.status == QUEUE_STATUS_DESTROYED) {
> +               queue->s.status = QUEUE_STATUS_FREE;
> +               schedule_queue_destroy(queue);
> +       }
> +       UNLOCK(&queue->s.lock);
> +}
> +
>  int odp_queue_destroy(odp_queue_t handle)
>  {
>         queue_entry_t *queue;
> @@ -246,41 +252,31 @@ int odp_queue_destroy(odp_queue_t handle)
>         LOCK(&queue->s.lock);
>         if (queue->s.status == QUEUE_STATUS_FREE) {
>                 UNLOCK(&queue->s.lock);
> -               ODP_ERR("queue_destroy: queue \"%s\" already free\n",
> -                       queue->s.name);
> +               ODP_ERR("queue \"%s\" already free\n", queue->s.name);
> +               return -1;
> +       }
> +       if (queue->s.status == QUEUE_STATUS_DESTROYED) {
> +               UNLOCK(&queue->s.lock);
> +               ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name
> );
>                 return -1;
>         }
>         if (queue->s.head != NULL) {
>                 UNLOCK(&queue->s.lock);
> -               ODP_ERR("queue_destroy: queue \"%s\" not empty\n",
> -                       queue->s.name);
> +               ODP_ERR("queue \"%s\" not empty\n", queue->s.name);
>                 return -1;
>         }
>
> -       queue->s.enqueue = queue_enq_dummy;
> -       queue->s.enqueue_multi = queue_enq_multi_dummy;
> -
>         switch (queue->s.status) {
>         case QUEUE_STATUS_READY:
>                 queue->s.status = QUEUE_STATUS_FREE;
> -               queue->s.head = NULL;
> -               queue->s.tail = NULL;
> +               break;
> +       case QUEUE_STATUS_NOTSCHED:
> +               queue->s.status = QUEUE_STATUS_FREE;
> +               schedule_queue_destroy(queue);
>                 break;
>         case QUEUE_STATUS_SCHED:
> -               /*
> -                * Override dequeue_multi to destroy queue when it will
> -                * be scheduled next time.
> -                */
> +               /* Queue is still in scheduling */
>                 queue->s.status = QUEUE_STATUS_DESTROYED;
> -               queue->s.dequeue_multi = queue_deq_multi_destroy;
> -               break;
> -       case QUEUE_STATUS_NOTSCHED:
> -               /* Queue won't be scheduled anymore */
> -               odp_buffer_free(queue->s.sched_buf);
> -               queue->s.sched_buf = ODP_BUFFER_INVALID;
> -               queue->s.status = QUEUE_STATUS_FREE;
> -               queue->s.head = NULL;
> -               queue->s.tail = NULL;
>                 break;
>         default:
>                 ODP_ABORT("Unexpected queue status\n");
>

If it permissible for an API to abort, I would say that is important enough
to be described in the API docs as part of the expected and permissible
behavior.
You would otherwise expect an error return code


> @@ -290,23 +286,6 @@ int odp_queue_destroy(odp_queue_t handle)
>         return 0;
>  }
>
> -odp_buffer_t queue_sched_buf(odp_queue_t handle)
> -{
> -       queue_entry_t *queue;
> -       queue = queue_to_qentry(handle);
> -
> -       return queue->s.sched_buf;
> -}
> -
> -
> -int queue_sched_atomic(odp_queue_t handle)
> -{
> -       queue_entry_t *queue;
> -       queue = queue_to_qentry(handle);
> -
> -       return queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC;
> -}
> -
>  int odp_queue_set_context(odp_queue_t handle, void *context)
>  {
>         queue_entry_t *queue;
> @@ -352,6 +331,12 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t
> *buf_hdr)
>         int sched = 0;
>
>         LOCK(&queue->s.lock);
> +       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> +               UNLOCK(&queue->s.lock);
> +               ODP_ERR("Bad queue status\n");
> +               return -1;
> +       }
> +
>         if (queue->s.head == NULL) {
>                 /* Empty queue */
>                 queue->s.head = buf_hdr;
> @@ -370,8 +355,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t
> *buf_hdr)
>         UNLOCK(&queue->s.lock);
>
>         /* Add queue to scheduling */
> -       if (sched == 1)
> -               odp_schedule_queue(queue->s.handle,
> queue->s.param.sched.prio);
> +       if (sched)
> +               schedule_queue(queue);
>
>         return 0;
>  }
> @@ -389,6 +374,12 @@ int queue_enq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
>         buf_hdr[num-1]->next = NULL;
>
>         LOCK(&queue->s.lock);
> +       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> +               UNLOCK(&queue->s.lock);
> +               ODP_ERR("Bad queue status\n");
> +               return -1;
> +       }
> +
>         /* Empty queue */
>         if (queue->s.head == NULL)
>                 queue->s.head = buf_hdr[0];
> @@ -404,25 +395,12 @@ int queue_enq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
>         UNLOCK(&queue->s.lock);
>
>         /* Add queue to scheduling */
> -       if (sched == 1)
> -               odp_schedule_queue(queue->s.handle,
> queue->s.param.sched.prio);
> +       if (sched)
> +               schedule_queue(queue);
>
>         return num; /* All events enqueued */
>  }
>
> -int queue_enq_dummy(queue_entry_t *queue ODP_UNUSED,
> -                   odp_buffer_hdr_t *buf_hdr ODP_UNUSED)
> -{
> -       return -1;
> -}
> -
> -int queue_enq_multi_dummy(queue_entry_t *queue ODP_UNUSED,
> -                         odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
> -                         int num ODP_UNUSED)
> -{
> -       return -1;
> -}
> -
>  int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int
> num)
>  {
>         odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX];
> @@ -455,24 +433,26 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev)
>
>  odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
>  {
> -       odp_buffer_hdr_t *buf_hdr = NULL;
> +       odp_buffer_hdr_t *buf_hdr;
>
>         LOCK(&queue->s.lock);
>
>         if (queue->s.head == NULL) {
>                 /* Already empty queue */
> -               if (queue->s.status == QUEUE_STATUS_SCHED &&
> -                   queue->s.type != ODP_QUEUE_TYPE_PKTIN)
> +               if (queue->s.status == QUEUE_STATUS_SCHED)
>                         queue->s.status = QUEUE_STATUS_NOTSCHED;
> -       } else {
> -               buf_hdr       = queue->s.head;
> -               queue->s.head = buf_hdr->next;
> -               buf_hdr->next = NULL;
>
> -               if (queue->s.head == NULL) {
> -                       /* Queue is now empty */
> -                       queue->s.tail = NULL;
> -               }
> +               UNLOCK(&queue->s.lock);
> +               return NULL;
> +       }
> +
> +       buf_hdr       = queue->s.head;
> +       queue->s.head = buf_hdr->next;
> +       buf_hdr->next = NULL;
> +
> +       if (queue->s.head == NULL) {
> +               /* Queue is now empty */
> +               queue->s.tail = NULL;
>         }
>
>         UNLOCK(&queue->s.lock);
> @@ -483,31 +463,39 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
>
>  int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> int num)
>  {
> -       int i = 0;
> +       odp_buffer_hdr_t *hdr;
> +       int i;
>
>         LOCK(&queue->s.lock);
> +       if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) {
> +               /* Bad queue, or queue has been destroyed.
> +                * Scheduler finalizes queue destroy after this. */
> +               UNLOCK(&queue->s.lock);
> +               return -1;
> +       }
>
> -       if (queue->s.head == NULL) {
> +       hdr = queue->s.head;
> +
> +       if (hdr == NULL) {
>                 /* Already empty queue */
> -               if (queue->s.status == QUEUE_STATUS_SCHED &&
> -                   queue->s.type != ODP_QUEUE_TYPE_PKTIN)
> +               if (queue->s.status == QUEUE_STATUS_SCHED)
>                         queue->s.status = QUEUE_STATUS_NOTSCHED;
> -       } else {
> -               odp_buffer_hdr_t *hdr = queue->s.head;
>
> -               for (; i < num && hdr; i++) {
> -                       buf_hdr[i]       = hdr;
> -                       /* odp_prefetch(hdr->addr); */
> -                       hdr              = hdr->next;
> -                       buf_hdr[i]->next = NULL;
> -               }
> +               UNLOCK(&queue->s.lock);
> +               return 0;
> +       }
>
> -               queue->s.head = hdr;
> +       for (i = 0; i < num && hdr; i++) {
> +               buf_hdr[i]       = hdr;
> +               hdr              = hdr->next;
> +               buf_hdr[i]->next = NULL;
> +       }
>
> -               if (hdr == NULL) {
> -                       /* Queue is now empty */
> -                       queue->s.tail = NULL;
> -               }
> +       queue->s.head = hdr;
> +
> +       if (hdr == NULL) {
> +               /* Queue is now empty */
> +               queue->s.tail = NULL;
>         }
>
>         UNLOCK(&queue->s.lock);
> @@ -515,23 +503,6 @@ int queue_deq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
>         return i;
>  }
>
> -int queue_deq_multi_destroy(queue_entry_t *queue,
> -                           odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED,
> -                           int num ODP_UNUSED)
> -{
> -       LOCK(&queue->s.lock);
> -
> -       odp_buffer_free(queue->s.sched_buf);
> -       queue->s.sched_buf = ODP_BUFFER_INVALID;
> -       queue->s.status = QUEUE_STATUS_FREE;
> -       queue->s.head = NULL;
> -       queue->s.tail = NULL;
> -
> -       UNLOCK(&queue->s.lock);
> -
> -       return 0;
> -}
> -
>  int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num)
>  {
>         queue_entry_t *queue;
> diff --git a/platform/linux-generic/odp_schedule.c
> b/platform/linux-generic/odp_schedule.c
> index dd65168..59e40c7 100644
> --- a/platform/linux-generic/odp_schedule.c
> +++ b/platform/linux-generic/odp_schedule.c
> @@ -21,17 +21,15 @@
>  #include <odp/hints.h>
>
>  #include <odp_queue_internal.h>
> +#include <odp_packet_io_internal.h>
>
> -
> -/* Limits to number of scheduled queues */
> -#define SCHED_POOL_SIZE (256*1024)
> +/* Number of schedule commands.
> + * One per scheduled queue and packet interface */
> +#define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES)
>
>  /* Scheduler sub queues */
>  #define QUEUES_PER_PRIO  4
>
> -/* TODO: random or queue based selection */
>

Needs to have a bug link, a todo that makes it into the repo is a known
deficiency in the code


> -#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x)))
> -
>  /* Maximum number of dequeues */
>  #define MAX_DEQ 4
>
> @@ -48,21 +46,36 @@ typedef struct {
>         pri_mask_t     pri_mask[ODP_CONFIG_SCHED_PRIOS];
>         odp_spinlock_t mask_lock;
>         odp_pool_t     pool;
> +       uint32_t       pri_count[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO];
>  } sched_t;
>
> +/* Schedule command */
>  typedef struct {
> -       odp_queue_t queue;
> +       int           cmd;
> +
> +       union {
> +               queue_entry_t *qe;
> +
> +               struct {
> +                       odp_pktio_t   pktio;
> +                       pktio_entry_t *pe;
> +                       int           prio;
> +               };
> +       };
> +} sched_cmd_t;
> +
> +#define SCHED_CMD_DEQUEUE    0
> +#define SCHED_CMD_POLL_PKTIN 1
>
> -} queue_desc_t;
>
>  typedef struct {
>         odp_queue_t pri_queue;
> -       odp_event_t desc_ev;
> +       odp_event_t cmd_ev;
>
> -       odp_event_t ev[MAX_DEQ];
> +       odp_buffer_hdr_t *buf_hdr[MAX_DEQ];
> +       queue_entry_t *qe;
>         int num;
>         int index;
> -       odp_queue_t queue;
>         int pause;
>
>  } sched_local_t;
> @@ -73,14 +86,6 @@ static sched_t *sched;
>  /* Thread local scheduler context */
>  static __thread sched_local_t sched_local;
>
> -
> -static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio)
> -{
> -       int id = SEL_PRI_QUEUE(queue);
> -       return sched->pri_queue[prio][id];
> -}
> -
> -
>  int odp_schedule_init_global(void)
>  {
>         odp_shm_t shm;
> @@ -101,9 +106,11 @@ int odp_schedule_init_global(void)
>                 return -1;
>         }
>
> -       params.buf.size  = sizeof(queue_desc_t);
> +       memset(sched, 0, sizeof(sched_t));
> +
> +       params.buf.size  = sizeof(sched_cmd_t);
>         params.buf.align = 0;
> -       params.buf.num   = SCHED_POOL_SIZE/sizeof(queue_desc_t);
> +       params.buf.num   = NUM_SCHED_CMD;
>         params.type      = ODP_POOL_BUFFER;
>
>         pool = odp_pool_create("odp_sched_pool", ODP_SHM_NULL, &params);
> @@ -178,15 +185,17 @@ int odp_schedule_init_local(void)
>  {
>         int i;
>
> +       memset(&sched_local, 0, sizeof(sched_local_t));
> +
>         sched_local.pri_queue = ODP_QUEUE_INVALID;
> -       sched_local.desc_ev   = ODP_EVENT_INVALID;
> +       sched_local.cmd_ev    = ODP_EVENT_INVALID;
>
>         for (i = 0; i < MAX_DEQ; i++)
> -               sched_local.ev[i] = ODP_EVENT_INVALID;
> +               sched_local.buf_hdr[i] = NULL;
>
> +       sched_local.qe    = NULL;
>         sched_local.num   = 0;
>         sched_local.index = 0;
> -       sched_local.queue = ODP_QUEUE_INVALID;
>         sched_local.pause = 0;
>
>         return 0;
> @@ -198,50 +207,128 @@ int odp_schedule_term_local(void)
>         return 0;
>  }
>
> -void odp_schedule_mask_set(odp_queue_t queue, int prio)
> +static int pri_id_queue(odp_queue_t queue)
>  {
> -       int id = SEL_PRI_QUEUE(queue);
> +       return (QUEUES_PER_PRIO-1) & (queue_to_id(queue));
> +}
>
> +static int pri_id_pktio(odp_pktio_t pktio)
> +{
> +       return (QUEUES_PER_PRIO-1) & (pktio_to_id(pktio));
> +}
> +
> +static odp_queue_t pri_set(int id, int prio)
> +{
>         odp_spinlock_lock(&sched->mask_lock);
>         sched->pri_mask[prio] |= 1 << id;
> +       sched->pri_count[prio][id]++;
>         odp_spinlock_unlock(&sched->mask_lock);
> +
> +       return sched->pri_queue[prio][id];
> +}
> +
> +static void pri_clr(int id, int prio)
> +{
> +       odp_spinlock_lock(&sched->mask_lock);
> +
> +       /* Clear mask bit when last queue is removed*/
> +       sched->pri_count[prio][id]--;
> +
> +       if (sched->pri_count[prio][id] == 0)
> +               sched->pri_mask[prio] &= (uint8_t)(~(1 << id));
> +
> +       odp_spinlock_unlock(&sched->mask_lock);
> +}
> +
> +static odp_queue_t pri_set_queue(odp_queue_t queue, int prio)
> +{
> +       int id = pri_id_queue(queue);
> +
> +       return pri_set(id, prio);
> +}
> +
> +static odp_queue_t pri_set_pktio(odp_pktio_t pktio, int prio)
> +{
> +       int id = pri_id_pktio(pktio);
> +
> +       return pri_set(id, prio);
> +}
> +
> +static void pri_clr_queue(odp_queue_t queue, int prio)
> +{
> +       int id = pri_id_queue(queue);
> +       pri_clr(id, prio);
>  }
>
> +static void pri_clr_pktio(odp_pktio_t pktio, int prio)
> +{
> +       int id = pri_id_pktio(pktio);
> +       pri_clr(id, prio);
> +}
>
> -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue)
> +int schedule_queue_init(queue_entry_t *qe)
>  {
>         odp_buffer_t buf;
> +       sched_cmd_t *sched_cmd;
>
>         buf = odp_buffer_alloc(sched->pool);
>
> -       if (buf != ODP_BUFFER_INVALID) {
> -               queue_desc_t *desc;
> -               desc        = odp_buffer_addr(buf);
> -               desc->queue = queue;
> -       }
> +       if (buf == ODP_BUFFER_INVALID)
> +               return -1;
>
> -       return buf;
> -}
> +       sched_cmd      = odp_buffer_addr(buf);
> +       sched_cmd->cmd = SCHED_CMD_DEQUEUE;
> +       sched_cmd->qe  = qe;
>
> +       qe->s.cmd_ev    = odp_buffer_to_event(buf);
> +       qe->s.pri_queue = pri_set_queue(queue_handle(qe), queue_prio(qe));
>
> -void odp_schedule_queue(odp_queue_t queue, int prio)
> +       return 0;
> +}
> +
> +void schedule_queue_destroy(queue_entry_t *qe)
>  {
> -       odp_buffer_t desc_buf;
> -       odp_queue_t  pri_queue;
> +       odp_buffer_t buf;
>
> -       pri_queue = select_pri_queue(queue, prio);
> -       desc_buf  = queue_sched_buf(queue);
> +       buf = odp_buffer_from_event(qe->s.cmd_ev);
> +       odp_buffer_free(buf);
>
> -       odp_queue_enq(pri_queue, odp_buffer_to_event(desc_buf));
> +       pri_clr_queue(queue_handle(qe), queue_prio(qe));
> +
> +       qe->s.cmd_ev    = ODP_EVENT_INVALID;
> +       qe->s.pri_queue = ODP_QUEUE_INVALID;
>  }
>
> +int schedule_pktio_start(odp_pktio_t pktio, int prio)
> +{
> +       odp_buffer_t buf;
> +       sched_cmd_t *sched_cmd;
> +       odp_queue_t pri_queue;
> +
> +       buf = odp_buffer_alloc(sched->pool);
> +
> +       if (buf == ODP_BUFFER_INVALID)
> +               return -1;
> +
> +       sched_cmd        = odp_buffer_addr(buf);
> +       sched_cmd->cmd   = SCHED_CMD_POLL_PKTIN;
> +       sched_cmd->pktio = pktio;
> +       sched_cmd->pe    = get_pktio_entry(pktio);
> +       sched_cmd->prio  = prio;
> +
> +       pri_queue  = pri_set_pktio(pktio, prio);
> +
> +       odp_queue_enq(pri_queue, odp_buffer_to_event(buf));
> +
> +       return 0;
> +}
>
>  void odp_schedule_release_atomic(void)
>  {
>         if (sched_local.pri_queue != ODP_QUEUE_INVALID &&
>             sched_local.num       == 0) {
>                 /* Release current atomic queue */
> -               odp_queue_enq(sched_local.pri_queue, sched_local.desc_ev);
> +               odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev);
>                 sched_local.pri_queue = ODP_QUEUE_INVALID;
>         }
>  }
> @@ -252,7 +339,8 @@ static inline int copy_events(odp_event_t out_ev[],
> unsigned int max)
>         int i = 0;
>
>         while (sched_local.num && max) {
> -               out_ev[i] = sched_local.ev[sched_local.index];
> +               odp_buffer_hdr_t *hdr =
> sched_local.buf_hdr[sched_local.index];
> +               out_ev[i] = odp_buffer_to_event(hdr->handle.handle);
>                 sched_local.index++;
>                 sched_local.num--;
>                 max--;
> @@ -279,7 +367,7 @@ static int schedule(odp_queue_t *out_queue,
> odp_event_t out_ev[],
>                 ret = copy_events(out_ev, max_num);
>
>                 if (out_queue)
> -                       *out_queue = sched_local.queue;
> +                       *out_queue = queue_handle(sched_local.qe);
>
>                 return ret;
>         }
> @@ -302,7 +390,10 @@ static int schedule(odp_queue_t *out_queue,
> odp_event_t out_ev[],
>                 for (j = 0; j < QUEUES_PER_PRIO; j++, id++) {
>                         odp_queue_t  pri_q;
>                         odp_event_t  ev;
> -                       odp_buffer_t desc_buf;
> +                       odp_buffer_t buf;
> +                       sched_cmd_t *sched_cmd;
> +                       queue_entry_t *qe;
> +                       int num;
>
>                         if (id >= QUEUES_PER_PRIO)
>                                 id = 0;
> @@ -310,59 +401,63 @@ static int schedule(odp_queue_t *out_queue,
> odp_event_t out_ev[],
>                         if (odp_unlikely((sched->pri_mask[i] & (1 << id))
> == 0))
>                                 continue;
>
> -                       pri_q    = sched->pri_queue[i][id];
> -                       ev       = odp_queue_deq(pri_q);
> -                       desc_buf = odp_buffer_from_event(ev);
> -
> -                       if (desc_buf != ODP_BUFFER_INVALID) {
> -                               queue_desc_t *desc;
> -                               odp_queue_t queue;
> -                               int num;
> -
> -                               desc  = odp_buffer_addr(desc_buf);
> -                               queue = desc->queue;
> -
> -                               if (odp_queue_type(queue) ==
> -                                       ODP_QUEUE_TYPE_PKTIN &&
> -                                       !queue_is_sched(queue))
> -                                       continue;
> -
> -                               num = odp_queue_deq_multi(queue,
> sched_local.ev,
> -                                                         max_deq);
> -
> -                               if (num == 0) {
> -                                       /* Remove empty queue from
> scheduling,
> -                                        * except packet input queues
> -                                        */
> -                                       if (odp_queue_type(queue) ==
> -                                           ODP_QUEUE_TYPE_PKTIN &&
> -                                           !queue_is_free(queue))
> -                                               odp_queue_enq(pri_q, ev);
> -
> -                                       continue;
> -                               }
> -
> -                               sched_local.num   = num;
> -                               sched_local.index = 0;
> -                               ret = copy_events(out_ev, max_num);
> -
> -                               sched_local.queue = queue;
> -
> -                               if (queue_sched_atomic(queue)) {
> -                                       /* Hold queue during atomic access
> */
> -                                       sched_local.pri_queue = pri_q;
> -                                       sched_local.desc_ev   = ev;
> +                       pri_q = sched->pri_queue[i][id];
> +                       ev    = odp_queue_deq(pri_q);
> +                       buf   = odp_buffer_from_event(ev);
> +
> +                       if (buf == ODP_BUFFER_INVALID)
> +                               continue;
> +
> +                       sched_cmd = odp_buffer_addr(buf);
> +
> +                       if (sched_cmd->cmd == SCHED_CMD_POLL_PKTIN) {
> +                               /* Poll packet input */
> +                               if (pktin_poll(sched_cmd->pe)) {
> +                                       /* Stop scheduling the pktio */
> +                                       pri_clr_pktio(sched_cmd->pktio,
> +                                                     sched_cmd->prio);
> +                                       odp_buffer_free(buf);
>                                 } else {
> -                                       /* Continue scheduling the queue */
> +                                       /* Continue scheduling the pktio */
>                                         odp_queue_enq(pri_q, ev);
>                                 }
>
> -                               /* Output the source queue handle */
> -                               if (out_queue)
> -                                       *out_queue = queue;
> +                               continue;
> +                       }
> +
> +                       qe  = sched_cmd->qe;
> +                       num = queue_deq_multi(qe, sched_local.buf_hdr,
> max_deq);
>
> -                               return ret;
> +                       if (num < 0) {
> +                               /* Destroyed queue */
> +                               queue_destroy_finalize(qe);
> +                               continue;
>                         }
> +
> +                       if (num == 0) {
> +                               /* Remove empty queue from scheduling */
> +                               continue;
> +                       }
> +
> +                       sched_local.num   = num;
> +                       sched_local.index = 0;
> +                       sched_local.qe    = qe;
> +                       ret = copy_events(out_ev, max_num);
> +
> +                       if (queue_is_atomic(qe)) {
> +                               /* Hold queue during atomic access */
> +                               sched_local.pri_queue = pri_q;
> +                               sched_local.cmd_ev    = ev;
> +                       } else {
> +                               /* Continue scheduling the queue */
> +                               odp_queue_enq(pri_q, ev);
> +                       }
> +
> +                       /* Output the source queue handle */
> +                       if (out_queue)
> +                               *out_queue = queue_handle(qe);
> +
> +                       return ret;
>                 }
>         }
>
> --
> 2.3.3
>
>
> _______________________________________________
> lng-odp mailing list
> lng-odp@lists.linaro.org
> http://lists.linaro.org/mailman/listinfo/lng-odp
>



-- 
Mike Holmes
Technical Manager - Linaro Networking Group
Linaro.org <http://www.linaro.org/> *│ *Open source software for ARM SoCs
_______________________________________________
lng-odp mailing list
lng-odp@lists.linaro.org
http://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to