When using futex as a condition variable, for example: to manage a
threadpool, there may be a lot of threads inside the futex_wait to sleep on
this futex. The futex_hash_bucket consists therefore of many struct futex_q
for the same futex.

On bad luck another futex, used as mutex, hashed into the same bucket.
Every futex_wake on this mutex, has to scan the whole chain of above waiter
to find the futex_q for this mutex. For non-unusual threadpool sizes
of more than 20, this should be a considerable effort.

I therefore suggest to include in the hash-bucketchain only one futex_q per
futex and to queue additional waiter in an extrachain at the 'top' futex_q
entry. Thus different futex are isolated from each other, the cost of a
hash collision is reduced.

To show the idea, I added a sample patch. Here, the plist is exchanged for
a futex-specific implementation. kernel/fplist.h is certainly not not the
right place.

Signed-off-by: Gerd Gerats <gerd.ger...@gmx.de>
---
 kernel/fplist.h | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 kernel/futex.c  | 105 ++++++++++++++++++++------------------
 2 files changed, 208 insertions(+), 50 deletions(-)
 create mode 100644 kernel/fplist.h

diff --git a/kernel/fplist.h b/kernel/fplist.h
new file mode 100644
index 000000000000..262abe70b31c
--- /dev/null
+++ b/kernel/fplist.h
@@ -0,0 +1,153 @@
+#ifndef _LINUX_FPLIST_H_
+#define _LINUX_FPLIST_H_
+
+/*
+ * futex specific priority-sorted list
+ *
+ * based on include/linux/plist.h
+ *
+ * Simple ASCII art explanation:
+ *
+ * fl:futx_list
+ * pl:prio_list
+ * nl:node_list
+ *
+ * +------+
+ * |      v
+ * |     |fl|        HEAD
+ * |      ^
+ * |      |
+ * |      v
+ * |  +--------+
+ * |  +->|pl|<-+
+ * |     |10|   (prio)
+ * |     |  |
+ * |  +->|nl|<-+
+ * |  +--------+
+ * |      ^
+ * |      |
+ * |      v
+ * |  +------------------------------------+
+ * |  +->|pl|<->|pl|<--------------->|pl|<-+
+ * |     |10|   |21|   |21|   |21|   |40|   (prio)
+ * |     |  |   |  |   |  |   |  |   |  |
+ * |  +->|nl|<->|nl|<->|nl|<->|nl|<->|nl|<-+
+ * |  +------------------------------------+
+ * |      ^
+ * +------+
+ */
+
+#include <linux/kernel.h>
+#include <linux/list.h>
+
+struct fplist_head {
+       struct list_head        futx_list;
+};
+
+struct fplist_node {
+       int                     prio;
+       struct list_head        futx_list;      // top level futex list
+       // per futex list
+       struct list_head        prio_list;
+       struct list_head        node_list;
+};
+
+#define FPLIST_HEAD_INIT(head)                         \
+{                                                      \
+       .futx_list = LIST_HEAD_INIT((head).futx_list)   \
+}
+
+#define FPLIST_HEAD(head) \
+       struct fplist_head head = FPLIST_HEAD_INIT(head)
+
+static inline bool fplist_node_empty(struct fplist_node *node)
+{
+       return list_empty(&node->node_list) && list_empty(&node->futx_list);
+}
+
+static inline void fplist_head_init(struct fplist_head *head)
+{
+       INIT_LIST_HEAD(&head->futx_list);
+}
+
+static void fplist_del(struct fplist_node *node, struct fplist_head *head)
+{
+       struct fplist_node *next;
+
+       if (list_empty(&node->node_list)) {
+               /* last/only in ring, remove from futex ring */
+               list_del_init(&node->futx_list);
+               return;
+       }
+       next = list_entry(node->node_list.next,
+                       struct fplist_node, node_list);
+       if (!list_empty(&node->prio_list)) {
+               /* add the next node into prio_list */
+               if (list_empty(&next->prio_list))
+                       list_add(&next->prio_list, &node->prio_list);
+               list_del_init(&node->prio_list);
+       }
+       if (!list_empty(&node->futx_list))
+               list_replace_init(&node->futx_list, &next->futx_list);
+       list_del_init(&node->node_list);
+}
+
+static void fplist_add_first(struct fplist_node *node, struct fplist_head 
*head)
+{
+       list_add_tail(&node->futx_list, &head->futx_list);
+       INIT_LIST_HEAD(&node->prio_list);
+       INIT_LIST_HEAD(&node->node_list);
+}
+
+static void
+fplist_add_insert(struct fplist_node *node, struct fplist_node *top,
+               struct fplist_head *head)
+{
+       struct fplist_node *iter, *prev = NULL;
+       struct list_head *node_next = &top->node_list;
+
+       INIT_LIST_HEAD(&node->futx_list);
+       INIT_LIST_HEAD(&node->prio_list);
+
+       iter = top;
+
+       do {
+               if (node->prio < iter->prio) {
+                       node_next = &iter->node_list;
+                       break;
+               }
+
+               prev = iter;
+               iter = list_entry(iter->prio_list.next,
+                               struct fplist_node, prio_list);
+       } while (iter != top);
+
+       if (!prev || prev->prio != node->prio)
+               list_add_tail(&node->prio_list, &iter->prio_list);
+       if (!prev)
+               list_replace_init(&top->futx_list, &node->futx_list);
+       list_add_tail(&node->node_list, node_next);
+}
+
+#define fplist_for_each_entry(pos, head, member)       \
+        list_for_each_entry(pos, &(head)->futx_list, member.futx_list)
+
+static inline struct fplist_node*
+___fplist_for_each_next(struct fplist_node *node) {
+       struct fplist_node *next;
+
+       next = list_entry(node->node_list.next, struct fplist_node, node_list);
+       return list_empty(&next->futx_list) ? next : 0;
+}
+
+#define __fplist_for_each_next(pos, member) ({ \
+       struct fplist_node *next__ = ___fplist_for_each_next(&(pos)->member); \
+       next__ ? list_entry(next__, typeof(*(pos)), member) : 0; \
+       })
+
+#define fplist_for_each_entry_safe(pos, n, top, member) \
+       for (pos = (top); \
+               (pos && ((n = __fplist_for_each_next(pos, member)), 1)); \
+               pos = n)
+
+#endif
diff --git a/kernel/futex.c b/kernel/futex.c
index 357348a6cf6b..a594f41c2625 100644
--- a/kernel/futex.c
+++ b/kernel/futex.c
@@ -72,6 +72,8 @@
 
 #include "locking/rtmutex_common.h"
 
