This adds ODP_QUEUE_TYPE_NOTIF along side the existing plain and sched
queues. Notification queues are similar to plain queues, in that they
are dequeued manually. However it is also possible to wait on dequeue
(via the new odp_queue_deq_wait) method, which will block on an empty
queue in such a way that (if supported by the platform) the execution
will yield allowing the CPU to either run another thread or idle.

Signed-off-by: Sergei Trofimov <sergei.trofi...@arm.com>
---
 include/odp/api/spec/queue.h                       | 23 +++++++++-
 .../linux-generic/include/odp_packet_io_queue.h    |  4 +-
 .../linux-generic/include/odp_queue_internal.h     |  8 +++-
 platform/linux-generic/odp_packet_io.c             |  6 +--
 platform/linux-generic/odp_queue.c                 | 49 ++++++++++++++++++++--
 5 files changed, 79 insertions(+), 11 deletions(-)

diff --git a/include/odp/api/spec/queue.h b/include/odp/api/spec/queue.h
index ed9284f..5af676b 100644
--- a/include/odp/api/spec/queue.h
+++ b/include/odp/api/spec/queue.h
@@ -62,7 +62,15 @@ typedef enum odp_queue_type_t {
          * Scheduled queues are connected to the scheduler. Application must
          * not dequeue events directly from these queues but use the scheduler
          * instead. */
-       ODP_QUEUE_TYPE_SCHED
+       ODP_QUEUE_TYPE_SCHED,
+
+       /** Notification queue
+        *
+        * Behave similar to plain queues, however, threads can execute a 
waiting
+        * dequeue, causing them to block if the queue is empty.
+        *
+        */
+       ODP_QUEUE_TYPE_NOTIF
 } odp_queue_type_t;
 
 /**
@@ -309,6 +317,19 @@ odp_event_t odp_queue_deq(odp_queue_t queue);
 int odp_queue_deq_multi(odp_queue_t queue, odp_event_t events[], int num);
 
 /**
+ * Queue dequeue
+ *
+ * Dequeues next event from head of the queue, blocking if the queue
+ * is empty. Must be used only with ODP_QUEUE_TYPE_NOTIF type queues.
+ *
+ * @param queue   Queue handle
+ *
+ * @return Event handle
+ * @retval ODP_EVENT_INVALID on failure (e.g. not a notification queue)
+ */
+odp_event_t odp_queue_deq_wait(odp_queue_t handle);
+
+/**
  * Queue type
  *
  * @param queue   Queue handle
diff --git a/platform/linux-generic/include/odp_packet_io_queue.h 
b/platform/linux-generic/include/odp_packet_io_queue.h
index d1d4b22..a24adfc 100644
--- a/platform/linux-generic/include/odp_packet_io_queue.h
+++ b/platform/linux-generic/include/odp_packet_io_queue.h
@@ -29,14 +29,14 @@ ODP_STATIC_ASSERT(ODP_PKTIN_QUEUE_MAX_BURST >= 
QUEUE_MULTI_MAX,
                  "ODP_PKTIN_DEQ_MULTI_MAX_ERROR");
 
 int pktin_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *queue, int wait);
 
 int pktin_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
 int pktin_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
 
 
 int pktout_enqueue(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue);
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *queue, int wait);
 
 int pktout_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
                     int num);
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index a10628e..fb78124 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -26,9 +26,12 @@ extern "C" {
 #include <odp/api/packet_io.h>
 #include <odp/api/align.h>
 #include <odp/api/hints.h>
+
 #include <odp/api/ticketlock.h>
 #include <odp_config_internal.h>
 
+#include <semaphore.h>
+
 #define QUEUE_MULTI_MAX CONFIG_BURST_SIZE
 
 #define QUEUE_STATUS_FREE         0
@@ -42,7 +45,7 @@ extern "C" {
 union queue_entry_u;
 
 typedef int (*enq_func_t)(union queue_entry_u *, odp_buffer_hdr_t *);
-typedef        odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *);
+typedef        odp_buffer_hdr_t *(*deq_func_t)(union queue_entry_u *, int);
 
 typedef int (*enq_multi_func_t)(union queue_entry_u *,
                                odp_buffer_hdr_t **, int);
@@ -77,6 +80,7 @@ struct queue_entry_s {
        char              name[ODP_QUEUE_NAME_LEN];
 
        int               depth;
+       sem_t             notif_sem;
 };
 
 union queue_entry_u {
@@ -88,7 +92,7 @@ union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait);
 
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
diff --git a/platform/linux-generic/odp_packet_io.c 
b/platform/linux-generic/odp_packet_io.c
index 98460a5..8063806 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -590,7 +590,7 @@ int pktout_enqueue(queue_entry_t *qentry, odp_buffer_hdr_t 
*buf_hdr)
        return (nbr == len ? 0 : -1);
 }
 
-odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED)
+odp_buffer_hdr_t *pktout_dequeue(queue_entry_t *qentry ODP_UNUSED, int wait 
ODP_UNUSED)
 {
        ODP_ABORT("attempted dequeue from a pktout queue");
        return NULL;
@@ -625,13 +625,13 @@ int pktin_enqueue(queue_entry_t *qentry ODP_UNUSED,
        return -1;
 }
 
-odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry)
+odp_buffer_hdr_t *pktin_dequeue(queue_entry_t *qentry, int wait ODP_UNUSED)
 {
        odp_buffer_hdr_t *buf_hdr;
        odp_buffer_hdr_t *hdr_tbl[QUEUE_MULTI_MAX];
        int pkts;
 
-       buf_hdr = queue_deq(qentry);
+       buf_hdr = queue_deq(qentry, 0);
        if (buf_hdr != NULL)
                return buf_hdr;
 
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index d364801..82fe794 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -34,6 +34,7 @@
 #include <string.h>
 #include <inttypes.h>
 #include <unistd.h>
+#include <semaphore.h>
 
 typedef struct queue_table_t {
        queue_entry_t  queue[ODP_CONFIG_QUEUES];
@@ -88,6 +89,10 @@ static int queue_init(queue_entry_t *queue, const char *name,
                                                    0);
                }
        }
+
+       if (param->type == ODP_QUEUE_TYPE_NOTIF)
+               sem_init(&queue->s.notif_sem, 0, 0);
+
        queue->s.type = queue->s.param.type;
 
        queue->s.enqueue = queue_enq;
@@ -452,8 +457,14 @@ static inline int enq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
                queue->s.status = QUEUE_STATUS_SCHED;
                sched = 1; /* retval: schedule queue */
        }
