As a part of the effort to remove dependency on rtnl lock, cls API is being
converted to use fine-grained locking mechanisms instead of global rtnl
lock. However, chain_head_change callback for ingress Qdisc is a sleeping
function and cannot be executed while holding a spinlock.

Extend cls API with new workqueue intended to be used for tcf_proto
lifetime management. Modify tcf_proto_destroy() to deallocate proto
asynchronously on workqueue in order to ensure that all chain_head_change
callbacks involving the proto complete before it is freed. Convert
mini_qdisc_pair_swap(), that is used as a chain_head_change callback for
ingress and clsact Qdiscs, to use a workqueue.

Signed-off-by: Vlad Buslov <vla...@mellanox.com>
Suggested-by: Jiri Pirko <j...@mellanox.com>
Acked-by: Jiri Pirko <j...@mellanox.com>
---
 include/net/sch_generic.h |  6 ++++++
 net/sched/cls_api.c       | 48 +++++++++++++++++++++++++++++++++++++++++------
 net/sched/sch_generic.c   | 37 +++++++++++++++++++++++++++++++++---
 net/sched/sch_ingress.c   |  4 ++++
 4 files changed, 86 insertions(+), 9 deletions(-)

diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h
index 4d736427a4cb..29118bbd528f 100644
--- a/include/net/sch_generic.h
+++ b/include/net/sch_generic.h
@@ -319,6 +319,7 @@ struct tcf_proto {
        const struct tcf_proto_ops      *ops;
        struct tcf_chain        *chain;
        struct rcu_head         rcu;
+       struct work_struct      work;
 };
 
 struct qdisc_skb_cb {
@@ -397,6 +398,8 @@ tc_cls_offload_cnt_update(struct tcf_block *block, u32 *cnt,
        }
 }
 
+bool tc_queue_proto_work(struct work_struct *work);
+
 static inline void qdisc_cb_private_validate(const struct sk_buff *skb, int sz)
 {
        struct qdisc_skb_cb *qcb;
@@ -1148,12 +1151,15 @@ struct mini_Qdisc_pair {
        struct mini_Qdisc miniq1;
        struct mini_Qdisc miniq2;
        struct mini_Qdisc __rcu **p_miniq;
+       struct tcf_proto *tp_head;
+       struct work_struct work;
 };
 
 void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp,
                          struct tcf_proto *tp_head);
 void mini_qdisc_pair_init(struct mini_Qdisc_pair *miniqp, struct Qdisc *qdisc,
                          struct mini_Qdisc __rcu **p_miniq);
+void mini_qdisc_pair_cleanup(struct mini_Qdisc_pair *miniqp);
 
 static inline void skb_tc_reinsert(struct sk_buff *skb, struct tcf_result *res)
 {
diff --git a/net/sched/cls_api.c b/net/sched/cls_api.c
index f427a1e00e7e..17c1691bf0c0 100644
--- a/net/sched/cls_api.c
+++ b/net/sched/cls_api.c
@@ -107,12 +107,14 @@ int register_tcf_proto_ops(struct tcf_proto_ops *ops)
 EXPORT_SYMBOL(register_tcf_proto_ops);
 
 static struct workqueue_struct *tc_filter_wq;
+static struct workqueue_struct *tc_proto_wq;
 
 int unregister_tcf_proto_ops(struct tcf_proto_ops *ops)
 {
        struct tcf_proto_ops *t;
        int rc = -ENOENT;
 
+       flush_workqueue(tc_proto_wq);
        /* Wait for outstanding call_rcu()s, if any, from a
         * tcf_proto_ops's destroy() handler.
         */
@@ -139,6 +141,12 @@ bool tcf_queue_work(struct rcu_work *rwork, work_func_t 
func)
 }
 EXPORT_SYMBOL(tcf_queue_work);
 
+bool tc_queue_proto_work(struct work_struct *work)
+{
+       return queue_work(tc_proto_wq, work);
+}
+EXPORT_SYMBOL(tc_queue_proto_work);
+
 /* Select new prio value from the range, managed by kernel. */
 
 static inline u32 tcf_auto_prio(struct tcf_proto *tp)
@@ -151,6 +159,23 @@ static inline u32 tcf_auto_prio(struct tcf_proto *tp)
        return TC_H_MAJ(first);
 }
 
