On 12/05/2020 14:55, Yanqin Wei wrote:
Hi Anton,

I am curious of what is the concurrency use case for the ByteQ lib.  Because 
currently we found some UT cases failure on some weak memory model CPU, like 
arm and ppc.

Next patch :)


Besides this question, I have a few comments inline.

Thanks, I will review them.


Best Regards,
Wei Yanqin

-----Original Message-----
From: dev <ovs-dev-boun...@openvswitch.org> On Behalf Of
anton.iva...@cambridgegreys.com
Sent: Tuesday, May 12, 2020 3:41 PM
To: d...@openvswitch.org
Cc: Anton Ivanov <anton.iva...@cambridgegreys.com>
Subject: [ovs-dev] [PATCH 1/2] Make ByteQ safe for simultaneous
producer/consumer

From: Anton Ivanov <anton.iva...@cambridgegreys.com>

A ByteQ with unlocked head and tail is unsafe for simultaneous
consume/produce.

If simultaneous use is desired, these either need to be locked or there needs to
be a third atomic or lock guarded variable "used".
[Yanqin]  Is the requirement only single consume /single produce? It is better 
to make use case clear.
An atomic "used" allows the producer to enqueue safely because it "owns" the
head and even if the consumer changes the head it will only increase the
space available versus the value in "used".

Once the data has been written and the enqueued should be made visible it
fenced and the used is updated.

Similar for "consumer" - it can safely consume now as it "owns" tail and never
reads beyond tail + used (wrapped around as needed).

Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com>
---
  lib/byteq.c | 17 ++++++++++++++++-
  lib/byteq.h |  2 ++
  2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/lib/byteq.c b/lib/byteq.c
index 3f865cf9e..da40c2530 100644
--- a/lib/byteq.c
+++ b/lib/byteq.c
@@ -19,6 +19,7 @@
  #include <string.h>
  #include <unistd.h>
  #include "util.h"
+#include "ovs-atomic.h"

  /* Initializes 'q' as an empty byteq that uses the 'size' bytes of 'buffer' to
   * store data.  'size' must be a power of 2.
@@ -32,13 +33,16 @@ byteq_init(struct byteq *q, uint8_t *buffer, size_t size)
      q->buffer = buffer;
      q->size = size;
      q->head = q->tail = 0;
+    q->used = ATOMIC_VAR_INIT(0);
  }

  /* Returns the number of bytes current queued in 'q'. */  int
byteq_used(const struct byteq *q)  {
-    return q->head - q->tail;
+    int retval;
+    atomic_read_relaxed(&q->used, &retval);
[Yanqin] "acquire ordering" is required to make dequeue/enqueue not be 
reordered before it.
+    return retval;
  }

  /* Returns the number of bytes that can be added to 'q' without overflow. */
@@ -68,9 +72,11 @@ byteq_is_full(const struct byteq *q)  void
byteq_put(struct byteq *q, uint8_t c)  {
+    int discard;
      ovs_assert(!byteq_is_full(q));
      *byteq_head(q) = c;
      q->head++;
+    atomic_add(&q->used, 1, &discard);
[Yanqin] atomic_add use seq_cst memory ordering. I think release ordering is enough to 
make enqueue not be reordered after "used" update.
  }

  /* Adds the 'n' bytes in 'p' at the head of 'q', which must have at least 'n'
@@ -79,6 +85,7 @@ void
  byteq_putn(struct byteq *q, const void *p_, size_t n)  {
      const uint8_t *p = p_;
+    int discard;
      ovs_assert(byteq_avail(q) >= n);
      while (n > 0) {
          size_t chunk = MIN(n, byteq_headroom(q)); @@ -86,6 +93,7 @@
byteq_putn(struct byteq *q, const void *p_, size_t n)
          byteq_advance_head(q, chunk);
          p += chunk;
          n -= chunk;
+        atomic_add(&q->used, chunk, &discard);
[Yanqin] Ditto
      }
  }

@@ -103,9 +111,11 @@ uint8_t
  byteq_get(struct byteq *q)
  {
      uint8_t c;
+    int discard;
      ovs_assert(!byteq_is_empty(q));
      c = *byteq_tail(q);
      q->tail++;
+    atomic_sub(&q->used, 1, &discard);
[Yanqin] Ditto
      return c;
  }

@@ -168,8 +178,10 @@ byteq_tail(const struct byteq *q)  void
byteq_advance_tail(struct byteq *q, unsigned int n)  {
+    int discard;
      ovs_assert(byteq_tailroom(q) >= n);
      q->tail += n;
+    atomic_sub_relaxed(&q->used, n, &discard);
[Yanqin] Why relax ordering here? Tail updating may be reordered after that.
  }

  /* Returns the byte after the last in-use byte of 'q', the point at which new
@@ -195,6 +207,9 @@ byteq_headroom(const struct byteq *q)  void
byteq_advance_head(struct byteq *q, unsigned int n)  {
+    int discard;
      ovs_assert(byteq_headroom(q) >= n);
      q->head += n;
+    atomic_thread_fence(memory_order_release);
+    atomic_add_relaxed(&q->used, n, &discard);
[Yanqin] Suggest to use one-way barrier here.
  }
diff --git a/lib/byteq.h b/lib/byteq.h
index d73e3684e..e829efab0 100644
--- a/lib/byteq.h
+++ b/lib/byteq.h
@@ -19,6 +19,7 @@
  #include <stdbool.h>
  #include <stddef.h>
  #include <stdint.h>
+#include "ovs-atomic.h"

  /* General-purpose circular queue of bytes. */  struct byteq { @@ -26,6 +27,7
@@ struct byteq {
      unsigned int size;          /* Number of bytes allocated for 'buffer'. */
      unsigned int head;          /* Head of queue. */
      unsigned int tail;          /* Chases the head. */
+    atomic_int used;
  };

  void byteq_init(struct byteq *, uint8_t *buffer, size_t size);
--
2.20.1

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev
IMPORTANT NOTICE: The contents of this email and any attachments are 
confidential and may also be privileged. If you are not the intended recipient, 
please notify the sender immediately and do not disclose the contents to any 
other person, use it for any purpose, or store or copy the information in any 
medium. Thank you.

--
Anton R. Ivanov
Cambridgegreys Limited. Registered in England. Company Number 10273661
https://www.cambridgegreys.com/

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to