+
        UNLOCK(&queue->s.lock);
 
+       if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+               for (i = 0; i < num; i++)
+                       sem_post(&queue->s.notif_sem);
+       }
+
        /* Add queue to scheduling */
        if (sched && sched_fn->sched_queue(queue->s.index))
                ODP_ABORT("schedule_queue failed\n");
@@ -571,6 +582,12 @@ static inline int deq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[],
                queue->s.depth -= i;
        }
 
+       if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+               for (j = 0; j < i; j++) {
+                       sem_trywait(&queue->s.notif_sem);
+               }
+       }
+
        /* Queue is empty */
        if (hdr == NULL)
                queue->s.tail = NULL;
@@ -588,11 +605,20 @@ int queue_deq_multi(queue_entry_t *queue, 
odp_buffer_hdr_t *buf_hdr[], int num)
        return deq_multi(queue, buf_hdr, num);
 }
 
-odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
+odp_buffer_hdr_t *queue_deq(queue_entry_t *queue, int wait)
 {
        odp_buffer_hdr_t *buf_hdr = NULL;
        int ret;
 
+       if (queue->s.type == ODP_QUEUE_TYPE_NOTIF) {
+               if (wait) {
+                       sem_wait(&queue->s.notif_sem);
+               } else {
+                       if (sem_trywait(&queue->s.notif_sem))
+                               return NULL;
+               }
+       }
+
        ret = deq_multi(queue, &buf_hdr, 1);
 
        if (ret == 1)
@@ -620,14 +646,31 @@ int odp_queue_deq_multi(odp_queue_t handle, odp_event_t 
events[], int num)
        return ret;
 }
 
-
 odp_event_t odp_queue_deq(odp_queue_t handle)
 {
        queue_entry_t *queue;
        odp_buffer_hdr_t *buf_hdr;
 
        queue   = queue_to_qentry(handle);
-       buf_hdr = queue->s.dequeue(queue);
+       buf_hdr = queue->s.dequeue(queue, 0);
+
+       if (buf_hdr)
+               return odp_buffer_to_event(buf_hdr->handle.handle);
+
+       return ODP_EVENT_INVALID;
+}
+
+odp_event_t odp_queue_deq_wait(odp_queue_t handle)
+{
+       queue_entry_t *queue;
+       odp_buffer_hdr_t *buf_hdr;
+
+       queue   = queue_to_qentry(handle);
+
+       if (queue->s.type != ODP_QUEUE_TYPE_NOTIF)
+               return ODP_EVENT_INVALID;
+
+       buf_hdr = queue->s.dequeue(queue, 1);
 
        if (buf_hdr)
                return odp_buffer_to_event(buf_hdr->handle.handle);
-- 
1.9.1

Reply via email to