On Mon, Oct 22, 2012 at 07:39:16PM -0400, Mikulas Patocka wrote: > Use rcu_read_lock_sched / rcu_read_unlock_sched / synchronize_sched > instead of rcu_read_lock / rcu_read_unlock / synchronize_rcu. > > This is an optimization. The RCU-protected region is very small, so > there will be no latency problems if we disable preempt in this region. > > So we use rcu_read_lock_sched / rcu_read_unlock_sched that translates > to preempt_disable / preempt_disable. It is smaller (and supposedly > faster) than preemptible rcu_read_lock / rcu_read_unlock. > > Signed-off-by: Mikulas Patocka <mpato...@redhat.com>
OK, as promised/threatened, I finally got a chance to take a closer look. The light_mb() and heavy_mb() definitions aren't doing much for me, the code would be cleared with them expanded inline. And while the approach of pairing barrier() with synchronize_sched() is interesting, it would be simpler to rely on RCU's properties. The key point is that if RCU cannot prove that a given RCU-sched read-side critical section is seen by all CPUs to have started after a given synchronize_sched(), then that synchronize_sched() must wait for that RCU-sched read-side critical section to complete. This means, as discussed earlier, that there will be a memory barrier somewhere following the end of that RCU-sched read-side critical section, and that this memory barrier executes before the completion of the synchronize_sched(). So I suggest something like the following (untested!) implementation: ------------------------------------------------------------------------ struct percpu_rw_semaphore { unsigned __percpu *counters; bool locked; struct mutex mtx; wait_queue_head_t wq; }; static inline void percpu_down_read(struct percpu_rw_semaphore *p) { rcu_read_lock_sched(); if (unlikely(p->locked)) { rcu_read_unlock_sched(); /* * There might (or might not) be a writer. Acquire &p->mtx, * it is always safe (if a bit slow) to do so. */ mutex_lock(&p->mtx); this_cpu_inc(*p->counters); mutex_unlock(&p->mtx); return; } /* No writer, proceed locklessly. */ this_cpu_inc(*p->counters); rcu_read_unlock_sched(); } static inline void percpu_up_read(struct percpu_rw_semaphore *p) { /* * Decrement our count, but protected by RCU-sched so that * the writer can force proper serialization. */ rcu_read_lock_sched(); this_cpu_dec(*p->counters); rcu_read_unlock_sched(); } static inline unsigned __percpu_count(unsigned __percpu *counters) { unsigned total = 0; int cpu; for_each_possible_cpu(cpu) total += ACCESS_ONCE(*per_cpu_ptr(counters, cpu)); return total; } static inline void percpu_down_write(struct percpu_rw_semaphore *p) { mutex_lock(&p->mtx); /* Wait for a previous writer, if necessary. */ wait_event(p->wq, !ACCESS_ONCE(p->locked)); /* Force the readers to acquire the lock when manipulating counts. */ ACCESS_ONCE(p->locked) = true; /* Wait for all pre-existing readers' checks of ->locked to finish. */ synchronize_sched(); /* * At this point, all percpu_down_read() invocations will * acquire p->mtx. */ /* * Wait for all pre-existing readers to complete their * percpu_up_read() calls. Because ->locked is set and * because we hold ->mtx, there cannot be any new readers. * ->counters will therefore monotonically decrement to zero. */ while (__percpu_count(p->counters)) msleep(1); /* * Invoke synchronize_sched() in order to force the last * caller of percpu_up_read() to exit its RCU-sched read-side * critical section. On SMP systems, this also forces the CPU * that invoked that percpu_up_read() to execute a full memory * barrier between the time it exited the RCU-sched read-side * critical section and the time that synchronize_sched() returns, * so that the critical section begun by this invocation of * percpu_down_write() will happen after the critical section * ended by percpu_up_read(). */ synchronize_sched(); } static inline void percpu_up_write(struct percpu_rw_semaphore *p) { /* Allow others to proceed, but not yet locklessly. */ mutex_unlock(&p->mtx); /* * Ensure that all calls to percpu_down_read() that did not * start unambiguously after the above mutex_unlock() still * acquire the lock, forcing their critical sections to be * serialized with the one terminated by this call to * percpu_up_write(). */ synchronize_sched(); /* Now it is safe to allow readers to proceed locklessly. */ ACCESS_ONCE(p->locked) = false; /* * If there is another writer waiting, wake it up. Note that * p->mtx properly serializes its critical section with the * critical section terminated by this call to percpu_up_write(). */ wake_up(&p->wq); } static inline int percpu_init_rwsem(struct percpu_rw_semaphore *p) { p->counters = alloc_percpu(unsigned); if (unlikely(!p->counters)) return -ENOMEM; p->locked = false; mutex_init(&p->mtx); init_waitqueue_head(&p->wq); return 0; } static inline void percpu_free_rwsem(struct percpu_rw_semaphore *p) { free_percpu(p->counters); p->counters = NULL; /* catch use after free bugs */ } ------------------------------------------------------------------------ Of course, it would be nice to get rid of the extra synchronize_sched(). One way to do this is to use SRCU, which allows blocking operations in its read-side critical sections (though also increasing read-side overhead a bit, and also untested): ------------------------------------------------------------------------ struct percpu_rw_semaphore { bool locked; struct mutex mtx; /* Could also be rw_semaphore. */ struct srcu_struct s; wait_queue_head_t wq; }; static inline int percpu_down_read(struct percpu_rw_semaphore *p) { int idx; idx = srcu_read_lock(&p->s); if (unlikely(p->locked)) { srcu_read_unlock(&p->s, idx); /* * There might (or might not) be a writer. Acquire &p->mtx, * it is always safe (if a bit slow) to do so. */ mutex_lock(&p->mtx); return -1; /* srcu_read_lock() cannot return -1. */ } return idx; } static inline void percpu_up_read(struct percpu_rw_semaphore *p, int idx) { if (idx == -1) mutex_unlock(&p->mtx); else srcu_read_unlock(&p->s, idx); } static inline void percpu_down_write(struct percpu_rw_semaphore *p) { mutex_lock(&p->mtx); /* Wait for a previous writer, if necessary. */ wait_event(p->wq, !ACCESS_ONCE(p->locked)); /* Force new readers to acquire the lock when manipulating counts. */ ACCESS_ONCE(p->locked) = true; /* Wait for all pre-existing readers' checks of ->locked to finish. */ synchronize_srcu(&p->s); /* At this point, all lockless readers have completed. */ } static inline void percpu_up_write(struct percpu_rw_semaphore *p) { /* Allow others to proceed, but not yet locklessly. */ mutex_unlock(&p->mtx); /* * Ensure that all calls to percpu_down_read() that did not * start unambiguously after the above mutex_unlock() still * acquire the lock, forcing their critical sections to be * serialized with the one terminated by this call to * percpu_up_write(). */ synchronize_sched(); /* Now it is safe to allow readers to proceed locklessly. */ ACCESS_ONCE(p->locked) = false; /* * If there is another writer waiting, wake it up. Note that * p->mtx properly serializes its critical section with the * critical section terminated by this call to percpu_up_write(). */ wake_up(&p->wq); } static inline int percpu_init_rwsem(struct percpu_rw_semaphore *p) { p->locked = false; mutex_init(&p->mtx); if (unlikely(!init_srcu_struct(&p->s))); return -ENOMEM; init_waitqueue_head(&p->wq); return 0; } static inline void percpu_free_rwsem(struct percpu_rw_semaphore *p) { cleanup_srcu_struct(&p->s); } ------------------------------------------------------------------------ Of course, there was a question raised as to whether something already exists that does this job... And you guys did ask! Thanx, Paul -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/