These are provided only by urcu-qsbr, but they are useful in general: since both rcu_unregister_thread and synchronize_rcu lock on the same GP mutex, synchronize_rcu can deadlock if another thread calls rcu_unregister_thread and causes the grace period never to elapse.
At the same time, wrap synchronize_rcu with offlining/onlining of the thread. This allows reader threads to call synchronize_rcu. Signed-off-by: Paolo Bonzini <[email protected]> --- README | 10 ++++++---- urcu-bp-static.h | 17 +++++++++++++++++ urcu-bp.c | 26 ++++++++++++++++++++++++++ urcu-bp.h | 4 ++++ urcu-static.h | 15 +++++++++++++++ urcu.c | 27 +++++++++++++++++++++++++++ urcu.h | 4 ++++ 7 files changed, 99 insertions(+), 4 deletions(-) diff --git a/README b/README index a2ca1eb..eec54b2 100644 --- a/README +++ b/README @@ -49,16 +49,18 @@ Usage of liburcu Dynamically detects kernel support for sys_membarrier(). Falls back on urcu-mb scheme if support is not present, which has slower read-side. + * rcu_thread_online() and rcu_thread_offline() can be used to mark long + periods for which the threads are not active. Usage of liburcu-qsbr * #include <urcu-qsbr.h> * Link with "-lurcu-qsbr". * The QSBR flavor of RCU needs to have each reader thread executing - rcu_quiescent_state() periodically to progress. rcu_thread_online() - and rcu_thread_offline() can be used to mark long periods for which - the threads are not active. It provides the fastest read-side at the - expense of more intrusiveness in the application code. + rcu_quiescent_state() periodically to progress, except in areas + already marked with rcu_thread_offline(). It provides the fastest + read-side at the expense of more intrusiveness in the application + code. Usage of liburcu-mb diff --git a/urcu-bp-static.h b/urcu-bp-static.h index 933f234..8e66a79 100644 --- a/urcu-bp-static.h +++ b/urcu-bp-static.h @@ -201,6 +201,23 @@ static inline void _rcu_read_unlock(void) _STORE_SHARED(rcu_reader->ctr, rcu_reader->ctr - RCU_GP_COUNT); } +static inline void _rcu_thread_offline(void) +{ + if (likely(rcu_reader)) { + smp_mb(); + STORE_SHARED(rcu_reader->ctr, 0); + smp_mb(); /* write rcu_reader.ctr before read futex */ + } +} + +static inline void _rcu_thread_online(void) +{ + if (likely(rcu_reader)) { + _STORE_SHARED(rcu_reader->ctr, LOAD_SHARED(rcu_gp_ctr)); + smp_mb(); + } +} + #ifdef __cplusplus } #endif diff --git a/urcu-bp.c b/urcu-bp.c index 1b0bd69..b984ed6 100644 --- a/urcu-bp.c +++ b/urcu-bp.c @@ -166,6 +166,18 @@ void synchronize_rcu(void) { sigset_t newmask, oldmask; int ret; + unsigned long was_online; + + was_online = rcu_reader && rcu_reader->ctr; + + /* + * Mark the writer thread offline to make sure we don't wait for + * our own quiescent state. This allows using synchronize_rcu() in + * threads registered as readers. + */ + smp_mb(); + if (was_online) + STORE_SHARED(rcu_reader->ctr, 0); ret = sigemptyset(&newmask); assert(!ret); @@ -211,6 +223,10 @@ out: mutex_unlock(&rcu_gp_lock); ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL); assert(!ret); + + if (was_online) + _STORE_SHARED(rcu_reader->ctr, LOAD_SHARED(rcu_gp_ctr)); + smp_mb(); } /* @@ -227,6 +243,16 @@ void rcu_read_unlock(void) _rcu_read_unlock(); } +void rcu_thread_offline(void) +{ + _rcu_thread_offline(); +} + +void rcu_thread_online(void) +{ + _rcu_thread_online(); +} + /* * only grow for now. */ diff --git a/urcu-bp.h b/urcu-bp.h index 0ea53e1..7cb4e3d 100644 --- a/urcu-bp.h +++ b/urcu-bp.h @@ -71,6 +71,8 @@ extern "C" { */ #define rcu_read_lock() _rcu_read_lock() #define rcu_read_unlock() _rcu_read_unlock() +#define rcu_thread_offline() _rcu_thread_offline() +#define rcu_thread_online() _rcu_thread_online() #else /* !_LGPL_SOURCE */ @@ -81,6 +83,8 @@ extern "C" { extern void rcu_read_lock(void); extern void rcu_read_unlock(void); +extern void rcu_thread_offline(void); +extern void rcu_thread_online(void); #endif /* !_LGPL_SOURCE */ diff --git a/urcu-static.h b/urcu-static.h index f68dd7c..479c06f 100644 --- a/urcu-static.h +++ b/urcu-static.h @@ -296,6 +296,21 @@ static inline void _rcu_read_unlock(void) } } +static inline void _rcu_thread_offline(void) +{ + smp_mb(); + STORE_SHARED(rcu_reader.ctr, 0); + STORE_SHARED(rcu_reader.need_mb, 0); + smp_mb(); /* write rcu_reader.ctr before read futex */ + wake_up_gp(); +} + +static inline void _rcu_thread_online(void) +{ + _STORE_SHARED(rcu_reader.ctr, LOAD_SHARED(rcu_gp_ctr)); + smp_mb(); +} + #ifdef __cplusplus } #endif diff --git a/urcu.c b/urcu.c index 5d09a62..dee8b59 100644 --- a/urcu.c +++ b/urcu.c @@ -284,6 +284,19 @@ void update_counter_and_wait(void) void synchronize_rcu(void) { + unsigned long was_online; + + was_online = rcu_reader.ctr; + + /* + * Mark the writer thread offline to make sure we don't wait for + * our own quiescent state. This allows using synchronize_rcu() in + * threads registered as readers. + */ + smp_mb(); + if (was_online) + STORE_SHARED(rcu_reader.ctr, 0); + mutex_lock(&rcu_gp_lock); if (list_empty(®istry)) @@ -326,6 +339,10 @@ void synchronize_rcu(void) smp_mb_master(RCU_MB_GROUP); out: mutex_unlock(&rcu_gp_lock); + + if (was_online) + _STORE_SHARED(rcu_reader.ctr, LOAD_SHARED(rcu_gp_ctr)); + smp_mb(); } /* @@ -342,6 +359,16 @@ void rcu_read_unlock(void) _rcu_read_unlock(); } +void rcu_thread_offline(void) +{ + _rcu_thread_offline(); +} + +void rcu_thread_online(void) +{ + _rcu_thread_online(); +} + void rcu_register_thread(void) { rcu_reader.tid = pthread_self(); diff --git a/urcu.h b/urcu.h index 9241139..39dbf2b 100644 --- a/urcu.h +++ b/urcu.h @@ -70,6 +70,8 @@ extern "C" { */ #define rcu_read_lock() _rcu_read_lock() #define rcu_read_unlock() _rcu_read_unlock() +#define rcu_thread_offline() _rcu_thread_offline() +#define rcu_thread_online() _rcu_thread_online() #else /* !_LGPL_SOURCE */ @@ -80,6 +82,8 @@ extern "C" { extern void rcu_read_lock(void); extern void rcu_read_unlock(void); +extern void rcu_thread_offline(void); +extern void rcu_thread_online(void); #endif /* !_LGPL_SOURCE */ -- 1.6.6 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
