On 23 March 2015 at 10:42, Petri Savolainen <petri.savolai...@nokia.com> wrote:
> Scheduler runs by polling scheduler priority queues for schedule > commands. There are two types of scheduler commands: queue dequeue > and packet input poll. Packet input is polled directly when a poll > command is received. From schduler point of view, the default > packet input queue is like any other queue. > > Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> > --- > .../linux-generic/include/odp_packet_io_internal.h | 17 +- > .../linux-generic/include/odp_queue_internal.h | 34 +-- > .../linux-generic/include/odp_schedule_internal.h | 14 +- > platform/linux-generic/odp_packet_io.c | 78 ++++-- > platform/linux-generic/odp_queue.c | 193 ++++++-------- > platform/linux-generic/odp_schedule.c | 277 > ++++++++++++++------- > 6 files changed, 369 insertions(+), 244 deletions(-) > > diff --git a/platform/linux-generic/include/odp_packet_io_internal.h > b/platform/linux-generic/include/odp_packet_io_internal.h > index 47b8992..161be16 100644 > --- a/platform/linux-generic/include/odp_packet_io_internal.h > +++ b/platform/linux-generic/include/odp_packet_io_internal.h > @@ -40,6 +40,8 @@ typedef enum { > struct pktio_entry { > odp_spinlock_t lock; /**< entry spinlock */ > int taken; /**< is entry taken(1) or free(0) > */ > + int cls_ena; /**< is classifier enabled */ > cls_ena is not very descriptive clsfy_enable is better but still ugly > + odp_pktio_t handle; /**< pktio handle */ > odp_queue_t inq_default; /**< default input queue, if set */ > odp_queue_t outq_default; /**< default out queue */ > odp_queue_t loopq; /**< loopback queue for "loop" > device */ > @@ -64,15 +66,22 @@ typedef struct { > > extern void *pktio_entry_ptr[]; > > +static inline int pktio_to_id(odp_pktio_t pktio) > +{ > + return _odp_typeval(pktio) - 1; > +} > > -static inline pktio_entry_t *get_pktio_entry(odp_pktio_t id) > +static inline pktio_entry_t *get_pktio_entry(odp_pktio_t pktio) > { > - if (odp_unlikely(id == ODP_PKTIO_INVALID || > - _odp_typeval(id) > ODP_CONFIG_PKTIO_ENTRIES)) > + if (odp_unlikely(pktio == ODP_PKTIO_INVALID || > + _odp_typeval(pktio) > ODP_CONFIG_PKTIO_ENTRIES)) > return NULL; > > - return pktio_entry_ptr[_odp_typeval(id) - 1]; > + return pktio_entry_ptr[pktio_to_id(pktio)]; > } > + > +int pktin_poll(pktio_entry_t *entry); > + > #ifdef __cplusplus > } > #endif > diff --git a/platform/linux-generic/include/odp_queue_internal.h > b/platform/linux-generic/include/odp_queue_internal.h > index 65aae14..61d0c43 100644 > --- a/platform/linux-generic/include/odp_queue_internal.h > +++ b/platform/linux-generic/include/odp_queue_internal.h > @@ -36,10 +36,11 @@ extern "C" { > #define QUEUE_MULTI_MAX 8 > > #define QUEUE_STATUS_FREE 0 > -#define QUEUE_STATUS_READY 1 > -#define QUEUE_STATUS_NOTSCHED 2 > -#define QUEUE_STATUS_SCHED 3 > -#define QUEUE_STATUS_DESTROYED 4 > +#define QUEUE_STATUS_DESTROYED 1 > +#define QUEUE_STATUS_READY 2 > +#define QUEUE_STATUS_NOTSCHED 3 > +#define QUEUE_STATUS_SCHED 4 > + > > /* forward declaration */ > union queue_entry_u; > @@ -69,7 +70,8 @@ struct queue_entry_s { > deq_multi_func_t dequeue_multi; > > odp_queue_t handle; > - odp_buffer_t sched_buf; > + odp_queue_t pri_queue; > + odp_event_t cmd_ev; > odp_queue_type_t type; > odp_queue_param_t param; > odp_pktio_t pktin; > @@ -100,7 +102,6 @@ int queue_deq_multi_destroy(queue_entry_t *queue, > odp_buffer_hdr_t *buf_hdr[], > void queue_lock(queue_entry_t *queue); > void queue_unlock(queue_entry_t *queue); > > -odp_buffer_t queue_sched_buf(odp_queue_t queue); > int queue_sched_atomic(odp_queue_t handle); > > static inline uint32_t queue_to_id(odp_queue_t handle) > @@ -121,24 +122,23 @@ static inline queue_entry_t > *queue_to_qentry(odp_queue_t handle) > return get_qentry(queue_id); > } > > -static inline int queue_is_free(odp_queue_t handle) > +static inline int queue_is_atomic(queue_entry_t *qe) > { > - queue_entry_t *queue; > - > - queue = queue_to_qentry(handle); > + return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC; > +} > - return queue->s.status == QUEUE_STATUS_FREE; > +static inline odp_queue_t queue_handle(queue_entry_t *qe) > +{ > + return qe->s.handle; > } > > -static inline int queue_is_sched(odp_queue_t handle) > +static inline int queue_prio(queue_entry_t *qe) > { > - queue_entry_t *queue; > + return qe->s.param.sched.prio; > +} > > - queue = queue_to_qentry(handle); > +void queue_destroy_finalize(queue_entry_t *qe); > > - return ((queue->s.status == QUEUE_STATUS_SCHED) && > - (queue->s.pktin != ODP_PKTIO_INVALID)); > -} > #ifdef __cplusplus > } > #endif > diff --git a/platform/linux-generic/include/odp_schedule_internal.h > b/platform/linux-generic/include/odp_schedule_internal.h > index acda2e4..904bfbd 100644 > --- a/platform/linux-generic/include/odp_schedule_internal.h > +++ b/platform/linux-generic/include/odp_schedule_internal.h > @@ -16,12 +16,20 @@ extern "C" { > > #include <odp/buffer.h> > #include <odp/queue.h> > +#include <odp/packet_io.h> > +#include <odp_queue_internal.h> > > -void odp_schedule_mask_set(odp_queue_t queue, int prio); > > -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue); > +int schedule_queue_init(queue_entry_t *qe); > +void schedule_queue_destroy(queue_entry_t *qe); > > -void odp_schedule_queue(odp_queue_t queue, int prio); > +static inline void schedule_queue(const queue_entry_t *qe) > +{ > + odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev); > +} > + > + > +int schedule_pktio_start(odp_pktio_t pktio, int prio); > > > #ifdef __cplusplus > diff --git a/platform/linux-generic/odp_packet_io.c > b/platform/linux-generic/odp_packet_io.c > index 21f0c17..4ab45c0 100644 > --- a/platform/linux-generic/odp_packet_io.c > +++ b/platform/linux-generic/odp_packet_io.c > @@ -142,6 +142,7 @@ static void unlock_entry_classifier(pktio_entry_t > *entry) > static void init_pktio_entry(pktio_entry_t *entry) > { > set_taken(entry); > + entry->s.cls_ena = 1; /* TODO: disable cls by default */ > Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code for that published revision of the code. > entry->s.inq_default = ODP_QUEUE_INVALID; > memset(&entry->s.pkt_sock, 0, sizeof(entry->s.pkt_sock)); > memset(&entry->s.pkt_sock_mmap, 0, sizeof(entry->s.pkt_sock_mmap)); > @@ -273,6 +274,8 @@ static odp_pktio_t setup_pktio_entry(const char *dev, > odp_pool_t pool) > unlock_entry_classifier(pktio_entry); > } > > + pktio_entry->s.handle = id; > + > return id; > } > > @@ -475,19 +478,27 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t > queue) > > qentry = queue_to_qentry(queue); > > - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) > - return -1; > - > lock_entry(pktio_entry); > pktio_entry->s.inq_default = queue; > unlock_entry(pktio_entry); > > - queue_lock(qentry); > - qentry->s.pktin = id; > - qentry->s.status = QUEUE_STATUS_SCHED; > - queue_unlock(qentry); > - > - odp_schedule_queue(queue, qentry->s.param.sched.prio); > + switch (qentry->s.type) { > + case ODP_QUEUE_TYPE_PKTIN: > + /* User polls the input queue */ > + queue_lock(qentry); > + qentry->s.pktin = id; > + queue_unlock(qentry); > + /*break; TODO: Uncomment and change _TYPE_PKTIN to _POLL*/ > Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code > + case ODP_QUEUE_TYPE_SCHED: > + /* Packet input through the scheduler */ > + if (schedule_pktio_start(id, ODP_SCHED_PRIO_LOWEST)) { > + ODP_ERR("Schedule pktio start failed\n"); > + return -1; > + } > + break; > + default: > + ODP_ABORT("Bad queue type\n"); > If it permissible for an API to abort, I would say that is important enough to be described in the API docs as part of the expected and permissible behavior. You would otherwise expect an error return code. > + } > > return 0; > } > @@ -506,15 +517,6 @@ int odp_pktio_inq_remdef(odp_pktio_t id) > qentry = queue_to_qentry(queue); > > queue_lock(qentry); > - if (qentry->s.status == QUEUE_STATUS_FREE) { > - queue_unlock(qentry); > - unlock_entry(pktio_entry); > - return -1; > - } > - > - qentry->s.enqueue = queue_enq_dummy; > - qentry->s.enqueue_multi = queue_enq_multi_dummy; > - qentry->s.status = QUEUE_STATUS_NOTSCHED; > qentry->s.pktin = ODP_PKTIO_INVALID; > queue_unlock(qentry); > > @@ -665,6 +667,46 @@ int pktin_deq_multi(queue_entry_t *qentry, > odp_buffer_hdr_t *buf_hdr[], int num) > return nbr; > } > > +int pktin_poll(pktio_entry_t *entry) > +{ > + odp_packet_t pkt_tbl[QUEUE_MULTI_MAX]; > + odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; > + int num, num_enq, i; > + > + if (odp_unlikely(is_free(entry))) > + return -1; > + > + num = odp_pktio_recv(entry->s.handle, pkt_tbl, QUEUE_MULTI_MAX); > + > + if (num < 0) { > + ODP_ERR("Packet recv error\n"); > + return -1; > + } > + > + for (i = 0, num_enq = 0; i < num; ++i) { > + odp_buffer_t buf; > + odp_buffer_hdr_t *hdr; > + > + buf = _odp_packet_to_buffer(pkt_tbl[i]); > + hdr = odp_buf_to_hdr(buf); > + > + if (entry->s.cls_ena) { > + if (packet_classifier(entry->s.handle, pkt_tbl[i]) > < 0) > + hdr_tbl[num_enq++] = hdr; > + } else { > + hdr_tbl[num_enq++] = hdr; > + } > + } > + > + if (num_enq) { > + queue_entry_t *qentry; > + qentry = queue_to_qentry(entry->s.inq_default); > + queue_enq_multi(qentry, hdr_tbl, num_enq); > + } > + > + return 0; > +} > + > /** function should be called with locked entry */ > static int sockfd_from_pktio_entry(pktio_entry_t *entry) > { > diff --git a/platform/linux-generic/odp_queue.c > b/platform/linux-generic/odp_queue.c > index 4bb8b9b..4a0465b 100644 > --- a/platform/linux-generic/odp_queue.c > +++ b/platform/linux-generic/odp_queue.c > @@ -88,7 +88,9 @@ static void queue_init(queue_entry_t *queue, const char > *name, > > queue->s.head = NULL; > queue->s.tail = NULL; > - queue->s.sched_buf = ODP_BUFFER_INVALID; > + > + queue->s.pri_queue = ODP_QUEUE_INVALID; > + queue->s.cmd_ev = ODP_EVENT_INVALID; > } > > > @@ -222,22 +224,26 @@ odp_queue_t odp_queue_create(const char *name, > odp_queue_type_t type, > > if (handle != ODP_QUEUE_INVALID && > (type == ODP_QUEUE_TYPE_SCHED || type == > ODP_QUEUE_TYPE_PKTIN)) { > - odp_buffer_t buf; > - > - buf = odp_schedule_buffer_alloc(handle); > - if (buf == ODP_BUFFER_INVALID) { > - queue->s.status = QUEUE_STATUS_FREE; > - ODP_ERR("queue_init: sched buf alloc failed\n"); > + if (schedule_queue_init(queue)) { > + ODP_ERR("schedule queue init failed\n"); > return ODP_QUEUE_INVALID; > } > - > - queue->s.sched_buf = buf; > - odp_schedule_mask_set(handle, queue->s.param.sched.prio); > } > > return handle; > } > > +void queue_destroy_finalize(queue_entry_t *queue) > +{ > + LOCK(&queue->s.lock); > + > + if (queue->s.status == QUEUE_STATUS_DESTROYED) { > + queue->s.status = QUEUE_STATUS_FREE; > + schedule_queue_destroy(queue); > + } > + UNLOCK(&queue->s.lock); > +} > + > int odp_queue_destroy(odp_queue_t handle) > { > queue_entry_t *queue; > @@ -246,41 +252,31 @@ int odp_queue_destroy(odp_queue_t handle) > LOCK(&queue->s.lock); > if (queue->s.status == QUEUE_STATUS_FREE) { > UNLOCK(&queue->s.lock); > - ODP_ERR("queue_destroy: queue \"%s\" already free\n", > - queue->s.name); > + ODP_ERR("queue \"%s\" already free\n", queue->s.name); > + return -1; > + } > + if (queue->s.status == QUEUE_STATUS_DESTROYED) { > + UNLOCK(&queue->s.lock); > + ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name > ); > return -1; > } > if (queue->s.head != NULL) { > UNLOCK(&queue->s.lock); > - ODP_ERR("queue_destroy: queue \"%s\" not empty\n", > - queue->s.name); > + ODP_ERR("queue \"%s\" not empty\n", queue->s.name); > return -1; > } > > - queue->s.enqueue = queue_enq_dummy; > - queue->s.enqueue_multi = queue_enq_multi_dummy; > - > switch (queue->s.status) { > case QUEUE_STATUS_READY: > queue->s.status = QUEUE_STATUS_FREE; > - queue->s.head = NULL; > - queue->s.tail = NULL; > + break; > + case QUEUE_STATUS_NOTSCHED: > + queue->s.status = QUEUE_STATUS_FREE; > + schedule_queue_destroy(queue); > break; > case QUEUE_STATUS_SCHED: > - /* > - * Override dequeue_multi to destroy queue when it will > - * be scheduled next time. > - */ > + /* Queue is still in scheduling */ > queue->s.status = QUEUE_STATUS_DESTROYED; > - queue->s.dequeue_multi = queue_deq_multi_destroy; > - break; > - case QUEUE_STATUS_NOTSCHED: > - /* Queue won't be scheduled anymore */ > - odp_buffer_free(queue->s.sched_buf); > - queue->s.sched_buf = ODP_BUFFER_INVALID; > - queue->s.status = QUEUE_STATUS_FREE; > - queue->s.head = NULL; > - queue->s.tail = NULL; > break; > default: > ODP_ABORT("Unexpected queue status\n"); > If it permissible for an API to abort, I would say that is important enough to be described in the API docs as part of the expected and permissible behavior. You would otherwise expect an error return code > @@ -290,23 +286,6 @@ int odp_queue_destroy(odp_queue_t handle) > return 0; > } > > -odp_buffer_t queue_sched_buf(odp_queue_t handle) > -{ > - queue_entry_t *queue; > - queue = queue_to_qentry(handle); > - > - return queue->s.sched_buf; > -} > - > - > -int queue_sched_atomic(odp_queue_t handle) > -{ > - queue_entry_t *queue; > - queue = queue_to_qentry(handle); > - > - return queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC; > -} > - > int odp_queue_set_context(odp_queue_t handle, void *context) > { > queue_entry_t *queue; > @@ -352,6 +331,12 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t > *buf_hdr) > int sched = 0; > > LOCK(&queue->s.lock); > + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > + UNLOCK(&queue->s.lock); > + ODP_ERR("Bad queue status\n"); > + return -1; > + } > + > if (queue->s.head == NULL) { > /* Empty queue */ > queue->s.head = buf_hdr; > @@ -370,8 +355,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t > *buf_hdr) > UNLOCK(&queue->s.lock); > > /* Add queue to scheduling */ > - if (sched == 1) > - odp_schedule_queue(queue->s.handle, > queue->s.param.sched.prio); > + if (sched) > + schedule_queue(queue); > > return 0; > } > @@ -389,6 +374,12 @@ int queue_enq_multi(queue_entry_t *queue, > odp_buffer_hdr_t *buf_hdr[], int num) > buf_hdr[num-1]->next = NULL; > > LOCK(&queue->s.lock); > + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > + UNLOCK(&queue->s.lock); > + ODP_ERR("Bad queue status\n"); > + return -1; > + } > + > /* Empty queue */ > if (queue->s.head == NULL) > queue->s.head = buf_hdr[0]; > @@ -404,25 +395,12 @@ int queue_enq_multi(queue_entry_t *queue, > odp_buffer_hdr_t *buf_hdr[], int num) > UNLOCK(&queue->s.lock); > > /* Add queue to scheduling */ > - if (sched == 1) > - odp_schedule_queue(queue->s.handle, > queue->s.param.sched.prio); > + if (sched) > + schedule_queue(queue); > > return num; /* All events enqueued */ > } > > -int queue_enq_dummy(queue_entry_t *queue ODP_UNUSED, > - odp_buffer_hdr_t *buf_hdr ODP_UNUSED) > -{ > - return -1; > -} > - > -int queue_enq_multi_dummy(queue_entry_t *queue ODP_UNUSED, > - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, > - int num ODP_UNUSED) > -{ > - return -1; > -} > - > int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int > num) > { > odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; > @@ -455,24 +433,26 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) > > odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) > { > - odp_buffer_hdr_t *buf_hdr = NULL; > + odp_buffer_hdr_t *buf_hdr; > > LOCK(&queue->s.lock); > > if (queue->s.head == NULL) { > /* Already empty queue */ > - if (queue->s.status == QUEUE_STATUS_SCHED && > - queue->s.type != ODP_QUEUE_TYPE_PKTIN) > + if (queue->s.status == QUEUE_STATUS_SCHED) > queue->s.status = QUEUE_STATUS_NOTSCHED; > - } else { > - buf_hdr = queue->s.head; > - queue->s.head = buf_hdr->next; > - buf_hdr->next = NULL; > > - if (queue->s.head == NULL) { > - /* Queue is now empty */ > - queue->s.tail = NULL; > - } > + UNLOCK(&queue->s.lock); > + return NULL; > + } > + > + buf_hdr = queue->s.head; > + queue->s.head = buf_hdr->next; > + buf_hdr->next = NULL; > + > + if (queue->s.head == NULL) { > + /* Queue is now empty */ > + queue->s.tail = NULL; > } > > UNLOCK(&queue->s.lock); > @@ -483,31 +463,39 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) > > int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], > int num) > { > - int i = 0; > + odp_buffer_hdr_t *hdr; > + int i; > > LOCK(&queue->s.lock); > + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { > + /* Bad queue, or queue has been destroyed. > + * Scheduler finalizes queue destroy after this. */ > + UNLOCK(&queue->s.lock); > + return -1; > + } > > - if (queue->s.head == NULL) { > + hdr = queue->s.head; > + > + if (hdr == NULL) { > /* Already empty queue */ > - if (queue->s.status == QUEUE_STATUS_SCHED && > - queue->s.type != ODP_QUEUE_TYPE_PKTIN) > + if (queue->s.status == QUEUE_STATUS_SCHED) > queue->s.status = QUEUE_STATUS_NOTSCHED; > - } else { > - odp_buffer_hdr_t *hdr = queue->s.head; > > - for (; i < num && hdr; i++) { > - buf_hdr[i] = hdr; > - /* odp_prefetch(hdr->addr); */ > - hdr = hdr->next; > - buf_hdr[i]->next = NULL; > - } > + UNLOCK(&queue->s.lock); > + return 0; > + } > > - queue->s.head = hdr; > + for (i = 0; i < num && hdr; i++) { > + buf_hdr[i] = hdr; > + hdr = hdr->next; > + buf_hdr[i]->next = NULL; > + } > > - if (hdr == NULL) { > - /* Queue is now empty */ > - queue->s.tail = NULL; > - } > + queue->s.head = hdr; > + > + if (hdr == NULL) { > + /* Queue is now empty */ > + queue->s.tail = NULL; > } > > UNLOCK(&queue->s.lock); > @@ -515,23 +503,6 @@ int queue_deq_multi(queue_entry_t *queue, > odp_buffer_hdr_t *buf_hdr[], int num) > return i; > } > > -int queue_deq_multi_destroy(queue_entry_t *queue, > - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, > - int num ODP_UNUSED) > -{ > - LOCK(&queue->s.lock); > - > - odp_buffer_free(queue->s.sched_buf); > - queue->s.sched_buf = ODP_BUFFER_INVALID; > - queue->s.status = QUEUE_STATUS_FREE; > - queue->s.head = NULL; > - queue->s.tail = NULL; > - > - UNLOCK(&queue->s.lock); > - > - return 0; > -} > - > int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num) > { > queue_entry_t *queue; > diff --git a/platform/linux-generic/odp_schedule.c > b/platform/linux-generic/odp_schedule.c > index dd65168..59e40c7 100644 > --- a/platform/linux-generic/odp_schedule.c > +++ b/platform/linux-generic/odp_schedule.c > @@ -21,17 +21,15 @@ > #include <odp/hints.h> > > #include <odp_queue_internal.h> > +#include <odp_packet_io_internal.h> > > - > -/* Limits to number of scheduled queues */ > -#define SCHED_POOL_SIZE (256*1024) > +/* Number of schedule commands. > + * One per scheduled queue and packet interface */ > +#define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES) > > /* Scheduler sub queues */ > #define QUEUES_PER_PRIO 4 > > -/* TODO: random or queue based selection */ > Needs to have a bug link, a todo that makes it into the repo is a known deficiency in the code > -#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x))) > - > /* Maximum number of dequeues */ > #define MAX_DEQ 4 > > @@ -48,21 +46,36 @@ typedef struct { > pri_mask_t pri_mask[ODP_CONFIG_SCHED_PRIOS]; > odp_spinlock_t mask_lock; > odp_pool_t pool; > + uint32_t pri_count[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO]; > } sched_t; > > +/* Schedule command */ > typedef struct { > - odp_queue_t queue; > + int cmd; > + > + union { > + queue_entry_t *qe; > + > + struct { > + odp_pktio_t pktio; > + pktio_entry_t *pe; > + int prio; > + }; > + }; > +} sched_cmd_t; > + > +#define SCHED_CMD_DEQUEUE 0 > +#define SCHED_CMD_POLL_PKTIN 1 > > -} queue_desc_t; > > typedef struct { > odp_queue_t pri_queue; > - odp_event_t desc_ev; > + odp_event_t cmd_ev; > > - odp_event_t ev[MAX_DEQ]; > + odp_buffer_hdr_t *buf_hdr[MAX_DEQ]; > + queue_entry_t *qe; > int num; > int index; > - odp_queue_t queue; > int pause; > > } sched_local_t; > @@ -73,14 +86,6 @@ static sched_t *sched; > /* Thread local scheduler context */ > static __thread sched_local_t sched_local; > > - > -static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio) > -{ > - int id = SEL_PRI_QUEUE(queue); > - return sched->pri_queue[prio][id]; > -} > - > - > int odp_schedule_init_global(void) > { > odp_shm_t shm; > @@ -101,9 +106,11 @@ int odp_schedule_init_global(void) > return -1; > } > > - params.buf.size = sizeof(queue_desc_t); > + memset(sched, 0, sizeof(sched_t)); > + > + params.buf.size = sizeof(sched_cmd_t); > params.buf.align = 0; > - params.buf.num = SCHED_POOL_SIZE/sizeof(queue_desc_t); > + params.buf.num = NUM_SCHED_CMD; > params.type = ODP_POOL_BUFFER; > > pool = odp_pool_create("odp_sched_pool", ODP_SHM_NULL, ¶ms); > @@ -178,15 +185,17 @@ int odp_schedule_init_local(void) > { > int i; > > + memset(&sched_local, 0, sizeof(sched_local_t)); > + > sched_local.pri_queue = ODP_QUEUE_INVALID; > - sched_local.desc_ev = ODP_EVENT_INVALID; > + sched_local.cmd_ev = ODP_EVENT_INVALID; > > for (i = 0; i < MAX_DEQ; i++) > - sched_local.ev[i] = ODP_EVENT_INVALID; > + sched_local.buf_hdr[i] = NULL; > > + sched_local.qe = NULL; > sched_local.num = 0; > sched_local.index = 0; > - sched_local.queue = ODP_QUEUE_INVALID; > sched_local.pause = 0; > > return 0; > @@ -198,50 +207,128 @@ int odp_schedule_term_local(void) > return 0; > } > > -void odp_schedule_mask_set(odp_queue_t queue, int prio) > +static int pri_id_queue(odp_queue_t queue) > { > - int id = SEL_PRI_QUEUE(queue); > + return (QUEUES_PER_PRIO-1) & (queue_to_id(queue)); > +} > > +static int pri_id_pktio(odp_pktio_t pktio) > +{ > + return (QUEUES_PER_PRIO-1) & (pktio_to_id(pktio)); > +} > + > +static odp_queue_t pri_set(int id, int prio) > +{ > odp_spinlock_lock(&sched->mask_lock); > sched->pri_mask[prio] |= 1 << id; > + sched->pri_count[prio][id]++; > odp_spinlock_unlock(&sched->mask_lock); > + > + return sched->pri_queue[prio][id]; > +} > + > +static void pri_clr(int id, int prio) > +{ > + odp_spinlock_lock(&sched->mask_lock); > + > + /* Clear mask bit when last queue is removed*/ > + sched->pri_count[prio][id]--; > + > + if (sched->pri_count[prio][id] == 0) > + sched->pri_mask[prio] &= (uint8_t)(~(1 << id)); > + > + odp_spinlock_unlock(&sched->mask_lock); > +} > + > +static odp_queue_t pri_set_queue(odp_queue_t queue, int prio) > +{ > + int id = pri_id_queue(queue); > + > + return pri_set(id, prio); > +} > + > +static odp_queue_t pri_set_pktio(odp_pktio_t pktio, int prio) > +{ > + int id = pri_id_pktio(pktio); > + > + return pri_set(id, prio); > +} > + > +static void pri_clr_queue(odp_queue_t queue, int prio) > +{ > + int id = pri_id_queue(queue); > + pri_clr(id, prio); > } > > +static void pri_clr_pktio(odp_pktio_t pktio, int prio) > +{ > + int id = pri_id_pktio(pktio); > + pri_clr(id, prio); > +} > > -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue) > +int schedule_queue_init(queue_entry_t *qe) > { > odp_buffer_t buf; > + sched_cmd_t *sched_cmd; > > buf = odp_buffer_alloc(sched->pool); > > - if (buf != ODP_BUFFER_INVALID) { > - queue_desc_t *desc; > - desc = odp_buffer_addr(buf); > - desc->queue = queue; > - } > + if (buf == ODP_BUFFER_INVALID) > + return -1; > > - return buf; > -} > + sched_cmd = odp_buffer_addr(buf); > + sched_cmd->cmd = SCHED_CMD_DEQUEUE; > + sched_cmd->qe = qe; > > + qe->s.cmd_ev = odp_buffer_to_event(buf); > + qe->s.pri_queue = pri_set_queue(queue_handle(qe), queue_prio(qe)); > > -void odp_schedule_queue(odp_queue_t queue, int prio) > + return 0; > +} > + > +void schedule_queue_destroy(queue_entry_t *qe) > { > - odp_buffer_t desc_buf; > - odp_queue_t pri_queue; > + odp_buffer_t buf; > > - pri_queue = select_pri_queue(queue, prio); > - desc_buf = queue_sched_buf(queue); > + buf = odp_buffer_from_event(qe->s.cmd_ev); > + odp_buffer_free(buf); > > - odp_queue_enq(pri_queue, odp_buffer_to_event(desc_buf)); > + pri_clr_queue(queue_handle(qe), queue_prio(qe)); > + > + qe->s.cmd_ev = ODP_EVENT_INVALID; > + qe->s.pri_queue = ODP_QUEUE_INVALID; > } > > +int schedule_pktio_start(odp_pktio_t pktio, int prio) > +{ > + odp_buffer_t buf; > + sched_cmd_t *sched_cmd; > + odp_queue_t pri_queue; > + > + buf = odp_buffer_alloc(sched->pool); > + > + if (buf == ODP_BUFFER_INVALID) > + return -1; > + > + sched_cmd = odp_buffer_addr(buf); > + sched_cmd->cmd = SCHED_CMD_POLL_PKTIN; > + sched_cmd->pktio = pktio; > + sched_cmd->pe = get_pktio_entry(pktio); > + sched_cmd->prio = prio; > + > + pri_queue = pri_set_pktio(pktio, prio); > + > + odp_queue_enq(pri_queue, odp_buffer_to_event(buf)); > + > + return 0; > +} > > void odp_schedule_release_atomic(void) > { > if (sched_local.pri_queue != ODP_QUEUE_INVALID && > sched_local.num == 0) { > /* Release current atomic queue */ > - odp_queue_enq(sched_local.pri_queue, sched_local.desc_ev); > + odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev); > sched_local.pri_queue = ODP_QUEUE_INVALID; > } > } > @@ -252,7 +339,8 @@ static inline int copy_events(odp_event_t out_ev[], > unsigned int max) > int i = 0; > > while (sched_local.num && max) { > - out_ev[i] = sched_local.ev[sched_local.index]; > + odp_buffer_hdr_t *hdr = > sched_local.buf_hdr[sched_local.index]; > + out_ev[i] = odp_buffer_to_event(hdr->handle.handle); > sched_local.index++; > sched_local.num--; > max--; > @@ -279,7 +367,7 @@ static int schedule(odp_queue_t *out_queue, > odp_event_t out_ev[], > ret = copy_events(out_ev, max_num); > > if (out_queue) > - *out_queue = sched_local.queue; > + *out_queue = queue_handle(sched_local.qe); > > return ret; > } > @@ -302,7 +390,10 @@ static int schedule(odp_queue_t *out_queue, > odp_event_t out_ev[], > for (j = 0; j < QUEUES_PER_PRIO; j++, id++) { > odp_queue_t pri_q; > odp_event_t ev; > - odp_buffer_t desc_buf; > + odp_buffer_t buf; > + sched_cmd_t *sched_cmd; > + queue_entry_t *qe; > + int num; > > if (id >= QUEUES_PER_PRIO) > id = 0; > @@ -310,59 +401,63 @@ static int schedule(odp_queue_t *out_queue, > odp_event_t out_ev[], > if (odp_unlikely((sched->pri_mask[i] & (1 << id)) > == 0)) > continue; > > - pri_q = sched->pri_queue[i][id]; > - ev = odp_queue_deq(pri_q); > - desc_buf = odp_buffer_from_event(ev); > - > - if (desc_buf != ODP_BUFFER_INVALID) { > - queue_desc_t *desc; > - odp_queue_t queue; > - int num; > - > - desc = odp_buffer_addr(desc_buf); > - queue = desc->queue; > - > - if (odp_queue_type(queue) == > - ODP_QUEUE_TYPE_PKTIN && > - !queue_is_sched(queue)) > - continue; > - > - num = odp_queue_deq_multi(queue, > sched_local.ev, > - max_deq); > - > - if (num == 0) { > - /* Remove empty queue from > scheduling, > - * except packet input queues > - */ > - if (odp_queue_type(queue) == > - ODP_QUEUE_TYPE_PKTIN && > - !queue_is_free(queue)) > - odp_queue_enq(pri_q, ev); > - > - continue; > - } > - > - sched_local.num = num; > - sched_local.index = 0; > - ret = copy_events(out_ev, max_num); > - > - sched_local.queue = queue; > - > - if (queue_sched_atomic(queue)) { > - /* Hold queue during atomic access > */ > - sched_local.pri_queue = pri_q; > - sched_local.desc_ev = ev; > + pri_q = sched->pri_queue[i][id]; > + ev = odp_queue_deq(pri_q); > + buf = odp_buffer_from_event(ev); > + > + if (buf == ODP_BUFFER_INVALID) > + continue; > + > + sched_cmd = odp_buffer_addr(buf); > + > + if (sched_cmd->cmd == SCHED_CMD_POLL_PKTIN) { > + /* Poll packet input */ > + if (pktin_poll(sched_cmd->pe)) { > + /* Stop scheduling the pktio */ > + pri_clr_pktio(sched_cmd->pktio, > + sched_cmd->prio); > + odp_buffer_free(buf); > } else { > - /* Continue scheduling the queue */ > + /* Continue scheduling the pktio */ > odp_queue_enq(pri_q, ev); > } > > - /* Output the source queue handle */ > - if (out_queue) > - *out_queue = queue; > + continue; > + } > + > + qe = sched_cmd->qe; > + num = queue_deq_multi(qe, sched_local.buf_hdr, > max_deq); > > - return ret; > + if (num < 0) { > + /* Destroyed queue */ > + queue_destroy_finalize(qe); > + continue; > } > + > + if (num == 0) { > + /* Remove empty queue from scheduling */ > + continue; > + } > + > + sched_local.num = num; > + sched_local.index = 0; > + sched_local.qe = qe; > + ret = copy_events(out_ev, max_num); > + > + if (queue_is_atomic(qe)) { > + /* Hold queue during atomic access */ > + sched_local.pri_queue = pri_q; > + sched_local.cmd_ev = ev; > + } else { > + /* Continue scheduling the queue */ > + odp_queue_enq(pri_q, ev); > + } > + > + /* Output the source queue handle */ > + if (out_queue) > + *out_queue = queue_handle(qe); > + > + return ret; > } > } > > -- > 2.3.3 > > > _______________________________________________ > lng-odp mailing list > lng-odp@lists.linaro.org > http://lists.linaro.org/mailman/listinfo/lng-odp > -- Mike Holmes Technical Manager - Linaro Networking Group Linaro.org <http://www.linaro.org/> *│ *Open source software for ARM SoCs
_______________________________________________ lng-odp mailing list lng-odp@lists.linaro.org http://lists.linaro.org/mailman/listinfo/lng-odp