+static void tcf_chain_put(struct tcf_chain *chain);
+
+static void tcf_proto_destroy_work(struct work_struct *work)
+{
+       struct tcf_proto *tp = container_of(work, struct tcf_proto, work);
+       struct tcf_chain *chain = tp->chain;
+
+       rtnl_lock();
+
+       tp->ops->destroy(tp, NULL);
+       module_put(tp->ops->owner);
+       kfree_rcu(tp, rcu);
+       tcf_chain_put(chain);
+
+       rtnl_unlock();
+}
+
 static struct tcf_proto *tcf_proto_create(const char *kind, u32 protocol,
                                          u32 prio, struct tcf_chain *chain,
                                          struct netlink_ext_ack *extack)
@@ -171,6 +196,7 @@ static struct tcf_proto *tcf_proto_create(const char *kind, 
u32 protocol,
        tp->protocol = protocol;
        tp->prio = prio;
        tp->chain = chain;
+       INIT_WORK(&tp->work, tcf_proto_destroy_work);
 
        err = tp->ops->init(tp);
        if (err) {
@@ -187,9 +213,7 @@ static struct tcf_proto *tcf_proto_create(const char *kind, 
u32 protocol,
 static void tcf_proto_destroy(struct tcf_proto *tp,
                              struct netlink_ext_ack *extack)
 {
-       tp->ops->destroy(tp, extack);
-       module_put(tp->ops->owner);
-       kfree_rcu(tp, rcu);
+       tc_queue_proto_work(&tp->work);
 }
 
 struct tcf_filter_chain_list_item {
@@ -361,7 +385,6 @@ static void tcf_chain_flush(struct tcf_chain *chain)
                RCU_INIT_POINTER(chain->filter_chain, tp->next);
                tcf_proto_destroy(tp, NULL);
                tp = rtnl_dereference(chain->filter_chain);
-               tcf_chain_put(chain);
        }
 }
 
@@ -1131,7 +1154,6 @@ static void tcf_chain_tp_remove(struct tcf_chain *chain,
        if (tp == chain->filter_chain)
                tcf_chain0_head_change(chain, next);
        RCU_INIT_POINTER(*chain_info->pprev, next);
-       tcf_chain_put(chain);
 }
 
 static struct tcf_proto *tcf_chain_tp_find(struct tcf_chain *chain,
@@ -1420,8 +1442,14 @@ static int tc_new_tfilter(struct sk_buff *skb, struct 
nlmsghdr *n,
                tfilter_notify(net, skb, n, tp, block, q, parent, fh,
                               RTM_NEWTFILTER, false);
        } else {
-               if (tp_created)
+               if (tp_created) {
+                       /* tp wasn't inserted to chain tp list. Take reference
+                        * to chain manually for tcf_proto_destroy() to
+                        * release.
+                        */
+                       tcf_chain_hold(chain);
                        tcf_proto_destroy(tp, NULL);
+               }
        }
 
 errout:
@@ -2351,6 +2379,12 @@ static int __init tc_filter_init(void)
        if (!tc_filter_wq)
                return -ENOMEM;
 
+       tc_proto_wq = alloc_ordered_workqueue("tc_proto_workqueue", 0);
+       if (!tc_proto_wq) {
+               err = -ENOMEM;
+               goto err_proto_wq;
+       }
+
        err = register_pernet_subsys(&tcf_net_ops);
        if (err)
                goto err_register_pernet_subsys;
@@ -2367,6 +2401,8 @@ static int __init tc_filter_init(void)
        return 0;
 
 err_register_pernet_subsys:
+       destroy_workqueue(tc_proto_wq);
+err_proto_wq:
        destroy_workqueue(tc_filter_wq);
        return err;
 }
diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c
index de1663f7d3ad..5aa8e485fcb0 100644
--- a/net/sched/sch_generic.c
+++ b/net/sched/sch_generic.c
@@ -1363,10 +1363,14 @@ static void mini_qdisc_rcu_func(struct rcu_head *head)
 {
 }
 
-void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp,
-                         struct tcf_proto *tp_head)
+void __mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp,
+                           struct tcf_proto *tp_head)
 {
-       struct mini_Qdisc *miniq_old = rtnl_dereference(*miniqp->p_miniq);
+       /* p_miniq is either changed on ordered workqueue or during miniqp
+        * cleanup. In both cases no concurrent modification is possible.
+        */
+       struct mini_Qdisc *miniq_old =
+               rcu_dereference_protected(*miniqp->p_miniq, 1);
        struct mini_Qdisc *miniq;
 
        if (!tp_head) {
@@ -1394,6 +1398,25 @@ void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp,
                 */
                call_rcu_bh(&miniq_old->rcu, mini_qdisc_rcu_func);
 }
+
+void mini_qdisc_pair_swap_work(struct work_struct *work)
+{
+       struct mini_Qdisc_pair *miniqp = container_of(work,
+                                                     struct mini_Qdisc_pair,
+                                                     work);
+       struct tcf_proto *tp_head;
+
+       tp_head = xchg(&miniqp->tp_head, ERR_PTR(-ENOENT));
+       if (!IS_ERR(tp_head))
+               __mini_qdisc_pair_swap(miniqp, tp_head);
+}
+
+void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp,
+                         struct tcf_proto *tp_head)
+{
+       xchg(&miniqp->tp_head, tp_head);
+       tc_queue_proto_work(&miniqp->work);
+}
 EXPORT_SYMBOL(mini_qdisc_pair_swap);
 
 void mini_qdisc_pair_init(struct mini_Qdisc_pair *miniqp, struct Qdisc *qdisc,
@@ -1404,5 +1427,13 @@ void mini_qdisc_pair_init(struct mini_Qdisc_pair 
*miniqp, struct Qdisc *qdisc,
        miniqp->miniq2.cpu_bstats = qdisc->cpu_bstats;
        miniqp->miniq2.cpu_qstats = qdisc->cpu_qstats;
        miniqp->p_miniq = p_miniq;
+       INIT_WORK(&miniqp->work, mini_qdisc_pair_swap_work);
 }
 EXPORT_SYMBOL(mini_qdisc_pair_init);
+
+void mini_qdisc_pair_cleanup(struct mini_Qdisc_pair *miniqp)
+{
+       cancel_work_sync(&miniqp->work);
+       __mini_qdisc_pair_swap(miniqp, NULL);
+}
+EXPORT_SYMBOL(mini_qdisc_pair_cleanup);
diff --git a/net/sched/sch_ingress.c b/net/sched/sch_ingress.c
index ce3f55259d0d..8edc3c46bd9c 100644
--- a/net/sched/sch_ingress.c
+++ b/net/sched/sch_ingress.c
@@ -99,6 +99,7 @@ static void ingress_destroy(struct Qdisc *sch)
        struct ingress_sched_data *q = qdisc_priv(sch);
 
        tcf_block_put_ext(q->block, sch, &q->block_info);
+       mini_qdisc_pair_cleanup(&q->miniqp);
        net_dec_ingress_queue();
 }
 
@@ -245,6 +246,9 @@ static void clsact_destroy(struct Qdisc *sch)
        tcf_block_put_ext(q->egress_block, sch, &q->egress_block_info);
        tcf_block_put_ext(q->ingress_block, sch, &q->ingress_block_info);
 
+       mini_qdisc_pair_cleanup(&q->miniqp_ingress);
+       mini_qdisc_pair_cleanup(&q->miniqp_egress);
+
        net_dec_ingress_queue();
        net_dec_egress_queue();
 }
-- 
2.7.5

Reply via email to