As a part of the effort to remove dependency on rtnl lock, cls API is being converted to use fine-grained locking mechanisms instead of global rtnl lock. However, chain_head_change callback for ingress Qdisc is a sleeping function and cannot be executed while holding a spinlock.
Extend cls API with new workqueue intended to be used for tcf_proto lifetime management. Modify tcf_proto_destroy() to deallocate proto asynchronously on workqueue in order to ensure that all chain_head_change callbacks involving the proto complete before it is freed. Convert mini_qdisc_pair_swap(), that is used as a chain_head_change callback for ingress and clsact Qdiscs, to use a workqueue. Signed-off-by: Vlad Buslov <vla...@mellanox.com> Suggested-by: Jiri Pirko <j...@mellanox.com> Acked-by: Jiri Pirko <j...@mellanox.com> --- include/net/sch_generic.h | 6 ++++++ net/sched/cls_api.c | 48 +++++++++++++++++++++++++++++++++++++++++------ net/sched/sch_generic.c | 37 +++++++++++++++++++++++++++++++++--- net/sched/sch_ingress.c | 4 ++++ 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/include/net/sch_generic.h b/include/net/sch_generic.h index 4d736427a4cb..29118bbd528f 100644 --- a/include/net/sch_generic.h +++ b/include/net/sch_generic.h @@ -319,6 +319,7 @@ struct tcf_proto { const struct tcf_proto_ops *ops; struct tcf_chain *chain; struct rcu_head rcu; + struct work_struct work; }; struct qdisc_skb_cb { @@ -397,6 +398,8 @@ tc_cls_offload_cnt_update(struct tcf_block *block, u32 *cnt, } } +bool tc_queue_proto_work(struct work_struct *work); + static inline void qdisc_cb_private_validate(const struct sk_buff *skb, int sz) { struct qdisc_skb_cb *qcb; @@ -1148,12 +1151,15 @@ struct mini_Qdisc_pair { struct mini_Qdisc miniq1; struct mini_Qdisc miniq2; struct mini_Qdisc __rcu **p_miniq; + struct tcf_proto *tp_head; + struct work_struct work; }; void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp, struct tcf_proto *tp_head); void mini_qdisc_pair_init(struct mini_Qdisc_pair *miniqp, struct Qdisc *qdisc, struct mini_Qdisc __rcu **p_miniq); +void mini_qdisc_pair_cleanup(struct mini_Qdisc_pair *miniqp); static inline void skb_tc_reinsert(struct sk_buff *skb, struct tcf_result *res) { diff --git a/net/sched/cls_api.c b/net/sched/cls_api.c index f427a1e00e7e..17c1691bf0c0 100644 --- a/net/sched/cls_api.c +++ b/net/sched/cls_api.c @@ -107,12 +107,14 @@ int register_tcf_proto_ops(struct tcf_proto_ops *ops) EXPORT_SYMBOL(register_tcf_proto_ops); static struct workqueue_struct *tc_filter_wq; +static struct workqueue_struct *tc_proto_wq; int unregister_tcf_proto_ops(struct tcf_proto_ops *ops) { struct tcf_proto_ops *t; int rc = -ENOENT; + flush_workqueue(tc_proto_wq); /* Wait for outstanding call_rcu()s, if any, from a * tcf_proto_ops's destroy() handler. */ @@ -139,6 +141,12 @@ bool tcf_queue_work(struct rcu_work *rwork, work_func_t func) } EXPORT_SYMBOL(tcf_queue_work); +bool tc_queue_proto_work(struct work_struct *work) +{ + return queue_work(tc_proto_wq, work); +} +EXPORT_SYMBOL(tc_queue_proto_work); + /* Select new prio value from the range, managed by kernel. */ static inline u32 tcf_auto_prio(struct tcf_proto *tp) @@ -151,6 +159,23 @@ static inline u32 tcf_auto_prio(struct tcf_proto *tp) return TC_H_MAJ(first); } +static void tcf_chain_put(struct tcf_chain *chain); + +static void tcf_proto_destroy_work(struct work_struct *work) +{ + struct tcf_proto *tp = container_of(work, struct tcf_proto, work); + struct tcf_chain *chain = tp->chain; + + rtnl_lock(); + + tp->ops->destroy(tp, NULL); + module_put(tp->ops->owner); + kfree_rcu(tp, rcu); + tcf_chain_put(chain); + + rtnl_unlock(); +} + static struct tcf_proto *tcf_proto_create(const char *kind, u32 protocol, u32 prio, struct tcf_chain *chain, struct netlink_ext_ack *extack) @@ -171,6 +196,7 @@ static struct tcf_proto *tcf_proto_create(const char *kind, u32 protocol, tp->protocol = protocol; tp->prio = prio; tp->chain = chain; + INIT_WORK(&tp->work, tcf_proto_destroy_work); err = tp->ops->init(tp); if (err) { @@ -187,9 +213,7 @@ static struct tcf_proto *tcf_proto_create(const char *kind, u32 protocol, static void tcf_proto_destroy(struct tcf_proto *tp, struct netlink_ext_ack *extack) { - tp->ops->destroy(tp, extack); - module_put(tp->ops->owner); - kfree_rcu(tp, rcu); + tc_queue_proto_work(&tp->work); } struct tcf_filter_chain_list_item { @@ -361,7 +385,6 @@ static void tcf_chain_flush(struct tcf_chain *chain) RCU_INIT_POINTER(chain->filter_chain, tp->next); tcf_proto_destroy(tp, NULL); tp = rtnl_dereference(chain->filter_chain); - tcf_chain_put(chain); } } @@ -1131,7 +1154,6 @@ static void tcf_chain_tp_remove(struct tcf_chain *chain, if (tp == chain->filter_chain) tcf_chain0_head_change(chain, next); RCU_INIT_POINTER(*chain_info->pprev, next); - tcf_chain_put(chain); } static struct tcf_proto *tcf_chain_tp_find(struct tcf_chain *chain, @@ -1420,8 +1442,14 @@ static int tc_new_tfilter(struct sk_buff *skb, struct nlmsghdr *n, tfilter_notify(net, skb, n, tp, block, q, parent, fh, RTM_NEWTFILTER, false); } else { - if (tp_created) + if (tp_created) { + /* tp wasn't inserted to chain tp list. Take reference + * to chain manually for tcf_proto_destroy() to + * release. + */ + tcf_chain_hold(chain); tcf_proto_destroy(tp, NULL); + } } errout: @@ -2351,6 +2379,12 @@ static int __init tc_filter_init(void) if (!tc_filter_wq) return -ENOMEM; + tc_proto_wq = alloc_ordered_workqueue("tc_proto_workqueue", 0); + if (!tc_proto_wq) { + err = -ENOMEM; + goto err_proto_wq; + } + err = register_pernet_subsys(&tcf_net_ops); if (err) goto err_register_pernet_subsys; @@ -2367,6 +2401,8 @@ static int __init tc_filter_init(void) return 0; err_register_pernet_subsys: + destroy_workqueue(tc_proto_wq); +err_proto_wq: destroy_workqueue(tc_filter_wq); return err; } diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index de1663f7d3ad..5aa8e485fcb0 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -1363,10 +1363,14 @@ static void mini_qdisc_rcu_func(struct rcu_head *head) { } -void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp, - struct tcf_proto *tp_head) +void __mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp, + struct tcf_proto *tp_head) { - struct mini_Qdisc *miniq_old = rtnl_dereference(*miniqp->p_miniq); + /* p_miniq is either changed on ordered workqueue or during miniqp + * cleanup. In both cases no concurrent modification is possible. + */ + struct mini_Qdisc *miniq_old = + rcu_dereference_protected(*miniqp->p_miniq, 1); struct mini_Qdisc *miniq; if (!tp_head) { @@ -1394,6 +1398,25 @@ void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp, */ call_rcu_bh(&miniq_old->rcu, mini_qdisc_rcu_func); } + +void mini_qdisc_pair_swap_work(struct work_struct *work) +{ + struct mini_Qdisc_pair *miniqp = container_of(work, + struct mini_Qdisc_pair, + work); + struct tcf_proto *tp_head; + + tp_head = xchg(&miniqp->tp_head, ERR_PTR(-ENOENT)); + if (!IS_ERR(tp_head)) + __mini_qdisc_pair_swap(miniqp, tp_head); +} + +void mini_qdisc_pair_swap(struct mini_Qdisc_pair *miniqp, + struct tcf_proto *tp_head) +{ + xchg(&miniqp->tp_head, tp_head); + tc_queue_proto_work(&miniqp->work); +} EXPORT_SYMBOL(mini_qdisc_pair_swap); void mini_qdisc_pair_init(struct mini_Qdisc_pair *miniqp, struct Qdisc *qdisc, @@ -1404,5 +1427,13 @@ void mini_qdisc_pair_init(struct mini_Qdisc_pair *miniqp, struct Qdisc *qdisc, miniqp->miniq2.cpu_bstats = qdisc->cpu_bstats; miniqp->miniq2.cpu_qstats = qdisc->cpu_qstats; miniqp->p_miniq = p_miniq; + INIT_WORK(&miniqp->work, mini_qdisc_pair_swap_work); } EXPORT_SYMBOL(mini_qdisc_pair_init); + +void mini_qdisc_pair_cleanup(struct mini_Qdisc_pair *miniqp) +{ + cancel_work_sync(&miniqp->work); + __mini_qdisc_pair_swap(miniqp, NULL); +} +EXPORT_SYMBOL(mini_qdisc_pair_cleanup); diff --git a/net/sched/sch_ingress.c b/net/sched/sch_ingress.c index ce3f55259d0d..8edc3c46bd9c 100644 --- a/net/sched/sch_ingress.c +++ b/net/sched/sch_ingress.c @@ -99,6 +99,7 @@ static void ingress_destroy(struct Qdisc *sch) struct ingress_sched_data *q = qdisc_priv(sch); tcf_block_put_ext(q->block, sch, &q->block_info); + mini_qdisc_pair_cleanup(&q->miniqp); net_dec_ingress_queue(); } @@ -245,6 +246,9 @@ static void clsact_destroy(struct Qdisc *sch) tcf_block_put_ext(q->egress_block, sch, &q->egress_block_info); tcf_block_put_ext(q->ingress_block, sch, &q->ingress_block_info); + mini_qdisc_pair_cleanup(&q->miniqp_ingress); + mini_qdisc_pair_cleanup(&q->miniqp_egress); + net_dec_ingress_queue(); net_dec_egress_queue(); } -- 2.7.5