RCU has only one multi-tail callback list, which is implemented via
the nxtlist, nxttail, nxtcompleted, qlen_lazy, and qlen fields in the
rcu_data structure, and whose operations are open-code throughout the
Tree RCU implementation.  This has been more or less OK in the past,
but upcoming callback-list optimizations in SRCU could really use
a multi-tail callback list there as well.

This commit therefore abstracts the multi-tail callback list handling
into a new kernel/rcu/rcu_segcblist.h file, and uses this new API.
The simple head-and-tail pointer callback list is also abstracted and
applied everywhere except for the NOCB callback-offload lists.  (Yes,
the plan is to apply them there as well, but this commit is already
bigger than would be good.)

Signed-off-by: Paul E. McKenney <paul...@linux.vnet.ibm.com>
---
 kernel/rcu/rcu_segcblist.h | 626 +++++++++++++++++++++++++++++++++++++++++++++
 kernel/rcu/tree.c          | 348 ++++++++-----------------
 kernel/rcu/tree.h          |  41 +--
 kernel/rcu/tree_plugin.h   |  54 ++--
 kernel/rcu/tree_trace.c    |  21 +-
 5 files changed, 781 insertions(+), 309 deletions(-)
 create mode 100644 kernel/rcu/rcu_segcblist.h

diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
new file mode 100644
index 000000000000..55b20ab5f940
--- /dev/null
+++ b/kernel/rcu/rcu_segcblist.h
@@ -0,0 +1,626 @@
+/*
+ * RCU segmented callback lists
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright IBM Corporation, 2017
+ *
+ * Authors: Paul E. McKenney <paul...@linux.vnet.ibm.com>
+ */
+
+#ifndef __KERNEL_RCU_SEGCBLIST_H
+#define __KERNEL_RCU_SEGCBLIST_H
+
+/* Simple unsegmented callback lists. */
+struct rcu_cblist {
+       struct rcu_head *head;
+       struct rcu_head **tail;
+       long len;
+       long len_lazy;
+};
+
+#define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
+
+/* Initialize simple callback list. */
+static inline void rcu_cblist_init(struct rcu_cblist *rclp)
+{
+       rclp->head = NULL;
+       rclp->tail = &rclp->head;
+       rclp->len = 0;
+       rclp->len_lazy = 0;
+}
+
+/* Is simple callback list empty? */
+static inline bool rcu_cblist_empty(struct rcu_cblist *rclp)
+{
+       return !rclp->head;
+}
+
+/* Return number of callbacks in simple callback list. */
+static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
+{
+       return rclp->len;
+}
+
+/* Return number of lazy callbacks in simple callback list. */
+static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
+{
+       return rclp->len_lazy;
+}
+
+/*
+ * Debug function to actually count the number of callbacks.
+ * If the number exceeds the limit specified, return -1.
+ */
+static inline long rcu_cblist_count_cbs(struct rcu_cblist *rclp, long lim)
+{
+       int cnt = 0;
+       struct rcu_head **rhpp = &rclp->head;
+
+       for (;;) {
+               if (!*rhpp)
+                       return cnt;
+               if (++cnt > lim)
+                       return -1;
+               rhpp = &(*rhpp)->next;
+       }
+}
+
+/*
+ * Dequeue the oldest rcu_head structure from the specified callback
+ * list.  This function assumes that the callback is non-lazy, but
+ * the caller can later invoke rcu_cblist_dequeued_lazy() if it
+ * finds otherwise (and if it cares about laziness).  This allows
+ * different users to have different ways of determining laziness.
+ */
+static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp)
+{
+       struct rcu_head *rhp;
+
+       rhp = rclp->head;
+       if (!rhp)
+               return NULL;
+       prefetch(rhp);
+       rclp->len--;
+       rclp->head = rhp->next;
+       if (!rclp->head)
+               rclp->tail = &rclp->head;
+       return rhp;
+}
+
+/*
+ * Account for the fact that a previously dequeued callback turned out
+ * to be marked as lazy.
+ */
+static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp)
+{
+       rclp->len_lazy--;
+}
+
+/*
+ * Interim function to return rcu_cblist head pointer.  Longer term, the
+ * rcu_cblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head *rcu_cblist_head(struct rcu_cblist *rclp)
+{
+       return rclp->head;
+}
+
+/*
+ * Interim function to return rcu_cblist head pointer.  Longer term, the
+ * rcu_cblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head **rcu_cblist_tail(struct rcu_cblist *rclp)
+{
+       WARN_ON_ONCE(rcu_cblist_empty(rclp));
+       return rclp->tail;
+}
+
+/* Complicated segmented callback lists.  ;-) */
+
+/*
+ * Index values for segments in rcu_segcblist structure.
+ *
+ * The segments are as follows:
+ *
+ * [head, *tails[RCU_DONE_TAIL]):
+ *     Callbacks whose grace period has elapsed, and thus can be invoked.
+ * [*tails[RCU_DONE_TAIL], *tails[RCU_WAIT_TAIL]):
+ *     Callbacks waiting for the current GP from the current CPU's viewpoint.
+ * [*tails[RCU_WAIT_TAIL], *tails[RCU_NEXT_READY_TAIL]):
+ *     Callbacks that arrived before the next GP started, again from
+ *     the current CPU's viewpoint.  These can be handled by the next GP.
+ * [*tails[RCU_NEXT_READY_TAIL], *tails[RCU_NEXT_TAIL]):
+ *     Callbacks that might have arrived after the next GP started.
+ *     There is some uncertainty as to when a given GP starts and
+ *     ends, but a CPU knows the exact times if it is the one starting
+ *     or ending the GP.  Other CPUs know that the previous GP ends
+ *     before the next one starts.
+ *
+ * Note that RCU_WAIT_TAIL cannot be empty unless RCU_NEXT_READY_TAIL is also
+ * empty.
+ *
+ * The ->gp_seq[] array contains the grace-period number at which the
+ * corresponding segment of callbacks will be ready to invoke.  A given
+ * element of this array is meaningful only when the corresponding segment
+ * is non-empty, and it is never valid for RCU_DONE_TAIL (whose callbacks
+ * are already ready to invoke) or for RCU_NEXT_TAIL (whose callbacks have
+ * not yet been assigned a grace-period number).
+ */
+#define RCU_DONE_TAIL          0       /* Also RCU_WAIT head. */
+#define RCU_WAIT_TAIL          1       /* Also RCU_NEXT_READY head. */
+#define RCU_NEXT_READY_TAIL    2       /* Also RCU_NEXT head. */
+#define RCU_NEXT_TAIL          3
+#define RCU_CBLIST_NSEGS       4
+
+struct rcu_segcblist {
+       struct rcu_head *head;
+       struct rcu_head **tails[RCU_CBLIST_NSEGS];
+       unsigned long gp_seq[RCU_CBLIST_NSEGS];
+       long len;
+       long len_lazy;
+};
+
+/*
+ * Initialize an rcu_segcblist structure.
+ */
+static inline void rcu_segcblist_init(struct rcu_segcblist *rsclp)
+{
+       int i;
+
+       BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq));
+       BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq));
+       rsclp->head = NULL;
+       for (i = 0; i < RCU_CBLIST_NSEGS; i++)
+               rsclp->tails[i] = &rsclp->head;
+       rsclp->len = 0;
+       rsclp->len_lazy = 0;
+}
+
+/*
+ * Is the specified rcu_segcblist structure empty?
+ *
+ * But careful!  The fact that the ->head field is NULL does not
+ * necessarily imply that there are no callbacks associated with
+ * this structure.  When callbacks are being invoked, they are
+ * removed as a group.  If callback invocation must be preempted,
+ * the remaining callbacks will be added back to the list.  Either
+ * way, the counts are updated later.
+ *
+ * So it is often the case that rcu_segcblist_n_cbs() should be used
+ * instead.
+ */
+static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp)
+{
+       return !rsclp->head;
+}
+
+/* Return number of callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp)
+{
+       return READ_ONCE(rsclp->len);
+}
+
+/* Return number of lazy callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp)
+{
+       return rsclp->len_lazy;
+}
+
+/* Return number of lazy callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp)
+{
+       return rsclp->len - rsclp->len_lazy;
+}
+
+/*
+ * Is the specified rcu_segcblist enabled, for example, not corresponding
+ * to an offline or callback-offloaded CPU?
+ */
+static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
+{
+       return !!rsclp->tails[RCU_NEXT_TAIL];
+}
+
+/*
+ * Disable the specified rcu_segcblist structure, so that callbacks can
+ * no longer be posted to it.  This structure must be empty.
+ */
+static inline void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
+{
+       WARN_ON_ONCE(!rcu_segcblist_empty(rsclp));
+       WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp));
+       WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp));
+       rsclp->tails[RCU_NEXT_TAIL] = NULL;
+}
+
+/*
+ * Is the specified segment of the specified rcu_segcblist structure
+ * empty of callbacks?
+ */
+static inline bool rcu_segcblist_segempty(struct rcu_segcblist *rsclp, int seg)
+{
+       if (seg == RCU_DONE_TAIL)
+               return &rsclp->head == rsclp->tails[RCU_DONE_TAIL];
+       return rsclp->tails[seg - 1] == rsclp->tails[seg];
+}
+
+/*
+ * Are all segments following the specified segment of the specified
+ * rcu_segcblist structure empty of callbacks?  (The specified
+ * segment might well contain callbacks.)
+ */
+static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int 
seg)
+{
+       return !*rsclp->tails[seg];
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * are ready to be invoked?
+ */
+static inline bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp)
+{
+       return rcu_segcblist_is_enabled(rsclp) &&
+              &rsclp->head != rsclp->tails[RCU_DONE_TAIL];
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * are still pending, that is, not yet ready to be invoked?
+ */
+static inline bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp)
+{
+       return rcu_segcblist_is_enabled(rsclp) &&
+              !rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL);
+}
+
+/*
+ * Return a pointer to the first callback in the specified rcu_segcblist
+ * structure.  This is useful for diagnostics.
+ */
+static inline struct rcu_head *
+rcu_segcblist_first_cb(struct rcu_segcblist *rsclp)
+{
+       if (rcu_segcblist_is_enabled(rsclp))
+              return rsclp->head;
+       return NULL;
+}
+
+/*
+ * Return a pointer to the first pending callback in the specified
+ * rcu_segcblist structure.  This is useful just after posting a given
+ * callback -- if that callback is the first pending callback, then
+ * you cannot rely on someone else having already started up the required
+ * grace period.
+ */
+static inline struct rcu_head *
+rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp)
+{
+       if (rcu_segcblist_is_enabled(rsclp))
+              return *rsclp->tails[RCU_DONE_TAIL];
+       return NULL;
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * have not yet been processed beyond having been posted, that is,
+ * does it contain callbacks in its last segment?
+ */
+static inline bool rcu_segcblist_new_cbs(struct rcu_segcblist *rsclp)
+{
+       return rcu_segcblist_is_enabled(rsclp) &&
+              !rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL);
+}
+
+/*
+ * Enqueue the specified callback onto the specified rcu_segcblist
+ * structure, updating accounting as needed.  Note that the ->len
+ * field may be accessed locklessly, hence the WRITE_ONCE().
+ * The ->len field is used by rcu_barrier() and friends to determine
+ * if it must post a callback on this structure, and it is OK
+ * for rcu_barrier() to sometimes post callbacks needlessly, but
+ * absolutely not OK for it to ever miss posting a callback.
+ */
+static inline void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
+                                        struct rcu_head *rhp, bool lazy)
+{
+       WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */
+       if (lazy)
+               rsclp->len_lazy++;
+       smp_mb(); /* Ensure counts are updated before callback is enqueued. */
+       rhp->next = NULL;
+       *rsclp->tails[RCU_NEXT_TAIL] = rhp;
+       rsclp->tails[RCU_NEXT_TAIL] = &rhp->next;
+}
+
+/*
+ * Extract only the counts from the specified rcu_segcblist structure,
+ * and place them in the specified rcu_cblist structure.  This function
+ * supports both callback orphaning and invocation, hence the separation
+ * of counts and callbacks.  (Callbacks ready for invocation must be
+ * orphaned and adopted separately from pending callbacks, but counts
+ * apply to all callbacks.  Locking must be used to make sure that
+ * both orphaned-callbacks lists are consistent.)
+ */
+static inline void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp,
+                                              struct rcu_cblist *rclp)
+{
+       rclp->len_lazy += rsclp->len_lazy;
+       rclp->len += rsclp->len;
+       rsclp->len_lazy = 0;
+       WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */
+}
+
+/*
+ * Extract only those callbacks ready to be invoked from the specified
+ * rcu_segcblist structure and place them in the specified rcu_cblist
+ * structure.
+ */
+static inline void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
+                                                 struct rcu_cblist *rclp)
+{
+       int i;
+
+       if (!rcu_segcblist_ready_cbs(rsclp))
+               return; /* Nothing to do. */
+       *rclp->tail = rsclp->head;
+       rsclp->head = *rsclp->tails[RCU_DONE_TAIL];
+       *rsclp->tails[RCU_DONE_TAIL] = NULL;
+       rclp->tail = rsclp->tails[RCU_DONE_TAIL];
+       for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--)
+               if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL])
+                       rsclp->tails[i] = &rsclp->head;
+}
+
+/*
+ * Extract only those callbacks still pending (not yet ready to be
+ * invoked) from the specified rcu_segcblist structure and place them in
+ * the specified rcu_cblist structure.  Note that this loses information
+ * about any callbacks that might have been partway done waiting for
+ * their grace period.  Too bad!  They will have to start over.
+ */
+static inline void
+rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
+                              struct rcu_cblist *rclp)
+{
+       int i;
+
+       if (!rcu_segcblist_pend_cbs(rsclp))
+               return; /* Nothing to do. */
+       *rclp->tail = *rsclp->tails[RCU_DONE_TAIL];
+       rclp->tail = rsclp->tails[RCU_NEXT_TAIL];
+       *rsclp->tails[RCU_DONE_TAIL] = NULL;
+       for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++)
+               rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL];
+}
+
+/*
+ * Move the entire contents of the specified rcu_segcblist structure,
+ * counts, callbacks, and all, to the specified rcu_cblist structure.
+ * @@@ Why do we need this???  Moving early-boot CBs to NOCB lists?
+ * @@@ Memory barrier needed?  (Not if only used at boot time...)
+ */
+static inline void rcu_segcblist_extract_all(struct rcu_segcblist *rsclp,
+                                            struct rcu_cblist *rclp)
+{
+       rcu_segcblist_extract_done_cbs(rsclp, rclp);
+       rcu_segcblist_extract_pend_cbs(rsclp, rclp);
+       rcu_segcblist_extract_count(rsclp, rclp);
+}
+
+/*
+ * Insert counts from the specified rcu_cblist structure in the
+ * specified rcu_segcblist structure.
+ */
+static inline void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp,
+                                             struct rcu_cblist *rclp)
+{
+       
+       rsclp->len_lazy += rclp->len_lazy;
+       /* ->len sampled locklessly. */
+       WRITE_ONCE(rsclp->len, rsclp->len + rclp->len);
+       rclp->len_lazy = 0;
+       rclp->len = 0;
+}
+
+/*
+ * Move callbacks from the specified rcu_cblist to the beginning of the
+ * done-callbacks segment of the specified rcu_segcblist.
+ */
+static inline void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp,
+                                                struct rcu_cblist *rclp)
+{
+       int i;
+
+       if (!rclp->head)
+               return; /* No callbacks to move. */
+       *rclp->tail = rsclp->head;
+       rsclp->head = rclp->head;
+       for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
+               if (&rsclp->head == rsclp->tails[i])
+                       rsclp->tails[i] = rclp->tail;
+               else
+                       break;
+       rclp->head = NULL;
+       rclp->tail = &rclp->head;
+}
+
+/*
+ * Move callbacks from the specified rcu_cblist to the end of the
+ * new-callbacks segment of the specified rcu_segcblist.
+ */
+static inline void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp,
+                                                struct rcu_cblist *rclp)
+{
+       if (!rclp->head)
+               return; /* Nothing to do. */
+       *rsclp->tails[RCU_NEXT_TAIL] = rclp->head;
+       rsclp->tails[RCU_NEXT_TAIL] = rclp->tail;
+       rclp->head = NULL;
+       rclp->tail = &rclp->head;
+}
+
+/*
+ * Advance the callbacks in the specified rcu_segcblist structure based
+ * on the current value passed in for the grace-period counter.
+ */
+static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp,
+                                        unsigned long seq)
+{
+       int i, j;
+
+       WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
+       WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
+
+       /*
+        * Find all callbacks whose ->gp_seq numbers indicate that they
+        * are ready to invoke, and put them into the RCU_DONE_TAIL segment.
+        */
+       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
+               if (ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
+                       break;
+               rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i];
+       }
+
+       /* If no callbacks moved, nothing more need be done. */
+       if (i == RCU_WAIT_TAIL)
+               return;
+
+       /* Clean up tail pointers that might have been misordered above. */
+       for (j = RCU_WAIT_TAIL; j < i; j++)
+               rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL];
+
+       /*
+        * Callbacks moved, so clean up the misordered ->tails[] pointers
+        * that now point into the middle of the list of ready-to-invoke
+        * callbacks.  The overall effect is to copy down the later pointers
+        * into the gap that was created by the now-ready segments.
+        */
+       for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
+               if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL])
+                       break;  /* No more callbacks. */
+               rsclp->tails[j] = rsclp->tails[i];
+               rsclp->gp_seq[j] = rsclp->gp_seq[i];
+       }
+}
+
+/*
+ * "Accelerate" callbacks based on more-accurate grace-period information.
+ * The reason for this is that RCU does not synchronize the beginnings and
+ * ends of grace periods, and that callbacks are posted locally.  This in
+ * turn means that the callbacks must be labelled conservatively early
+ * on, as getting exact information would degrade both performance and
+ * scalability.  When more accurate grace-period information becomes
+ * available, previously posted callbacks can be "accelerated", marking
+ * them to complete at the end of the earlier grace period.
+ *
+ * This function operates on an rcu_segcblist structure, and also the
+ * grace-period sequence number at which new callbacks would become
+ * ready to invoke.
+ */
+static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp,
+                                           unsigned long seq)
+{
+       int i;
+
+       WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
+       WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
+
+       /*
+        * Find the segment preceding the oldest segment of callbacks
+        * whose ->gp_seq[] completion is at or after that passed in via
+        * "seq", skipping any empty segments.  This oldest segment, along
+        * with any later segments, can be merged in with any newly arrived
+        * callbacks in the RCU_NEXT_TAIL segment, and assigned "seq"
+        * as their ->gp_seq[] grace-period completion sequence number.
+        */
+       for (i = RCU_NEXT_READY_TAIL; i > RCU_DONE_TAIL; i--)
+               if (rsclp->tails[i] != rsclp->tails[i - 1] &&
+                   ULONG_CMP_LT(rsclp->gp_seq[i], seq))
+                       break;
+
+       /*
+        * If all the segments contain callbacks that correspond to
+        * earlier grace-period sequence numbers than "seq", leave.
+        * Assuming that the rcu_segcblist structure has enough
+        * segments in its arrays, this can only happen if some of
+        * the non-done segments contain callbacks that really are
+        * ready to invoke.  This situation will get straightened
+        * out by the next call to rcu_segcblist_advance().
+        *
+        * Also advance to the oldest segment of callbacks whose
+        * ->gp_seq[] completion is at or after that passed in via "seq",
+        * skipping any empty segments.
+        */
+       if (++i >= RCU_NEXT_TAIL)
+               return false;
+
+       /*
+        * Merge all later callbacks, including newly arrived callbacks,
+        * into the segment located by the for-loop above.  Assign "seq"
+        * as the ->gp_seq[] value in order to correctly handle the case
+        * where there were no pending callbacks in the rcu_segcblist
+        * structure other than in the RCU_NEXT_TAIL segment.
+        */
+       for (; i < RCU_NEXT_TAIL; i++) {
+               rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL];
+               rsclp->gp_seq[i] = seq;
+       }
+       return true;
+}
+
+/*
+ * Scan the specified rcu_segcblist structure for callbacks that need
+ * a grace period later than the one specified by "seq".  We don't look
+ * at the RCU_DONE_TAIL or RCU_NEXT_TAIL segments because they don't
+ * have a grace-period sequence number.
+ */
+static inline bool rcu_segcblist_future_gp_needed(struct rcu_segcblist *rsclp,
+                                                 unsigned long seq)
+{
+       int i;
+
+       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
+               if (rsclp->tails[i - 1] != rsclp->tails[i] &&
+                   ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
+                       return true;
+       return false;
+}
+
+/*
+ * Interim function to return rcu_segcblist head pointer.  Longer term, the
+ * rcu_segcblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp)
+{
+       return rsclp->head;
+}
+
+/*
+ * Interim function to return rcu_segcblist head pointer.  Longer term, the
+ * rcu_segcblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp)
+{
+       WARN_ON_ONCE(rcu_segcblist_empty(rsclp));
+       return rsclp->tails[RCU_NEXT_TAIL];
+}
+
+#endif /* __KERNEL_RCU_SEGCBLIST_H */
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index ef4b8f63c06d..00390965c843 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -97,8 +97,8 @@ struct rcu_state sname##_state = { \
        .gpnum = 0UL - 300UL, \
        .completed = 0UL - 300UL, \
        .orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
-       .orphan_nxttail = &sname##_state.orphan_nxtlist, \
-       .orphan_donetail = &sname##_state.orphan_donelist, \
+       .orphan_pend = RCU_CBLIST_INITIALIZER(sname##_state.orphan_pend), \
+       .orphan_done = RCU_CBLIST_INITIALIZER(sname##_state.orphan_done), \
        .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
        .name = RCU_STATE_NAME(sname), \
        .abbr = sabbr, \
@@ -733,16 +733,6 @@ void rcutorture_record_progress(unsigned long vernum)
 EXPORT_SYMBOL_GPL(rcutorture_record_progress);
 
 /*
- * Does the CPU have callbacks ready to be invoked?
- */
-static int
-cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
-{
-       return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
-              rdp->nxttail[RCU_NEXT_TAIL] != NULL;
-}
-
-/*
  * Return the root node of the specified rcu_state structure.
  */
 static struct rcu_node *rcu_get_root(struct rcu_state *rsp)
@@ -772,21 +762,17 @@ static int rcu_future_needs_gp(struct rcu_state *rsp)
 static bool
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-       int i;
-
        if (rcu_gp_in_progress(rsp))
                return false;  /* No, a grace period is already in progress. */
        if (rcu_future_needs_gp(rsp))
                return true;  /* Yes, a no-CBs CPU needs one. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL])
+       if (!rcu_segcblist_is_enabled(&rdp->cblist))
                return false;  /* No, this is a no-CBs (or offline) CPU. */
-       if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
+       if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
                return true;  /* Yes, CPU has newly registered callbacks. */
-       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
-               if (rdp->nxttail[i - 1] != rdp->nxttail[i] &&
-                   ULONG_CMP_LT(READ_ONCE(rsp->completed),
-                                rdp->nxtcompleted[i]))
-                       return true;  /* Yes, CBs for future grace period. */
+       if (rcu_segcblist_future_gp_needed(&rdp->cblist,
+                                          READ_ONCE(rsp->completed)))
+               return true;  /* Yes, CBs for future grace period. */
        return false; /* No grace period needed. */
 }
 
@@ -1497,7 +1483,8 @@ static void print_other_cpu_stall(struct rcu_state *rsp, 
unsigned long gpnum)
 
        print_cpu_stall_info_end();
        for_each_possible_cpu(cpu)
-               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+               totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+                                                           cpu)->cblist);
        pr_cont("(detected by %d, t=%ld jiffies, g=%ld, c=%ld, q=%lu)\n",
               smp_processor_id(), (long)(jiffies - rsp->gp_start),
               (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1551,7 +1538,8 @@ static void print_cpu_stall(struct rcu_state *rsp)
        print_cpu_stall_info(rsp, smp_processor_id());
        print_cpu_stall_info_end();
        for_each_possible_cpu(cpu)
-               totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+               totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+                                                           cpu)->cblist);
        pr_cont(" (t=%lu jiffies g=%ld c=%ld q=%lu)\n",
                jiffies - rsp->gp_start,
                (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1654,30 +1642,6 @@ void rcu_cpu_stall_reset(void)
 }
 
 /*
- * Initialize the specified rcu_data structure's default callback list
- * to empty.  The default callback list is the one that is not used by
- * no-callbacks CPUs.
- */
-static void init_default_callback_list(struct rcu_data *rdp)
-{
-       int i;
-
-       rdp->nxtlist = NULL;
-       for (i = 0; i < RCU_NEXT_SIZE; i++)
-               rdp->nxttail[i] = &rdp->nxtlist;
-}
-
-/*
- * Initialize the specified rcu_data structure's callback list to empty.
- */
-static void init_callback_list(struct rcu_data *rdp)
-{
-       if (init_nocb_callback_list(rdp))
-               return;
-       init_default_callback_list(rdp);
-}
-
-/*
  * Determine the value that ->completed will have at the end of the
  * next subsequent grace period.  This is used to tag callbacks so that
  * a CPU can invoke callbacks in a timely fashion even if that CPU has
@@ -1731,7 +1695,6 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data 
*rdp,
                    unsigned long *c_out)
 {
        unsigned long c;
-       int i;
        bool ret = false;
        struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
 
@@ -1777,13 +1740,11 @@ rcu_start_future_gp(struct rcu_node *rnp, struct 
rcu_data *rdp,
        /*
         * Get a new grace-period number.  If there really is no grace
         * period in progress, it will be smaller than the one we obtained
-        * earlier.  Adjust callbacks as needed.  Note that even no-CBs
-        * CPUs have a ->nxtcompleted[] array, so no no-CBs checks needed.
+        * earlier.  Adjust callbacks as needed.
         */
        c = rcu_cbs_completed(rdp->rsp, rnp_root);
-       for (i = RCU_DONE_TAIL; i < RCU_NEXT_TAIL; i++)
-               if (ULONG_CMP_LT(c, rdp->nxtcompleted[i]))
-                       rdp->nxtcompleted[i] = c;
+       if (!rcu_is_nocb_cpu(rdp->cpu))
+               (void)rcu_segcblist_accelerate(&rdp->cblist, c);
 
        /*
         * If the needed for the required grace period is already
@@ -1863,57 +1824,27 @@ static void rcu_gp_kthread_wake(struct rcu_state *rsp)
 static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                               struct rcu_data *rdp)
 {
-       unsigned long c;
-       int i;
-       bool ret;
-
-       /* If the CPU has no callbacks, nothing to do. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
-               return false;
-
-       /*
-        * Starting from the sublist containing the callbacks most
-        * recently assigned a ->completed number and working down, find the
-        * first sublist that is not assignable to an upcoming grace period.
-        * Such a sublist has something in it (first two tests) and has
-        * a ->completed number assigned that will complete sooner than
-        * the ->completed number for newly arrived callbacks (last test).
-        *
-        * The key point is that any later sublist can be assigned the
-        * same ->completed number as the newly arrived callbacks, which
-        * means that the callbacks in any of these later sublist can be
-        * grouped into a single sublist, whether or not they have already
-        * been assigned a ->completed number.
-        */
-       c = rcu_cbs_completed(rsp, rnp);
-       for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--)
-               if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
-                   !ULONG_CMP_GE(rdp->nxtcompleted[i], c))
-                       break;
+       bool ret = false;
 
