Currently a thread sleeping on a mutex wait queue can be delayed indefinitely by other threads managing to steal the lock, that is acquiring the lock out-of-order before the sleepers. I noticed this via a testcase (see the Reference: below) where one CPU was unlocking / relocking a mutex in a tight loop while another CPU was delayed indefinitely trying to wake up and get the lock but losing out to the first CPU and going back to sleep:
CPU0: CPU1: mutex_lock->acquire mutex_lock->sleep mutex_unlock->wake CPU1 wakeup mutex_lock->acquire trylock fail->sleep mutex_unlock->wake CPU1 wakeup mutex_lock->acquire trylock fail->sleep ... ... To fix this we can make sure that CPU1 makes progress by avoiding the fastpath locking, optimistic spinning and trylocking if there is any waiter on the list. The corresponding check can be done without holding wait_lock, since the goal is only to make sure sleepers make progress and not to guarantee that the locking will happen in FIFO order. CC: Peter Zijlstra <pet...@infradead.org> CC: Ingo Molnar <mi...@redhat.com> CC: Chris Wilson <ch...@chris-wilson.co.uk> CC: Daniel Vetter <daniel.vet...@intel.com> CC: Ville Syrjälä <ville.syrj...@linux.intel.com> Reference: https://bugs.freedesktop.org/show_bug.cgi?id=96701 Signed-off-by: Imre Deak <imre.d...@intel.com> --- include/linux/mutex.h | 5 +++++ kernel/locking/mutex.c | 33 +++++++++++++++++++++------------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/include/linux/mutex.h b/include/linux/mutex.h index 2cb7531..562dfa8 100644 --- a/include/linux/mutex.h +++ b/include/linux/mutex.h @@ -130,6 +130,11 @@ static inline int mutex_is_locked(struct mutex *lock) return atomic_read(&lock->count) != 1; } +static inline int mutex_has_waiters(struct mutex *lock) +{ + return !list_empty(&lock->wait_list); +} + /* * See kernel/locking/mutex.c for detailed documentation of these APIs. * Also see Documentation/locking/mutex-design.txt. diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c index a70b90d..d18b531 100644 --- a/kernel/locking/mutex.c +++ b/kernel/locking/mutex.c @@ -276,7 +276,7 @@ static inline int mutex_can_spin_on_owner(struct mutex *lock) */ static inline bool mutex_try_to_acquire(struct mutex *lock) { - return !mutex_is_locked(lock) && + return !mutex_is_locked(lock) && !mutex_has_waiters(lock) && (atomic_cmpxchg_acquire(&lock->count, 1, 0) == 1); } @@ -520,7 +520,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, preempt_disable(); mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip); - if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { + if (!mutex_has_waiters(lock) && + mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) { /* got the lock, yay! */ preempt_enable(); return 0; @@ -532,7 +533,7 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, * Once more, try to acquire the lock. Only try-lock the mutex if * it is unlocked to reduce unnecessary xchg() operations. */ - if (!mutex_is_locked(lock) && + if (!mutex_is_locked(lock) && !mutex_has_waiters(lock) && (atomic_xchg_acquire(&lock->count, 0) == 1)) goto skip_wait; @@ -556,7 +557,8 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass, * other waiters. We only attempt the xchg if the count is * non-negative in order to avoid unnecessary xchg operations: */ - if (atomic_read(&lock->count) >= 0 && + if (lock->wait_list.next == &waiter.list && + atomic_read(&lock->count) >= 0 && (atomic_xchg_acquire(&lock->count, -1) == 1)) break; @@ -789,10 +791,11 @@ __mutex_lock_interruptible_slowpath(struct mutex *lock); */ int __sched mutex_lock_interruptible(struct mutex *lock) { - int ret; + int ret = -1; might_sleep(); - ret = __mutex_fastpath_lock_retval(&lock->count); + if (!mutex_has_waiters(lock)) + ret = __mutex_fastpath_lock_retval(&lock->count); if (likely(!ret)) { mutex_set_owner(lock); return 0; @@ -804,10 +807,11 @@ EXPORT_SYMBOL(mutex_lock_interruptible); int __sched mutex_lock_killable(struct mutex *lock) { - int ret; + int ret = -1; might_sleep(); - ret = __mutex_fastpath_lock_retval(&lock->count); + if (!mutex_has_waiters(lock)) + ret = __mutex_fastpath_lock_retval(&lock->count); if (likely(!ret)) { mutex_set_owner(lock); return 0; @@ -905,6 +909,9 @@ int __sched mutex_trylock(struct mutex *lock) { int ret; + if (mutex_has_waiters(lock)) + return 0; + ret = __mutex_fastpath_trylock(&lock->count, __mutex_trylock_slowpath); if (ret) mutex_set_owner(lock); @@ -917,11 +924,12 @@ EXPORT_SYMBOL(mutex_trylock); int __sched __ww_mutex_lock(struct ww_mutex *lock, struct ww_acquire_ctx *ctx) { - int ret; + int ret = -1; might_sleep(); - ret = __mutex_fastpath_lock_retval(&lock->base.count); + if (!mutex_has_waiters(lock)) + ret = __mutex_fastpath_lock_retval(&lock->base.count); if (likely(!ret)) { ww_mutex_set_context_fastpath(lock, ctx); @@ -935,11 +943,12 @@ EXPORT_SYMBOL(__ww_mutex_lock); int __sched __ww_mutex_lock_interruptible(struct ww_mutex *lock, struct ww_acquire_ctx *ctx) { - int ret; + int ret = -1; might_sleep(); - ret = __mutex_fastpath_lock_retval(&lock->base.count); + if (!mutex_has_waiters(lock)) + ret = __mutex_fastpath_lock_retval(&lock->base.count); if (likely(!ret)) { ww_mutex_set_context_fastpath(lock, ctx); -- 2.5.0