On 04/16/2019 11:49 AM, Peter Zijlstra wrote:
> On Sat, Apr 13, 2019 at 01:22:50PM -0400, Waiman Long wrote:
>
>> +#define RWSEM_COUNT_HANDOFF(c)      ((c) & RWSEM_FLAG_HANDOFF)
>> +#define RWSEM_COUNT_LOCKED_OR_HANDOFF(c)    \
>> +    ((c) & (RWSEM_LOCK_MASK|RWSEM_FLAG_HANDOFF))
> Like said before, I also made these go away.

Yes, my refactored patches will remove all those trivial macros.

>
>> @@ -245,6 +274,8 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
>>      struct rwsem_waiter *waiter, *tmp;
>>      long oldcount, woken = 0, adjustment = 0;
>>  
>> +    lockdep_assert_held(&sem->wait_lock);
>> +
>>      /*
>>       * Take a peek at the queue head waiter such that we can determine
>>       * the wakeup(s) to perform.
>> @@ -276,6 +307,15 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
>>              adjustment = RWSEM_READER_BIAS;
>>              oldcount = atomic_long_fetch_add(adjustment, &sem->count);
>>              if (unlikely(oldcount & RWSEM_WRITER_MASK)) {
>> +                    /*
>> +                     * Initiate handoff to reader, if applicable.
>> +                     */
>> +                    if (!(oldcount & RWSEM_FLAG_HANDOFF) &&
>> +                        time_after(jiffies, waiter->timeout)) {
>> +                            adjustment -= RWSEM_FLAG_HANDOFF;
>> +                            lockevent_inc(rwsem_rlock_handoff);
>> +                    }
>                       /*
>                        * When we've been waiting 'too' long (for
>                        * writers to give up the lock) request a
>                        * HANDOFF to force the issue.
>                        */
>
> ?

Sure.

>
>> +
>>                      atomic_long_sub(adjustment, &sem->count);
> Can we change this to: atomic_long_add() please? The below loop that
> wakes all remaining readers does use add(), so it is a bit 'weird' to
> have the adjustment being negated on handover.
>
>>                      return;
>>              }
>> @@ -324,6 +364,12 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
>>              adjustment -= RWSEM_FLAG_WAITERS;
>>      }
>>  
>> +    /*
>> +     * Clear the handoff flag
>> +     */
> Right, but that is a trivial comment in the 'increment i' style, it
> clearly states what the code does, but completely fails to elucidate the
> code.
>
> Maybe:
>
>       /*
>        * When we've woken a reader, we no longer need to force writers
>        * to give up the lock and we can clear HANDOFF.
>        */
>
> And I suppose this is required if we were the pickup of the handoff set
> above, but is there a guarantee that the HANDOFF was not set by a
> writer?

I can change the comment. The handoff bit is always cleared in
rwsem_try_write_lock() when the lock is successfully acquire. Will add a
comment to document that.

