On Wed, Jun 24, 2015 at 07:58:30PM +0200, Peter Zijlstra wrote: > On Wed, Jun 24, 2015 at 10:10:17AM -0700, Paul E. McKenney wrote: > > > The thing is, once you start bailing on this condition your 'queue' > > > drains very fast and this is around the same time sync_rcu() would've > > > released the waiters too. > > > > In my experience, this sort of thing simply melts down on large systems. > > I am reworking this with multiple locks so as to keep the large-system > > contention down to a dull roar. > > So with the MCS queue we're got less global trashing than you had with > the start/done tickets. Only the queue head on enqueue.
Here is what I had in mind, where you don't have any global trashing except when the ->expedited_sequence gets updated. Passes mild rcutorture testing. Still needs asynchronous CPU stoppage and stall warnings and trace documentation updates. Plus fixes for whatever bugs show up. Thanx, Paul ------------------------------------------------------------------------ diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c index 78d0a87ff354..887370b7e52a 100644 --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -70,6 +70,7 @@ MODULE_ALIAS("rcutree"); static struct lock_class_key rcu_node_class[RCU_NUM_LVLS]; static struct lock_class_key rcu_fqs_class[RCU_NUM_LVLS]; +static struct lock_class_key rcu_exp_class[RCU_NUM_LVLS]; /* * In order to export the rcu_state name to the tracing tools, it @@ -3323,6 +3324,22 @@ static int synchronize_sched_expedited_cpu_stop(void *data) return 0; } +/* Common code for synchronize_sched_expedited() work-done checking. */ +static bool sync_sched_exp_wd(struct rcu_state *rsp, struct rcu_node *rnp, + atomic_long_t *stat, unsigned long s) +{ + if (ULONG_CMP_GE(READ_ONCE(rsp->expedited_sequence), s)) { + if (rnp) + mutex_unlock(&rnp->exp_funnel_mutex); + /* Ensure test happens before caller kfree(). */ + smp_mb__before_atomic(); /* ^^^ */ + atomic_long_inc(stat); + put_online_cpus(); + return true; + } + return false; +} + /** * synchronize_sched_expedited - Brute-force RCU-sched grace period * @@ -3334,58 +3351,24 @@ static int synchronize_sched_expedited_cpu_stop(void *data) * restructure your code to batch your updates, and then use a single * synchronize_sched() instead. * - * This implementation can be thought of as an application of ticket - * locking to RCU, with sync_sched_expedited_started and - * sync_sched_expedited_done taking on the roles of the halves - * of the ticket-lock word. Each task atomically increments - * sync_sched_expedited_started upon entry, snapshotting the old value, - * then attempts to stop all the CPUs. If this succeeds, then each - * CPU will have executed a context switch, resulting in an RCU-sched - * grace period. We are then done, so we use atomic_cmpxchg() to - * update sync_sched_expedited_done to match our snapshot -- but - * only if someone else has not already advanced past our snapshot. - * - * On the other hand, if try_stop_cpus() fails, we check the value - * of sync_sched_expedited_done. If it has advanced past our - * initial snapshot, then someone else must have forced a grace period - * some time after we took our snapshot. In this case, our work is - * done for us, and we can simply return. Otherwise, we try again, - * but keep our initial snapshot for purposes of checking for someone - * doing our work for us. - * - * If we fail too many times in a row, we fall back to synchronize_sched(). + * This implementation can be thought of as an application of sequence + * locking to expedited grace periods, but using the sequence counter to + * determine when someone else has already done the work instead of for + * retrying readers. */ void synchronize_sched_expedited(void) { - cpumask_var_t cm; - bool cma = false; int cpu; - long firstsnap, s, snap; - int trycount = 0; + long s; struct rcu_state *rsp = &rcu_sched_state; + struct rcu_node *rnp0; + struct rcu_node *rnp1 = NULL; - /* - * If we are in danger of counter wrap, just do synchronize_sched(). - * By allowing sync_sched_expedited_started to advance no more than - * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring - * that more than 3.5 billion CPUs would be required to force a - * counter wrap on a 32-bit system. Quite a few more CPUs would of - * course be required on a 64-bit system. - */ - if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start), - (ulong)atomic_long_read(&rsp->expedited_done) + - ULONG_MAX / 8)) { - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_wrap); - return; - } + /* Take a snapshot of the sequence number. */ + smp_mb(); /* Caller's modifications seen first by other CPUs. */ + s = (READ_ONCE(rsp->expedited_sequence) + 3) & ~0x1; + smp_mb(); /* Above access must not bleed into critical section. */ - /* - * Take a ticket. Note that atomic_inc_return() implies a - * full memory barrier. - */ - snap = atomic_long_inc_return(&rsp->expedited_start); - firstsnap = snap; if (!try_get_online_cpus()) { /* CPU hotplug operation in flight, fall back to normal GP. */ wait_rcu_gp(call_rcu_sched); @@ -3394,100 +3377,47 @@ void synchronize_sched_expedited(void) } WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id())); - /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */ - cma = zalloc_cpumask_var(&cm, GFP_KERNEL); - if (cma) { - cpumask_copy(cm, cpu_online_mask); - cpumask_clear_cpu(raw_smp_processor_id(), cm); - for_each_cpu(cpu, cm) { - struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); - - if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) - cpumask_clear_cpu(cpu, cm); - } - if (cpumask_weight(cm) == 0) - goto all_cpus_idle; - } - /* - * Each pass through the following loop attempts to force a - * context switch on each CPU. + * Each pass through the following loop works its way + * up the rcu_node tree, returning if others have done the + * work or otherwise falls through holding the root rnp's + * ->exp_funnel_mutex. The mapping from CPU to rcu_node structure + * can be inexact, as it is just promoting locality and is not + * strictly needed for correctness. */ - while (try_stop_cpus(cma ? cm : cpu_online_mask, - synchronize_sched_expedited_cpu_stop, - NULL) == -EAGAIN) { - put_online_cpus(); - atomic_long_inc(&rsp->expedited_tryfail); - - /* Check to see if someone else did our work for us. */ - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_workdone1); - free_cpumask_var(cm); + rnp0 = per_cpu_ptr(rsp->rda, raw_smp_processor_id())->mynode; + for (; rnp0 != NULL; rnp0 = rnp0->parent) { + if (sync_sched_exp_wd(rsp, rnp1, &rsp->expedited_workdone1, s)) return; - } + mutex_lock(&rnp0->exp_funnel_mutex); + if (rnp1) + mutex_unlock(&rnp1->exp_funnel_mutex); + rnp1 = rnp0; + } + rnp0 = rnp1; /* rcu_get_root(rsp), AKA root rcu_node structure. */ + if (sync_sched_exp_wd(rsp, rnp0, &rsp->expedited_workdone2, s)) + return; - /* No joy, try again later. Or just synchronize_sched(). */ - if (trycount++ < 10) { - udelay(trycount * num_online_cpus()); - } else { - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_normal); - free_cpumask_var(cm); - return; - } + WRITE_ONCE(rsp->expedited_sequence, rsp->expedited_sequence + 1); + smp_mb(); /* Ensure expedited GP seen after counter increment. */ + WARN_ON_ONCE(!(rsp->expedited_sequence & 0x1)); - /* Recheck to see if someone else did our work for us. */ - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_workdone2); - free_cpumask_var(cm); - return; - } + /* Stop each CPU that is online, non-idle, and not us. */ + for_each_online_cpu(cpu) { + struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); - /* - * Refetching sync_sched_expedited_started allows later - * callers to piggyback on our grace period. We retry - * after they started, so our grace period works for them, - * and they started after our first try, so their grace - * period works for us. - */ - if (!try_get_online_cpus()) { - /* CPU hotplug operation in flight, use normal GP. */ - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_normal); - free_cpumask_var(cm); - return; - } - snap = atomic_long_read(&rsp->expedited_start); - smp_mb(); /* ensure read is before try_stop_cpus(). */ + /* Skip our CPU and any idle CPUs. */ + if (raw_smp_processor_id() == cpu || + !(atomic_add_return(0, &rdtp->dynticks) & 0x1)) + continue; + stop_one_cpu(cpu, synchronize_sched_expedited_cpu_stop, NULL); } - atomic_long_inc(&rsp->expedited_stoppedcpus); - -all_cpus_idle: - free_cpumask_var(cm); - /* - * Everyone up to our most recent fetch is covered by our grace - * period. Update the counter, but only if our work is still - * relevant -- which it won't be if someone who started later - * than we did already did their update. - */ - do { - atomic_long_inc(&rsp->expedited_done_tries); - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)snap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_done_lost); - break; - } - } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s); - atomic_long_inc(&rsp->expedited_done_exit); + smp_mb(); /* Ensure expedited GP seen before counter increment. */ + WRITE_ONCE(rsp->expedited_sequence, rsp->expedited_sequence + 1); + WARN_ON_ONCE(rsp->expedited_sequence & 0x1); + mutex_unlock(&rnp0->exp_funnel_mutex); + smp_mb(); /* ensure subsequent action seen after grace period. */ put_online_cpus(); } @@ -4043,6 +3973,7 @@ static void __init rcu_init_one(struct rcu_state *rsp, { static const char * const buf[] = RCU_NODE_NAME_INIT; static const char * const fqs[] = RCU_FQS_NAME_INIT; + static const char * const exp[] = RCU_EXP_NAME_INIT; static u8 fl_mask = 0x1; int levelcnt[RCU_NUM_LVLS]; /* # nodes in each level. */ @@ -4101,6 +4032,9 @@ static void __init rcu_init_one(struct rcu_state *rsp, rnp->level = i; INIT_LIST_HEAD(&rnp->blkd_tasks); rcu_init_one_nocb(rnp); + mutex_init(&rnp->exp_funnel_mutex); + lockdep_set_class_and_name(&rnp->exp_funnel_mutex, + &rcu_exp_class[i], exp[i]); } } diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h index de22d6d06bf9..f0f4dd96dd73 100644 --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -68,6 +68,7 @@ # define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0 } # define RCU_NODE_NAME_INIT { "rcu_node_0" } # define RCU_FQS_NAME_INIT { "rcu_node_fqs_0" } +# define RCU_EXP_NAME_INIT { "rcu_node_exp_0" } #elif NR_CPUS <= RCU_FANOUT_2 # define RCU_NUM_LVLS 2 # define NUM_RCU_LVL_0 1 @@ -76,6 +77,7 @@ # define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1 } # define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1" } # define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1" } +# define RCU_EXP_NAME_INIT { "rcu_node_exp_0", "rcu_node_exp_1" } #elif NR_CPUS <= RCU_FANOUT_3 # define RCU_NUM_LVLS 3 # define NUM_RCU_LVL_0 1 @@ -85,6 +87,7 @@ # define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2 } # define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1", "rcu_node_2" } # define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2" } +# define RCU_EXP_NAME_INIT { "rcu_node_exp_0", "rcu_node_exp_1", "rcu_node_exp_2" } #elif NR_CPUS <= RCU_FANOUT_4 # define RCU_NUM_LVLS 4 # define NUM_RCU_LVL_0 1 @@ -95,6 +98,7 @@ # define NUM_RCU_LVL_INIT { NUM_RCU_LVL_0, NUM_RCU_LVL_1, NUM_RCU_LVL_2, NUM_RCU_LVL_3 } # define RCU_NODE_NAME_INIT { "rcu_node_0", "rcu_node_1", "rcu_node_2", "rcu_node_3" } # define RCU_FQS_NAME_INIT { "rcu_node_fqs_0", "rcu_node_fqs_1", "rcu_node_fqs_2", "rcu_node_fqs_3" } +# define RCU_EXP_NAME_INIT { "rcu_node_exp_0", "rcu_node_exp_1", "rcu_node_exp_2", "rcu_node_exp_3" } #else # error "CONFIG_RCU_FANOUT insufficient for NR_CPUS" #endif /* #if (NR_CPUS) <= RCU_FANOUT_1 */ @@ -237,6 +241,8 @@ struct rcu_node { int need_future_gp[2]; /* Counts of upcoming no-CB GP requests. */ raw_spinlock_t fqslock ____cacheline_internodealigned_in_smp; + + struct mutex exp_funnel_mutex ____cacheline_internodealigned_in_smp; } ____cacheline_internodealigned_in_smp; /* @@ -478,17 +484,11 @@ struct rcu_state { /* _rcu_barrier(). */ /* End of fields guarded by barrier_mutex. */ - atomic_long_t expedited_start; /* Starting ticket. */ - atomic_long_t expedited_done; /* Done ticket. */ - atomic_long_t expedited_wrap; /* # near-wrap incidents. */ + unsigned long expedited_sequence; /* Take a ticket. */ atomic_long_t expedited_tryfail; /* # acquisition failures. */ atomic_long_t expedited_workdone1; /* # done by others #1. */ atomic_long_t expedited_workdone2; /* # done by others #2. */ atomic_long_t expedited_normal; /* # fallbacks to normal. */ - atomic_long_t expedited_stoppedcpus; /* # successful stop_cpus. */ - atomic_long_t expedited_done_tries; /* # tries to update _done. */ - atomic_long_t expedited_done_lost; /* # times beaten to _done. */ - atomic_long_t expedited_done_exit; /* # times exited _done loop. */ unsigned long jiffies_force_qs; /* Time at which to invoke */ /* force_quiescent_state(). */ diff --git a/kernel/rcu/tree_trace.c b/kernel/rcu/tree_trace.c index 3ea7ffc7d5c4..d2aab8dcd58e 100644 --- a/kernel/rcu/tree_trace.c +++ b/kernel/rcu/tree_trace.c @@ -185,18 +185,13 @@ static int show_rcuexp(struct seq_file *m, void *v) { struct rcu_state *rsp = (struct rcu_state *)m->private; - seq_printf(m, "s=%lu d=%lu w=%lu tf=%lu wd1=%lu wd2=%lu n=%lu sc=%lu dt=%lu dl=%lu dx=%lu\n", - atomic_long_read(&rsp->expedited_start), - atomic_long_read(&rsp->expedited_done), - atomic_long_read(&rsp->expedited_wrap), + seq_printf(m, "t=%lu tf=%lu wd1=%lu wd2=%lu n=%lu sc=%lu\n", + rsp->expedited_sequence, atomic_long_read(&rsp->expedited_tryfail), atomic_long_read(&rsp->expedited_workdone1), atomic_long_read(&rsp->expedited_workdone2), atomic_long_read(&rsp->expedited_normal), - atomic_long_read(&rsp->expedited_stoppedcpus), - atomic_long_read(&rsp->expedited_done_tries), - atomic_long_read(&rsp->expedited_done_lost), - atomic_long_read(&rsp->expedited_done_exit)); + rsp->expedited_sequence / 2); return 0; } -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/