This patch modifies waiter/wait_record and multiple-producers single-consumer
queue to make them work correctly on weak memory model architectures like ARM.
These two constitute the building blocks of the very critical synchronization
mechanism in OSv - lock-free mutex.

As the issue #1123 describes, OSv quite often hangs while booting on 2 or
more CPUs (SMP mode). Inspecting state of threads and relevant variables
seems to reveal that threads waiting for some mutex end up waiting "forever"
even though corresponding "waker" threads did wake them (called thread::wake()).
More specifically the thread::do_wait_until() (called by waiter::wait()),
running on cpu 1, does not seem to SEE the expected value - nullptr -
of the shared variable "t" (waiting thread), even though the other thread 
(waker),
running on cpu 2, did call do_wake_with() method (called by 
wake_with_from_mutex())
as part of the responsibility hand-off protocol.

The problem is that the waiter member variable "t" (pointer to the waiting 
thread)
is a shared variable and for it to be properly synchronized and visible across
multiple CPUs, the reads and writes to it need to be guarded with correct
read and writes to some atomic variable to establish appropriate "inter-thread 
happens-before"
relationship. According to the C++ memory model, the "inter-thread 
happens-before"
is based on "happens-before" and "synchronizes-with" relationships. The former
one is quite intuitive, whereas the latter can be formulated as such:

"A suitably-tagged atomic write operation W, on a variable, x, synchronizes
with a suitably-tagged atomic read operation on x that reads the value STORED
by either that write, W, or a subsequent atomic write operation on x by the same
thread that performed the initial write, W, or a sequence of atomic 
read-modify-write
operations on x (such as fetch_add() or compare_exchange_weak()) by any thread,
where the value read by the 1st thread in the sequence is the value written by 
W." -
from the section 5.3.1, chapter "The C++ memory model and operations on atomic 
types"
from the book "C++ Concurrency in Action" by Anthony Williams.

In essence it means, that if the a write to a shared variable, t, 
"happens-before" the
atomic write operation W, on a variable, x, on thread T1, the corresponding 
read of, t,
that "happens-after" the "synchronize-with" read operation on, x, on thread T2, 
SHOULD see
the value of t as written on thread T1 before W.

This simple example illustrates the above:

std::vector<int> data; // shared variable
std::atomic<bool> data_ready(false);

void reader_thread()
{
    while(!data_ready.load())                // (1)
    {
        std::this_thread::sleep(std::chrono::milliseconds(1));
    }
    std::cout<<"The answer="<<data[0]<<"\n"; // (2)
}

void writer_thread()
{
    data.push_back(42);                      // (3)
    data_ready=true;                         // (4)
}

where (1) happens-before (2), (3) happens-before (4), (4) synchronizes-with (1) 
if
(1) reads true and finally (3) inter-thread happens-before (2) in such case.

So coming back to the waiter member variable, t, it needs to be guarded by a 
"synchronize-with"
operation on some atomic variable. It seems that in context of 
do_wake_with()/do_wait_until(),
such candidate could be the thread member variable _detached_state.st which is 
of type atomic<status>.
As a matter of fact, the thread::prepare_wait() (called by do_wait_until()) 
does call load() and store()
on _detached_state.st and then thread::wake_impl() (called by do_wake_with()) 
calls compare_exchange_weak()
on _detached_state.st in a loop, which might seem to provide the desired 
"synchronize-with" relationship.
The problem is, that the compare_exchange_weak() loop in wake_impl() sometimes 
exits without
writing to _detached_state.st if the status of the thread was neither waiting 
nor sending_lock. In such
cases wake_impl() fails to provide necessary "synchronize-with" relationship 
with a store/load in
prepare_wait() and therefore the write to the waiter shared variable t may not 
be visible on the cpu
executing prepare_wait().

It is not clear how to change the relevant code to provide necessary 
"synchronize-with" relationship
around the member variable t. In lieu of that, this patch changes the waiter 
member variable "t"
to the atomic and replaces all reads and writes with the 
load(std::memory_order_acquire)
and store(...,std::memory_order_release) calls respectively. The 
acquire-release memory ordering is
enough to enforce, that the writes to t should be correctly visible across 
other CPUs.

This patch also changes bsd/porting/synch.cc to change _awake variable to an 
atomic
for similar reasons as the t variable in the waiter. In this case, the 
synch_port::wakeup*()
methods use "void thread::wake_with(Action action)" and we need to make sure 
the _awake variable
is visible between CPUs.

Lastly this patch also changes the queue_mpsc to make the poplist variable an 
atomic as well.
The poplist is intended to be manipulated by single thread at any point in time 
(single consumer)
which means that do not need to protect it from regular races. However the 
poplist consumers (even when
one at the time) may run on different CPUs and therefore need to SEE the latest 
value of this variable 
after the write on a different CPUs. For that reason we make poplist an atomic 
and use acquire/release
semantics to archieve cross-CPU visibility.

