Added multi-data versions of ring enqueue and dequeue operations. Signed-off-by: Petri Savolainen <petri.savolai...@nokia.com> --- platform/linux-generic/include/odp_ring_internal.h | 65 ++++++++++++++++++++++ 1 file changed, 65 insertions(+)
diff --git a/platform/linux-generic/include/odp_ring_internal.h b/platform/linux-generic/include/odp_ring_internal.h index 6a6291a..55fedeb 100644 --- a/platform/linux-generic/include/odp_ring_internal.h +++ b/platform/linux-generic/include/odp_ring_internal.h @@ -80,6 +80,45 @@ static inline uint32_t ring_deq(ring_t *ring, uint32_t mask) return data; } +/* Dequeue multiple data from the ring head. Num is smaller than ring size. */ +static inline uint32_t ring_deq_multi(ring_t *ring, uint32_t mask, + uint32_t data[], uint32_t num) +{ + uint32_t head, tail, new_head, i; + + head = odp_atomic_load_u32(&ring->r_head); + + /* Move reader head. This thread owns data at the new head. */ + do { + tail = odp_atomic_load_u32(&ring->w_tail); + + /* Ring is empty */ + if (head == tail) + return 0; + + /* Try to take all available */ + if ((tail - head) < num) + num = tail - head; + + new_head = head + num; + + } while (odp_unlikely(odp_atomic_cas_acq_u32(&ring->r_head, &head, + new_head) == 0)); + + /* Read queue index */ + for (i = 0; i < num; i++) + data[i] = ring->data[(head + 1 + i) & mask]; + + /* Wait until other readers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) != head)) + odp_cpu_pause(); + + /* Now update the reader tail */ + odp_atomic_store_rel_u32(&ring->r_tail, new_head); + + return num; +} + /* Enqueue data into the ring tail */ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) { @@ -104,6 +143,32 @@ static inline void ring_enq(ring_t *ring, uint32_t mask, uint32_t data) odp_atomic_store_rel_u32(&ring->w_tail, new_head); } +/* Enqueue multiple data into the ring tail. Num is smaller than ring size. */ +static inline void ring_enq_multi(ring_t *ring, uint32_t mask, uint32_t data[], + uint32_t num) +{ + uint32_t old_head, new_head, i; + + /* Reserve a slot in the ring for writing */ + old_head = odp_atomic_fetch_add_u32(&ring->w_head, num); + new_head = old_head + 1; + + /* Ring is full. Wait for the last reader to finish. */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->r_tail) == new_head)) + odp_cpu_pause(); + + /* Write data */ + for (i = 0; i < num; i++) + ring->data[(new_head + i) & mask] = data[i]; + + /* Wait until other writers have updated the tail */ + while (odp_unlikely(odp_atomic_load_acq_u32(&ring->w_tail) != old_head)) + odp_cpu_pause(); + + /* Now update the writer tail */ + odp_atomic_store_rel_u32(&ring->w_tail, old_head + num); +} + #ifdef __cplusplus } #endif -- 2.8.1