>
>> +    if (woken && RWSEM_COUNT_HANDOFF(atomic_long_read(&sem->count)))
>> +            adjustment -= RWSEM_FLAG_HANDOFF;
>> +
>>      if (adjustment)
>>              atomic_long_add(adjustment, &sem->count);
>>  }
>> @@ -332,22 +378,42 @@ static void __rwsem_mark_wake(struct rw_semaphore *sem,
>>   * This function must be called with the sem->wait_lock held to prevent
>>   * race conditions between checking the rwsem wait list and setting the
>>   * sem->count accordingly.
>> + *
>> + * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
>> + * bit is set or the lock is acquired.
>>   */
>> +static inline bool rwsem_try_write_lock(long count, struct rw_semaphore 
>> *sem,
>> +                                    enum writer_wait_state wstate)
>>  {
>>      long new;
>>  
>       lockdep_assert_held(&sem->wait_lock);

Sure.

>
>> +retry:
>> +    if (RWSEM_COUNT_LOCKED(count)) {
>> +            if (RWSEM_COUNT_HANDOFF(count) || (wstate != WRITER_HANDOFF))
>> +                    return false;
>> +            /*
>> +             * The lock may become free just before setting handoff bit.
>> +             * It will be simpler if atomic_long_or_return() is available.
>> +             */
>> +            atomic_long_or(RWSEM_FLAG_HANDOFF, &sem->count);
>> +            count = atomic_long_read(&sem->count);
>> +            goto retry;
>> +    }
>> +
>> +    if ((wstate == WRITER_NOT_FIRST) && RWSEM_COUNT_HANDOFF(count))
>>              return false;
>>  
>> +    new = (count & ~RWSEM_FLAG_HANDOFF) + RWSEM_WRITER_LOCKED -
>> +          (list_is_singular(&sem->wait_list) ? RWSEM_FLAG_WAITERS : 0);
>>  
>>      if (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new)) {
>>              rwsem_set_owner(sem);
>>              return true;
>>      }
>>  
>> +    if (unlikely((wstate == WRITER_HANDOFF) && !RWSEM_COUNT_HANDOFF(count)))
>> +            goto retry;
>> +
>>      return false;
>>  }
> This function gives me heartburn. Don't you just feel something readable
> trying to struggle free from that?
>
> See, if you first write that function in the form:
>
>       long new;
>
>       do {
>               new = count | RWSEM_WRITER_LOCKED;
>
>               if (count & RWSEM_LOCK_MASK)
>                       return false;
>
>               if (list_is_singular(&sem->wait_list))
>                       new &= ~RWSEM_FLAG_WAITERS;
>
>       } while (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
>
>       rwsem_set_owner(sem);
>       return true;
>
> And then add the HANDOFF bits like:
>
>       long new;
>
>       do {
> +             bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
>
> +             new = (count | RWSEM_WRITER_LOCKED) & ~RWSEM_FLAG_HANDOFF;
>
>               if (count & RWSEM_LOCK_MASK) {
> +                     if (has_handoff && wstate != WRITER_HANDOFF)
> +                             return false;
>                       new |= RWSEM_FLAG_HANDOFF;
>               }
>
> +             if (has_handoff && wstate == WRITER_NOT_FIRST)
> +                     return false;
>
>               if (list_is_singular(&sem->wait_list))
>                       new &= ~RWSEM_FLAG_WAITERS;
>
>       } while (atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
>
>       rwsem_set_owner(sem);
>       return true;
>
> it almost looks like sensible code.

Yes, it looks much better. I don't like that piece of code myself. I am
sorry that I didn't spend the time to make the code more sane.

Thanks for your suggestion. Will modify it accordingly.

>>  
>> @@ -359,7 +425,7 @@ static inline bool rwsem_try_write_lock_unqueued(struct 
>> rw_semaphore *sem)
>>  {
>>      long count = atomic_long_read(&sem->count);
>>  
>> -    while (!RWSEM_COUNT_LOCKED(count)) {
>> +    while (!RWSEM_COUNT_LOCKED_OR_HANDOFF(count)) {
>>              if (atomic_long_try_cmpxchg_acquire(&sem->count, &count,
>>                                      count + RWSEM_WRITER_LOCKED)) {
> RWSEM_WRITER_LOCKED really should be RWSEM_FLAG_WRITER or something like
> that, and since it is a flag, that really should've been | not +.

Sure.

>>                      rwsem_set_owner(sem);
>> @@ -498,6 +564,16 @@ static bool rwsem_optimistic_spin(struct rw_semaphore 
>> *sem)
>>  }
>>  #endif
>>  
>> +/*
>> + * This is safe to be called without holding the wait_lock.
>> + */
>> +static inline bool
>> +rwsem_waiter_is_first(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
>> +{
>> +    return list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
>> +                    == waiter;
> Just bust the line limit on that, this is silly. If you feel strongly
> about the 80 char thing, we could do:
>
> #define rwsem_first_waiter(sem) \
>       list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
>
> and use that in both locations. (and one could even write the
> list_for_each_entry_safe() loop in the form:
>
>       while (!list_empty(&sem->wait_list)) {
>               entry = rwsem_first_waiter(sem);
>
>               ...
>
>               list_del();
>
>               ...
>       }
>
> Although I suppose that gets you confused later on where you want to
> wake more readers still... I'll get there,.. eventually.

Yes, it is a good idea.

>> +}
>> +
>>  /*
>>   * Wait for the read lock to be granted
>>   */
>> @@ -510,16 +586,18 @@ __rwsem_down_read_failed_common(struct rw_semaphore 
>> *sem, int state)
>>  
>>      waiter.task = current;
>>      waiter.type = RWSEM_WAITING_FOR_READ;
>> +    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
>>  
>>      raw_spin_lock_irq(&sem->wait_lock);
>>      if (list_empty(&sem->wait_list)) {
>>              /*
>>               * In case the wait queue is empty and the lock isn't owned
>> +             * by a writer or has the handoff bit set, this reader can
>> +             * exit the slowpath and return immediately as its
>> +             * RWSEM_READER_BIAS has already been set in the count.
>>               */
>> +            if (!(atomic_long_read(&sem->count) &
>> +                 (RWSEM_WRITER_MASK | RWSEM_FLAG_HANDOFF))) {
>>                      raw_spin_unlock_irq(&sem->wait_lock);
>>                      rwsem_set_reader_owned(sem);
>>                      lockevent_inc(rwsem_rlock_fast);
>> @@ -567,7 +645,8 @@ __rwsem_down_read_failed_common(struct rw_semaphore 
>> *sem, int state)
>>  out_nolock:
>>      list_del(&waiter.list);
>>      if (list_empty(&sem->wait_list))
>> +            atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
>> +                               &sem->count);
> If you split the line, this wants { }.

OK.

>>      raw_spin_unlock_irq(&sem->wait_lock);
>>      __set_current_state(TASK_RUNNING);
>>      lockevent_inc(rwsem_rlock_fail);
>> @@ -593,7 +672,7 @@ static inline struct rw_semaphore *
>>  __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
>>  {
>>      long count;
>> +    enum writer_wait_state wstate;
>>      struct rwsem_waiter waiter;
>>      struct rw_semaphore *ret = sem;
>>      DEFINE_WAKE_Q(wake_q);
>> @@ -608,56 +687,63 @@ __rwsem_down_write_failed_common(struct rw_semaphore 
>> *sem, int state)
>>       */
>>      waiter.task = current;
>>      waiter.type = RWSEM_WAITING_FOR_WRITE;
>> +    waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
>>  
>>      raw_spin_lock_irq(&sem->wait_lock);
>>  
>>      /* account for this before adding a new element to the list */
>> +    wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
>>  
>>      list_add_tail(&waiter.list, &sem->wait_list);
>>  
>>      /* we're now waiting on the lock */
>> +    if (wstate == WRITER_NOT_FIRST) {
>>              count = atomic_long_read(&sem->count);
>>  
>>              /*
>> +             * If there were already threads queued before us and:
>> +             *  1) there are no no active locks, wake the front
>> +             *     queued process(es) as the handoff bit might be set.
>> +             *  2) there are no active writers and some readers, the lock
>> +             *     must be read owned; so we try to wake any read lock
>> +             *     waiters that were queued ahead of us.
>>               */
>> +            if (!RWSEM_COUNT_LOCKED(count))
>> +                    __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
>> +            else if (!(count & RWSEM_WRITER_MASK) &&
>> +                      (count & RWSEM_READER_MASK))
>>                      __rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q);
> That RWSEM_WRITER_MASK is another layer of obfustaction we can do
> without.

The RWSEM_WRITER_MASK macro is added to prepare for the later patch that
merge owner into count where RWSEM_WRITER_LOCK will be different.

> Does the above want to be something like:
>
>               if (!(count & RWSEM_WRITER_LOCKED)) {
>                       __rwsem_mark_wake(sem, (count & RWSEM_READER_MASK) ?
>                                              RWSEM_WAKE_READERS :
>                                              RWSEM_WAKE_ANY, &wake_q);
>               }

Yes.

>> +            else
>> +                    goto wait;
>>  
>> +            /*
>> +             * The wakeup is normally called _after_ the wait_lock
>> +             * is released, but given that we are proactively waking
>> +             * readers we can deal with the wake_q overhead as it is
>> +             * similar to releasing and taking the wait_lock again
>> +             * for attempting rwsem_try_write_lock().
>> +             */
>> +            wake_up_q(&wake_q);
> Hurmph.. the reason we do wake_up_q() outside of wait_lock is such that
> those tasks don't bounce on wait_lock. Also, it removes a great deal of
> hold-time from wait_lock.
>
> So I'm not sure I buy your argument here.
>

Actually, we don't want to release the wait_lock, do wake_up_q() and
acquire the wait_lock again as the state would have been changed. I
didn't change the comment on this patch, but will reword it to discuss that.

>> +            /*
>> +             * Reinitialize wake_q after use.
>> +             */
> Or:
>               /* we need wake_q again below, reinitialize */
>

Sure.

>> +            wake_q_init(&wake_q);
>>      } else {
>>              count = atomic_long_add_return(RWSEM_FLAG_WAITERS, &sem->count);
>>      }
>>  
>> +wait:
>>      /* wait until we successfully acquire the lock */
>>      set_current_state(state);
>>      while (true) {
>> +            if (rwsem_try_write_lock(count, sem, wstate))
>>                      break;
>> +
>>              raw_spin_unlock_irq(&sem->wait_lock);
>>  
>>              /* Block until there are no active lockers. */
>> +            for (;;) {
>>                      if (signal_pending_state(state, current))
>>                              goto out_nolock;
>>  
>> @@ -665,9 +751,34 @@ __rwsem_down_write_failed_common(struct rw_semaphore 
>> *sem, int state)
>>                      lockevent_inc(rwsem_sleep_writer);
>>                      set_current_state(state);
>>                      count = atomic_long_read(&sem->count);
>> +
>> +                    if ((wstate == WRITER_NOT_FIRST) &&
>> +                        rwsem_waiter_is_first(sem, &waiter))
>> +                            wstate = WRITER_FIRST;
>> +
>> +                    if (!RWSEM_COUNT_LOCKED(count))
>> +                            break;
>> +
>> +                    /*
>> +                     * An RT task sets the HANDOFF bit immediately.
>> +                     * Non-RT task will wait a while before doing so.
> Again, this describes what we already read the code to do; but doesn't
> add anything.

Will remove that.

>> +                     *
>> +                     * The setting of the handoff bit is deferred
>> +                     * until rwsem_try_write_lock() is called.
>> +                     */
>> +                    if ((wstate == WRITER_FIRST) && (rt_task(current) ||
>> +                        time_after(jiffies, waiter.timeout))) {
>> +                            wstate = WRITER_HANDOFF;
>> +                            lockevent_inc(rwsem_wlock_handoff);
>> +                            /*
>> +                             * Break out to call rwsem_try_write_lock().
>> +                             */
> Another exceedingly useful comment.
>
>> +                            break;
>> +                    }
>> +            }
>>  
>>              raw_spin_lock_irq(&sem->wait_lock);
>> +            count = atomic_long_read(&sem->count);
>>      }
>>      __set_current_state(TASK_RUNNING);
>>      list_del(&waiter.list);
>> @@ -680,6 +791,12 @@ __rwsem_down_write_failed_common(struct rw_semaphore 
>> *sem, int state)
>>      __set_current_state(TASK_RUNNING);
>>      raw_spin_lock_irq(&sem->wait_lock);
>>      list_del(&waiter.list);
>> +    /*
>> +     * If handoff bit has been set by this waiter, make sure that the
>> +     * clearing of it is seen by others before proceeding.
>> +     */
>> +    if (unlikely(wstate == WRITER_HANDOFF))
>> +            atomic_long_add_return(-RWSEM_FLAG_HANDOFF,  &sem->count);
> _AGAIN_ no explanation what so ff'ing ever.
>
> And why add_return() if you ignore the return value.
>

OK, will remove those.

>>      if (list_empty(&sem->wait_list))
>>              atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
> And you could've easily combined the two flags in a single andnot op.

That is true, but the nolock case is rarely executed. That is why I opt
for simplicity than more complicated but faster code.

Cheers,
Longman

Reply via email to