+#include "fplist.h"
+
 /*
  * READ this before attempting to hack on futexes!
  *
@@ -229,7 +231,7 @@ struct futex_pi_state {
  * we can wake only the relevant ones (hashed queues may be shared).
  *
  * A futex_q has a woken state, just like tasks have TASK_RUNNING.
- * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0.
+ * It is considered woken when fplist_node_empty(&q->list) || q->lock_ptr == 0.
  * The order of wakeup is always to make the first condition true, then
  * the second.
  *
@@ -237,7 +239,7 @@ struct futex_pi_state {
  * the rt_mutex code. See unqueue_me_pi().
  */
 struct futex_q {
-       struct plist_node list;
+       struct fplist_node list;
 
        struct task_struct *task;
        spinlock_t *lock_ptr;
@@ -262,7 +264,7 @@ static const struct futex_q futex_q_init = {
 struct futex_hash_bucket {
        atomic_t waiters;
        spinlock_t lock;
-       struct plist_head chain;
+       struct fplist_head chain;
 } ____cacheline_aligned_in_smp;
 
 /*
@@ -745,13 +747,26 @@ static struct futex_q *futex_top_waiter(struct 
futex_hash_bucket *hb,
 {
        struct futex_q *this;
 
-       plist_for_each_entry(this, &hb->chain, list) {
+       fplist_for_each_entry(this, &hb->chain, list) {
                if (match_futex(&this->key, key))
                        return this;
        }
        return NULL;
 }
 
+static void futex_add(struct futex_q *q, struct futex_hash_bucket *hb)
+{
+       struct futex_q *top = futex_top_waiter(hb, &q->key);
+
+       if (top)
+               fplist_add_insert(&q->list, &top->list, &hb->chain);
+       else
+               fplist_add_first(&q->list, &hb->chain);
+}
+
+#define futex_for_each_match_entry_safe(pos, n, hb, key) \
+       fplist_for_each_entry_safe(pos, n, futex_top_waiter(hb, key), list)
+
 static int cmpxchg_futex_value_locked(u32 *curval, u32 __user *uaddr,
                                      u32 uval, u32 newval)
 {
@@ -1352,11 +1367,11 @@ static void __unqueue_futex(struct futex_q *q)
        struct futex_hash_bucket *hb;
 
        if (WARN_ON_SMP(!q->lock_ptr || !spin_is_locked(q->lock_ptr))
-           || WARN_ON(plist_node_empty(&q->list)))
+           || WARN_ON(fplist_node_empty(&q->list)))
                return;
 
        hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock);
-       plist_del(&q->list, &hb->chain);
+       fplist_del(&q->list, &hb->chain);
        hb_waiters_dec(hb);
 }
 
@@ -1384,7 +1399,7 @@ static void mark_wake_futex(struct wake_q_head *wake_q, 
struct futex_q *q)
         * is written, without taking any locks. This is possible in the event
         * of a spurious wakeup, for example. A memory barrier is required here
         * to prevent the following store to lock_ptr from getting ahead of the
-        * plist_del in __unqueue_futex().
+        * fplist_del in __unqueue_futex().
         */
        smp_store_release(&q->lock_ptr, NULL);
 }
@@ -1521,21 +1536,19 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int 
nr_wake, u32 bitset)
 
        spin_lock(&hb->lock);
 
-       plist_for_each_entry_safe(this, next, &hb->chain, list) {
-               if (match_futex (&this->key, &key)) {
-                       if (this->pi_state || this->rt_waiter) {
-                               ret = -EINVAL;
-                               break;
-                       }
+       futex_for_each_match_entry_safe(this, next, hb, &key) {
+               if (this->pi_state || this->rt_waiter) {
+                       ret = -EINVAL;
+                       break;
+               }
 
-                       /* Check if one of the bits is set in both bitsets */
-                       if (!(this->bitset & bitset))
-                               continue;
+               /* Check if one of the bits is set in both bitsets */
+               if (!(this->bitset & bitset))
+                       continue;
 
-                       mark_wake_futex(&wake_q, this);
-                       if (++ret >= nr_wake)
-                               break;
-               }
+               mark_wake_futex(&wake_q, this);
+               if (++ret >= nr_wake)
+                       break;
        }
 
        spin_unlock(&hb->lock);
@@ -1604,30 +1617,26 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, 
u32 __user *uaddr2,
                goto retry;
        }
 
-       plist_for_each_entry_safe(this, next, &hb1->chain, list) {
-               if (match_futex (&this->key, &key1)) {
-                       if (this->pi_state || this->rt_waiter) {
-                               ret = -EINVAL;
-                               goto out_unlock;
-                       }
-                       mark_wake_futex(&wake_q, this);
-                       if (++ret >= nr_wake)
-                               break;
+       futex_for_each_match_entry_safe(this, next, hb1, &key1) {
+               if (this->pi_state || this->rt_waiter) {
+                       ret = -EINVAL;
+                       goto out_unlock;
                }
+               mark_wake_futex(&wake_q, this);
+               if (++ret >= nr_wake)
+                       break;
        }
 
        if (op_ret > 0) {
                op_ret = 0;
-               plist_for_each_entry_safe(this, next, &hb2->chain, list) {
-                       if (match_futex (&this->key, &key2)) {
-                               if (this->pi_state || this->rt_waiter) {
-                                       ret = -EINVAL;
-                                       goto out_unlock;
-                               }
-                               mark_wake_futex(&wake_q, this);
-                               if (++op_ret >= nr_wake2)
-                                       break;
+               futex_for_each_match_entry_safe(this, next, hb2, &key2) {
+                       if (this->pi_state || this->rt_waiter) {
+                               ret = -EINVAL;
+                               goto out_unlock;
                        }
+                       mark_wake_futex(&wake_q, this);
+                       if (++op_ret >= nr_wake2)
+                               break;
                }
                ret += op_ret;
        }
@@ -1656,18 +1665,18 @@ void requeue_futex(struct futex_q *q, struct 
futex_hash_bucket *hb1,
 {
 
        /*
-        * If key1 and key2 hash to the same bucket, no need to
+        * If key1 and key2 hash to the same bucket, we still need to
         * requeue.
         */
+       fplist_del(&q->list, &hb1->chain);
        if (likely(&hb1->chain != &hb2->chain)) {
-               plist_del(&q->list, &hb1->chain);
                hb_waiters_dec(hb1);
                hb_waiters_inc(hb2);
-               plist_add(&q->list, &hb2->chain);
                q->lock_ptr = &hb2->lock;
        }
        get_futex_key_refs(key2);
        q->key = *key2;
+       futex_add(q, hb2);
 }
 
 /**
@@ -1949,13 +1958,10 @@ static int futex_requeue(u32 __user *uaddr1, unsigned 
int flags,
                }
        }
 
-       plist_for_each_entry_safe(this, next, &hb1->chain, list) {
+       futex_for_each_match_entry_safe(this, next, hb1, &key1) {
                if (task_count - nr_wake >= nr_requeue)
                        break;
 
-               if (!match_futex(&this->key, &key1))
-                       continue;
-
                /*
                 * FUTEX_WAIT_REQEUE_PI and FUTEX_CMP_REQUEUE_PI should always
                 * be paired with each other and no other futex ops.
@@ -2110,8 +2116,7 @@ static inline void __queue_me(struct futex_q *q, struct 
futex_hash_bucket *hb)
         */
        prio = min(current->normal_prio, MAX_RT_PRIO);
 
-       plist_node_init(&q->list, prio);
-       plist_add(&q->list, &hb->chain);
+       futex_add(q, hb);
        q->task = current;
 }
 
@@ -2396,7 +2401,7 @@ static void futex_wait_queue_me(struct futex_hash_bucket 
*hb, struct futex_q *q,
         * If we have been removed from the hash list, then another task
         * has tried to wake us, and we can skip the call to schedule().
         */
-       if (likely(!plist_node_empty(&q->list))) {
+       if (likely(!fplist_node_empty(&q->list))) {
                /*
                 * If the timer has already expired, current will already be
                 * flagged for rescheduling. Only call schedule if there
@@ -2918,7 +2923,7 @@ int handle_early_requeue_pi_wakeup(struct 
futex_hash_bucket *hb,
                 * We were woken prior to requeue by a timeout or a signal.
                 * Unqueue the futex_q and determine which it was.
                 */
-               plist_del(&q->list, &hb->chain);
+               fplist_del(&q->list, &hb->chain);
                hb_waiters_dec(hb);
 
                /* Handle spurious wakeups gracefully */
@@ -3495,7 +3500,7 @@ static int __init futex_init(void)
 
        for (i = 0; i < futex_hashsize; i++) {
                atomic_set(&futex_queues[i].waiters, 0);
-               plist_head_init(&futex_queues[i].chain);
+               fplist_head_init(&futex_queues[i].chain);
                spin_lock_init(&futex_queues[i].lock);
        }
 
-- 
2.14.1

Reply via email to