Allocating an entire svc_rqst (including all of the pages, etc...) for
each workqueue request is pretty expensive. Keep a cache of allocated
svc_rqst structures for each NUMA node that we keep in svc_pool.

In order to keep the cache from growing without bound, we register a
shrinker. Since the cache is already NUMA-aware, we can use a NUMA-aware
shrinker as well.

Signed-off-by: Jeff Layton <[email protected]>
---
 fs/nfsd/nfssvc.c           |   6 +-
 include/linux/sunrpc/svc.h |  17 ++++++
 net/sunrpc/svc.c           |   1 +
 net/sunrpc/svc_wq.c        | 136 ++++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 154 insertions(+), 6 deletions(-)

diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 2c7ebced0311..c359e8f77b30 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -672,7 +672,6 @@ nfsd(void *vrqstp)
 static void
 nfsd_work(struct work_struct *work)
 {
-       int node = numa_node_id();
        struct svc_xprt *xprt = container_of(work, struct svc_xprt, xpt_work);
        struct net *net = xprt->xpt_net;
        struct nfsd_net *nn = net_generic(net, nfsd_net_id);
@@ -681,7 +680,7 @@ nfsd_work(struct work_struct *work)
        struct fs_struct *saved_fs;
        int err;
 
-       rqstp = svc_rqst_alloc(serv, &serv->sv_pools[node], node);
+       rqstp = find_or_alloc_svc_rqst(serv);
        if (!rqstp) {
                /* Alloc failure. Give up for now, and requeue the work */
                queue_work(serv->sv_wq, &xprt->xpt_work);
@@ -703,8 +702,7 @@ nfsd_work(struct work_struct *work)
 
        saved_fs = swap_fs_struct(saved_fs);
        put_fs_struct(saved_fs);
-
-       svc_rqst_free(rqstp);
+       put_svc_rqst(rqstp);
 }
 
 static struct svc_serv_ops nfsd_wq_sv_ops = {
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index f47de87660b4..33321ddacfee 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -105,6 +105,7 @@ struct svc_serv {
        struct svc_pool *       sv_pools;       /* array of thread pools */
        struct svc_serv_ops *   sv_ops;         /* server operations */
        struct workqueue_struct *sv_wq;         /* workqueue for wq-based 
services */
+       struct shrinker         sv_shrinker;    /* for shrinking svc_rqst 
caches */
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        struct list_head        sv_cb_list;     /* queue for callback requests
                                                 * that arrive over the same
@@ -274,6 +275,7 @@ struct svc_rqst {
 #define        RQ_VICTIM       (5)                     /* about to be shut 
down */
 #define        RQ_BUSY         (6)                     /* request is busy */
        unsigned long           rq_flags;       /* flags field */
+       unsigned long           rq_time;        /* when rqstp was last put */
 
        void *                  rq_argp;        /* decoded arguments */
        void *                  rq_resp;        /* xdr'd results */
@@ -493,6 +495,21 @@ char *                svc_print_addr(struct svc_rqst *, 
char *, size_t);
 #if IS_ENABLED(CONFIG_SUNRPC_SVC_WORKQUEUE)
 int               svc_wq_setup(struct svc_serv *, struct svc_pool *, int);
 void              svc_wq_enqueue_xprt(struct svc_xprt *);
+struct svc_rqst        *  find_or_alloc_svc_rqst(struct svc_serv *serv);
+void              exit_svc_rqst_cache(struct svc_serv *serv);
+
+static inline void
+put_svc_rqst(struct svc_rqst *rqstp)
+{
+       rqstp->rq_time = jiffies;
+       clear_bit(RQ_BUSY, &rqstp->rq_flags);
+}
+#else
+static inline void
+exit_svc_rqst_cache(struct svc_serv *serv)
+{
+       return;
+}
 #endif
 
 #define        RPC_MAX_ADDRBUFLEN      (63U)
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 4300bc852f6e..4ebba00b8b27 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -547,6 +547,7 @@ svc_destroy(struct svc_serv *serv)
 
        if (serv->sv_wq) {
                destroy_workqueue(serv->sv_wq);
+               exit_svc_rqst_cache(serv);
                module_put(serv->sv_ops->svo_module);
        }
 
diff --git a/net/sunrpc/svc_wq.c b/net/sunrpc/svc_wq.c
index d4720ecd0b32..e96bbf49c1a0 100644
--- a/net/sunrpc/svc_wq.c
+++ b/net/sunrpc/svc_wq.c
@@ -12,6 +12,130 @@
 #include <trace/events/sunrpc.h>
 
 /*
+ * Find a svc_rqst to use. Try to find an already allocated-one on the list
+ * first, and then allocate if there isn't one already available.
+ */
+struct svc_rqst *
+find_or_alloc_svc_rqst(struct svc_serv *serv)
+{
+       int node = numa_node_id();
+       struct svc_rqst *rqstp;
+       struct svc_pool *pool = &serv->sv_pools[node];
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+               if (!test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
+                       rcu_read_unlock();
+                       return rqstp;
+               }
+       }
+       rcu_read_unlock();
+
+       rqstp = svc_rqst_alloc(serv, pool, node);
+       if (rqstp) {
+               spin_lock_bh(&pool->sp_lock);
+               list_add_tail_rcu(&rqstp->rq_all, &pool->sp_all_threads);
+               ++pool->sp_nrthreads;
+               spin_unlock_bh(&pool->sp_lock);
+       }
+       return rqstp;
+}
+EXPORT_SYMBOL_GPL(find_or_alloc_svc_rqst);
+
+static unsigned long
+count_svc_rqst_objects(struct shrinker *shrinker, struct shrink_control *sc)
+{
+       struct svc_serv *serv = container_of(shrinker, struct svc_serv,
+                                               sv_shrinker);
+       struct svc_pool *pool = &serv->sv_pools[sc->nid];
+       struct svc_rqst *rqstp;
+       unsigned long count = 0;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+               /* Don't count it if it's busy */
+               if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+                       continue;
+
+               /* Don't count it if it was used within the last second */
+               if (time_before(jiffies, rqstp->rq_time + HZ))
+                       continue;
+
+               ++count;
+       }
+       rcu_read_unlock();
+
+       return count;
+}
+
+static unsigned long
+scan_svc_rqst_objects(struct shrinker *shrinker, struct shrink_control *sc)
+{
+       struct svc_serv *serv = container_of(shrinker, struct svc_serv,
+                                               sv_shrinker);
+       struct svc_pool *pool = &serv->sv_pools[sc->nid];
+       struct svc_rqst *rqstp;
+       unsigned long count = 0;
+
+       spin_lock(&pool->sp_lock);
+       list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+               /* Don't free it if it's busy */
+               if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags))
+                       continue;
+
+               list_del_rcu(&rqstp->rq_all);
+               svc_rqst_free(rqstp);
+               --pool->sp_nrthreads;
+               ++count;
+               if (sc->nr_to_scan-- == 0)
+                       break;
+       }
+       spin_unlock(&pool->sp_lock);
+
+       return count;
+}
+
+static int
+init_svc_rqst_cache(struct svc_serv *serv)
+{
+       struct shrinker *shrinker = &serv->sv_shrinker;
+
+       memset(shrinker, 0, sizeof(*shrinker));
+
+       shrinker->count_objects = count_svc_rqst_objects;
+       shrinker->scan_objects = scan_svc_rqst_objects;
+       shrinker->seeks = DEFAULT_SEEKS;
+       shrinker->flags = SHRINKER_NUMA_AWARE;
+
+       return register_shrinker(shrinker);
+}
+
+void
+exit_svc_rqst_cache(struct svc_serv *serv)
+{
+       int node;
+
+       unregister_shrinker(&serv->sv_shrinker);
+
+       for (node = 0; node < serv->sv_nrpools; node++) {
+               struct svc_pool *pool = &serv->sv_pools[node];
+
+               spin_lock_bh(&pool->sp_lock);
+               while (!list_empty(&pool->sp_all_threads)) {
+                       struct svc_rqst *rqstp = list_first_entry(
+                                       &pool->sp_all_threads, struct svc_rqst,
+                                       rq_all);
+
+                       WARN_ON_ONCE(test_bit(RQ_BUSY, &rqstp->rq_flags));
+                       list_del_rcu(&rqstp->rq_all);
+                       svc_rqst_free(rqstp);
+               }
+               pool->sp_nrthreads = 0;
+               spin_unlock_bh(&pool->sp_lock);
+       }
+}
+
+/*
  * This workqueue job should run on each node when the workqueue is created. It
  * walks the list of xprts for its node, and queues the workqueue job for each.
  */
@@ -58,12 +182,13 @@ process_queued_xprts(struct svc_serv *serv)
 
 /*
  * Start up or shut down a workqueue-based RPC service. Basically, we use this
- * to allocate the workqueue. The function assumes that the caller holds one
- * serv->sv_nrthreads reference.
+ * to allocate the workqueue and set up the shrinker for the svc_rqst cache.
+ * This function assumes that the caller holds one serv->sv_nrthreads 
reference.
  */
 int
 svc_wq_setup(struct svc_serv *serv, struct svc_pool *pool, int max_active)
 {
+       int err;
        int nrthreads = serv->sv_nrthreads - 1; /* -1 for caller's reference */
 
        WARN_ON_ONCE(nrthreads < 0);
@@ -79,14 +204,21 @@ svc_wq_setup(struct svc_serv *serv, struct svc_pool *pool, 
int max_active)
                /* svc is down and none requested? */
                if (!max_active)
                        return 0;
+
+               err = init_svc_rqst_cache(serv);
+               if (err)
+                       return err;
+
                __module_get(serv->sv_ops->svo_module);
                serv->sv_wq = alloc_workqueue("%s",
                                        WQ_UNBOUND|WQ_FREEZABLE|WQ_SYSFS,
                                        max_active, serv->sv_name);
                if (!serv->sv_wq) {
+                       exit_svc_rqst_cache(serv);
                        module_put(serv->sv_ops->svo_module);
                        return -ENOMEM;
                }
+
                process_queued_xprts(serv);
        } else {
                /*
-- 
2.1.0

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to