Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
ensure the transaction must succeed or fail as a complete unit.

To solve this, we split apply_workqueue_attrs() into three stages.
The first stage does the preparation: allocation memory, pwqs.
The second stage does the attrs-installaion and pwqs-installation.
The third stage frees the allocated memory and (old or unused) pwqs.

As the result, batching multiple apply_workqueue_attrs()s can
succeed or fail as a complete unit:
        1) batch do all the first stage for all the workqueues
        2) only commit all when all the above succeed.

This patch is a preparation for the next patch ("Allow modifying low level
unbound workqueue cpumask") which will do a multiple apply_workqueue_attrs().

The patch doesn't have functionality changed except two minor adjustment:
        1) free_unbound_pwq() for the error path is removed, we use the
           heavier version put_pwq_unlocked() instead since the error path
           is rare. this adjustment simplifies the code.
        2) the memory-allocation is also moved into wq_pool_mutex.
           this is needed to avoid to do the further splitting.

Suggested-by: Tejun Heo <t...@kernel.org>
Cc: Christoph Lameter <c...@linux.com>
Cc: Kevin Hilman <khil...@linaro.org>
Cc: Lai Jiangshan <la...@cn.fujitsu.com>
Cc: Mike Galbraith <bitbuc...@online.de>
Cc: Paul E. McKenney <paul...@linux.vnet.ibm.com>
Cc: Tejun Heo <t...@kernel.org>
Cc: Viresh Kumar <viresh.ku...@linaro.org>
Cc: Frederic Weisbecker <fweis...@gmail.com>
Signed-off-by: Lai Jiangshan <la...@cn.fujitsu.com>
---
 kernel/workqueue.c | 203 +++++++++++++++++++++++++++++++----------------------
 1 file changed, 119 insertions(+), 84 deletions(-)

diff --git a/kernel/workqueue.c b/kernel/workqueue.c
index c6845fa..b150828 100644
--- a/kernel/workqueue.c
+++ b/kernel/workqueue.c
@@ -3424,17 +3424,6 @@ static struct pool_workqueue *alloc_unbound_pwq(struct 
workqueue_struct *wq,
        return pwq;
 }
 
-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
-       lockdep_assert_held(&wq_pool_mutex);
-
-       if (pwq) {
-               put_unbound_pool(pwq->pool);
-               kmem_cache_free(pwq_cache, pwq);
-       }
-}
-
 /**
  * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
  * @attrs: the wq_attrs of interest
@@ -3497,42 +3486,49 @@ static struct pool_workqueue 
*numa_pwq_tbl_install(struct workqueue_struct *wq,
        return old_pwq;
 }
 
-/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on.  Older pwqs are released as in-flight work
- * items finish.  Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
- *
- * Return: 0 on success and -errno on failure.
- */
-int apply_workqueue_attrs(struct workqueue_struct *wq,
-                         const struct workqueue_attrs *attrs)
+/* Context to store the prepared attrs & pwqs before installed */
+struct wq_unbound_install_ctx {
+       struct workqueue_struct *wq;    /* target to be installed */
+       struct workqueue_attrs  *attrs; /* attrs for installing */
+       struct pool_workqueue   *dfl_pwq;
+       struct pool_workqueue   *pwq_tbl[];
+};
+
+/* Free the resources after success or abort */
+static void wq_unbound_install_ctx_free(struct wq_unbound_install_ctx *ctx)
 {
+       int node;
+
+       if (ctx) {
+               /* put the pwqs */
+               for_each_node(node)
+                       put_pwq_unlocked(ctx->pwq_tbl[node]);
+               put_pwq_unlocked(ctx->dfl_pwq);
+
+               free_workqueue_attrs(ctx->attrs);
+       }
+
+       kfree(ctx);
+}
+
+/* Allocates the attrs and pwqs for later installment */
+static struct wq_unbound_install_ctx *
+wq_unbound_install_ctx_prepare(struct workqueue_struct *wq,
+                              const struct workqueue_attrs *attrs)
+{
+       struct wq_unbound_install_ctx *ctx;
        struct workqueue_attrs *new_attrs, *tmp_attrs;
-       struct pool_workqueue **pwq_tbl, *dfl_pwq;
-       int node, ret;
+       int node;
 
-       /* only unbound workqueues can change attributes */
-       if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
-               return -EINVAL;
+       lockdep_assert_held(&wq_pool_mutex);
 
-       /* creating multiple pwqs breaks ordering guarantee */
-       if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
-               return -EINVAL;
+       ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+                     GFP_KERNEL);
 
-       pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
        new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
        tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
