When using futex as a condition variable, for example: to manage a threadpool, there may be a lot of threads inside the futex_wait to sleep on this futex. The futex_hash_bucket consists therefore of many struct futex_q for the same futex.
On bad luck another futex, used as mutex, hashed into the same bucket. Every futex_wake on this mutex, has to scan the whole chain of above waiter to find the futex_q for this mutex. For non-unusual threadpool sizes of more than 20, this should be a considerable effort. I therefore suggest to include in the hash-bucketchain only one futex_q per futex and to queue additional waiter in an extrachain at the 'top' futex_q entry. Thus different futex are isolated from each other, the cost of a hash collision is reduced. To show the idea, I added a sample patch. Here, the plist is exchanged for a futex-specific implementation. kernel/fplist.h is certainly not not the right place. Signed-off-by: Gerd Gerats <gerd.ger...@gmx.de> --- kernel/fplist.h | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ kernel/futex.c | 105 ++++++++++++++++++++------------------ 2 files changed, 208 insertions(+), 50 deletions(-) create mode 100644 kernel/fplist.h diff --git a/kernel/fplist.h b/kernel/fplist.h new file mode 100644 index 000000000000..262abe70b31c --- /dev/null +++ b/kernel/fplist.h @@ -0,0 +1,153 @@ +#ifndef _LINUX_FPLIST_H_ +#define _LINUX_FPLIST_H_ + +/* + * futex specific priority-sorted list + * + * based on include/linux/plist.h + * + * Simple ASCII art explanation: + * + * fl:futx_list + * pl:prio_list + * nl:node_list + * + * +------+ + * | v + * | |fl| HEAD + * | ^ + * | | + * | v + * | +--------+ + * | +->|pl|<-+ + * | |10| (prio) + * | | | + * | +->|nl|<-+ + * | +--------+ + * | ^ + * | | + * | v + * | +------------------------------------+ + * | +->|pl|<->|pl|<--------------->|pl|<-+ + * | |10| |21| |21| |21| |40| (prio) + * | | | | | | | | | | | + * | +->|nl|<->|nl|<->|nl|<->|nl|<->|nl|<-+ + * | +------------------------------------+ + * | ^ + * +------+ + */ + +#include <linux/kernel.h> +#include <linux/list.h> + +struct fplist_head { + struct list_head futx_list; +}; + +struct fplist_node { + int prio; + struct list_head futx_list; // top level futex list + // per futex list + struct list_head prio_list; + struct list_head node_list; +}; + +#define FPLIST_HEAD_INIT(head) \ +{ \ + .futx_list = LIST_HEAD_INIT((head).futx_list) \ +} + +#define FPLIST_HEAD(head) \ + struct fplist_head head = FPLIST_HEAD_INIT(head) + +static inline bool fplist_node_empty(struct fplist_node *node) +{ + return list_empty(&node->node_list) && list_empty(&node->futx_list); +} + +static inline void fplist_head_init(struct fplist_head *head) +{ + INIT_LIST_HEAD(&head->futx_list); +} + +static void fplist_del(struct fplist_node *node, struct fplist_head *head) +{ + struct fplist_node *next; + + if (list_empty(&node->node_list)) { + /* last/only in ring, remove from futex ring */ + list_del_init(&node->futx_list); + return; + } + next = list_entry(node->node_list.next, + struct fplist_node, node_list); + if (!list_empty(&node->prio_list)) { + /* add the next node into prio_list */ + if (list_empty(&next->prio_list)) + list_add(&next->prio_list, &node->prio_list); + list_del_init(&node->prio_list); + } + if (!list_empty(&node->futx_list)) + list_replace_init(&node->futx_list, &next->futx_list); + list_del_init(&node->node_list); +} + +static void fplist_add_first(struct fplist_node *node, struct fplist_head *head) +{ + list_add_tail(&node->futx_list, &head->futx_list); + INIT_LIST_HEAD(&node->prio_list); + INIT_LIST_HEAD(&node->node_list); +} + +static void +fplist_add_insert(struct fplist_node *node, struct fplist_node *top, + struct fplist_head *head) +{ + struct fplist_node *iter, *prev = NULL; + struct list_head *node_next = &top->node_list; + + INIT_LIST_HEAD(&node->futx_list); + INIT_LIST_HEAD(&node->prio_list); + + iter = top; + + do { + if (node->prio < iter->prio) { + node_next = &iter->node_list; + break; + } + + prev = iter; + iter = list_entry(iter->prio_list.next, + struct fplist_node, prio_list); + } while (iter != top); + + if (!prev || prev->prio != node->prio) + list_add_tail(&node->prio_list, &iter->prio_list); + if (!prev) + list_replace_init(&top->futx_list, &node->futx_list); + list_add_tail(&node->node_list, node_next); +} + +#define fplist_for_each_entry(pos, head, member) \ + list_for_each_entry(pos, &(head)->futx_list, member.futx_list) + +static inline struct fplist_node* +___fplist_for_each_next(struct fplist_node *node) { + struct fplist_node *next; + + next = list_entry(node->node_list.next, struct fplist_node, node_list); + return list_empty(&next->futx_list) ? next : 0; +} + +#define __fplist_for_each_next(pos, member) ({ \ + struct fplist_node *next__ = ___fplist_for_each_next(&(pos)->member); \ + next__ ? list_entry(next__, typeof(*(pos)), member) : 0; \ + }) + +#define fplist_for_each_entry_safe(pos, n, top, member) \ + for (pos = (top); \ + (pos && ((n = __fplist_for_each_next(pos, member)), 1)); \ + pos = n) + +#endif diff --git a/kernel/futex.c b/kernel/futex.c index 357348a6cf6b..a594f41c2625 100644 --- a/kernel/futex.c +++ b/kernel/futex.c @@ -72,6 +72,8 @@ #include "locking/rtmutex_common.h" +#include "fplist.h" + /* * READ this before attempting to hack on futexes! * @@ -229,7 +231,7 @@ struct futex_pi_state { * we can wake only the relevant ones (hashed queues may be shared). * * A futex_q has a woken state, just like tasks have TASK_RUNNING. - * It is considered woken when plist_node_empty(&q->list) || q->lock_ptr == 0. + * It is considered woken when fplist_node_empty(&q->list) || q->lock_ptr == 0. * The order of wakeup is always to make the first condition true, then * the second. * @@ -237,7 +239,7 @@ struct futex_pi_state { * the rt_mutex code. See unqueue_me_pi(). */ struct futex_q { - struct plist_node list; + struct fplist_node list; struct task_struct *task; spinlock_t *lock_ptr; @@ -262,7 +264,7 @@ static const struct futex_q futex_q_init = { struct futex_hash_bucket { atomic_t waiters; spinlock_t lock; - struct plist_head chain; + struct fplist_head chain; } ____cacheline_aligned_in_smp; /* @@ -745,13 +747,26 @@ static struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb, { struct futex_q *this; - plist_for_each_entry(this, &hb->chain, list) { + fplist_for_each_entry(this, &hb->chain, list) { if (match_futex(&this->key, key)) return this; } return NULL; } +static void futex_add(struct futex_q *q, struct futex_hash_bucket *hb) +{ + struct futex_q *top = futex_top_waiter(hb, &q->key); + + if (top) + fplist_add_insert(&q->list, &top->list, &hb->chain); + else + fplist_add_first(&q->list, &hb->chain); +} + +#define futex_for_each_match_entry_safe(pos, n, hb, key) \ + fplist_for_each_entry_safe(pos, n, futex_top_waiter(hb, key), list) + static int cmpxchg_futex_value_locked(u32 *curval, u32 __user *uaddr, u32 uval, u32 newval) { @@ -1352,11 +1367,11 @@ static void __unqueue_futex(struct futex_q *q) struct futex_hash_bucket *hb; if (WARN_ON_SMP(!q->lock_ptr || !spin_is_locked(q->lock_ptr)) - || WARN_ON(plist_node_empty(&q->list))) + || WARN_ON(fplist_node_empty(&q->list))) return; hb = container_of(q->lock_ptr, struct futex_hash_bucket, lock); - plist_del(&q->list, &hb->chain); + fplist_del(&q->list, &hb->chain); hb_waiters_dec(hb); } @@ -1384,7 +1399,7 @@ static void mark_wake_futex(struct wake_q_head *wake_q, struct futex_q *q) * is written, without taking any locks. This is possible in the event * of a spurious wakeup, for example. A memory barrier is required here * to prevent the following store to lock_ptr from getting ahead of the - * plist_del in __unqueue_futex(). + * fplist_del in __unqueue_futex(). */ smp_store_release(&q->lock_ptr, NULL); } @@ -1521,21 +1536,19 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset) spin_lock(&hb->lock); - plist_for_each_entry_safe(this, next, &hb->chain, list) { - if (match_futex (&this->key, &key)) { - if (this->pi_state || this->rt_waiter) { - ret = -EINVAL; - break; - } + futex_for_each_match_entry_safe(this, next, hb, &key) { + if (this->pi_state || this->rt_waiter) { + ret = -EINVAL; + break; + } - /* Check if one of the bits is set in both bitsets */ - if (!(this->bitset & bitset)) - continue; + /* Check if one of the bits is set in both bitsets */ + if (!(this->bitset & bitset)) + continue; - mark_wake_futex(&wake_q, this); - if (++ret >= nr_wake) - break; - } + mark_wake_futex(&wake_q, this); + if (++ret >= nr_wake) + break; } spin_unlock(&hb->lock); @@ -1604,30 +1617,26 @@ futex_wake_op(u32 __user *uaddr1, unsigned int flags, u32 __user *uaddr2, goto retry; } - plist_for_each_entry_safe(this, next, &hb1->chain, list) { - if (match_futex (&this->key, &key1)) { - if (this->pi_state || this->rt_waiter) { - ret = -EINVAL; - goto out_unlock; - } - mark_wake_futex(&wake_q, this); - if (++ret >= nr_wake) - break; + futex_for_each_match_entry_safe(this, next, hb1, &key1) { + if (this->pi_state || this->rt_waiter) { + ret = -EINVAL; + goto out_unlock; } + mark_wake_futex(&wake_q, this); + if (++ret >= nr_wake) + break; } if (op_ret > 0) { op_ret = 0; - plist_for_each_entry_safe(this, next, &hb2->chain, list) { - if (match_futex (&this->key, &key2)) { - if (this->pi_state || this->rt_waiter) { - ret = -EINVAL; - goto out_unlock; - } - mark_wake_futex(&wake_q, this); - if (++op_ret >= nr_wake2) - break; + futex_for_each_match_entry_safe(this, next, hb2, &key2) { + if (this->pi_state || this->rt_waiter) { + ret = -EINVAL; + goto out_unlock; } + mark_wake_futex(&wake_q, this); + if (++op_ret >= nr_wake2) + break; } ret += op_ret; } @@ -1656,18 +1665,18 @@ void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1, { /* - * If key1 and key2 hash to the same bucket, no need to + * If key1 and key2 hash to the same bucket, we still need to * requeue. */ + fplist_del(&q->list, &hb1->chain); if (likely(&hb1->chain != &hb2->chain)) { - plist_del(&q->list, &hb1->chain); hb_waiters_dec(hb1); hb_waiters_inc(hb2); - plist_add(&q->list, &hb2->chain); q->lock_ptr = &hb2->lock; } get_futex_key_refs(key2); q->key = *key2; + futex_add(q, hb2); } /** @@ -1949,13 +1958,10 @@ static int futex_requeue(u32 __user *uaddr1, unsigned int flags, } } - plist_for_each_entry_safe(this, next, &hb1->chain, list) { + futex_for_each_match_entry_safe(this, next, hb1, &key1) { if (task_count - nr_wake >= nr_requeue) break; - if (!match_futex(&this->key, &key1)) - continue; - /* * FUTEX_WAIT_REQEUE_PI and FUTEX_CMP_REQUEUE_PI should always * be paired with each other and no other futex ops. @@ -2110,8 +2116,7 @@ static inline void __queue_me(struct futex_q *q, struct futex_hash_bucket *hb) */ prio = min(current->normal_prio, MAX_RT_PRIO); - plist_node_init(&q->list, prio); - plist_add(&q->list, &hb->chain); + futex_add(q, hb); q->task = current; } @@ -2396,7 +2401,7 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q, * If we have been removed from the hash list, then another task * has tried to wake us, and we can skip the call to schedule(). */ - if (likely(!plist_node_empty(&q->list))) { + if (likely(!fplist_node_empty(&q->list))) { /* * If the timer has already expired, current will already be * flagged for rescheduling. Only call schedule if there @@ -2918,7 +2923,7 @@ int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb, * We were woken prior to requeue by a timeout or a signal. * Unqueue the futex_q and determine which it was. */ - plist_del(&q->list, &hb->chain); + fplist_del(&q->list, &hb->chain); hb_waiters_dec(hb); /* Handle spurious wakeups gracefully */ @@ -3495,7 +3500,7 @@ static int __init futex_init(void) for (i = 0; i < futex_hashsize; i++) { atomic_set(&futex_queues[i].waiters, 0); - plist_head_init(&futex_queues[i].chain); + fplist_head_init(&futex_queues[i].chain); spin_lock_init(&futex_queues[i].lock); } -- 2.14.1