On Sun, Apr 21, 2019 at 05:07:56PM -0400, Waiman Long wrote: > How about the following chunks to disable preemption temporarily for the > increment-check-decrement sequence? > > diff --git a/include/linux/preempt.h b/include/linux/preempt.h > index dd92b1a93919..4cc03ac66e13 100644 > --- a/include/linux/preempt.h > +++ b/include/linux/preempt.h > @@ -250,6 +250,8 @@ do { \ > #define preempt_enable_notrace() barrier() > #define preemptible() 0 > > +#define __preempt_disable_nop /* preempt_disable() is nop */ > + > #endif /* CONFIG_PREEMPT_COUNT */ > > #ifdef MODULE > diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c > index 043fd29b7534..54029e6af17b 100644 > --- a/kernel/locking/rwsem.c > +++ b/kernel/locking/rwsem.c > @@ -256,11 +256,64 @@ static inline struct task_struct > *rwsem_get_owner(struct r > return (struct task_struct *) (cowner > ? cowner | (sowner & RWSEM_NONSPINNABLE) : sowner); > } > + > +/* > + * If __preempt_disable_nop is defined, calling preempt_disable() and > + * preempt_enable() directly is the most efficient way. Otherwise, it may > + * be more efficient to disable and enable interrupt instead for disabling > + * preemption tempoarily. > + */ > +#ifdef __preempt_disable_nop > +#define disable_preemption() preempt_disable() > +#define enable_preemption() preempt_enable() > +#else > +#define disable_preemption() local_irq_disable() > +#define enable_preemption() local_irq_enable() > +#endif
I'm not aware of an architecture where disabling interrupts is faster than disabling preemption. > +/* > + * When the owner task structure pointer is merged into couunt, less bits > + * will be available for readers. Therefore, there is a very slight chance > + * that the reader count may overflow. We try to prevent that from > happening > + * by checking for the MS bit of the count and failing the trylock attempt > + * if this bit is set. > + * > + * With preemption enabled, there is a remote possibility that preemption > + * can happen in the narrow timing window between incrementing and > + * decrementing the reader count and the task is put to sleep for a > + * considerable amount of time. If sufficient number of such unfortunate > + * sequence of events happen, we may still overflow the reader count. > + * To avoid such possibility, we have to disable preemption for the > + * whole increment-check-decrement sequence. > + * > + * The function returns true if there are too many readers and the count > + * has already been properly decremented so the reader must go directly > + * into the wait list. > + */ > +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt) > +{ > + bool wait = false; /* Wait now flag */ > + > + disable_preemption(); > + *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, > &sem->count); > + if (unlikely(*cnt < 0)) { > + atomic_long_add(-RWSEM_READER_BIAS, &sem->count); > + wait = true; > + } > + enable_preemption(); > + return wait; > +} > #else /* !CONFIG_RWSEM_OWNER_COUNT */ This also means you have to ensure CONFIG_NR_CPUS < 32K for RWSEM_OWNER_COUNT. > static inline struct task_struct *rwsem_get_owner(struct rw_semaphore *sem) > { > return READ_ONCE(sem->owner); > } > + > +static inline bool rwsem_read_trylock(struct rw_semaphore *sem, long *cnt) > +{ > + *cnt = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, > &sem->count); > + return false; > +} > #endif /* CONFIG_RWSEM_OWNER_COUNT */ > > /* > @@ -981,32 +1034,18 @@ static inline void clear_wr_nonspinnable(struct > rw_semaph > * Wait for the read lock to be granted > */ > static struct rw_semaphore __sched * > -rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, long count) > +rwsem_down_read_slowpath(struct rw_semaphore *sem, int state, const > bool wait) > { > - long adjustment = -RWSEM_READER_BIAS; > + long count, adjustment = -RWSEM_READER_BIAS; > bool wake = false; > struct rwsem_waiter waiter; > DEFINE_WAKE_Q(wake_q); > > - if (unlikely(count < 0)) { > + if (unlikely(wait)) { > /* > - * The sign bit has been set meaning that too many active > - * readers are present. We need to decrement reader count & > - * enter wait queue immediately to avoid overflowing the > - * reader count. > - * > - * As preemption is not disabled, there is a remote > - * possibility that preemption can happen in the narrow > - * timing window between incrementing and decrementing > - * the reader count and the task is put to sleep for a > - * considerable amount of time. If sufficient number > - * of such unfortunate sequence of events happen, we > - * may still overflow the reader count. It is extremely > - * unlikey, though. If this is a concern, we should consider > - * disable preemption during this timing window to make > - * sure that such unfortunate event will not happen. > + * The reader count has already been decremented and the > + * reader should go directly into the wait list now. > */ > - atomic_long_add(-RWSEM_READER_BIAS, &sem->count); > adjustment = 0; > goto queue; > } > @@ -1358,11 +1397,12 @@ static struct rw_semaphore > *rwsem_downgrade_wake(struct > */ > inline void __down_read(struct rw_semaphore *sem) > { > - long tmp = atomic_long_fetch_add_acquire(RWSEM_READER_BIAS, > - &sem->count); > + long tmp; > + bool wait; > > + wait = rwsem_read_trylock(sem, &tmp); > if (unlikely(tmp & RWSEM_READ_FAILED_MASK)) { > - rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, tmp); > + rwsem_down_read_slowpath(sem, TASK_UNINTERRUPTIBLE, wait); > DEBUG_RWSEMS_WARN_ON(!is_rwsem_reader_owned(sem), sem); > } else { > rwsem_set_reader_owned(sem); I think I prefer that function returning/taking the bias/adjustment value instead of a bool, if it is all the same.