This patch makes unbound worker_pools reference counted and dynamically created and destroyed as workqueues needing them come and go. All unbound worker_pools are hashed on unbound_pool_hash which is keyed by the content of worker_pool->attrs.
When an unbound workqueue is allocated, get_unbound_pool() is called with the attributes of the workqueue. If there already is a matching worker_pool, the reference count is bumped and the pool is returned. If not, a new worker_pool with matching attributes is created and returned. When an unbound workqueue is destroyed, put_unbound_pool() is called which decrements the reference count of the associated worker_pool. If the refcnt reaches zero, the worker_pool is destroyed in sched-RCU safe way. Note that the standard unbound worker_pools - normal and highpri ones with no specific cpumask affinity - are no longer created explicitly during init_workqueues(). init_workqueues() only initializes workqueue_attrs to be used for standard unbound pools - unbound_std_wq_attrs[]. The pools are spawned on demand as workqueues are created. Signed-off-by: Tejun Heo <t...@kernel.org> --- kernel/workqueue.c | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 218 insertions(+), 12 deletions(-) diff --git a/kernel/workqueue.c b/kernel/workqueue.c index 7eba824..fb91b67 100644 --- a/kernel/workqueue.c +++ b/kernel/workqueue.c @@ -41,6 +41,7 @@ #include <linux/debug_locks.h> #include <linux/lockdep.h> #include <linux/idr.h> +#include <linux/jhash.h> #include <linux/hashtable.h> #include <linux/rculist.h> @@ -80,6 +81,7 @@ enum { NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */ + UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */ BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */ MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */ @@ -149,6 +151,8 @@ struct worker_pool { struct ida worker_ida; /* L: for worker IDs */ struct workqueue_attrs *attrs; /* I: worker attributes */ + struct hlist_node hash_node; /* R: unbound_pool_hash node */ + atomic_t refcnt; /* refcnt for unbound pools */ /* * The current concurrency level. As it's likely to be accessed @@ -156,6 +160,12 @@ struct worker_pool { * cacheline. */ atomic_t nr_running ____cacheline_aligned_in_smp; + + /* + * Destruction of pool is sched-RCU protected to allow dereferences + * from get_work_pool(). + */ + struct rcu_head rcu; } ____cacheline_aligned_in_smp; /* @@ -218,6 +228,11 @@ struct workqueue_struct { static struct kmem_cache *pwq_cache; +/* hash of all unbound pools keyed by pool->attrs */ +static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER); + +static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS]; + struct workqueue_struct *system_wq __read_mostly; EXPORT_SYMBOL_GPL(system_wq); struct workqueue_struct *system_highpri_wq __read_mostly; @@ -1740,7 +1755,7 @@ static struct worker *create_worker(struct worker_pool *pool) worker->pool = pool; worker->id = id; - if (pool->cpu != WORK_CPU_UNBOUND) + if (pool->cpu >= 0) worker->task = kthread_create_on_node(worker_thread, worker, cpu_to_node(pool->cpu), "kworker/%d:%d%s", pool->cpu, id, pri); @@ -3159,6 +3174,54 @@ fail: return NULL; } +static void copy_workqueue_attrs(struct workqueue_attrs *to, + const struct workqueue_attrs *from) +{ + to->nice = from->nice; + cpumask_copy(to->cpumask, from->cpumask); +} + +/* + * Hacky implementation of jhash of bitmaps which only considers the + * specified number of bits. We probably want a proper implementation in + * include/linux/jhash.h. + */ +static u32 jhash_bitmap(const unsigned long *bitmap, int bits, u32 hash) +{ + int nr_longs = bits / BITS_PER_LONG; + int nr_leftover = bits % BITS_PER_LONG; + unsigned long leftover = 0; + + if (nr_longs) + hash = jhash(bitmap, nr_longs * sizeof(long), hash); + if (nr_leftover) { + bitmap_copy(&leftover, bitmap + nr_longs, nr_leftover); + hash = jhash(&leftover, sizeof(long), hash); + } + return hash; +} + +/* hash value of the content of @attr */ +static u32 wqattrs_hash(const struct workqueue_attrs *attrs) +{ + u32 hash = 0; + + hash = jhash_1word(attrs->nice, hash); + hash = jhash_bitmap(cpumask_bits(attrs->cpumask), nr_cpu_ids, hash); + return hash; +} + +/* content equality test */ +static bool wqattrs_equal(const struct workqueue_attrs *a, + const struct workqueue_attrs *b) +{ + if (a->nice != b->nice) + return false; + if (!cpumask_equal(a->cpumask, b->cpumask)) + return false; + return true; +} + /** * init_worker_pool - initialize a newly zalloc'd worker_pool * @pool: worker_pool to initialize @@ -3169,6 +3232,8 @@ fail: static int init_worker_pool(struct worker_pool *pool) { spin_lock_init(&pool->lock); + pool->id = -1; + pool->cpu = -1; pool->flags |= POOL_DISASSOCIATED; INIT_LIST_HEAD(&pool->worklist); INIT_LIST_HEAD(&pool->idle_list); @@ -3185,12 +3250,133 @@ static int init_worker_pool(struct worker_pool *pool) mutex_init(&pool->assoc_mutex); ida_init(&pool->worker_ida); + INIT_HLIST_NODE(&pool->hash_node); + atomic_set(&pool->refcnt, 1); pool->attrs = alloc_workqueue_attrs(GFP_KERNEL); if (!pool->attrs) return -ENOMEM; return 0; } +static void rcu_free_pool(struct rcu_head *rcu) +{ + struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu); + + ida_destroy(&pool->worker_ida); + free_workqueue_attrs(pool->attrs); + kfree(pool); +} + +/** + * put_unbound_pool - put a worker_pool + * @pool: worker_pool to put + * + * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU + * safe manner. + */ +static void put_unbound_pool(struct worker_pool *pool) +{ + struct worker *worker; + + if (!atomic_dec_and_test(&pool->refcnt)) + return; + + /* sanity checks */ + if (WARN_ON(!(pool->flags & POOL_DISASSOCIATED))) + return; + if (WARN_ON(pool->nr_workers != pool->nr_idle)) + return; + if (WARN_ON(!list_empty(&pool->worklist))) + return; + + /* release id and unhash */ + spin_lock_irq(&workqueue_lock); + if (pool->id >= 0) + idr_remove(&worker_pool_idr, pool->id); + hash_del(&pool->hash_node); + spin_unlock_irq(&workqueue_lock); + + /* lock out manager and destroy all workers */ + mutex_lock(&pool->manager_mutex); + spin_lock_irq(&pool->lock); + + while ((worker = first_worker(pool))) + destroy_worker(worker); + WARN_ON(pool->nr_workers || pool->nr_idle); + + spin_unlock_irq(&pool->lock); + mutex_unlock(&pool->manager_mutex); + + /* shut down the timers */ + del_timer_sync(&pool->idle_timer); + del_timer_sync(&pool->mayday_timer); + + /* sched-RCU protected to allow dereferences from get_work_pool() */ + call_rcu_sched(&pool->rcu, rcu_free_pool); +} + +/** + * get_unbound_pool - get a worker_pool with the specified attributes + * @attrs: the attributes of the worker_pool to get + * + * Obtain a worker_pool which has the same attributes as @attrs, bump the + * reference count and return it. If there already is a matching + * worker_pool, it will be used; otherwise, this function attempts to + * create a new one. On failure, returns NULL. + */ +static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) +{ + static DEFINE_MUTEX(create_mutex); + u32 hash = wqattrs_hash(attrs); + struct worker_pool *pool; + struct hlist_node *tmp; + struct worker *worker; + + mutex_lock(&create_mutex); + + /* do we already have a matching pool? */ + spin_lock_irq(&workqueue_lock); + hash_for_each_possible(unbound_pool_hash, pool, tmp, hash_node, hash) { + if (wqattrs_equal(pool->attrs, attrs)) { + atomic_inc(&pool->refcnt); + goto out_unlock; + } + } + spin_unlock_irq(&workqueue_lock); + + /* nope, create a new one */ + pool = kzalloc(sizeof(*pool), GFP_KERNEL); + if (!pool || init_worker_pool(pool) < 0) + goto fail; + + copy_workqueue_attrs(pool->attrs, attrs); + + if (worker_pool_assign_id(pool) < 0) + goto fail; + + /* create and start the initial worker */ + worker = create_worker(pool); + if (!worker) + goto fail; + + spin_lock_irq(&pool->lock); + start_worker(worker); + spin_unlock_irq(&pool->lock); + + /* install */ + spin_lock_irq(&workqueue_lock); + hash_add(unbound_pool_hash, &pool->hash_node, hash); +out_unlock: + spin_unlock_irq(&workqueue_lock); + mutex_unlock(&create_mutex); + return pool; +fail: + mutex_unlock(&create_mutex); + if (pool) + put_unbound_pool(pool); + return NULL; +} + static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; @@ -3215,7 +3401,12 @@ static int alloc_and_link_pwqs(struct workqueue_struct *wq) if (!pwq) return -ENOMEM; - pwq->pool = get_std_worker_pool(WORK_CPU_UNBOUND, highpri); + pwq->pool = get_unbound_pool(unbound_std_wq_attrs[highpri]); + if (!pwq->pool) { + kmem_cache_free(pwq_cache, pwq); + return -ENOMEM; + } + list_add_tail_rcu(&pwq->pwqs_node, &wq->pwqs); } @@ -3393,6 +3584,15 @@ void destroy_workqueue(struct workqueue_struct *wq) kfree(wq->rescuer); } + /* + * We're the sole accessor of @wq at this point. Directly access + * the first pwq and put its pool. + */ + if (wq->flags & WQ_UNBOUND) { + pwq = list_first_entry(&wq->pwqs, struct pool_workqueue, + pwqs_node); + put_unbound_pool(pwq->pool); + } free_pwqs(wq); kfree(wq); } @@ -3856,19 +4056,14 @@ static int __init init_workqueues(void) hotcpu_notifier(workqueue_cpu_down_callback, CPU_PRI_WORKQUEUE_DOWN); /* initialize CPU pools */ - for_each_wq_cpu(cpu) { + for_each_possible_cpu(cpu) { struct worker_pool *pool; i = 0; for_each_std_worker_pool(pool, cpu) { BUG_ON(init_worker_pool(pool)); pool->cpu = cpu; - - if (cpu != WORK_CPU_UNBOUND) - cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); - else - cpumask_setall(pool->attrs->cpumask); - + cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu)); pool->attrs->nice = std_nice[i++]; /* alloc pool ID */ @@ -3877,14 +4072,13 @@ static int __init init_workqueues(void) } /* create the initial worker */ - for_each_online_wq_cpu(cpu) { + for_each_online_cpu(cpu) { struct worker_pool *pool; for_each_std_worker_pool(pool, cpu) { struct worker *worker; - if (cpu != WORK_CPU_UNBOUND) - pool->flags &= ~POOL_DISASSOCIATED; + pool->flags &= ~POOL_DISASSOCIATED; worker = create_worker(pool); BUG_ON(!worker); @@ -3894,6 +4088,18 @@ static int __init init_workqueues(void) } } + /* create default unbound wq attrs */ + for (i = 0; i < NR_STD_WORKER_POOLS; i++) { + struct workqueue_attrs *attrs; + + BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL))); + + attrs->nice = std_nice[i]; + cpumask_setall(attrs->cpumask); + + unbound_std_wq_attrs[i] = attrs; + } + system_wq = alloc_workqueue("events", 0, 0); system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0); system_long_wq = alloc_workqueue("events_long", 0, 0); -- 1.8.1.2 -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/