Added multi-data versions of ring enqueue and dequeue operations.

Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com>
---
 platform/linux-generic/include/odp_ring_internal.h | 65 ++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/platform/linux-generic/include/odp_ring_internal.h 
b/platform/linux-generic/include/odp_ring_internal.h
index 6a6291a..55fedeb 100644
--- a/platform/linux-generic/include/odp_ring_internal.h
+++ b/platform/linux-generic/include/odp_ring_internal.h
@@ -80,6 +80,45 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask)
        return data;
 }
 
+/* Dequeue multiple data from the ring head. Num is smaller than ring size. */
+static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask,
+                                     uint32_t data[], uint32_t num)
+{
+       uint32_t head, tail, new_head, i;
+
+       head = odp_atomic_load_u32(&ring->r_head);
+
+       /* Move reader head. This thread owns data at the new head. */
+       do {
+               tail = odp_atomic_load_u32(&ring->w_tail);
+
+               /* Ring is empty */
+               if (head == tail)
+                       return 0;
+
+               /* Try to take all available */
+               if ((tail - head) < num)
+                       num = tail - head;
+
+               new_head = head + num;
+
+       } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head,
+                             new_head) == 0));
+
+       /* Read queue index */
+       for (i = 0; i < num; i++)
+               data[i] = ring->data[(head + 1 + i) & mask];
+
+       /* Wait until other readers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head))
+               odp_cpu_pause();
+
+       /* Now update the reader tail */
+       odp_atomic_store_rel_u32(&ring->r_tail, new_head);
+
+       return num;
+}
+
 /* Enqueue data into the ring tail */
 static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data)
 {
@@ -104,6 +143,32 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, 
uint32_t data)
        odp_atomic_store_rel_u32(&ring->w_tail, new_head);
 }
 
+/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */
+static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[],
+                                 uint32_t num)
+{
+       uint32_t old_head, new_head, i;
+
+       /* Reserve a slot in the ring for writing */
+       old_head = odp_atomic_fetch_add_u32(&ring->w_head, num);
+       new_head = old_head + 1;
+
+       /* Ring is full. Wait for the last reader to finish. */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head))
+               odp_cpu_pause();
+
+       /* Write data */
+       for (i = 0; i < num; i++)
+               ring->data[(new_head + i) & mask] = data[i];
+
+       /* Wait until other writers have updated the tail */
+       while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head))
+               odp_cpu_pause();
+
+       /* Now update the writer tail */
+       odp_atomic_store_rel_u32(&ring->w_tail, old_head + num);
+}
+
 #ifdef __cplusplus
 }
 #endif
-- 
2.8.1

Reply via email to