-       if (!pwq_tbl || !new_attrs || !tmp_attrs)
-               goto enomem;
+       if (!ctx || !new_attrs || !tmp_attrs)
+               goto out_free;
 
        /* make a copy of @attrs and sanitize it */
        copy_workqueue_attrs(new_attrs, attrs);
@@ -3546,75 +3542,114 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
        copy_workqueue_attrs(tmp_attrs, new_attrs);
 
        /*
-        * CPUs should stay stable across pwq creations and installations.
-        * Pin CPUs, determine the target cpumask for each node and create
-        * pwqs accordingly.
-        */
-       get_online_cpus();
-
-       mutex_lock(&wq_pool_mutex);
-
-       /*
         * If something goes wrong during CPU up/down, we'll fall back to
         * the default pwq covering whole @attrs->cpumask.  Always create
         * it even if we don't use it immediately.
         */
-       dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
-       if (!dfl_pwq)
-               goto enomem_pwq;
+       ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+       if (!ctx->dfl_pwq)
+               goto out_free;
 
        for_each_node(node) {
                if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
-                       pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
-                       if (!pwq_tbl[node])
-                               goto enomem_pwq;
+                       ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+                       if (!ctx->pwq_tbl[node])
+                               goto out_free;
                } else {
-                       dfl_pwq->refcnt++;
-                       pwq_tbl[node] = dfl_pwq;
+                       ctx->dfl_pwq->refcnt++;
+                       ctx->pwq_tbl[node] = ctx->dfl_pwq;
                }
        }
 
-       mutex_unlock(&wq_pool_mutex);
+       ctx->wq = wq;
+       ctx->attrs = new_attrs;
+
+out_free:
+       free_workqueue_attrs(tmp_attrs);
+
+       if (!ctx || !ctx->wq) {
+               kfree(new_attrs);
+               wq_unbound_install_ctx_free(ctx);
+               return NULL;
+       } else {
+               return ctx;
+       }
+}
+
+/* Set the unbound_attr and install the prepared pwqs. Should not fail */
+static void wq_unbound_install_ctx_commit(struct wq_unbound_install_ctx *ctx)
+{
+       int node;
 
        /* all pwqs have been created successfully, let's install'em */
-       mutex_lock(&wq->mutex);
+       mutex_lock(&ctx->wq->mutex);
 
-       copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+       copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
 
        /* save the previous pwq and install the new one */
        for_each_node(node)
-               pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+               ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
+                                                         ctx->pwq_tbl[node]);
 
        /* @dfl_pwq might not have been used, ensure it's linked */
-       link_pwq(dfl_pwq);
-       swap(wq->dfl_pwq, dfl_pwq);
+       link_pwq(ctx->dfl_pwq);
+       swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
 
-       mutex_unlock(&wq->mutex);
+       mutex_unlock(&ctx->wq->mutex);
+}
 
-       /* put the old pwqs */
-       for_each_node(node)
-               put_pwq_unlocked(pwq_tbl[node]);
-       put_pwq_unlocked(dfl_pwq);
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on.  Older pwqs are released as in-flight work
+ * items finish.  Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations.
+ *
+ * Return: 0 on success and -errno on failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+                         const struct workqueue_attrs *attrs)
+{
+       struct wq_unbound_install_ctx *ctx;
+       int ret = -ENOMEM;
 
-       put_online_cpus();
-       ret = 0;
-       /* fall through */
-out_free:
-       free_workqueue_attrs(tmp_attrs);
-       free_workqueue_attrs(new_attrs);
-       kfree(pwq_tbl);
-       return ret;
+       /* only unbound workqueues can change attributes */
+       if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+               return -EINVAL;
 
-enomem_pwq:
-       free_unbound_pwq(dfl_pwq);
-       for_each_node(node)
-               if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
-                       free_unbound_pwq(pwq_tbl[node]);
+       /* creating multiple pwqs breaks ordering guarantee */
+       if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+               return -EINVAL;
+
+       /*
+        * CPUs should stay stable across pwq creations and installations.
+        * Pin CPUs, determine the target cpumask for each node and create
+        * pwqs accordingly.
+        */
+       get_online_cpus();
+
+       mutex_lock(&wq_pool_mutex);
+       ctx = wq_unbound_install_ctx_prepare(wq, attrs);
        mutex_unlock(&wq_pool_mutex);
+
        put_online_cpus();
-enomem:
-       ret = -ENOMEM;
-       goto out_free;
+
+       /* the ctx has been prepared successfully, let's commit it */
+       if (ctx) {
+               wq_unbound_install_ctx_commit(ctx);
+               ret = 0;
+       }
+
+       wq_unbound_install_ctx_free(ctx);
+
+       return ret;
 }
 
 /**
-- 
2.1.0

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to