Petri Savolainen(psavol) replied on github web page:

platform/linux-generic/include/odp_ring_st_internal.h
@@ -0,0 +1,118 @@
+/* Copyright (c) 2018, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+#ifndef ODP_RING_ST_INTERNAL_H_
+#define ODP_RING_ST_INTERNAL_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <odp/api/hints.h>
+#include <odp_align_internal.h>
+
+/* Basic ring for single thread usage. Operations must be synchronized by using
+ * locks (or other means), when multiple threads use the same ring. */
+typedef struct {
+       uint32_t head;
+       uint32_t tail;
+       uint32_t mask;
+       uint32_t *data;
+
+} ring_st_t;
+
+/* Initialize ring. Ring size must be a power of two. */
+static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t size)
+{
+       ring->head = 0;
+       ring->tail = 0;
+       ring->mask = size - 1;
+       ring->data = data;
+}
+
+/* Dequeue data from the ring head. Max_num is smaller than ring size.*/
+static inline uint32_t ring_st_deq_multi(ring_st_t *ring, uint32_t data[],
+                                        uint32_t max_num)
+{
+       uint32_t head, tail, mask, idx;
+       uint32_t num, i;
+
+       head = ring->head;
+       tail = ring->tail;
+       mask = ring->mask;
+       num  = tail - head;
+
+       /* Empty */
+       if (num == 0)
+               return 0;
+
+       if (num > max_num)
+               num = max_num;
+
+       idx = head & mask;
+
+       for (i = 0; i < num; i++) {
+               data[i] = ring->data[idx];
+               idx     = (idx + 1) & mask;
+       }
+
+       ring->head = head + num;
+
+       return num;
+}
+
+/* Enqueue data into the ring tail. Num_data is smaller than ring size. */
+static inline uint32_t ring_st_enq_multi(ring_st_t *ring, const uint32_t 
data[],
+                                        uint32_t num_data)
+{
+       uint32_t head, tail, mask, size, idx;
+       uint32_t num, i;
+
+       head = ring->head;
+       tail = ring->tail;
+       mask = ring->mask;
+       size = mask + 1;
+       num  = size - (tail - head);
+
+       /* Full */
+       if (num == 0)
+               return 0;
+
+       if (num > num_data)
+               num = num_data;
+
+       idx = tail & mask;
+
+       for (i = 0; i < num; i++) {
+               ring->data[idx] = data[i];
+               idx     = (idx + 1) & mask;
+       }
+
+       ring->tail = tail + num;
+
+       return num;
+}
+
+/* Check if ring is empty */
+static inline int ring_st_is_empty(ring_st_t *ring)
+{
+       uint32_t head, tail, num;
+
+       head = ring->head;
+       tail = ring->tail;
+       num  = tail - head;
+
+       if (num == 0)
+               return 1;
+
+       return 0;


Comment:
OK. Compiler probably did that already, but changed in v2.

> Petri Savolainen(psavol) wrote:
> Tail and head indexes are (masked from) uint32_t and do not wrap around when 
> the ring is full. I think you assume that the store index is 0...size-1, 
> while it's full uint32_t which is then masked to get the actual index.
> 
> For example:
> size = 100;
> 
> Empty:
> head = 100
> tail = 100
> num = 100 - 100 = 0
> 
> Full:
> head = 100
> tail = 200
> num = 200 - 100 = 100
> 
> Wrap uint32_t + full:
> head = 0xFFFFFF9C
> tail = 0
> num = 0 - 0xFFFFFF9C = 0x64 = 100
> 
> So, no abs() needed. Ring size can be 4096, instead of 4095.


>> Petri Savolainen(psavol) wrote:
>> It's already documented 5 lines above:
>> 
>> /* Initialize ring. Ring size must be a power of two. */
>> static inline void ring_st_init(ring_st_t *ring, uint32_t *data, uint32_t 
>> size)
>> {


>>> Petri Savolainen(psavol) wrote:
>>> This function converts 32 bit buffer indexes to buffer header pointers. The 
>>> counter operation is buffer_index_from_buf(). The prefetch is a side effect 
>>> of the function, which may be changed/moved any time if it's found out that 
>>> there's a place for prefetching. I actually plan to test if number of 
>>> prefetches should be limited as e.g. 32 consecutive prefetches may be too 
>>> much for some CPU architectures.


>>>> Petri Savolainen(psavol) wrote:
>>>> I prefer style where '== 0' is used instead of '!'. Especially, when the 
>>>> if clause is as complex as this and there's danger for reader to miss the 
>>>> '!' sign. 


>>>>> Petri Savolainen(psavol) wrote:
>>>>> It's there to ensure that all bits are zero also when someone would 
>>>>> modify the bitfield from two to three fields later on. Similarly to 
>>>>> memset() zero is used for struct inits.


>>>>>> Petri Savolainen(psavol) wrote:
>>>>>> There's no need for abs(). Since it's all uint32_t variables, wrap a 
>>>>>> round is handled already.
>>>>>> An example in 8bits:
>>>>>> 0xff - 0xfd = 0x02
>>>>>> 0x00 - 0xfe = 0x02
>>>>>> 0x01 - 0xff = 0x02
>>>>>> 0x02 - 0x00 = 0x02
>>>>>> 
>>>>>> This passes both gcc and clang, and is used already in the other ring 
>>>>>> implementation see ring_deq_multi().


>>>>>>> Petri Savolainen(psavol) wrote:
>>>>>>> I prefer style with blank line in the end of a typedef, since it's 
>>>>>>> easier to spot the type name (as it's not mixed into struct field 
>>>>>>> names). Checkpatch passes so this style should be OK.


>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>> Does this mean that sizes larger than 32 have no added performance 
>>>>>>>> benefit?


>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>> Must use `CONFIG_QUEUE_SIZE - 1` here, as noted earlier, if we're not 
>>>>>>>>> going to use the user-supplied queue size.


>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>> Given its name, this looks like an extraneous statement that should 
>>>>>>>>>> be deleted. Renaming this to something like 
>>>>>>>>>> `prefetch_dequeued_bufs()` would make the intent clearer here.


>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>> `if (!ring_st_is_empty(&queue->s.ring_st))` seems more natural here.


>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>> Change to `if (param->size >= CONFIG_QUEUE_SIZE)` to handle the 
>>>>>>>>>>>> effective queue capacity. The user-supplied `size` should then be 
>>>>>>>>>>>> set to `ROUNDUP_POWER2_U32(size) - 1` for the masking to work 
>>>>>>>>>>>> properly.


>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>> Same comment here as for plain queues.


>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>> As noted earlier, due to "losing" one entry to distinguish queue 
>>>>>>>>>>>>>> empty/full, this should be returned as `CONFIG_QUEUE_SIZE - 1`, 
>>>>>>>>>>>>>> and we also need to ensure that `CONFIG_QUEUE_SIZE` is itself a 
>>>>>>>>>>>>>> power of 2.


>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>> Since you're initializing `index.pool` and `index.buffer` 
>>>>>>>>>>>>>>> there's no need to set `index.u32` here.


>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>> We originally had this index partitioning based on 
>>>>>>>>>>>>>>>> `ODP_CONFIG_POOLS`. Do we want to return to that here?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> If not, we at least need an `ODP_STATIC_ASSERT()` to ensure 
>>>>>>>>>>>>>>>> that `ODP_CONFIG_POOLS < 256` or else bad things will happen 
>>>>>>>>>>>>>>>> here.


>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>>> This routine can be optimized to:
>>>>>>>>>>>>>>>>> ```
>>>>>>>>>>>>>>>>> return ring->head == ring->tail;
>>>>>>>>>>>>>>>>> ```


>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>>>> Your invariant is the queue is empty when `head == tail` 
>>>>>>>>>>>>>>>>>> therefore the queue is full when `abs(tail - head) == mask`, 
>>>>>>>>>>>>>>>>>> so the correct calculation here is:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> `num = mask - abs(tail - head);`
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> The effect is that a queue can only hold `size - 1` 
>>>>>>>>>>>>>>>>>> elements, otherwise you cannot distinguish between a full 
>>>>>>>>>>>>>>>>>> and an empty queue without another bit of metadata, which is 
>>>>>>>>>>>>>>>>>> a cost you're trying to avoid.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> This is somewhat problematic if the caller is trying to be 
>>>>>>>>>>>>>>>>>> "optimal" by specifying a power of two in the `size` 
>>>>>>>>>>>>>>>>>> parameter of the `odp_queue_param_t` passed to 
>>>>>>>>>>>>>>>>>> `odp_queue_create()`. For this reason we may wish to return 
>>>>>>>>>>>>>>>>>> a `max_size` of a power of 2 - 1 in `odp_queue_capability()` 
>>>>>>>>>>>>>>>>>> as part of this patch series.


>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>>>>> This only works if `size` is a power of 2. Should be 
>>>>>>>>>>>>>>>>>>> documented as such, since this is an internal routine. In 
>>>>>>>>>>>>>>>>>>> this case an `ODP_ASSERT(size == ROUNDUP_POWER2_U32(size))` 
>>>>>>>>>>>>>>>>>>> for this requirement would be a useful debugging aid.


>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>>>>>> should be `num = abs(tail - head);` to deal with wrap 
>>>>>>>>>>>>>>>>>>>> arounds, otherwise may be misinterpreted as overly large 
>>>>>>>>>>>>>>>>>>>> since it's `uint32_t`. Note that GCC and clang recognize 
>>>>>>>>>>>>>>>>>>>> `abs()` and treat it as a builtin, so there's no actual 
>>>>>>>>>>>>>>>>>>>> `stdlib.h` call here.


>>>>>>>>>>>>>>>>>>>>> Bill Fischofer(Bill-Fischofer-Linaro) wrote:
>>>>>>>>>>>>>>>>>>>>> Extra blank line should be removed (nit).


https://github.com/Linaro/odp/pull/492#discussion_r169886934
updated_at 2018-02-22 09:32:40

Reply via email to