From: Anton Ivanov <anton.iva...@cambridgegreys.com> A ByteQ with unlocked head and tail is unsafe for simultaneous consume/produce.
If simultaneous use is desired, these either need to be locked or there needs to be a third atomic or lock guarded variable "used". An atomic "used" allows the producer to enqueue safely because it "owns" the head and even if the consumer changes the head it will only increase the space available versus the value in "used". Once the data has been written and the enqueued should be made visible it fenced and the used is updated. Similar for "consumer" - it can safely consume now as it "owns" tail and never reads beyond tail + used (wrapped around as needed). Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com> --- lib/byteq.c | 17 ++++++++++++++++- lib/byteq.h | 2 ++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/byteq.c b/lib/byteq.c index 3f865cf9e..da40c2530 100644 --- a/lib/byteq.c +++ b/lib/byteq.c @@ -19,6 +19,7 @@ #include <string.h> #include <unistd.h> #include "util.h" +#include "ovs-atomic.h" /* Initializes 'q' as an empty byteq that uses the 'size' bytes of 'buffer' to * store data. 'size' must be a power of 2. @@ -32,13 +33,16 @@ byteq_init(struct byteq *q, uint8_t *buffer, size_t size) q->buffer = buffer; q->size = size; q->head = q->tail = 0; + q->used = ATOMIC_VAR_INIT(0); } /* Returns the number of bytes current queued in 'q'. */ int byteq_used(const struct byteq *q) { - return q->head - q->tail; + int retval; + atomic_read_relaxed(&q->used, &retval); + return retval; } /* Returns the number of bytes that can be added to 'q' without overflow. */ @@ -68,9 +72,11 @@ byteq_is_full(const struct byteq *q) void byteq_put(struct byteq *q, uint8_t c) { + int discard; ovs_assert(!byteq_is_full(q)); *byteq_head(q) = c; q->head++; + atomic_add(&q->used, 1, &discard); } /* Adds the 'n' bytes in 'p' at the head of 'q', which must have at least 'n' @@ -79,6 +85,7 @@ void byteq_putn(struct byteq *q, const void *p_, size_t n) { const uint8_t *p = p_; + int discard; ovs_assert(byteq_avail(q) >= n); while (n > 0) { size_t chunk = MIN(n, byteq_headroom(q)); @@ -86,6 +93,7 @@ byteq_putn(struct byteq *q, const void *p_, size_t n) byteq_advance_head(q, chunk); p += chunk; n -= chunk; + atomic_add(&q->used, chunk, &discard); } } @@ -103,9 +111,11 @@ uint8_t byteq_get(struct byteq *q) { uint8_t c; + int discard; ovs_assert(!byteq_is_empty(q)); c = *byteq_tail(q); q->tail++; + atomic_sub(&q->used, 1, &discard); return c; } @@ -168,8 +178,10 @@ byteq_tail(const struct byteq *q) void byteq_advance_tail(struct byteq *q, unsigned int n) { + int discard; ovs_assert(byteq_tailroom(q) >= n); q->tail += n; + atomic_sub_relaxed(&q->used, n, &discard); } /* Returns the byte after the last in-use byte of 'q', the point at which new @@ -195,6 +207,9 @@ byteq_headroom(const struct byteq *q) void byteq_advance_head(struct byteq *q, unsigned int n) { + int discard; ovs_assert(byteq_headroom(q) >= n); q->head += n; + atomic_thread_fence(memory_order_release); + atomic_add_relaxed(&q->used, n, &discard); } diff --git a/lib/byteq.h b/lib/byteq.h index d73e3684e..e829efab0 100644 --- a/lib/byteq.h +++ b/lib/byteq.h @@ -19,6 +19,7 @@ #include <stdbool.h> #include <stddef.h> #include <stdint.h> +#include "ovs-atomic.h" /* General-purpose circular queue of bytes. */ struct byteq { @@ -26,6 +27,7 @@ struct byteq { unsigned int size; /* Number of bytes allocated for 'buffer'. */ unsigned int head; /* Head of queue. */ unsigned int tail; /* Chases the head. */ + atomic_int used; }; void byteq_init(struct byteq *, uint8_t *buffer, size_t size); -- 2.20.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev