These are provided only by urcu-qsbr, but they are useful in general:
since both rcu_unregister_thread and synchronize_rcu lock on the same
GP mutex, synchronize_rcu can deadlock if another thread calls
rcu_unregister_thread and causes the grace period never to elapse.

At the same time, wrap synchronize_rcu with offlining/onlining of
the thread.  This allows reader threads to call synchronize_rcu.

Signed-off-by: Paolo Bonzini <[email protected]>
---
 README           |   10 ++++++----
 urcu-bp-static.h |   17 +++++++++++++++++
 urcu-bp.c        |   26 ++++++++++++++++++++++++++
 urcu-bp.h        |    4 ++++
 urcu-static.h    |   15 +++++++++++++++
 urcu.c           |   27 +++++++++++++++++++++++++++
 urcu.h           |    4 ++++
 7 files changed, 99 insertions(+), 4 deletions(-)

diff --git a/README b/README
index a2ca1eb..eec54b2 100644
--- a/README
+++ b/README
@@ -49,16 +49,18 @@ Usage of liburcu
          Dynamically detects kernel support for sys_membarrier(). Falls back
          on urcu-mb scheme if support is not present, which has slower
          read-side.
+       * rcu_thread_online() and rcu_thread_offline() can be used to mark long
+         periods for which the threads are not active.
 
 Usage of liburcu-qsbr
 
        * #include <urcu-qsbr.h>
        * Link with "-lurcu-qsbr".
        * The QSBR flavor of RCU needs to have each reader thread executing
-         rcu_quiescent_state() periodically to progress. rcu_thread_online()
-         and rcu_thread_offline() can be used to mark long periods for which
-         the threads are not active. It provides the fastest read-side at the
-         expense of more intrusiveness in the application code.
+         rcu_quiescent_state() periodically to progress, except in areas
+         already marked with rcu_thread_offline(). It provides the fastest
+         read-side at the expense of more intrusiveness in the application
+         code.
 
 Usage of liburcu-mb
 
diff --git a/urcu-bp-static.h b/urcu-bp-static.h
index 933f234..8e66a79 100644
--- a/urcu-bp-static.h
+++ b/urcu-bp-static.h
@@ -201,6 +201,23 @@ static inline void _rcu_read_unlock(void)
        _STORE_SHARED(rcu_reader->ctr, rcu_reader->ctr - RCU_GP_COUNT);
 }
 
+static inline void _rcu_thread_offline(void)
+{
+        if (likely(rcu_reader)) {
+               smp_mb();
+               STORE_SHARED(rcu_reader->ctr, 0);
+               smp_mb();       /* write rcu_reader.ctr before read futex */
+       }
+}
+
+static inline void _rcu_thread_online(void)
+{
+        if (likely(rcu_reader)) {
+               _STORE_SHARED(rcu_reader->ctr, LOAD_SHARED(rcu_gp_ctr));
+               smp_mb();
+       }
+}
+
 #ifdef __cplusplus 
 }
 #endif
diff --git a/urcu-bp.c b/urcu-bp.c
index 1b0bd69..b984ed6 100644
--- a/urcu-bp.c
+++ b/urcu-bp.c
@@ -166,6 +166,18 @@ void synchronize_rcu(void)
 {
        sigset_t newmask, oldmask;
        int ret;
+       unsigned long was_online;
+
+       was_online = rcu_reader && rcu_reader->ctr;
+
+        /*
+         * Mark the writer thread offline to make sure we don't wait for
+         * our own quiescent state. This allows using synchronize_rcu() in
+         * threads registered as readers.
+         */
+        smp_mb();
+        if (was_online)
+                STORE_SHARED(rcu_reader->ctr, 0);
 
        ret = sigemptyset(&newmask);
        assert(!ret);
@@ -211,6 +223,10 @@ out:
        mutex_unlock(&rcu_gp_lock);
        ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
        assert(!ret);
+
+        if (was_online)
+                _STORE_SHARED(rcu_reader->ctr, LOAD_SHARED(rcu_gp_ctr));
+        smp_mb();
 }
 
 /*
@@ -227,6 +243,16 @@ void rcu_read_unlock(void)
        _rcu_read_unlock();
 }
 
+void rcu_thread_offline(void)
+{
+        _rcu_thread_offline();
+}
+
+void rcu_thread_online(void)
+{
+        _rcu_thread_online();
+}
+
 /*
  * only grow for now.
  */
diff --git a/urcu-bp.h b/urcu-bp.h
index 0ea53e1..7cb4e3d 100644
--- a/urcu-bp.h
+++ b/urcu-bp.h
@@ -71,6 +71,8 @@ extern "C" {
  */
 #define rcu_read_lock()                _rcu_read_lock()
 #define rcu_read_unlock()      _rcu_read_unlock()
+#define rcu_thread_offline()   _rcu_thread_offline()
+#define rcu_thread_online()    _rcu_thread_online()
 
 #else /* !_LGPL_SOURCE */
 
@@ -81,6 +83,8 @@ extern "C" {
 
 extern void rcu_read_lock(void);
 extern void rcu_read_unlock(void);
+extern void rcu_thread_offline(void);
+extern void rcu_thread_online(void);
 
 #endif /* !_LGPL_SOURCE */
 
diff --git a/urcu-static.h b/urcu-static.h
index f68dd7c..479c06f 100644
--- a/urcu-static.h
+++ b/urcu-static.h
@@ -296,6 +296,21 @@ static inline void _rcu_read_unlock(void)
        }
 }
 
+static inline void _rcu_thread_offline(void)
+{
+       smp_mb();
+       STORE_SHARED(rcu_reader.ctr, 0);
+       STORE_SHARED(rcu_reader.need_mb, 0);
+       smp_mb();       /* write rcu_reader.ctr before read futex */
+       wake_up_gp();
+}
+
+static inline void _rcu_thread_online(void)
+{
+       _STORE_SHARED(rcu_reader.ctr, LOAD_SHARED(rcu_gp_ctr));
+       smp_mb();
+}
+
 #ifdef __cplusplus 
 }
 #endif
diff --git a/urcu.c b/urcu.c
index 5d09a62..dee8b59 100644
--- a/urcu.c
+++ b/urcu.c
@@ -284,6 +284,19 @@ void update_counter_and_wait(void)
 
 void synchronize_rcu(void)
 {
+        unsigned long was_online;
+
+        was_online = rcu_reader.ctr;
+
+        /*
+         * Mark the writer thread offline to make sure we don't wait for
+         * our own quiescent state. This allows using synchronize_rcu() in
+         * threads registered as readers.
+         */
+        smp_mb();
+        if (was_online)
+                STORE_SHARED(rcu_reader.ctr, 0);
+
        mutex_lock(&rcu_gp_lock);
 
        if (list_empty(&registry))
@@ -326,6 +339,10 @@ void synchronize_rcu(void)
        smp_mb_master(RCU_MB_GROUP);
 out:
        mutex_unlock(&rcu_gp_lock);
+
+        if (was_online)
+                _STORE_SHARED(rcu_reader.ctr, LOAD_SHARED(rcu_gp_ctr));
+        smp_mb();
 }
 
 /*
@@ -342,6 +359,16 @@ void rcu_read_unlock(void)
        _rcu_read_unlock();
 }
 
+void rcu_thread_offline(void)
+{
+       _rcu_thread_offline();
+}
+
+void rcu_thread_online(void)
+{
+       _rcu_thread_online();
+}
+
 void rcu_register_thread(void)
 {
        rcu_reader.tid = pthread_self();
diff --git a/urcu.h b/urcu.h
index 9241139..39dbf2b 100644
--- a/urcu.h
+++ b/urcu.h
@@ -70,6 +70,8 @@ extern "C" {
  */
 #define rcu_read_lock()                _rcu_read_lock()
 #define rcu_read_unlock()      _rcu_read_unlock()
+#define rcu_thread_offline()   _rcu_thread_offline()
+#define rcu_thread_online()    _rcu_thread_online()
 
 #else /* !_LGPL_SOURCE */
 
@@ -80,6 +82,8 @@ extern "C" {
 
 extern void rcu_read_lock(void);
 extern void rcu_read_unlock(void);
+extern void rcu_thread_offline(void);
+extern void rcu_thread_online(void);
 
 #endif /* !_LGPL_SOURCE */
 
-- 
1.6.6



_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to