-       /*
-        * If there are no sublist for unassigned callbacks, leave.
-        * At the same time, advance "i" one sublist, so that "i" will
-        * index into the sublist where all the remaining callbacks should
-        * be grouped into.
-        */
-       if (++i >= RCU_NEXT_TAIL)
+       /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+       if (!rcu_segcblist_pend_cbs(&rdp->cblist))
                return false;
 
        /*
-        * Assign all subsequent callbacks' ->completed number to the next
-        * full grace period and group them all in the sublist initially
-        * indexed by "i".
+        * Callbacks are often registered with incomplete grace-period
+        * information.  Something about the fact that getting exact
+        * information requires acquiring a global lock...  RCU therefore
+        * makes a conservative estimate of the grace period number at which
+        * a given callback will become ready to invoke.        The following
+        * code checks this estimate and improves it when possible, thus
+        * accelerating callback invocation to an earlier grace-period
+        * number.
         */
-       for (; i <= RCU_NEXT_TAIL; i++) {
-               rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
-               rdp->nxtcompleted[i] = c;
-       }
-       /* Record any needed additional grace periods. */
-       ret = rcu_start_future_gp(rnp, rdp, NULL);
+       if (rcu_segcblist_accelerate(&rdp->cblist, rcu_cbs_completed(rsp, rnp)))
+               ret = rcu_start_future_gp(rnp, rdp, NULL);
 
        /* Trace depending on how much we were able to accelerate. */
-       if (!*rdp->nxttail[RCU_WAIT_TAIL])
+       if (rcu_segcblist_restempty(&rdp->cblist, RCU_WAIT_TAIL))
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
        else
                trace_rcu_grace_period(rsp->name, rdp->gpnum, 
TPS("AccReadyCB"));
@@ -1933,32 +1864,15 @@ static bool rcu_accelerate_cbs(struct rcu_state *rsp, 
struct rcu_node *rnp,
 static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                            struct rcu_data *rdp)
 {
-       int i, j;
-
-       /* If the CPU has no callbacks, nothing to do. */
-       if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
+       /* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+       if (!rcu_segcblist_pend_cbs(&rdp->cblist))
                return false;
 
        /*
         * Find all callbacks whose ->completed numbers indicate that they
         * are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
         */
-       for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
-               if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i]))
-                       break;
-               rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
-       }
-       /* Clean up any sublist tail pointers that were misordered above. */
-       for (j = RCU_WAIT_TAIL; j < i; j++)
-               rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL];
-
-       /* Copy down callbacks to fill in empty sublists. */
-       for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
-               if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
-                       break;
-               rdp->nxttail[j] = rdp->nxttail[i];
-               rdp->nxtcompleted[j] = rdp->nxtcompleted[i];
-       }
+       rcu_segcblist_advance(&rdp->cblist, rnp->completed);
 
        /* Classify any remaining callbacks. */
        return rcu_accelerate_cbs(rsp, rnp, rdp);
@@ -2675,13 +2589,8 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
         * because _rcu_barrier() excludes CPU-hotplug operations, so it
         * cannot be running now.  Thus no memory barrier is required.
         */
-       if (rdp->nxtlist != NULL) {
-               rsp->qlen_lazy += rdp->qlen_lazy;
-               rsp->qlen += rdp->qlen;
-               rdp->n_cbs_orphaned += rdp->qlen;
-               rdp->qlen_lazy = 0;
-               WRITE_ONCE(rdp->qlen, 0);
-       }
+       rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
+       rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
 
        /*
         * Next, move those callbacks still needing a grace period to
@@ -2689,31 +2598,18 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state 
*rsp,
         * Some of the callbacks might have gone partway through a grace
         * period, but that is too bad.  They get to start over because we
         * cannot assume that grace periods are synchronized across CPUs.
-        * We don't bother updating the ->nxttail[] array yet, instead
-        * we just reset the whole thing later on.
         */
-       if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
-               *rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
-               rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
-               *rdp->nxttail[RCU_DONE_TAIL] = NULL;
-       }
+       rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
 
        /*
         * Then move the ready-to-invoke callbacks to the orphanage,
         * where some other CPU will pick them up.  These will not be
         * required to pass though another grace period: They are done.
         */
-       if (rdp->nxtlist != NULL) {
-               *rsp->orphan_donetail = rdp->nxtlist;
-               rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
-       }
+       rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
 
-       /*
-        * Finally, initialize the rcu_data structure's list to empty and
-        * disallow further callbacks on this CPU.
-        */
-       init_callback_list(rdp);
-       rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+       /* Finally, disallow further callbacks on this CPU.  */
+       rcu_segcblist_disable(&rdp->cblist);
 }
 
 /*
@@ -2722,7 +2618,6 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 {
-       int i;
        struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
 
        /* No-CBs CPUs are handled specially. */
@@ -2731,13 +2626,11 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, 
unsigned long flags)
                return;
 
        /* Do the accounting first. */
-       rdp->qlen_lazy += rsp->qlen_lazy;
-       rdp->qlen += rsp->qlen;
-       rdp->n_cbs_adopted += rsp->qlen;
-       if (rsp->qlen_lazy != rsp->qlen)
+       rdp->n_cbs_adopted += rcu_cblist_n_cbs(&rsp->orphan_done);
+       if (rcu_cblist_n_lazy_cbs(&rsp->orphan_done) !=
+           rcu_cblist_n_cbs(&rsp->orphan_done))
                rcu_idle_count_callbacks_posted();
-       rsp->qlen_lazy = 0;
-       rsp->qlen = 0;
+       rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
 
        /*
         * We do not need a memory barrier here because the only way we
@@ -2745,24 +2638,13 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, 
unsigned long flags)
         * we are the task doing the rcu_barrier().
         */
 
-       /* First adopt the ready-to-invoke callbacks. */
-       if (rsp->orphan_donelist != NULL) {
-               *rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
-               *rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
-               for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
-                       if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-                               rdp->nxttail[i] = rsp->orphan_donetail;
-               rsp->orphan_donelist = NULL;
-               rsp->orphan_donetail = &rsp->orphan_donelist;
-       }
-
-       /* And then adopt the callbacks that still need a grace period. */
-       if (rsp->orphan_nxtlist != NULL) {
-               *rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
-               rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
-               rsp->orphan_nxtlist = NULL;
-               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
-       }
+       /* First adopt the ready-to-invoke callbacks, then the done ones. */
+       rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
+       WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_done));
+       rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+       WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_pend));
+       WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
+                    !rcu_segcblist_n_cbs(&rdp->cblist));
 }
 
 /*
@@ -2850,9 +2732,11 @@ static void rcu_cleanup_dead_cpu(int cpu, struct 
rcu_state *rsp)
        rcu_adopt_orphan_cbs(rsp, flags);
        raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
 
-       WARN_ONCE(rdp->qlen != 0 || rdp->nxtlist != NULL,
-                 "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 
nxtlist=%p\n",
-                 cpu, rdp->qlen, rdp->nxtlist);
+       WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
+                 !rcu_segcblist_empty(&rdp->cblist),
+                 "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 
1stCB=%p\n",
+                 cpu, rcu_segcblist_n_cbs(&rdp->cblist),
+                 rcu_segcblist_first_cb(&rdp->cblist));
 }
 
 /*
@@ -2862,14 +2746,17 @@ static void rcu_cleanup_dead_cpu(int cpu, struct 
rcu_state *rsp)
 static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
        unsigned long flags;
-       struct rcu_head *next, *list, **tail;
-       long bl, count, count_lazy;
-       int i;
+       struct rcu_head *rhp;
+       struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
+       long bl, count;
 
        /* If no callbacks are ready, just return. */
-       if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
-               trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);
-               trace_rcu_batch_end(rsp->name, 0, !!READ_ONCE(rdp->nxtlist),
+       if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
+               trace_rcu_batch_start(rsp->name,
+                                     rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                     rcu_segcblist_n_cbs(&rdp->cblist), 0);
+               trace_rcu_batch_end(rsp->name, 0,
+                                   !rcu_segcblist_empty(&rdp->cblist),
                                    need_resched(), is_idle_task(current),
                                    rcu_is_callbacks_kthread());
                return;
@@ -2877,73 +2764,62 @@ static void rcu_do_batch(struct rcu_state *rsp, struct 
rcu_data *rdp)
 
        /*
         * Extract the list of ready callbacks, disabling to prevent
-        * races with call_rcu() from interrupt handlers.
+        * races with call_rcu() from interrupt handlers.  Leave the
+        * callback counts, as rcu_barrier() needs to be conservative.
         */
        local_irq_save(flags);
        WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
        bl = rdp->blimit;
-       trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);
-       list = rdp->nxtlist;
-       rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-       *rdp->nxttail[RCU_DONE_TAIL] = NULL;
-       tail = rdp->nxttail[RCU_DONE_TAIL];
-       for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)
-               if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-                       rdp->nxttail[i] = &rdp->nxtlist;
+       trace_rcu_batch_start(rsp->name, rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                             rcu_segcblist_n_cbs(&rdp->cblist), bl);
+       rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
        local_irq_restore(flags);
 
        /* Invoke callbacks. */
-       count = count_lazy = 0;
-       while (list) {
-               next = list->next;
-               prefetch(next);
-               debug_rcu_head_unqueue(list);
-               if (__rcu_reclaim(rsp->name, list))
-                       count_lazy++;
-               list = next;
-               /* Stop only if limit reached and CPU has something to do. */
-               if (++count >= bl &&
+       rhp = rcu_cblist_dequeue(&rcl);
+       for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+               debug_rcu_head_unqueue(rhp);
+               if (__rcu_reclaim(rsp->name, rhp))
+                       rcu_cblist_dequeued_lazy(&rcl);
+               /*
+                * Stop only if limit reached and CPU has something to do.
+                * Note: The rcl structure counts down from zero.
+                */
+               if (-rcu_cblist_n_cbs(&rcl) >= bl &&
                    (need_resched() ||
                     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
                        break;
        }
 
        local_irq_save(flags);
-       trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),
-                           is_idle_task(current),
+       count = -rcu_cblist_n_cbs(&rcl);
+       trace_rcu_batch_end(rsp->name, count, !rcu_cblist_empty(&rcl),
+                           need_resched(), is_idle_task(current),
                            rcu_is_callbacks_kthread());
 
-       /* Update count, and requeue any remaining callbacks. */
-       if (list != NULL) {
-               *tail = rdp->nxtlist;
-               rdp->nxtlist = list;
-               for (i = 0; i < RCU_NEXT_SIZE; i++)
-                       if (&rdp->nxtlist == rdp->nxttail[i])
-                               rdp->nxttail[i] = tail;
-                       else
-                               break;
-       }
+       /* Update counts and requeue any remaining callbacks. */
+       rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
        smp_mb(); /* List handling before counting for rcu_barrier(). */
-       rdp->qlen_lazy -= count_lazy;
-       WRITE_ONCE(rdp->qlen, rdp->qlen - count);
        rdp->n_cbs_invoked += count;
+       rcu_segcblist_insert_count(&rdp->cblist, &rcl);
 
        /* Reinstate batch limit if we have worked down the excess. */
-       if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
+       count = rcu_segcblist_n_cbs(&rdp->cblist);
+       if (rdp->blimit == LONG_MAX && count <= qlowmark)
                rdp->blimit = blimit;
 
        /* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */
-       if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {
+       if (count == 0 && rdp->qlen_last_fqs_check != 0) {
                rdp->qlen_last_fqs_check = 0;
                rdp->n_force_qs_snap = rsp->n_force_qs;
-       } else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)
-               rdp->qlen_last_fqs_check = rdp->qlen;
-       WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));
+       } else if (count < rdp->qlen_last_fqs_check - qhimark)
+               rdp->qlen_last_fqs_check = count;
+       WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
 
        local_irq_restore(flags);
 
        /* Re-invoke RCU core processing if there are callbacks remaining. */
-       if (cpu_has_callbacks_ready_to_invoke(rdp))
+       if (rcu_segcblist_ready_cbs(&rdp->cblist))
                invoke_rcu_core();
 }
 
@@ -3127,7 +3003,7 @@ __rcu_process_callbacks(struct rcu_state *rsp)
        }
 
        /* If there are callbacks ready, invoke them. */
-       if (cpu_has_callbacks_ready_to_invoke(rdp))
+       if (rcu_segcblist_ready_cbs(&rdp->cblist))
                invoke_rcu_callbacks(rsp, rdp);
 
        /* Do any needed deferred wakeups of rcuo kthreads. */
@@ -3199,7 +3075,8 @@ static void __call_rcu_core(struct rcu_state *rsp, struct 
rcu_data *rdp,
         * invoking force_quiescent_state() if the newly enqueued callback
         * is the only one waiting for a grace period to complete.
         */
-       if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) {
+       if (unlikely(rcu_segcblist_n_cbs(&rdp->cblist) >
+                    rdp->qlen_last_fqs_check + qhimark)) {
 
                /* Are we ignoring a completed grace period? */
                note_gp_changes(rsp, rdp);
@@ -3217,10 +3094,10 @@ static void __call_rcu_core(struct rcu_state *rsp, 
struct rcu_data *rdp,
                        /* Give the grace period a kick. */
                        rdp->blimit = LONG_MAX;
                        if (rsp->n_force_qs == rdp->n_force_qs_snap &&
-                           *rdp->nxttail[RCU_DONE_TAIL] != head)
+                           rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
                                force_quiescent_state(rsp);
                        rdp->n_force_qs_snap = rsp->n_force_qs;
-                       rdp->qlen_last_fqs_check = rdp->qlen;
+                       rdp->qlen_last_fqs_check = 
rcu_segcblist_n_cbs(&rdp->cblist);
                }
        }
 }
@@ -3260,7 +3137,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
        rdp = this_cpu_ptr(rsp->rda);
 
        /* Add the callback to our list. */
-       if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+       if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
                int offline;
 
                if (cpu != -1)
@@ -3279,23 +3156,21 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
                 */
                BUG_ON(cpu != -1);
                WARN_ON_ONCE(!rcu_is_watching());
-               if (!likely(rdp->nxtlist))
-                       init_default_callback_list(rdp);
+               if (rcu_segcblist_empty(&rdp->cblist))
+                       rcu_segcblist_init(&rdp->cblist);
        }
-       WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
-       if (lazy)
-               rdp->qlen_lazy++;
-       else
+       rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
+       if (!lazy)
                rcu_idle_count_callbacks_posted();
-       smp_mb();  /* Count before adding callback for rcu_barrier(). */
-       *rdp->nxttail[RCU_NEXT_TAIL] = head;
-       rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
        if (__is_kfree_rcu_offset((unsigned long)func))
                trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
-                                        rdp->qlen_lazy, rdp->qlen);
+                                        rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                        rcu_segcblist_n_cbs(&rdp->cblist));
        else
-               trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);
+               trace_rcu_callback(rsp->name, head,
+                                  rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+                                  rcu_segcblist_n_cbs(&rdp->cblist));
 
        /* Go handle any RCU core processing required. */
        __call_rcu_core(rsp, rdp, head, flags);
@@ -3607,7 +3482,7 @@ static int __rcu_pending(struct rcu_state *rsp, struct 
rcu_data *rdp)
        }
 
        /* Does this CPU have callbacks ready to invoke? */
-       if (cpu_has_callbacks_ready_to_invoke(rdp)) {
+       if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
                rdp->n_rp_cb_ready++;
                return 1;
        }
@@ -3671,10 +3546,10 @@ static bool __maybe_unused rcu_cpu_has_callbacks(bool 
*all_lazy)
 
        for_each_rcu_flavor(rsp) {
                rdp = this_cpu_ptr(rsp->rda);
-               if (!rdp->nxtlist)
+               if (rcu_segcblist_empty(&rdp->cblist))
                        continue;
                hc = true;
-               if (rdp->qlen != rdp->qlen_lazy || !all_lazy) {
+               if (rcu_segcblist_n_nonlazy_cbs(&rdp->cblist) || !all_lazy) {
                        al = false;
                        break;
                }
@@ -3783,7 +3658,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
                                __call_rcu(&rdp->barrier_head,
                                           rcu_barrier_callback, rsp, cpu, 0);
                        }
-               } else if (READ_ONCE(rdp->qlen)) {
+               } else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
                        _rcu_barrier_trace(rsp, "OnlineQ", cpu,
                                           rsp->barrier_sequence);
                        smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -3892,8 +3767,9 @@ rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
        rdp->qlen_last_fqs_check = 0;
        rdp->n_force_qs_snap = rsp->n_force_qs;
        rdp->blimit = blimit;
-       if (!rdp->nxtlist)
-               init_callback_list(rdp);  /* Re-enable callbacks on this CPU. */
+       if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
+           !init_nocb_callback_list(rdp))
+               rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
        rdp->dynticks->dynticks_nesting = DYNTICK_TASK_EXIT_IDLE;
        rcu_sysidle_init_percpu_data(rdp->dynticks);
        rcu_dynticks_eqs_online();
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 376c01e539c7..93889ff21dbb 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -30,6 +30,7 @@
 #include <linux/seqlock.h>
 #include <linux/swait.h>
 #include <linux/stop_machine.h>
+#include "rcu_segcblist.h"
 
 /*
  * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and
@@ -335,34 +336,9 @@ struct rcu_data {
                                        /* period it is aware of. */
 
        /* 2) batch handling */
-       /*
-        * If nxtlist is not NULL, it is partitioned as follows.
-        * Any of the partitions might be empty, in which case the
-        * pointer to that partition will be equal to the pointer for
-        * the following partition.  When the list is empty, all of
-        * the nxttail elements point to the ->nxtlist pointer itself,
-        * which in that case is NULL.
-        *
-        * [nxtlist, *nxttail[RCU_DONE_TAIL]):
-        *      Entries that batch # <= ->completed
-        *      The grace period for these entries has completed, and
-        *      the other grace-period-completed entries may be moved
-        *      here temporarily in rcu_process_callbacks().
-        * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]):
-        *      Entries that batch # <= ->completed - 1: waiting for current GP
-        * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]):
-        *      Entries known to have arrived before current GP ended
-        * [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL]):
-        *      Entries that might have arrived after current GP ended
-        *      Note that the value of *nxttail[RCU_NEXT_TAIL] will
-        *      always be NULL, as this is the end of the list.
-        */
-       struct rcu_head *nxtlist;
-       struct rcu_head **nxttail[RCU_NEXT_SIZE];
-       unsigned long   nxtcompleted[RCU_NEXT_SIZE];
-                                       /* grace periods for sublists. */
-       long            qlen_lazy;      /* # of lazy queued callbacks */
-       long            qlen;           /* # of queued callbacks, incl lazy */
+       struct rcu_segcblist cblist;    /* Segmented callback list, with */
+                                       /* different callbacks waiting for */
+                                       /* different grace periods. */
        long            qlen_last_fqs_check;
                                        /* qlen at last check for QS forcing */
        unsigned long   n_cbs_invoked;  /* count of RCU cbs invoked. */
@@ -500,14 +476,11 @@ struct rcu_state {
 
        raw_spinlock_t orphan_lock ____cacheline_internodealigned_in_smp;
                                                /* Protect following fields. */
-       struct rcu_head *orphan_nxtlist;        /* Orphaned callbacks that */
+       struct rcu_cblist orphan_pend;          /* Orphaned callbacks that */
                                                /*  need a grace period. */
-       struct rcu_head **orphan_nxttail;       /* Tail of above. */
-       struct rcu_head *orphan_donelist;       /* Orphaned callbacks that */
+       struct rcu_cblist orphan_done;          /* Orphaned callbacks that */
                                                /*  are ready to invoke. */
-       struct rcu_head **orphan_donetail;      /* Tail of above. */
-       long qlen_lazy;                         /* Number of lazy callbacks. */
-       long qlen;                              /* Total number of callbacks. */
+                                               /* (Contains counts.) */
        /* End of fields guarded by orphan_lock. */
 
        struct mutex barrier_mutex;             /* Guards barrier fields. */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 4a9df7ea89a4..db0a03474ae7 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -1350,10 +1350,10 @@ static bool __maybe_unused rcu_try_advance_all_cbs(void)
                 */
                if ((rdp->completed != rnp->completed ||
                     unlikely(READ_ONCE(rdp->gpwrap))) &&
-                   rdp->nxttail[RCU_DONE_TAIL] != rdp->nxttail[RCU_NEXT_TAIL])
+                   rcu_segcblist_pend_cbs(&rdp->cblist))
                        note_gp_changes(rsp, rdp);
 
-               if (cpu_has_callbacks_ready_to_invoke(rdp))
+               if (rcu_segcblist_ready_cbs(&rdp->cblist))
                        cbs_ready = true;
        }
        return cbs_ready;
@@ -1461,7 +1461,7 @@ static void rcu_prepare_for_idle(void)
        rdtp->last_accelerate = jiffies;
        for_each_rcu_flavor(rsp) {
                rdp = this_cpu_ptr(rsp->rda);
-               if (!*rdp->nxttail[RCU_DONE_TAIL])
+               if (rcu_segcblist_pend_cbs(&rdp->cblist))
                        continue;
                rnp = rdp->mynode;
                raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
@@ -1529,7 +1529,7 @@ static void rcu_oom_notify_cpu(void *unused)
 
        for_each_rcu_flavor(rsp) {
                rdp = raw_cpu_ptr(rsp->rda);
-               if (rdp->qlen_lazy != 0) {
+               if (rcu_segcblist_n_lazy_cbs(&rdp->cblist)) {
                        atomic_inc(&oom_callback_count);
                        rsp->call(&rdp->oom_head, rcu_oom_callback);
                }
@@ -1937,30 +1937,26 @@ static bool __maybe_unused 
rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
                                                     struct rcu_data *rdp,
                                                     unsigned long flags)
 {
-       long ql = rsp->qlen;
-       long qll = rsp->qlen_lazy;
+       long ql = rcu_cblist_n_cbs(&rsp->orphan_done);
+       long qll = rcu_cblist_n_lazy_cbs(&rsp->orphan_done);
 
        /* If this is not a no-CBs CPU, tell the caller to do it the old way. */
        if (!rcu_is_nocb_cpu(smp_processor_id()))
                return false;
-       rsp->qlen = 0;
-       rsp->qlen_lazy = 0;
 
        /* First, enqueue the donelist, if any.  This preserves CB ordering. */
-       if (rsp->orphan_donelist != NULL) {
-               __call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
-                                       rsp->orphan_donetail, ql, qll, flags);
-               ql = qll = 0;
-               rsp->orphan_donelist = NULL;
-               rsp->orphan_donetail = &rsp->orphan_donelist;
+       if (!rcu_cblist_empty(&rsp->orphan_done)) {
+               __call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_done),
+                                       rcu_cblist_tail(&rsp->orphan_done),
+                                       ql, qll, flags);
        }
-       if (rsp->orphan_nxtlist != NULL) {
-               __call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
-                                       rsp->orphan_nxttail, ql, qll, flags);
-               ql = qll = 0;
-               rsp->orphan_nxtlist = NULL;
-               rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+       if (!rcu_cblist_empty(&rsp->orphan_pend)) {
+               __call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_pend),
+                                       rcu_cblist_tail(&rsp->orphan_pend),
+                                       ql, qll, flags);
        }
+       rcu_cblist_init(&rsp->orphan_done);
+       rcu_cblist_init(&rsp->orphan_pend);
        return true;
 }
 
@@ -2403,16 +2399,16 @@ static bool init_nocb_callback_list(struct rcu_data 
*rdp)
                return false;
 
        /* If there are early-boot callbacks, move them to nocb lists. */
-       if (rdp->nxtlist) {
-               rdp->nocb_head = rdp->nxtlist;
-               rdp->nocb_tail = rdp->nxttail[RCU_NEXT_TAIL];
-               atomic_long_set(&rdp->nocb_q_count, rdp->qlen);
-               atomic_long_set(&rdp->nocb_q_count_lazy, rdp->qlen_lazy);
-               rdp->nxtlist = NULL;
-               rdp->qlen = 0;
-               rdp->qlen_lazy = 0;
+       if (!rcu_segcblist_empty(&rdp->cblist)) {
+               rdp->nocb_head = rcu_segcblist_head(&rdp->cblist);
+               rdp->nocb_tail = rcu_segcblist_tail(&rdp->cblist);
+               atomic_long_set(&rdp->nocb_q_count,
+                               rcu_segcblist_n_cbs(&rdp->cblist));
+               atomic_long_set(&rdp->nocb_q_count_lazy,
+                               rcu_segcblist_n_lazy_cbs(&rdp->cblist));
+               rcu_segcblist_init(&rdp->cblist);
        }
-       rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+       rcu_segcblist_disable(&rdp->cblist);
        return true;
 }
 
diff --git a/kernel/rcu/tree_trace.c b/kernel/rcu/tree_trace.c
index 65b43be38e68..066c64071a7b 100644
--- a/kernel/rcu/tree_trace.c
+++ b/kernel/rcu/tree_trace.c
@@ -41,6 +41,7 @@
 #include <linux/mutex.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
+#include <linux/prefetch.h>
 
 #define RCU_TREE_NONCORE
 #include "tree.h"
@@ -128,17 +129,15 @@ static void print_one_rcu_data(struct seq_file *m, struct 
rcu_data *rdp)
                   rdp->dynticks_fqs);
        seq_printf(m, " of=%lu", rdp->offline_fqs);
        rcu_nocb_q_lengths(rdp, &ql, &qll);
-       qll += rdp->qlen_lazy;
-       ql += rdp->qlen;
+       qll += rcu_segcblist_n_lazy_cbs(&rdp->cblist);
+       ql += rcu_segcblist_n_cbs(&rdp->cblist);
        seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
                   qll, ql,
-                  ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
-                       rdp->nxttail[RCU_NEXT_TAIL]],
-                  ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=
-                       rdp->nxttail[RCU_NEXT_READY_TAIL]],
-                  ".W"[rdp->nxttail[RCU_DONE_TAIL] !=
-                       rdp->nxttail[RCU_WAIT_TAIL]],
-                  ".D"[&rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL]]);
+                  ".N"[!rcu_segcblist_segempty(&rdp->cblist, RCU_NEXT_TAIL)],
+                  ".R"[!rcu_segcblist_segempty(&rdp->cblist,
+                                               RCU_NEXT_READY_TAIL)],
+                  ".W"[!rcu_segcblist_segempty(&rdp->cblist, RCU_WAIT_TAIL)],
+                  ".D"[!rcu_segcblist_segempty(&rdp->cblist, RCU_DONE_TAIL)]);
 #ifdef CONFIG_RCU_BOOST
        seq_printf(m, " kt=%d/%c ktl=%x",
                   per_cpu(rcu_cpu_has_work, rdp->cpu),
@@ -276,7 +275,9 @@ static void print_one_rcu_state(struct seq_file *m, struct 
rcu_state *rsp)
        seq_printf(m, "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
                   rsp->n_force_qs, rsp->n_force_qs_ngp,
                   rsp->n_force_qs - rsp->n_force_qs_ngp,
-                  READ_ONCE(rsp->n_force_qs_lh), rsp->qlen_lazy, rsp->qlen);
+                  READ_ONCE(rsp->n_force_qs_lh),
+                  rcu_cblist_n_lazy_cbs(&rsp->orphan_done),
+                  rcu_cblist_n_cbs(&rsp->orphan_done));
        for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < rcu_num_nodes; rnp++) {
                if (rnp->level != level) {
                        seq_puts(m, "\n");
-- 
2.5.2

Reply via email to