Use userspace lock based on futex with proxies APIs for waiting and boosting. Simpler implmentation.
Signed-off-by: Lai Jiangshan <[email protected]> --- urcu.c | 179 +++++++++++++++++----------------------------------- urcu/static/urcu.h | 120 ++++++++++++++--------------------- 2 files changed, 105 insertions(+), 194 deletions(-) diff --git a/urcu.c b/urcu.c index 039df55..328b2fb 100644 --- a/urcu.c +++ b/urcu.c @@ -40,6 +40,7 @@ #include "urcu/static/urcu.h" /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */ #include "urcu.h" +#include "urcu-wait-lock-impl.h" #ifdef RCU_MEMBARRIER static int init_done; @@ -63,16 +64,6 @@ void __attribute__((destructor)) rcu_exit(void); static pthread_mutex_t rcu_gp_lock = PTHREAD_MUTEX_INITIALIZER; -int32_t gp_futex; - -/* - * Global grace period counter. - * Contains the current RCU_GP_CTR_PHASE. - * Also has a RCU_GP_COUNT of 1, to accelerate the reader fast path. - * Written to only by writer with mutex taken. Read by both writer and readers. - */ -unsigned long rcu_gp_ctr = RCU_GP_COUNT; - /* * Written to only by each individual reader. Read by both the reader and the * writers. @@ -190,147 +181,91 @@ static void smp_mb_master(int group) } #endif /* #ifdef RCU_SIGNAL */ -/* - * synchronize_rcu() waiting. Single thread. - */ -static void wait_gp(void) +void __urcu_read_unlock_specail(void) { - /* Read reader_gp before read futex */ - smp_mb_master(RCU_MB_GROUP); - if (uatomic_read(&gp_futex) == -1) - futex_async(&gp_futex, FUTEX_WAIT, -1, - NULL, NULL, 0); + if (uwl_onwer(&rcu_reader.wait) == rcu_reader.tid) { + uwl_unlock_if_not_proxy_unlocked(&rcu_reader.wait, + rcu_reader.tid); + } } -void update_counter_and_wait(void) + +void synchronize_rcu(void) { CDS_LIST_HEAD(qsreaders); - int wait_loops = 0; + int wait_loops; struct rcu_reader *index, *tmp; + pid_t self = syscall(SYS_gettid); - /* Switch parity: 0 -> 1, 1 -> 0 */ - CMM_STORE_SHARED(rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR_PHASE); + mutex_lock(&rcu_gp_lock); - /* - * Must commit rcu_gp_ctr update to memory before waiting for quiescent - * state. Failure to do so could result in the writer waiting forever - * while new readers are always accessing data (no progress). Enforce - * compiler-order of store to rcu_gp_ctr before load rcu_reader ctr. - */ - cmm_barrier(); + if (cds_list_empty(®istry)) + goto out; - /* - * - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the - * model easier to understand. It does not have a big performance impact - * anyway, given this is the write-side. - */ - cmm_smp_mb(); + cds_list_for_each_entry(index, ®istry, node) + uwl_proxy_lock(&index->wait, index->tid); /* - * Wait for each thread rcu_reader.ctr count to become 0. + * synchronize_rcu rcu_read_unlock(outmost) + * + * sync = tid; ctr = 0; + * smp_mb_master()(1); smp_mb_slave()(3); + * test index->ctr and wait; test index->wait and wakeup; */ - for (;;) { - wait_loops++; - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - uatomic_dec(&gp_futex); - /* Write futex before read reader_gp */ - smp_mb_master(RCU_MB_GROUP); - } + smp_mb_master(RCU_MB_GROUP); /* (1) */ + for (wait_loops = 0; wait_loops < RCU_QS_ACTIVE_ATTEMPTS; wait_loops++) { cds_list_for_each_entry_safe(index, tmp, ®istry, node) { - if (!rcu_gp_ongoing(&index->ctr)) + if (_CMM_LOAD_SHARED(index->ctr) == 0) { + uwl_proxy_unlock(&index->wait); cds_list_move(&index->node, &qsreaders); + } } + if (cds_list_empty(®istry)) + goto done; + #ifndef HAS_INCOHERENT_CACHES - if (cds_list_empty(®istry)) { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - /* Read reader_gp before write futex */ - smp_mb_master(RCU_MB_GROUP); - uatomic_set(&gp_futex, 0); - } - break; - } else { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) - wait_gp(); - else - caa_cpu_relax(); - } + caa_cpu_relax(); #else /* #ifndef HAS_INCOHERENT_CACHES */ - /* - * BUSY-LOOP. Force the reader thread to commit its - * rcu_reader.ctr update to memory if we wait for too long. - */ - if (cds_list_empty(®istry)) { - if (wait_loops == RCU_QS_ACTIVE_ATTEMPTS) { - /* Read reader_gp before write futex */ - smp_mb_master(RCU_MB_GROUP); - uatomic_set(&gp_futex, 0); - } - break; - } else { - switch (wait_loops) { - case RCU_QS_ACTIVE_ATTEMPTS: - wait_gp(); - break; /* only escape switch */ - case KICK_READER_LOOPS: - smp_mb_master(RCU_MB_GROUP); - wait_loops = 0; - break; /* only escape switch */ - default: - caa_cpu_relax(); - } - } + cmm_smp_mb(); #endif /* #else #ifndef HAS_INCOHERENT_CACHES */ } - /* put back the reader list in the registry */ - cds_list_splice(&qsreaders, ®istry); -} -void synchronize_rcu(void) -{ - mutex_lock(&rcu_gp_lock); - - if (cds_list_empty(®istry)) - goto out; + /* avoid short-time read site's competing and syscall overhead */ + usleep(2000); - /* All threads should read qparity before accessing data structure - * where new ptr points to. Must be done within rcu_gp_lock because it - * iterates on reader threads.*/ - /* Write new ptr before changing the qparity */ - smp_mb_master(RCU_MB_GROUP); + /* index->ctr > 0 */ + cds_list_for_each_entry_safe(index, tmp, ®istry, node) { + /* reader still running, we need to wait reader */ + uwl_lock(&index->wait, self); - /* - * Wait for previous parity to be empty of readers. - */ - update_counter_and_wait(); /* 0 -> 1, wait readers in parity 0 */ + /* + * use uwl_set_unlock() instead of uwl_unlock() + * to avoid a syscall overhead. There is no competing + * now, so it is safe. + */ + uwl_set_unlock(&index->wait); + (void)(uwl_unlock); - /* - * Must finish waiting for quiescent state for parity 0 before - * committing next rcu_gp_ctr update to memory. Failure to do so could - * result in the writer waiting forever while new readers are always - * accessing data (no progress). Enforce compiler-order of load - * rcu_reader ctr before store to rcu_gp_ctr. - */ - cmm_barrier(); + cds_list_move(&index->node, &qsreaders); + } +done: /* - * Adding a cmm_smp_mb() which is _not_ formally required, but makes the - * model easier to understand. It does not have a big performance impact - * anyway, given this is the write-side. + * Ensure RCU C.S. finish when we found the ctr==0 + * + * synchronize_rcu rcu_read_unlock(outmost) + * + * see ctr == 0 RCU read C.S. + * smp_mb_master()(2); smp_mb_slave()(2); + * free(ptr) ctr = 0 */ - cmm_smp_mb(); + smp_mb_master(RCU_MB_GROUP); /* (2) */ - /* - * Wait for previous parity to be empty of readers. - */ - update_counter_and_wait(); /* 1 -> 0, wait readers in parity 1 */ + /* put back the reader list in the registry */ + cds_list_splice(&qsreaders, ®istry); - /* Finish waiting for reader threads before letting the old ptr being - * freed. Must be done within rcu_gp_lock because it iterates on reader - * threads. */ - smp_mb_master(RCU_MB_GROUP); out: mutex_unlock(&rcu_gp_lock); } @@ -352,8 +287,8 @@ void rcu_read_unlock(void) void rcu_register_thread(void) { rcu_reader.pthread = pthread_self(); + rcu_reader.tid = syscall(SYS_gettid); assert(rcu_reader.need_mb == 0); - assert(!(rcu_reader.ctr & RCU_GP_CTR_NEST_MASK)); mutex_lock(&rcu_gp_lock); rcu_init(); /* In case gcc does not support constructor attribute */ diff --git a/urcu/static/urcu.h b/urcu/static/urcu.h index cfcb300..b5b1af5 100644 --- a/urcu/static/urcu.h +++ b/urcu/static/urcu.h @@ -96,13 +96,6 @@ extern "C" { #endif /* - * If a reader is really non-cooperative and refuses to commit its - * rcu_active_readers count to memory (there is no barrier in the reader - * per-se), kick it after a few loops waiting for it. - */ -#define KICK_READER_LOOPS 10000 - -/* * Active attempts to check for reader Q.S. before calling futex(). */ #define RCU_QS_ACTIVE_ATTEMPTS 100 @@ -209,59 +202,19 @@ static inline void smp_mb_slave(int group) } #endif -/* - * The trick here is that RCU_GP_CTR_PHASE must be a multiple of 8 so we can use - * a full 8-bits, 16-bits or 32-bits bitmask for the lower order bits. - */ -#define RCU_GP_COUNT (1UL << 0) -/* Use the amount of bits equal to half of the architecture long size */ -#define RCU_GP_CTR_PHASE (1UL << (sizeof(unsigned long) << 2)) -#define RCU_GP_CTR_NEST_MASK (RCU_GP_CTR_PHASE - 1) - -/* - * Global quiescent period counter with low-order bits unused. - * Using a int rather than a char to eliminate false register dependencies - * causing stalls on some architectures. - */ -extern unsigned long rcu_gp_ctr; - struct rcu_reader { /* Data used by both reader and synchronize_rcu() */ unsigned long ctr; + int32_t wait; char need_mb; /* Data used for registry */ struct cds_list_head node __attribute__((aligned(CAA_CACHE_LINE_SIZE))); pthread_t pthread; + pid_t tid; }; extern struct rcu_reader __thread rcu_reader; - -extern int32_t gp_futex; - -/* - * Wake-up waiting synchronize_rcu(). Called from many concurrent threads. - */ -static inline void wake_up_gp(void) -{ - if (unlikely(uatomic_read(&gp_futex) == -1)) { - uatomic_set(&gp_futex, 0); - futex_async(&gp_futex, FUTEX_WAKE, 1, - NULL, NULL, 0); - } -} - -static inline int rcu_gp_ongoing(unsigned long *ctr) -{ - unsigned long v; - - /* - * Make sure both tests below are done on the same version of *value - * to insure consistency. - */ - v = CMM_LOAD_SHARED(*ctr); - return (v & RCU_GP_CTR_NEST_MASK) && - ((v ^ rcu_gp_ctr) & RCU_GP_CTR_PHASE); -} +void __urcu_read_unlock_specail(void); static inline void _rcu_read_lock(void) { @@ -269,19 +222,28 @@ static inline void _rcu_read_lock(void) cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ tmp = rcu_reader.ctr; - /* - * rcu_gp_ctr is - * RCU_GP_COUNT | (~RCU_GP_CTR_PHASE or RCU_GP_CTR_PHASE) - */ - if (likely(!(tmp & RCU_GP_CTR_NEST_MASK))) { - _CMM_STORE_SHARED(rcu_reader.ctr, _CMM_LOAD_SHARED(rcu_gp_ctr)); + + if (!tmp) { +#ifndef RCU_MB + _CMM_STORE_SHARED(rcu_reader.ctr, 1); + smp_mb_slave(RCU_MB_GROUP); /* (1) */ +#else /* #ifndef RCU_MB */ /* - * Set active readers count for outermost nesting level before - * accessing the pointer. See smp_mb_master(). + * the same behavior as !RCU_MB(above), but atomically. + * + * When a inturrupt(signal etc..) happens between + * "_CMM_STORE_SHARED(rcu_reader.ctr, 1);" and smp_mb_slave()(1), + * inturrupt handler's RCU C.S will miss a smp_mb_slave() + * when it enters the RCU C.S. + * + * For !RCU_MB, cmm_barrier()(1) implies smp_mb_slave(), it is OK. + * For RCU_MB, we should use xchg() to do it atomically. */ - smp_mb_slave(RCU_MB_GROUP); + uatomic_xchg(&rcu_reader.ctr, 1); +#endif /* #else #ifndef RCU_MB */ } else { - _CMM_STORE_SHARED(rcu_reader.ctr, tmp + RCU_GP_COUNT); + _CMM_STORE_SHARED(rcu_reader.ctr, tmp + 1); + cmm_barrier(); /* (1) */ } } @@ -289,21 +251,35 @@ static inline void _rcu_read_unlock(void) { unsigned long tmp; + cmm_barrier(); /* (2) */ tmp = rcu_reader.ctr; - /* - * Finish using rcu before decrementing the pointer. - * See smp_mb_master(). - */ - if (likely((tmp & RCU_GP_CTR_NEST_MASK) == RCU_GP_COUNT)) { - smp_mb_slave(RCU_MB_GROUP); - _CMM_STORE_SHARED(rcu_reader.ctr, rcu_reader.ctr - RCU_GP_COUNT); - /* write rcu_reader.ctr before read futex */ - smp_mb_slave(RCU_MB_GROUP); - wake_up_gp(); + + if (tmp == 1) { +#ifndef RCU_MB + smp_mb_slave(RCU_MB_GROUP); /* (2) */ + _CMM_STORE_SHARED(rcu_reader.ctr, 0); + smp_mb_slave(RCU_MB_GROUP); /* (3) */ +#else /* #ifndef RCU_MB */ + /* + * the same behavior as !RCU_MB(above), but atomically. + * + * When a inturrupt(signal etc..) happens between + * smp_mb_slave()(2) and "_CMM_STORE_SHARED(rcu_reader.ctr, 0);", + * inturrupt handler's RCU C.S will miss a smp_mb_slave() + * when it exits the RCU C.S. + * + * For !RCU_MB, cmm_barrier()(2) implies smp_mb_slave(), it is OK. + * For RCU_MB, we should use xchg() to do it atomically. + */ + uatomic_xchg(&rcu_reader.ctr, 0); +#endif /* #else #ifndef RCU_MB */ + + if (unlikely(_CMM_LOAD_SHARED(rcu_reader.wait))) + __urcu_read_unlock_specail(); } else { - _CMM_STORE_SHARED(rcu_reader.ctr, rcu_reader.ctr - RCU_GP_COUNT); + _CMM_STORE_SHARED(rcu_reader.ctr, tmp - 1); + cmm_barrier(); } - cmm_barrier(); /* Ensure the compiler does not reorder us with mutex */ } #ifdef __cplusplus -- 1.7.4.4 _______________________________________________ ltt-dev mailing list [email protected] http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev
