In general, the spinlock critical section is typically small enough
that once a CPU acquires a spinlock, it should be able to release the
lock in a reasonably short time. Waiting for the lock, however, can
take a while depending on how many CPUs are contending for the lock.

The exception here is when the lock holder needs to acquire another
spinlock. In this case, it may need to hold the spinlock for a
relatively long time. This can be problematic for RT tasks as it can
cause lock inversion problem where a lower priority task is holding
back a higher priority task.

The maximum level of spinlock nesting in the Linux kernel is 2, and all
those nested spinlock calls are annotated by the spin_lock_nested()
or its variants for lock dependency check. As a result, we can
actually override those APIs with a special version of RT spinlock
function _rt_raw_spinlock_nested() that can solve the lock inversion
problem. This is done by passing in the outer lock to the nested
spinlock call to the RT spinlock function which can query the priority
of the highest priority task that is waiting on the outer lock. It can
then boost its priority in the inner lock waiting accordingly. If the
outer lock isn't provided, the function default to a RT priority of 1.

Two new APIs are provided to allow the passing of the required outer
lock parameter:

 1) spin_lock_nested2(lock, subclass, nest_lock)
 2) spin_lock_irqsave_nested2(lock, flags, subclass, nest_lock)

There are about 70 call sites in the kernel that use spin_lock_nested()
or its variants. They can be modified later on to use the new APIs.

Signed-off-by: Waiman Long <long...@redhat.com>
---
 include/linux/spinlock.h         |  37 +++++++++-
 include/linux/spinlock_api_smp.h |   7 ++
 kernel/locking/qspinlock_rt.h    | 143 +++++++++++++++++++++++++++++++--------
 3 files changed, 156 insertions(+), 31 deletions(-)

diff --git a/include/linux/spinlock.h b/include/linux/spinlock.h
index 59248dc..7cd71ab 100644
--- a/include/linux/spinlock.h
+++ b/include/linux/spinlock.h
@@ -177,7 +177,15 @@ static inline void do_raw_spin_unlock(raw_spinlock_t 
*lock) __releases(lock)
 
 #define raw_spin_lock(lock)    _raw_spin_lock(lock)
 
-#ifdef CONFIG_DEBUG_LOCK_ALLOC
+#ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
+# define raw_spin_lock_nested(lock, subclass)          \
+       _rt_raw_spin_lock_nested(lock, subclass, NULL)
+# define raw_spin_lock_nested2(lock, subclass, nest_lock)\
+       _rt_raw_spin_lock_nested(lock, subclass, nest_lock)
+# define raw_spin_lock_nest_lock(lock, nest_lock)      \
+       _rt_raw_spin_lock_nested(lock, 0, &(nest_lock)->rlock)
+
+#elif defined(CONFIG_DEBUG_LOCK_ALLOC)
 # define raw_spin_lock_nested(lock, subclass) \
        _raw_spin_lock_nested(lock, subclass)
 
@@ -205,7 +213,16 @@ static inline void do_raw_spin_unlock(raw_spinlock_t 
*lock) __releases(lock)
                flags = _raw_spin_lock_irqsave(lock);   \
        } while (0)
 
-#ifdef CONFIG_DEBUG_LOCK_ALLOC
+#ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
+#define raw_spin_lock_irqsave_nested(lock, flags, subclass)             \
+       raw_spin_lock_irqsave_nested2(lock, flags, subclass, NULL)
+#define raw_spin_lock_irqsave_nested2(lock, flags, subclass, nest_lock)        
 \
+       do {                                                             \
+               typecheck(unsigned long, flags);                         \
+               flags = _rt_raw_spin_lock_irqsave_nested(lock, subclass, \
+                                                        nest_lock);     \
+       } while (0)
+#elif defined(CONFIG_DEBUG_LOCK_ALLOC)
 #define raw_spin_lock_irqsave_nested(lock, flags, subclass)            \
        do {                                                            \
                typecheck(unsigned long, flags);                        \
@@ -232,6 +249,15 @@ static inline void do_raw_spin_unlock(raw_spinlock_t 
*lock) __releases(lock)
 
 #endif
 
+#ifndef raw_spin_lock_nested2
+#define raw_spin_lock_nested2(lock, subclass, nest_lock)               \
+       raw_spin_lock_nested(lock, subclass)
+#endif
+#ifndef raw_spin_lock_irqsave_nested2
+#define raw_spin_lock_irqsave_nested2(lock, flags, subclass, nest_lock)        
\
+       raw_spin_lock_irqsave_nested(lock, flags, subclass)
+#endif
+
 #define raw_spin_lock_irq(lock)                _raw_spin_lock_irq(lock)
 #define raw_spin_lock_bh(lock)         _raw_spin_lock_bh(lock)
 #define raw_spin_unlock(lock)          _raw_spin_unlock(lock)
@@ -314,6 +340,10 @@ static __always_inline int spin_trylock(spinlock_t *lock)
        raw_spin_lock_nested(spinlock_check(lock), subclass);   \
 } while (0)
 
