On Thu, Jul 18, 2019 at 10:26:41AM +0100, Will Deacon wrote:

> /*
>  * We need to ensure ACQUIRE semantics when reading sem->count so that
>  * we pair with the RELEASE store performed by an unlocking/downgrading
>  * writer.
>  *
>  * P0 (writer)                        P1 (reader)
>  *
>  * down_write(sem);
>  * <write shared data>
>  * downgrade_write(sem);
>  * -> fetch_add_release(&sem->count)
>  *
>  *                            down_read_slowpath(sem);
>  *                            -> atomic_read(&sem->count)
>  *                               <ctrl dep>
>  *                               smp_acquire__after_ctrl_dep()
>  *                            <read shared data>
>  */

So I'm thinking all this is excessive; the simple rule is: lock acquire
should imply ACQUIRE, we all know why.

> In writing this, I also noticed that we don't have any explicit ordering
> at the end of the reader slowpath when we wait on the queue but get woken
> immediately:
> 
>       if (!waiter.task)
>               break;
> 
> Am I missing something?

Ha!, I ran into the very same one. I keep confusing myself, but I think
you're right and that needs to be smp_load_acquire() to match the
smp_store_release() in rwsem_mark_wake().

(the actual race there is _tiny_ due to the smp_mb() right before it,
but I cannot convince myself that is indeed sufficient)

The signal_pending_state() case is also fun, but I think wait_lock there
is sufficient (even under RCpc).

I've ended up with this..

---
diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 37524a47f002..9eb630904a17 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -1000,6 +1000,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int 
state)
        atomic_long_add(-RWSEM_READER_BIAS, &sem->count);
        adjustment = 0;
        if (rwsem_optimistic_spin(sem, false)) {
+               /* rwsem_optimistic_spin() implies ACQUIRE through 
rwsem_*trylock() */
                /*
                 * Wake up other readers in the wait list if the front
                 * waiter is a reader.
@@ -1014,6 +1015,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int 
state)
                }
                return sem;
        } else if (rwsem_reader_phase_trylock(sem, waiter.last_rowner)) {
+               /* rwsem_reader_phase_trylock() implies ACQUIRE */
                return sem;
        }
 
@@ -1032,6 +1034,8 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int 
state)
                 */
                if (adjustment && !(atomic_long_read(&sem->count) &
                     (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
+                       /* Provide lock ACQUIRE */
+                       smp_acquire__after_ctrl_dep();
                        raw_spin_unlock_irq(&sem->wait_lock);
                        rwsem_set_reader_owned(sem);
                        lockevent_inc(rwsem_rlock_fast);
@@ -1065,15 +1069,25 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int 
state)
        wake_up_q(&wake_q);
 
        /* wait to be given the lock */
-       while (true) {
+       for (;;) {
                set_current_state(state);
-               if (!waiter.task)
+               if (!smp_load_acquire(&waiter.task)) {
+                       /*
+                        * Matches rwsem_mark_wake()'s smp_store_release() and 
ensures
+                        * we're ordered against its sem->count operations.
+                        */
                        break;
+               }
                if (signal_pending_state(state, current)) {
                        raw_spin_lock_irq(&sem->wait_lock);
                        if (waiter.task)
                                goto out_nolock;
                        raw_spin_unlock_irq(&sem->wait_lock);
+                       /*
+                        * Ordered by sem->wait_lock against rwsem_mark_wake(), 
if we
+                        * see its waiter.task store, we must also see its 
sem->count
+                        * changes.
+                        */
                        break;
                }
                schedule();
@@ -1083,6 +1097,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, int 
state)
        __set_current_state(TASK_RUNNING);
        lockevent_inc(rwsem_rlock);
        return sem;
+
 out_nolock:
        list_del(&waiter.list);
        if (list_empty(&sem->wait_list)) {

Reply via email to