It is is very hard to prove correctness of this patch, but based on the 
empirical results
I was able to run a hello example on 2 CPUs over 100K times where before it 
would hang before
executing it 100 times.

Finally please note that on x86_64, the operations 
load(std::memory_order_acquire) and
store(...,std::memory_order_release) should not make binary more expensive, as 
the strong memory
model comes with implicit acquire and release semantics and therefore the 
load() and store() should not
translate to any LOCK-ed instructions.

On a general note, we have never seen these kind of problems on x86_64. But I 
think it can be explained by
the fact that Intel/AMD architecture comes with a strong memory model which is 
way more forgiving.
More specifically, x86_64 operates according to so called TSO (Total Store 
Order) model where
regular reads and writes come implicitly with acquire and release semantics and 
the atomic sequential
writes (the ones that translate to LOCK-ed instructions) acquire global system 
memory lock and flush
the CPU store buffers (the reads wait until lock is released). This means that 
the LOCK-ed instructions
enforce sequential consistency.

The ARM architecture comes with weak memory model where each CPUs in essence 
has it own seperate
view of memory synced via common interconnect. For example, the memory system 
does not guarantee
that a write becomes visible to all other CPUs at the same time (this behavior 
is called write non-atomicity).
So the ARM is way less forgiving.

Fixes #1123

Signed-off-by: Waldemar Kozaczuk <jwkozac...@gmail.com>
---
 bsd/porting/synch.cc           | 13 +++++++------
 include/lockfree/queue-mpsc.hh | 19 ++++++++++---------
 include/osv/wait_record.hh     | 16 ++++++++--------
 3 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/bsd/porting/synch.cc b/bsd/porting/synch.cc
index 59e82b7b..1641ac29 100644
--- a/bsd/porting/synch.cc
+++ b/bsd/porting/synch.cc
@@ -26,7 +26,7 @@ TRACEPOINT(trace_synch_wakeup_one_waking, "chan=%p 
thread=%p", void *, void *);
 
 struct synch_thread {
     sched::thread* _thread;
-    bool _awake;
+    std::atomic<bool> _awake;
 };
 
 class synch_port {
@@ -69,7 +69,7 @@ int synch_port::_msleep(void *chan, struct mtx *mtx,
     // Init the wait
     synch_thread wait;
     wait._thread = sched::thread::current();
-    wait._awake = false;
+    wait._awake.store(false, std::memory_order_release);
 
     if (mtx) {
         wait_lock = &mtx->_mutex;
@@ -100,7 +100,8 @@ int synch_port::_msleep(void *chan, struct mtx *mtx,
     {
         sched::thread::wait_until_interruptible([&] {
             return ( (timo_hz && t.expired()) ||
-                     (wait._awake) );
+                     (wait._awake.load(std::memory_order_acquire)) );
+
         });
     }
     catch (int e)
@@ -113,7 +114,7 @@ int synch_port::_msleep(void *chan, struct mtx *mtx,
         mutex_lock(wait_lock);
     }
     // msleep timeout
-    if (!wait._awake) {
+    if (!wait._awake.load(std::memory_order_acquire)) {
         trace_synch_msleep_expired(chan);
         if (chan) {
             // A pointer to the local "wait" may still be on the list -
@@ -146,7 +147,7 @@ void synch_port::wakeup(void* chan)
     for (auto it=ppp.first; it!=ppp.second; ++it) {
         synch_thread* wait = (*it).second;
         trace_synch_wakeup_waking(chan, wait->_thread);
-        wait->_thread->wake_with([&] { wait->_awake = true; });
+        wait->_thread->wake_with([&] { wait->_awake.store(true, 
std::memory_order_release); });
     }
     _evlist.erase(ppp.first, ppp.second);
     mutex_unlock(&_lock);
@@ -163,7 +164,7 @@ void synch_port::wakeup_one(void* chan)
         synch_thread* wait = (*it).second;
         _evlist.erase(it);
         trace_synch_wakeup_one_waking(chan, wait->_thread);
-        wait->_thread->wake_with([&] { wait->_awake = true; });
+        wait->_thread->wake_with([&] { wait->_awake.store(true, 
std::memory_order_release); });
     }
     mutex_unlock(&_lock);
 }
diff --git a/include/lockfree/queue-mpsc.hh b/include/lockfree/queue-mpsc.hh
index f0646571..968e0bea 100644
--- a/include/lockfree/queue-mpsc.hh
+++ b/include/lockfree/queue-mpsc.hh
@@ -46,7 +46,7 @@ template <class LT>
 class queue_mpsc {
 private:
     std::atomic<LT*> pushlist;
-    LT* poplist;
+    std::atomic<LT*> poplist;
 public:
     constexpr queue_mpsc<LT>() : pushlist(nullptr), poplist(nullptr) { }
 
@@ -70,14 +70,14 @@ public:
 
     inline LT* pop()
     {
-        if (poplist) {
+        LT* _poplist = poplist.load(std::memory_order_acquire);
+        if (_poplist) {
             // The poplist (prepared by an earlier pop) is not empty, so pop
             // from it. We don't need any locking to access the poplist, as
             // it is only touched by pop operations, and our assumption (of a
             // single-consumer queue) is that pops cannot be concurrent.
-            LT* result = poplist;
-            poplist = poplist->next;
-            return result;
+            poplist.store(_poplist->next, std::memory_order_release);
+            return _poplist;
         } else {
             // The poplist is empty. Atomically take the entire pushlist
             // (pushers may continue to to push concurrently, so the atomicity
@@ -96,17 +96,18 @@ public:
             // of the pop.
             while (r->next) {
                 LT *next = r->next;
-                r->next = poplist;
-                poplist = r;
+                r->next = _poplist;
+                _poplist = r;
                 r = next;
             }
+            poplist.store(_poplist, std::memory_order_release);
             return r;
         }
     }
 
     inline bool empty(void) const
     {
-           return (!poplist && !pushlist.load(std::memory_order_relaxed));
+           return (!poplist.load(std::memory_order_acquire) && 
!pushlist.load(std::memory_order_relaxed));
     }
 
     class iterator {
@@ -132,7 +133,7 @@ public:
 
     // iteration; only valid from consumer side; invalidated by pop()s
     // The iterator is NOT ordered.
-    iterator begin() { return { pushlist.load(std::memory_order_acquire), 
poplist }; }
+    iterator begin() { return { pushlist.load(std::memory_order_acquire), 
poplist.load(std::memory_order_acquire) }; }
     iterator end() { return { nullptr, nullptr }; }
 };
 
diff --git a/include/osv/wait_record.hh b/include/osv/wait_record.hh
index d3201b78..c72e5c18 100644
--- a/include/osv/wait_record.hh
+++ b/include/osv/wait_record.hh
@@ -30,21 +30,21 @@ namespace lockfree { struct mutex; }
 
 class waiter {
 protected:
-    sched::thread *t;
+    std::atomic<sched::thread*> t;
 public:
     explicit waiter(sched::thread *t) : t(t) { };
 
     inline void wake() {
-        t->wake_with_from_mutex([&] { t = nullptr; });
+        t.load(std::memory_order_acquire)->wake_with_from_mutex([&] { 
t.store(nullptr, std::memory_order_release); });
     }
 
     inline void wait() const {
-        sched::thread::wait_until([&] { return !t; });
+        sched::thread::wait_until([&] { return 
!t.load(std::memory_order_acquire); });
     }
 
     inline void wait(sched::timer* tmr) const {
         sched::thread::wait_until([&] {
-            return (tmr && tmr->expired()) || !t; });
+            return (tmr && tmr->expired()) || 
!t.load(std::memory_order_acquire); });
     }
 
     // The thread() method returns the thread waiting on this waiter, or 0 if
@@ -52,19 +52,19 @@ public:
     // sanity assert()s. To help enforce this intended use case, we return a
     // const sched::thread.
     inline const sched::thread *thread(void) const {
-        return t;
+        return t.load(std::memory_order_acquire);
     }
 
     // woken() returns true if the wait_record was already woken by a wake()
     // (a timeout doesn't set the wait_record to woken()).
     inline bool woken() const {
-        return !t;
+        return !t.load(std::memory_order_acquire);
     }
 
     // Signal the wait record as woken without actually waking it up.  Use
     // only with external synchronization.
     void clear() noexcept {
-        t = nullptr;
+        t.store(nullptr, std::memory_order_release);
     }
 
     // A waiter object cannot be copied or moved, as wake() on the copy will
@@ -89,7 +89,7 @@ struct wait_record : public waiter {
     struct wait_record *next;
     explicit wait_record(sched::thread *t) : waiter(t), next(nullptr) { };
     using mutex = lockfree::mutex;
-    void wake_lock(mutex* mtx) { t->wake_lock(mtx, this); }
+    void wake_lock(mutex* mtx) { 
t.load(std::memory_order_acquire)->wake_lock(mtx, this); }
 };
 
 #endif /* INCLUDED_OSV_WAIT_RECORD */
-- 
2.27.0

-- 
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to osv-dev+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/osv-dev/20210502040823.835695-1-jwkozaczuk%40gmail.com.

Reply via email to