+#define spin_lock_nested2(lock, subclass, nest_lock)           \
+       raw_spin_lock_nested2(spinlock_check(lock), subclass,   \
+                             spinlock_check(nest_lock))
+
 #define spin_lock_nest_lock(lock, nest_lock)                           \
 do {                                                                   \
        raw_spin_lock_nest_lock(spinlock_check(lock), nest_lock);       \
@@ -334,6 +364,9 @@ static __always_inline void spin_lock_irq(spinlock_t *lock)
        raw_spin_lock_irqsave_nested(spinlock_check(lock), flags, subclass); \
 } while (0)
 
+#define spin_lock_irqsave_nested2(lock, flags, subclass, nest_lock)    \
+       raw_spin_lock_irqsave_nested2(spinlock_check(lock), flags, subclass)
+
 static __always_inline void spin_unlock(spinlock_t *lock)
 {
        raw_spin_unlock(&lock->rlock);
diff --git a/include/linux/spinlock_api_smp.h b/include/linux/spinlock_api_smp.h
index 42dfab8..57b9585 100644
--- a/include/linux/spinlock_api_smp.h
+++ b/include/linux/spinlock_api_smp.h
@@ -43,6 +43,13 @@ unsigned long __lockfunc 
_raw_spin_lock_irqsave(raw_spinlock_t *lock)
 _raw_spin_unlock_irqrestore(raw_spinlock_t *lock, unsigned long flags)
                                                                
__releases(lock);
 
+#ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
+void __lockfunc _rt_raw_spin_lock_nested(raw_spinlock_t *lock, int subclass,
+               raw_spinlock_t *outerlock)               __acquires(lock);
+unsigned long __lockfunc _rt_raw_spin_lock_irqsave_nested(raw_spinlock_t *lock,
+               int subclass, raw_spinlock_t *outerlock) __acquires(lock);
+#endif
+
 #ifdef CONFIG_INLINE_SPIN_LOCK
 #define _raw_spin_lock(lock) __raw_spin_lock(lock)
 #endif
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
index b6289fb..c2f2722 100644
--- a/kernel/locking/qspinlock_rt.h
+++ b/kernel/locking/qspinlock_rt.h
@@ -26,9 +26,24 @@
  *  1) Soft IRQ = 1
  *  2) Hard IRQ = MAX_RT_PRIO
  *  3) NMI     = MAX_RT_PRIO+1
+ *
+ * The only additional resource that a spinlock holder may need to wait for
+ * before completing a lock critical section is another spinlock. The maximum
+ * level of spinlock nesting that is currently supported is 2. All those
+ * nested spinlock operations are annotated by spin_lock_nested() or its
+ * variants. There are currently about 70 instances of those nested spinlock
+ * calls in the kernel. These call sites can be modified to pass in the
+ * outer lock like what is done in the spin_lock_nest_lock() variant. In
+ * doing so, we can query the highest priority task that is waiting on the
+ * outer lock and adjust our waiting priority accordingly. To speed up nested
+ * spinlock calls, they will have a minimum RT priority of 1 to begin with.
  */
 #include <linux/sched.h>
 
+#ifndef MAX
+#define MAX(a, b)      (((a) >= (b)) ? (a) : (b))
+#endif
+
 /*
  * =========================[ Helper Functions ]=========================
  */
@@ -36,46 +51,36 @@
 /*
  * Translate the priority of a task to an equivalent RT priority
  */
-static u8 rt_task_priority(struct task_struct *task)
+static u8 rt_task_priority(struct task_struct *task, u8 min_prio)
 {
-       int prio = READ_ONCE(task->prio);
+       int prio;
 
-       return (prio >= MAX_RT_PRIO) ? 0 : (u8)(MAX_RT_PRIO - prio);
-}
-
-/*
- * ====================[ Functions Used by qspinlock.c ]====================
- */
+       if (!task)
+               return min_prio;
 
-static inline bool rt_enabled(void)
-{
-       return true;
+       prio = READ_ONCE(task->prio);
+       return (u8)MAX((prio >= MAX_RT_PRIO) ? 0 : MAX_RT_PRIO - prio,
+                       min_prio);
 }
 
 /*
- * Return the pending byte portion of the integer value of the lock.
+ * Return: true if locked acquired via RT spinning.
+ *        false if need to go into MCS wait queue.
  */
-static inline int rt_pending(int val)
-{
-       return val & _Q_PENDING_MASK;
-}
-
-/*
- * Return: true if locked acquired
- *        false if queuing in the MCS wait queue is needed.
- */
-static bool rt_spin_trylock(struct qspinlock *lock)
+static bool __rt_spin_trylock(struct qspinlock *lock,
+                             struct qspinlock *outerlock, u8 min_prio)
 {
        struct __qspinlock *l = (void *)lock;
+       struct __qspinlock *ol = (void *)outerlock;
        struct task_struct *task = in_interrupt() ? NULL : current;
-       u8 prio;
+       u8 prio, mypdprio = 0;
 
        BUILD_BUG_ON(_Q_PENDING_BITS != 8);
 
        if (!task)
-               prio = in_nmi() ? MAX_RT_PRIO + 1
-                    : in_irq() ? MAX_RT_PRIO : 1;
-       else if (!(prio = rt_task_priority(task)))
+               min_prio = in_nmi() ? MAX_RT_PRIO + 1
+                        : in_irq() ? MAX_RT_PRIO : 1;
+       if (!(prio = rt_task_priority(task, min_prio)))
                return false;
 
        /*
@@ -96,7 +101,7 @@ static bool rt_spin_trylock(struct qspinlock *lock)
 
                if (!(lockpend & _Q_LOCKED_MASK)) {
                        u16 old = lockpend;
-                       u16 new = (pdprio == prio)
+                       u16 new = (pdprio == mypdprio)
                                ? _Q_LOCKED_VAL : (lockpend | _Q_LOCKED_VAL);
 
                        /*
@@ -112,15 +117,56 @@ static bool rt_spin_trylock(struct qspinlock *lock)
                        pdprio = (u8)(lockpend >> _Q_PENDING_OFFSET);
                }
 
-               if (pdprio < prio)
-                       cmpxchg_relaxed(&l->pending, pdprio, prio);
+               if (pdprio < prio) {
+                       /*
+                        * As the RT priority can increase dynamically, we
+                        * need to keep track of what priority value has
+                        * been set in the pending byte of the lock.
+                        */
+                       if (cmpxchg_relaxed(&l->pending, pdprio, prio)
+                                       == pdprio)
+                               mypdprio = prio;
+               }
 next:
                cpu_relax();
+
+               /*
+                * Recompute pending priority
+                */
+               prio = MAX(ol ? ol->pending : 0,
+                          rt_task_priority(task, min_prio));
+
        }
        return true;
 }
 
 /*
+ * ====================[ Functions Used by qspinlock.c ]====================
+ */
+
+static inline bool rt_enabled(void)
+{
+       return true;
+}
+
+/*
+ * Return the pending byte portion of the integer value of the lock.
+ */
+static inline int rt_pending(int val)
+{
+       return val & _Q_PENDING_MASK;
+}
+
+/*
+ * Return: true if locked acquired
+ *        false if queuing in the MCS wait queue is needed.
+ */
+static inline bool rt_spin_trylock(struct qspinlock *lock)
+{
+       return __rt_spin_trylock(lock, NULL, 0);
+}
+
+/*
  * We need to make the non-RT tasks wait longer if RT tasks are spinning for
  * the lock. This is done to reduce the chance that a non-RT task may
  * accidently grab the lock away from the RT tasks in the short interval
@@ -154,3 +200,42 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
        }
        return atomic_read(&lock->val);
 }
+
+/*
+ * =================[ Exported Nested Spinlock Functions ]=================
+ */
+
+/*
+ * For nested spinlocks, we give it a minimum RT priority of 1. If the
+ * outerlock is specified, it will boost its priority if the priority of
+ * the highest waiting task in the outer lock is larger than itself.
+ */
+void __lockfunc _rt_raw_spin_lock_nested(raw_spinlock_t *lock, int subclass,
+               raw_spinlock_t *outerlock) __acquires(lock)
+{
+       preempt_disable();
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+       if (subclass) {
+               spin_acquire(&lock->dep_map, subclass, 0, _RET_IP_);
+       } else {
+               type_check(struct lockdep_map *, &outerlock->dep_map);
+               spin_acquire_nest(&lock->dep_map, 0, &outerlock->dep_map,
+                                 _RET_IP_);
+       }
+#endif
+       __acquire(lock);
+       __rt_spin_trylock(&lock->raw_lock,
+                         outerlock ? &outerlock->raw_lock : NULL, 1);
+}
+EXPORT_SYMBOL(_rt_raw_spin_lock_nested);
+
+unsigned long __lockfunc _rt_raw_spin_lock_irqsave_nested(raw_spinlock_t *lock,
+               int subclass, raw_spinlock_t *outerlock) __acquires(lock)
+{
+       unsigned long flags;
+
+       local_irq_save(flags);
+       _rt_raw_spin_lock_nested(lock, subclass, outerlock);
+       return flags;
+}
+EXPORT_SYMBOL(_rt_raw_spin_lock_irqsave_nested);
-- 
1.8.3.1

Reply via email to