On Thu, Jul 18, 2019 at 12:58:12PM +0200, Peter Zijlstra wrote: > On Thu, Jul 18, 2019 at 10:26:41AM +0100, Will Deacon wrote: > > > /* > > * We need to ensure ACQUIRE semantics when reading sem->count so that > > * we pair with the RELEASE store performed by an unlocking/downgrading > > * writer. > > * > > * P0 (writer) P1 (reader) > > * > > * down_write(sem); > > * <write shared data> > > * downgrade_write(sem); > > * -> fetch_add_release(&sem->count) > > * > > * down_read_slowpath(sem); > > * -> atomic_read(&sem->count) > > * <ctrl dep> > > * smp_acquire__after_ctrl_dep() > > * <read shared data> > > */ > > So I'm thinking all this is excessive; the simple rule is: lock acquire > should imply ACQUIRE, we all know why.
Fair enough, I just thought this was worth highlighting because you can't reply on the wait_lock to give you ACQUIRE ordering. > > In writing this, I also noticed that we don't have any explicit ordering > > at the end of the reader slowpath when we wait on the queue but get woken > > immediately: > > > > if (!waiter.task) > > break; > > > > Am I missing something? > > Ha!, I ran into the very same one. I keep confusing myself, but I think > you're right and that needs to be smp_load_acquire() to match the > smp_store_release() in rwsem_mark_wake(). > > (the actual race there is _tiny_ due to the smp_mb() right before it, > but I cannot convince myself that is indeed sufficient) > > The signal_pending_state() case is also fun, but I think wait_lock there > is sufficient (even under RCpc). > > I've ended up with this.. > > --- > diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c > index 37524a47f002..9eb630904a17 100644 > --- a/kernel/locking/rwsem.c > +++ b/kernel/locking/rwsem.c > @@ -1000,6 +1000,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int > state) > atomic_long_add(-RWSEM_READER_BIAS, &sem->count); > adjustment = 0; > if (rwsem_optimistic_spin(sem, false)) { > + /* rwsem_optimistic_spin() implies ACQUIRE through > rwsem_*trylock() */ I couldn't figure out if this was dependent on the return value or not, and looking at osq_lock() I also couldn't see the ACQUIRE barrier when we're spinning on node->locked. Hmm. > /* > * Wake up other readers in the wait list if the front > * waiter is a reader. > @@ -1014,6 +1015,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int > state) > } > return sem; > } else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) { > + /* rwsem_reader_phase_trylock() implies ACQUIRE */ Can we add "on success" to the end of this, please? > return sem; > } > > @@ -1032,6 +1034,8 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int > state) > */ > if (adjustment && !(atomic_long_read(&sem->count) & > (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) { > + /* Provide lock ACQUIRE */ > + smp_acquire__after_ctrl_dep(); > raw_spin_unlock_irq(&sem->wait_lock); > rwsem_set_reader_owned(sem); > lockevent_inc(rwsem_rlock_fast); > @@ -1065,15 +1069,25 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, > int state) > wake_up_q(&wake_q); > > /* wait to be given the lock */ > - while (true) { > + for (;;) { > set_current_state(state); > - if (!waiter.task) > + if (!smp_load_acquire(&waiter.task)) { > + /* > + * Matches rwsem_mark_wake()'s smp_store_release() and > ensures > + * we're ordered against its sem->count operations. > + */ > break; > + } Ack. Also, grepping for 'waiter.task' reveals a similar usage in drivers/tty/tty_ldsem.c if you're feeling brave enough. Will