Petri Savolainen(psavol) replied on github web page: platform/linux-generic/include/odp_ring_st_internal.h @@ -0,0 +1,118 @@ +/* Copyright (c) 2018, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#ifndef ODP_RING_ST_INTERNAL_H_ +#define ODP_RING_ST_INTERNAL_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <odp/api/hints.h> +#include <odp_align_internal.h> + +/* Basic ring for single thread usage. Operations must be synchronized by using + * locks (or other means), when multiple threads use the same ring. */ +typedef struct { + uint32_t head; + uint32_t tail; + uint32_t mask; + uint32_t *data; + +} ring_st_t; + +/* Initialize ring. Ring size must be a power of two. */ +static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t size) +{ + ring->head = 0; + ring->tail = 0; + ring->mask = size - 1; + ring->data = data; +} + +/* Dequeue data from the ring head. Max_num is smaller than ring size.*/ +static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[], + uint32_t max_num) +{ + uint32_t head, tail, mask, idx; + uint32_t num, i; + + head = ring->head; + tail = ring->tail; + mask = ring->mask; + num = tail - head; + + /* Empty */ + if (num == 0) + return 0; + + if (num > max_num) + num = max_num; + + idx = head & mask; + + for (i = 0; i < num; i++) { + data[i] = ring->data[idx]; + idx = (idx + 1) & mask; + } + + ring->head = head + num; + + return num; +} + +/* Enqueue data into the ring tail. Num_data is smaller than ring size. */ +static inline uint32_t ring_st_enq_multi(ring_st_t *ring, const uint32_t data[], + uint32_t num_data) +{ + uint32_t head, tail, mask, size, idx; + uint32_t num, i; + + head = ring->head; + tail = ring->tail; + mask = ring->mask; + size = mask + 1; + num = size - (tail - head); + + /* Full */ + if (num == 0) + return 0; + + if (num > num_data) + num = num_data; + + idx = tail & mask; + + for (i = 0; i < num; i++) { + ring->data[idx] = data[i]; + idx = (idx + 1) & mask; + } + + ring->tail = tail + num; + + return num; +} + +/* Check if ring is empty */ +static inline int ring_st_is_empty(ring_st_t *ring) +{ + uint32_t head, tail, num; + + head = ring->head; + tail = ring->tail; + num = tail - head; + + if (num == 0) + return 1; + + return 0;
Comment: OK. Compiler probably did that already, but changed in v2. > Petri Savolainen(psavol) wrote: > Tail and head indexes are (masked from) uint32_t and do not wrap around when > the ring is full. I think you assume that the store index is 0...size-1, > while it's full uint32_t which is then masked to get the actual index. > > For example: > size = 100; > > Empty: > head = 100 > tail = 100 > num = 100 - 100 = 0 > > Full: > head = 100 > tail = 200 > num = 200 - 100 = 100 > > Wrap uint32_t + full: > head = 0xFFFFFF9C > tail = 0 > num = 0 - 0xFFFFFF9C = 0x64 = 100 > > So, no abs() needed. Ring size can be 4096, instead of 4095. >> Petri Savolainen(psavol) wrote: >> It's already documented 5 lines above: >> >> /* Initialize ring. Ring size must be a power of two. */ >> static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t >> size) >> { >>> Petri Savolainen(psavol) wrote: >>> This function converts 32 bit buffer indexes to buffer header pointers. The >>> counter operation is buffer_index_from_buf(). The prefetch is a side effect >>> of the function, which may be changed/moved any time if it's found out that >>> there's a place for prefetching. I actually plan to test if number of >>> prefetches should be limited as e.g. 32 consecutive prefetches may be too >>> much for some CPU architectures. >>>> Petri Savolainen(psavol) wrote: >>>> I prefer style where '== 0' is used instead of '!'. Especially, when the >>>> if clause is as complex as this and there's danger for reader to miss the >>>> '!' sign. >>>>> Petri Savolainen(psavol) wrote: >>>>> It's there to ensure that all bits are zero also when someone would >>>>> modify the bitfield from two to three fields later on. Similarly to >>>>> memset() zero is used for struct inits. >>>>>> Petri Savolainen(psavol) wrote: >>>>>> There's no need for abs(). Since it's all uint32_t variables, wrap a >>>>>> round is handled already. >>>>>> An example in 8bits: >>>>>> 0xff - 0xfd = 0x02 >>>>>> 0x00 - 0xfe = 0x02 >>>>>> 0x01 - 0xff = 0x02 >>>>>> 0x02 - 0x00 = 0x02 >>>>>> >>>>>> This passes both gcc and clang, and is used already in the other ring >>>>>> implementation see ring_deq_multi(). >>>>>>> Petri Savolainen(psavol) wrote: >>>>>>> I prefer style with blank line in the end of a typedef, since it's >>>>>>> easier to spot the type name (as it's not mixed into struct field >>>>>>> names). Checkpatch passes so this style should be OK. >>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>> Does this mean that sizes larger than 32 have no added performance >>>>>>>> benefit? >>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>> Must use `CONFIG_QUEUE_SIZE - 1` here, as noted earlier, if we're not >>>>>>>>> going to use the user-supplied queue size. >>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>> Given its name, this looks like an extraneous statement that should >>>>>>>>>> be deleted. Renaming this to something like >>>>>>>>>> `prefetch_dequeued_bufs()` would make the intent clearer here. >>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>> `if (!ring_st_is_empty(&queue->s.ring_st))` seems more natural here. >>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>> Change to `if (param->size >= CONFIG_QUEUE_SIZE)` to handle the >>>>>>>>>>>> effective queue capacity. The user-supplied `size` should then be >>>>>>>>>>>> set to `ROUNDUP_POWER2_U32(size) - 1` for the masking to work >>>>>>>>>>>> properly. >>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>> Same comment here as for plain queues. >>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>> As noted earlier, due to "losing" one entry to distinguish queue >>>>>>>>>>>>>> empty/full, this should be returned as `CONFIG_QUEUE_SIZE - 1`, >>>>>>>>>>>>>> and we also need to ensure that `CONFIG_QUEUE_SIZE` is itself a >>>>>>>>>>>>>> power of 2. >>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>> Since you're initializing `index.pool` and `index.buffer` >>>>>>>>>>>>>>> there's no need to set `index.u32` here. >>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>> We originally had this index partitioning based on >>>>>>>>>>>>>>>> `ODP_CONFIG_POOLS`. Do we want to return to that here? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> If not, we at least need an `ODP_STATIC_ASSERT()` to ensure >>>>>>>>>>>>>>>> that `ODP_CONFIG_POOLS < 256` or else bad things will happen >>>>>>>>>>>>>>>> here. >>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>> This routine can be optimized to: >>>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>>> return ring->head == ring->tail; >>>>>>>>>>>>>>>>> ``` >>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>> Your invariant is the queue is empty when `head == tail` >>>>>>>>>>>>>>>>>> therefore the queue is full when `abs(tail - head) == mask`, >>>>>>>>>>>>>>>>>> so the correct calculation here is: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> `num = mask - abs(tail - head);` >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The effect is that a queue can only hold `size - 1` >>>>>>>>>>>>>>>>>> elements, otherwise you cannot distinguish between a full >>>>>>>>>>>>>>>>>> and an empty queue without another bit of metadata, which is >>>>>>>>>>>>>>>>>> a cost you're trying to avoid. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This is somewhat problematic if the caller is trying to be >>>>>>>>>>>>>>>>>> "optimal" by specifying a power of two in the `size` >>>>>>>>>>>>>>>>>> parameter of the `odp_queue_param_t` passed to >>>>>>>>>>>>>>>>>> `odp_queue_create()`. For this reason we may wish to return >>>>>>>>>>>>>>>>>> a `max_size` of a power of 2 - 1 in `odp_queue_capability()` >>>>>>>>>>>>>>>>>> as part of this patch series. >>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>> This only works if `size` is a power of 2. Should be >>>>>>>>>>>>>>>>>>> documented as such, since this is an internal routine. In >>>>>>>>>>>>>>>>>>> this case an `ODP_ASSERT(size == ROUNDUP_POWER2_U32(size))` >>>>>>>>>>>>>>>>>>> for this requirement would be a useful debugging aid. >>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>> should be `num = abs(tail - head);` to deal with wrap >>>>>>>>>>>>>>>>>>>> arounds, otherwise may be misinterpreted as overly large >>>>>>>>>>>>>>>>>>>> since it's `uint32_t`. Note that GCC and clang recognize >>>>>>>>>>>>>>>>>>>> `abs()` and treat it as a builtin, so there's no actual >>>>>>>>>>>>>>>>>>>> `stdlib.h` call here. >>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote: >>>>>>>>>>>>>>>>>>>>> Extra blank line should be removed (nit). https://github.com/Linaro/odp/pull/492#discussion_r169886934 updated_at 2018-02-22 09:32:40