This patch changes the queue_mpsc to make the poplist variable an atomic.
The poplist is intended to be manipulated by single thread at any point
in time (single consumer) which means that do not need to protect it from 
regular races.
However the poplist consumers (even when one at the time) may run on different 
CPUs
and therefore need to SEE the latest value of this variable 
after the write on a different CPUs. For that reason we make poplist
an atomic and use acquire/release semantics to archieve cross-CPU visibility.

Signed-off-by: Waldemar Kozaczuk <jwkozac...@gmail.com>
---
 include/lockfree/queue-mpsc.hh | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)

diff --git a/include/lockfree/queue-mpsc.hh b/include/lockfree/queue-mpsc.hh
index f0646571..968e0bea 100644
--- a/include/lockfree/queue-mpsc.hh
+++ b/include/lockfree/queue-mpsc.hh
@@ -46,7 +46,7 @@ template <class LT>
 class queue_mpsc {
 private:
     std::atomic<LT*> pushlist;
-    LT* poplist;
+    std::atomic<LT*> poplist;
 public:
     constexpr queue_mpsc<LT>() : pushlist(nullptr), poplist(nullptr) { }
 
@@ -70,14 +70,14 @@ public:
 
     inline LT* pop()
     {
-        if (poplist) {
+        LT* _poplist = poplist.load(std::memory_order_acquire);
+        if (_poplist) {
             // The poplist (prepared by an earlier pop) is not empty, so pop
             // from it. We don't need any locking to access the poplist, as
             // it is only touched by pop operations, and our assumption (of a
             // single-consumer queue) is that pops cannot be concurrent.
-            LT* result = poplist;
-            poplist = poplist->next;
-            return result;
+            poplist.store(_poplist->next, std::memory_order_release);
+            return _poplist;
         } else {
             // The poplist is empty. Atomically take the entire pushlist
             // (pushers may continue to to push concurrently, so the atomicity
@@ -96,17 +96,18 @@ public:
             // of the pop.
             while (r->next) {
                 LT *next = r->next;
-                r->next = poplist;
-                poplist = r;
+                r->next = _poplist;
+                _poplist = r;
                 r = next;
             }
+            poplist.store(_poplist, std::memory_order_release);
             return r;
         }
     }
 
     inline bool empty(void) const
     {
-           return (!poplist && !pushlist.load(std::memory_order_relaxed));
+           return (!poplist.load(std::memory_order_acquire) && 
!pushlist.load(std::memory_order_relaxed));
     }
 
     class iterator {
@@ -132,7 +133,7 @@ public:
 
     // iteration; only valid from consumer side; invalidated by pop()s
     // The iterator is NOT ordered.
-    iterator begin() { return { pushlist.load(std::memory_order_acquire), 
poplist }; }
+    iterator begin() { return { pushlist.load(std::memory_order_acquire), 
poplist.load(std::memory_order_acquire) }; }
     iterator end() { return { nullptr, nullptr }; }
 };
 
-- 
2.30.2

-- 
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to osv-dev+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/osv-dev/20210516041948.989655-3-jwkozaczuk%40gmail.com.

Reply via email to