Scheduler runs by polling scheduler priority queues for schedule commands. There are two types of scheduler commands: queue dequeue and packet input poll. Packet input is polled directly when a poll command is received. From schduler point of view, the default packet input queue is like any other queue.
Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> --- .../linux-generic/include/odp_packet_io_internal.h | 17 +- .../linux-generic/include/odp_queue_internal.h | 34 +-- .../linux-generic/include/odp_schedule_internal.h | 14 +- platform/linux-generic/odp_packet_io.c | 78 ++++-- platform/linux-generic/odp_queue.c | 193 ++++++-------- platform/linux-generic/odp_schedule.c | 277 ++++++++++++++------- 6 files changed, 369 insertions(+), 244 deletions(-) diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index 47b8992..161be16 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -40,6 +40,8 @@ typedef enum { struct pktio_entry { odp_spinlock_t lock; /**< entry spinlock */ int taken; /**< is entry taken(1) or free(0) */ + int cls_ena; /**< is classifier enabled */ + odp_pktio_t handle; /**< pktio handle */ odp_queue_t inq_default; /**< default input queue, if set */ odp_queue_t outq_default; /**< default out queue */ odp_queue_t loopq; /**< loopback queue for "loop" device */ @@ -64,15 +66,22 @@ typedef struct { extern void *pktio_entry_ptr[]; +static inline int pktio_to_id(odp_pktio_t pktio) +{ + return _odp_typeval(pktio) - 1; +} -static inline pktio_entry_t *get_pktio_entry(odp_pktio_t id) +static inline pktio_entry_t *get_pktio_entry(odp_pktio_t pktio) { - if (odp_unlikely(id == ODP_PKTIO_INVALID || - _odp_typeval(id) > ODP_CONFIG_PKTIO_ENTRIES)) + if (odp_unlikely(pktio == ODP_PKTIO_INVALID || + _odp_typeval(pktio) > ODP_CONFIG_PKTIO_ENTRIES)) return NULL; - return pktio_entry_ptr[_odp_typeval(id) - 1]; + return pktio_entry_ptr[pktio_to_id(pktio)]; } + +int pktin_poll(pktio_entry_t *entry); + #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 65aae14..61d0c43 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -36,10 +36,11 @@ extern "C" { #define QUEUE_MULTI_MAX 8 #define QUEUE_STATUS_FREE 0 -#define QUEUE_STATUS_READY 1 -#define QUEUE_STATUS_NOTSCHED 2 -#define QUEUE_STATUS_SCHED 3 -#define QUEUE_STATUS_DESTROYED 4 +#define QUEUE_STATUS_DESTROYED 1 +#define QUEUE_STATUS_READY 2 +#define QUEUE_STATUS_NOTSCHED 3 +#define QUEUE_STATUS_SCHED 4 + /* forward declaration */ union queue_entry_u; @@ -69,7 +70,8 @@ struct queue_entry_s { deq_multi_func_t dequeue_multi; odp_queue_t handle; - odp_buffer_t sched_buf; + odp_queue_t pri_queue; + odp_event_t cmd_ev; odp_queue_type_t type; odp_queue_param_t param; odp_pktio_t pktin; @@ -100,7 +102,6 @@ int queue_deq_multi_destroy(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue); -odp_buffer_t queue_sched_buf(odp_queue_t queue); int queue_sched_atomic(odp_queue_t handle); static inline uint32_t queue_to_id(odp_queue_t handle) @@ -121,24 +122,23 @@ static inline queue_entry_t *queue_to_qentry(odp_queue_t handle) return get_qentry(queue_id); } -static inline int queue_is_free(odp_queue_t handle) +static inline int queue_is_atomic(queue_entry_t *qe) { - queue_entry_t *queue; - - queue = queue_to_qentry(handle); + return qe->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC; +} - return queue->s.status == QUEUE_STATUS_FREE; +static inline odp_queue_t queue_handle(queue_entry_t *qe) +{ + return qe->s.handle; } -static inline int queue_is_sched(odp_queue_t handle) +static inline int queue_prio(queue_entry_t *qe) { - queue_entry_t *queue; + return qe->s.param.sched.prio; +} - queue = queue_to_qentry(handle); +void queue_destroy_finalize(queue_entry_t *qe); - return ((queue->s.status == QUEUE_STATUS_SCHED) && - (queue->s.pktin != ODP_PKTIO_INVALID)); -} #ifdef __cplusplus } #endif diff --git a/platform/linux-generic/include/odp_schedule_internal.h b/platform/linux-generic/include/odp_schedule_internal.h index acda2e4..904bfbd 100644 --- a/platform/linux-generic/include/odp_schedule_internal.h +++ b/platform/linux-generic/include/odp_schedule_internal.h @@ -16,12 +16,20 @@ extern "C" { #include <odp/buffer.h> #include <odp/queue.h> +#include <odp/packet_io.h> +#include <odp_queue_internal.h> -void odp_schedule_mask_set(odp_queue_t queue, int prio); -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue); +int schedule_queue_init(queue_entry_t *qe); +void schedule_queue_destroy(queue_entry_t *qe); -void odp_schedule_queue(odp_queue_t queue, int prio); +static inline void schedule_queue(const queue_entry_t *qe) +{ + odp_queue_enq(qe->s.pri_queue, qe->s.cmd_ev); +} + + +int schedule_pktio_start(odp_pktio_t pktio, int prio); #ifdef __cplusplus diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 21f0c17..4ab45c0 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -142,6 +142,7 @@ static void unlock_entry_classifier(pktio_entry_t *entry) static void init_pktio_entry(pktio_entry_t *entry) { set_taken(entry); + entry->s.cls_ena = 1; /* TODO: disable cls by default */ entry->s.inq_default = ODP_QUEUE_INVALID; memset(&entry->s.pkt_sock, 0, sizeof(entry->s.pkt_sock)); memset(&entry->s.pkt_sock_mmap, 0, sizeof(entry->s.pkt_sock_mmap)); @@ -273,6 +274,8 @@ static odp_pktio_t setup_pktio_entry(const char *dev, odp_pool_t pool) unlock_entry_classifier(pktio_entry); } + pktio_entry->s.handle = id; + return id; } @@ -475,19 +478,27 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) qentry = queue_to_qentry(queue); - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) - return -1; - lock_entry(pktio_entry); pktio_entry->s.inq_default = queue; unlock_entry(pktio_entry); - queue_lock(qentry); - qentry->s.pktin = id; - qentry->s.status = QUEUE_STATUS_SCHED; - queue_unlock(qentry); - - odp_schedule_queue(queue, qentry->s.param.sched.prio); + switch (qentry->s.type) { + case ODP_QUEUE_TYPE_PKTIN: + /* User polls the input queue */ + queue_lock(qentry); + qentry->s.pktin = id; + queue_unlock(qentry); + /*break; TODO: Uncomment and change _TYPE_PKTIN to _POLL*/ + case ODP_QUEUE_TYPE_SCHED: + /* Packet input through the scheduler */ + if (schedule_pktio_start(id, ODP_SCHED_PRIO_LOWEST)) { + ODP_ERR("Schedule pktio start failed\n"); + return -1; + } + break; + default: + ODP_ABORT("Bad queue type\n"); + } return 0; } @@ -506,15 +517,6 @@ int odp_pktio_inq_remdef(odp_pktio_t id) qentry = queue_to_qentry(queue); queue_lock(qentry); - if (qentry->s.status == QUEUE_STATUS_FREE) { - queue_unlock(qentry); - unlock_entry(pktio_entry); - return -1; - } - - qentry->s.enqueue = queue_enq_dummy; - qentry->s.enqueue_multi = queue_enq_multi_dummy; - qentry->s.status = QUEUE_STATUS_NOTSCHED; qentry->s.pktin = ODP_PKTIO_INVALID; queue_unlock(qentry); @@ -665,6 +667,46 @@ int pktin_deq_multi(queue_entry_t *qentry, odp_buffer_hdr_t *buf_hdr[], int num) return nbr; } +int pktin_poll(pktio_entry_t *entry) +{ + odp_packet_t pkt_tbl[QUEUE_MULTI_MAX]; + odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX]; + int num, num_enq, i; + + if (odp_unlikely(is_free(entry))) + return -1; + + num = odp_pktio_recv(entry->s.handle, pkt_tbl, QUEUE_MULTI_MAX); + + if (num < 0) { + ODP_ERR("Packet recv error\n"); + return -1; + } + + for (i = 0, num_enq = 0; i < num; ++i) { + odp_buffer_t buf; + odp_buffer_hdr_t *hdr; + + buf = _odp_packet_to_buffer(pkt_tbl[i]); + hdr = odp_buf_to_hdr(buf); + + if (entry->s.cls_ena) { + if (packet_classifier(entry->s.handle, pkt_tbl[i]) < 0) + hdr_tbl[num_enq++] = hdr; + } else { + hdr_tbl[num_enq++] = hdr; + } + } + + if (num_enq) { + queue_entry_t *qentry; + qentry = queue_to_qentry(entry->s.inq_default); + queue_enq_multi(qentry, hdr_tbl, num_enq); + } + + return 0; +} + /** function should be called with locked entry */ static int sockfd_from_pktio_entry(pktio_entry_t *entry) { diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index 4bb8b9b..4a0465b 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -88,7 +88,9 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.head = NULL; queue->s.tail = NULL; - queue->s.sched_buf = ODP_BUFFER_INVALID; + + queue->s.pri_queue = ODP_QUEUE_INVALID; + queue->s.cmd_ev = ODP_EVENT_INVALID; } @@ -222,22 +224,26 @@ odp_queue_t odp_queue_create(const char *name, odp_queue_type_t type, if (handle != ODP_QUEUE_INVALID && (type == ODP_QUEUE_TYPE_SCHED || type == ODP_QUEUE_TYPE_PKTIN)) { - odp_buffer_t buf; - - buf = odp_schedule_buffer_alloc(handle); - if (buf == ODP_BUFFER_INVALID) { - queue->s.status = QUEUE_STATUS_FREE; - ODP_ERR("queue_init: sched buf alloc failed\n"); + if (schedule_queue_init(queue)) { + ODP_ERR("schedule queue init failed\n"); return ODP_QUEUE_INVALID; } - - queue->s.sched_buf = buf; - odp_schedule_mask_set(handle, queue->s.param.sched.prio); } return handle; } +void queue_destroy_finalize(queue_entry_t *queue) +{ + LOCK(&queue->s.lock); + + if (queue->s.status == QUEUE_STATUS_DESTROYED) { + queue->s.status = QUEUE_STATUS_FREE; + schedule_queue_destroy(queue); + } + UNLOCK(&queue->s.lock); +} + int odp_queue_destroy(odp_queue_t handle) { queue_entry_t *queue; @@ -246,41 +252,31 @@ int odp_queue_destroy(odp_queue_t handle) LOCK(&queue->s.lock); if (queue->s.status == QUEUE_STATUS_FREE) { UNLOCK(&queue->s.lock); - ODP_ERR("queue_destroy: queue \"%s\" already free\n", - queue->s.name); + ODP_ERR("queue \"%s\" already free\n", queue->s.name); + return -1; + } + if (queue->s.status == QUEUE_STATUS_DESTROYED) { + UNLOCK(&queue->s.lock); + ODP_ERR("queue \"%s\" already destroyed\n", queue->s.name); return -1; } if (queue->s.head != NULL) { UNLOCK(&queue->s.lock); - ODP_ERR("queue_destroy: queue \"%s\" not empty\n", - queue->s.name); + ODP_ERR("queue \"%s\" not empty\n", queue->s.name); return -1; } - queue->s.enqueue = queue_enq_dummy; - queue->s.enqueue_multi = queue_enq_multi_dummy; - switch (queue->s.status) { case QUEUE_STATUS_READY: queue->s.status = QUEUE_STATUS_FREE; - queue->s.head = NULL; - queue->s.tail = NULL; + break; + case QUEUE_STATUS_NOTSCHED: + queue->s.status = QUEUE_STATUS_FREE; + schedule_queue_destroy(queue); break; case QUEUE_STATUS_SCHED: - /* - * Override dequeue_multi to destroy queue when it will - * be scheduled next time. - */ + /* Queue is still in scheduling */ queue->s.status = QUEUE_STATUS_DESTROYED; - queue->s.dequeue_multi = queue_deq_multi_destroy; - break; - case QUEUE_STATUS_NOTSCHED: - /* Queue won't be scheduled anymore */ - odp_buffer_free(queue->s.sched_buf); - queue->s.sched_buf = ODP_BUFFER_INVALID; - queue->s.status = QUEUE_STATUS_FREE; - queue->s.head = NULL; - queue->s.tail = NULL; break; default: ODP_ABORT("Unexpected queue status\n"); @@ -290,23 +286,6 @@ int odp_queue_destroy(odp_queue_t handle) return 0; } -odp_buffer_t queue_sched_buf(odp_queue_t handle) -{ - queue_entry_t *queue; - queue = queue_to_qentry(handle); - - return queue->s.sched_buf; -} - - -int queue_sched_atomic(odp_queue_t handle) -{ - queue_entry_t *queue; - queue = queue_to_qentry(handle); - - return queue->s.param.sched.sync == ODP_SCHED_SYNC_ATOMIC; -} - int odp_queue_set_context(odp_queue_t handle, void *context) { queue_entry_t *queue; @@ -352,6 +331,12 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) int sched = 0; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&queue->s.lock); + ODP_ERR("Bad queue status\n"); + return -1; + } + if (queue->s.head == NULL) { /* Empty queue */ queue->s.head = buf_hdr; @@ -370,8 +355,8 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) UNLOCK(&queue->s.lock); /* Add queue to scheduling */ - if (sched == 1) - odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio); + if (sched) + schedule_queue(queue); return 0; } @@ -389,6 +374,12 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) buf_hdr[num-1]->next = NULL; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + UNLOCK(&queue->s.lock); + ODP_ERR("Bad queue status\n"); + return -1; + } + /* Empty queue */ if (queue->s.head == NULL) queue->s.head = buf_hdr[0]; @@ -404,25 +395,12 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) UNLOCK(&queue->s.lock); /* Add queue to scheduling */ - if (sched == 1) - odp_schedule_queue(queue->s.handle, queue->s.param.sched.prio); + if (sched) + schedule_queue(queue); return num; /* All events enqueued */ } -int queue_enq_dummy(queue_entry_t *queue ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr ODP_UNUSED) -{ - return -1; -} - -int queue_enq_multi_dummy(queue_entry_t *queue ODP_UNUSED, - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, - int num ODP_UNUSED) -{ - return -1; -} - int odp_queue_enq_multi(odp_queue_t handle, const odp_event_t ev[], int num) { odp_buffer_hdr_t *buf_hdr[QUEUE_MULTI_MAX]; @@ -455,24 +433,26 @@ int odp_queue_enq(odp_queue_t handle, odp_event_t ev) odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) { - odp_buffer_hdr_t *buf_hdr = NULL; + odp_buffer_hdr_t *buf_hdr; LOCK(&queue->s.lock); if (queue->s.head == NULL) { /* Already empty queue */ - if (queue->s.status == QUEUE_STATUS_SCHED && - queue->s.type != ODP_QUEUE_TYPE_PKTIN) + if (queue->s.status == QUEUE_STATUS_SCHED) queue->s.status = QUEUE_STATUS_NOTSCHED; - } else { - buf_hdr = queue->s.head; - queue->s.head = buf_hdr->next; - buf_hdr->next = NULL; - if (queue->s.head == NULL) { - /* Queue is now empty */ - queue->s.tail = NULL; - } + UNLOCK(&queue->s.lock); + return NULL; + } + + buf_hdr = queue->s.head; + queue->s.head = buf_hdr->next; + buf_hdr->next = NULL; + + if (queue->s.head == NULL) { + /* Queue is now empty */ + queue->s.tail = NULL; } UNLOCK(&queue->s.lock); @@ -483,31 +463,39 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { - int i = 0; + odp_buffer_hdr_t *hdr; + int i; LOCK(&queue->s.lock); + if (odp_unlikely(queue->s.status < QUEUE_STATUS_READY)) { + /* Bad queue, or queue has been destroyed. + * Scheduler finalizes queue destroy after this. */ + UNLOCK(&queue->s.lock); + return -1; + } - if (queue->s.head == NULL) { + hdr = queue->s.head; + + if (hdr == NULL) { /* Already empty queue */ - if (queue->s.status == QUEUE_STATUS_SCHED && - queue->s.type != ODP_QUEUE_TYPE_PKTIN) + if (queue->s.status == QUEUE_STATUS_SCHED) queue->s.status = QUEUE_STATUS_NOTSCHED; - } else { - odp_buffer_hdr_t *hdr = queue->s.head; - for (; i < num && hdr; i++) { - buf_hdr[i] = hdr; - /* odp_prefetch(hdr->addr); */ - hdr = hdr->next; - buf_hdr[i]->next = NULL; - } + UNLOCK(&queue->s.lock); + return 0; + } - queue->s.head = hdr; + for (i = 0; i < num && hdr; i++) { + buf_hdr[i] = hdr; + hdr = hdr->next; + buf_hdr[i]->next = NULL; + } - if (hdr == NULL) { - /* Queue is now empty */ - queue->s.tail = NULL; - } + queue->s.head = hdr; + + if (hdr == NULL) { + /* Queue is now empty */ + queue->s.tail = NULL; } UNLOCK(&queue->s.lock); @@ -515,23 +503,6 @@ int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) return i; } -int queue_deq_multi_destroy(queue_entry_t *queue, - odp_buffer_hdr_t *buf_hdr[] ODP_UNUSED, - int num ODP_UNUSED) -{ - LOCK(&queue->s.lock); - - odp_buffer_free(queue->s.sched_buf); - queue->s.sched_buf = ODP_BUFFER_INVALID; - queue->s.status = QUEUE_STATUS_FREE; - queue->s.head = NULL; - queue->s.tail = NULL; - - UNLOCK(&queue->s.lock); - - return 0; -} - int odp_queue_deq_multi(odp_queue_t handle, odp_event_t events[], int num) { queue_entry_t *queue; diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index dd65168..59e40c7 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -21,17 +21,15 @@ #include <odp/hints.h> #include <odp_queue_internal.h> +#include <odp_packet_io_internal.h> - -/* Limits to number of scheduled queues */ -#define SCHED_POOL_SIZE (256*1024) +/* Number of schedule commands. + * One per scheduled queue and packet interface */ +#define NUM_SCHED_CMD (ODP_CONFIG_QUEUES + ODP_CONFIG_PKTIO_ENTRIES) /* Scheduler sub queues */ #define QUEUES_PER_PRIO 4 -/* TODO: random or queue based selection */ -#define SEL_PRI_QUEUE(x) ((QUEUES_PER_PRIO-1) & (queue_to_id(x))) - /* Maximum number of dequeues */ #define MAX_DEQ 4 @@ -48,21 +46,36 @@ typedef struct { pri_mask_t pri_mask[ODP_CONFIG_SCHED_PRIOS]; odp_spinlock_t mask_lock; odp_pool_t pool; + uint32_t pri_count[ODP_CONFIG_SCHED_PRIOS][QUEUES_PER_PRIO]; } sched_t; +/* Schedule command */ typedef struct { - odp_queue_t queue; + int cmd; + + union { + queue_entry_t *qe; + + struct { + odp_pktio_t pktio; + pktio_entry_t *pe; + int prio; + }; + }; +} sched_cmd_t; + +#define SCHED_CMD_DEQUEUE 0 +#define SCHED_CMD_POLL_PKTIN 1 -} queue_desc_t; typedef struct { odp_queue_t pri_queue; - odp_event_t desc_ev; + odp_event_t cmd_ev; - odp_event_t ev[MAX_DEQ]; + odp_buffer_hdr_t *buf_hdr[MAX_DEQ]; + queue_entry_t *qe; int num; int index; - odp_queue_t queue; int pause; } sched_local_t; @@ -73,14 +86,6 @@ static sched_t *sched; /* Thread local scheduler context */ static __thread sched_local_t sched_local; - -static inline odp_queue_t select_pri_queue(odp_queue_t queue, int prio) -{ - int id = SEL_PRI_QUEUE(queue); - return sched->pri_queue[prio][id]; -} - - int odp_schedule_init_global(void) { odp_shm_t shm; @@ -101,9 +106,11 @@ int odp_schedule_init_global(void) return -1; } - params.buf.size = sizeof(queue_desc_t); + memset(sched, 0, sizeof(sched_t)); + + params.buf.size = sizeof(sched_cmd_t); params.buf.align = 0; - params.buf.num = SCHED_POOL_SIZE/sizeof(queue_desc_t); + params.buf.num = NUM_SCHED_CMD; params.type = ODP_POOL_BUFFER; pool = odp_pool_create("odp_sched_pool", ODP_SHM_NULL, ¶ms); @@ -178,15 +185,17 @@ int odp_schedule_init_local(void) { int i; + memset(&sched_local, 0, sizeof(sched_local_t)); + sched_local.pri_queue = ODP_QUEUE_INVALID; - sched_local.desc_ev = ODP_EVENT_INVALID; + sched_local.cmd_ev = ODP_EVENT_INVALID; for (i = 0; i < MAX_DEQ; i++) - sched_local.ev[i] = ODP_EVENT_INVALID; + sched_local.buf_hdr[i] = NULL; + sched_local.qe = NULL; sched_local.num = 0; sched_local.index = 0; - sched_local.queue = ODP_QUEUE_INVALID; sched_local.pause = 0; return 0; @@ -198,50 +207,128 @@ int odp_schedule_term_local(void) return 0; } -void odp_schedule_mask_set(odp_queue_t queue, int prio) +static int pri_id_queue(odp_queue_t queue) { - int id = SEL_PRI_QUEUE(queue); + return (QUEUES_PER_PRIO-1) & (queue_to_id(queue)); +} +static int pri_id_pktio(odp_pktio_t pktio) +{ + return (QUEUES_PER_PRIO-1) & (pktio_to_id(pktio)); +} + +static odp_queue_t pri_set(int id, int prio) +{ odp_spinlock_lock(&sched->mask_lock); sched->pri_mask[prio] |= 1 << id; + sched->pri_count[prio][id]++; odp_spinlock_unlock(&sched->mask_lock); + + return sched->pri_queue[prio][id]; +} + +static void pri_clr(int id, int prio) +{ + odp_spinlock_lock(&sched->mask_lock); + + /* Clear mask bit when last queue is removed*/ + sched->pri_count[prio][id]--; + + if (sched->pri_count[prio][id] == 0) + sched->pri_mask[prio] &= (uint8_t)(~(1 << id)); + + odp_spinlock_unlock(&sched->mask_lock); +} + +static odp_queue_t pri_set_queue(odp_queue_t queue, int prio) +{ + int id = pri_id_queue(queue); + + return pri_set(id, prio); +} + +static odp_queue_t pri_set_pktio(odp_pktio_t pktio, int prio) +{ + int id = pri_id_pktio(pktio); + + return pri_set(id, prio); +} + +static void pri_clr_queue(odp_queue_t queue, int prio) +{ + int id = pri_id_queue(queue); + pri_clr(id, prio); } +static void pri_clr_pktio(odp_pktio_t pktio, int prio) +{ + int id = pri_id_pktio(pktio); + pri_clr(id, prio); +} -odp_buffer_t odp_schedule_buffer_alloc(odp_queue_t queue) +int schedule_queue_init(queue_entry_t *qe) { odp_buffer_t buf; + sched_cmd_t *sched_cmd; buf = odp_buffer_alloc(sched->pool); - if (buf != ODP_BUFFER_INVALID) { - queue_desc_t *desc; - desc = odp_buffer_addr(buf); - desc->queue = queue; - } + if (buf == ODP_BUFFER_INVALID) + return -1; - return buf; -} + sched_cmd = odp_buffer_addr(buf); + sched_cmd->cmd = SCHED_CMD_DEQUEUE; + sched_cmd->qe = qe; + qe->s.cmd_ev = odp_buffer_to_event(buf); + qe->s.pri_queue = pri_set_queue(queue_handle(qe), queue_prio(qe)); -void odp_schedule_queue(odp_queue_t queue, int prio) + return 0; +} + +void schedule_queue_destroy(queue_entry_t *qe) { - odp_buffer_t desc_buf; - odp_queue_t pri_queue; + odp_buffer_t buf; - pri_queue = select_pri_queue(queue, prio); - desc_buf = queue_sched_buf(queue); + buf = odp_buffer_from_event(qe->s.cmd_ev); + odp_buffer_free(buf); - odp_queue_enq(pri_queue, odp_buffer_to_event(desc_buf)); + pri_clr_queue(queue_handle(qe), queue_prio(qe)); + + qe->s.cmd_ev = ODP_EVENT_INVALID; + qe->s.pri_queue = ODP_QUEUE_INVALID; } +int schedule_pktio_start(odp_pktio_t pktio, int prio) +{ + odp_buffer_t buf; + sched_cmd_t *sched_cmd; + odp_queue_t pri_queue; + + buf = odp_buffer_alloc(sched->pool); + + if (buf == ODP_BUFFER_INVALID) + return -1; + + sched_cmd = odp_buffer_addr(buf); + sched_cmd->cmd = SCHED_CMD_POLL_PKTIN; + sched_cmd->pktio = pktio; + sched_cmd->pe = get_pktio_entry(pktio); + sched_cmd->prio = prio; + + pri_queue = pri_set_pktio(pktio, prio); + + odp_queue_enq(pri_queue, odp_buffer_to_event(buf)); + + return 0; +} void odp_schedule_release_atomic(void) { if (sched_local.pri_queue != ODP_QUEUE_INVALID && sched_local.num == 0) { /* Release current atomic queue */ - odp_queue_enq(sched_local.pri_queue, sched_local.desc_ev); + odp_queue_enq(sched_local.pri_queue, sched_local.cmd_ev); sched_local.pri_queue = ODP_QUEUE_INVALID; } } @@ -252,7 +339,8 @@ static inline int copy_events(odp_event_t out_ev[], unsigned int max) int i = 0; while (sched_local.num && max) { - out_ev[i] = sched_local.ev[sched_local.index]; + odp_buffer_hdr_t *hdr = sched_local.buf_hdr[sched_local.index]; + out_ev[i] = odp_buffer_to_event(hdr->handle.handle); sched_local.index++; sched_local.num--; max--; @@ -279,7 +367,7 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], ret = copy_events(out_ev, max_num); if (out_queue) - *out_queue = sched_local.queue; + *out_queue = queue_handle(sched_local.qe); return ret; } @@ -302,7 +390,10 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], for (j = 0; j < QUEUES_PER_PRIO; j++, id++) { odp_queue_t pri_q; odp_event_t ev; - odp_buffer_t desc_buf; + odp_buffer_t buf; + sched_cmd_t *sched_cmd; + queue_entry_t *qe; + int num; if (id >= QUEUES_PER_PRIO) id = 0; @@ -310,59 +401,63 @@ static int schedule(odp_queue_t *out_queue, odp_event_t out_ev[], if (odp_unlikely((sched->pri_mask[i] & (1 << id)) == 0)) continue; - pri_q = sched->pri_queue[i][id]; - ev = odp_queue_deq(pri_q); - desc_buf = odp_buffer_from_event(ev); - - if (desc_buf != ODP_BUFFER_INVALID) { - queue_desc_t *desc; - odp_queue_t queue; - int num; - - desc = odp_buffer_addr(desc_buf); - queue = desc->queue; - - if (odp_queue_type(queue) == - ODP_QUEUE_TYPE_PKTIN && - !queue_is_sched(queue)) - continue; - - num = odp_queue_deq_multi(queue, sched_local.ev, - max_deq); - - if (num == 0) { - /* Remove empty queue from scheduling, - * except packet input queues - */ - if (odp_queue_type(queue) == - ODP_QUEUE_TYPE_PKTIN && - !queue_is_free(queue)) - odp_queue_enq(pri_q, ev); - - continue; - } - - sched_local.num = num; - sched_local.index = 0; - ret = copy_events(out_ev, max_num); - - sched_local.queue = queue; - - if (queue_sched_atomic(queue)) { - /* Hold queue during atomic access */ - sched_local.pri_queue = pri_q; - sched_local.desc_ev = ev; + pri_q = sched->pri_queue[i][id]; + ev = odp_queue_deq(pri_q); + buf = odp_buffer_from_event(ev); + + if (buf == ODP_BUFFER_INVALID) + continue; + + sched_cmd = odp_buffer_addr(buf); + + if (sched_cmd->cmd == SCHED_CMD_POLL_PKTIN) { + /* Poll packet input */ + if (pktin_poll(sched_cmd->pe)) { + /* Stop scheduling the pktio */ + pri_clr_pktio(sched_cmd->pktio, + sched_cmd->prio); + odp_buffer_free(buf); } else { - /* Continue scheduling the queue */ + /* Continue scheduling the pktio */ odp_queue_enq(pri_q, ev); } - /* Output the source queue handle */ - if (out_queue) - *out_queue = queue; + continue; + } + + qe = sched_cmd->qe; + num = queue_deq_multi(qe, sched_local.buf_hdr, max_deq); - return ret; + if (num < 0) { + /* Destroyed queue */ + queue_destroy_finalize(qe); + continue; } + + if (num == 0) { + /* Remove empty queue from scheduling */ + continue; + } + + sched_local.num = num; + sched_local.index = 0; + sched_local.qe = qe; + ret = copy_events(out_ev, max_num); + + if (queue_is_atomic(qe)) { + /* Hold queue during atomic access */ + sched_local.pri_queue = pri_q; + sched_local.cmd_ev = ev; + } else { + /* Continue scheduling the queue */ + odp_queue_enq(pri_q, ev); + } + + /* Output the source queue handle */ + if (out_queue) + *out_queue = queue_handle(qe); + + return ret; } } -- 2.3.3 _______________________________________________ lng-odp mailing list lng-odp@lists.linaro.org http://lists.linaro.org/mailman/listinfo/lng-odp