On Wed, Jun 24, 2015 at 10:10:04AM -0700, Paul E. McKenney wrote: > On Wed, Jun 24, 2015 at 06:42:00PM +0200, Peter Zijlstra wrote: > > On Wed, Jun 24, 2015 at 09:09:04AM -0700, Paul E. McKenney wrote:
[ . . . ] > > > It looks like I do need to use smp_call_function_single() and your > > > resched_cpu() because calling stop_one_cpu() sequentially is about > > > twice as slow as try_stop_cpus() in rcutorture runs of up to 16 CPUs. > > > But either way, your point about not stopping all the CPUs does hold. > > > > Bah, I was afraid of that, the problem is that we wait for the > > individual stop_work to complete before sending another. > > > > The below is getting a little out of hand, but should avoid the problem > > and might be easier than getting the IPI think going, but who knows. > > OK, I will give this a try. Of course, the counter needs to be > initialized to 1 rather than zero, and it needs to be atomically > decremented after all stop_one_cpu_nowait() invocations, otherwise you > can get an early wakeup due to the usual race conditions. Except that I promised Ingo I would check for CPUs failing to schedule quickly enough, which means that I must track them individually rather than via a single counter... You did have me going for a bit, though! ;-) Thanx, Paul > > --- > > --- a/kernel/rcu/tree.c > > +++ b/kernel/rcu/tree.c > > @@ -103,6 +103,7 @@ struct rcu_state sname##_state = { \ > > .orphan_nxttail = &sname##_state.orphan_nxtlist, \ > > .orphan_donetail = &sname##_state.orphan_donelist, \ > > .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \ > > + .expedited_mutex = __MUTEX_INITIALIZER(sname##_state.expedited_mutex), \ > > .name = RCU_STATE_NAME(sname), \ > > .abbr = sabbr, \ > > } > > @@ -3253,23 +3254,28 @@ void cond_synchronize_rcu(unsigned long > > } > > EXPORT_SYMBOL_GPL(cond_synchronize_rcu); > > > > +struct exp_stop_state { > > + wait_queue_head_t *wq; > > + atomic_t count; > > +}; > > + > > static int synchronize_sched_expedited_cpu_stop(void *data) > > { > > + struct exp_stop_state *ess = data; > > + > > /* > > * There must be a full memory barrier on each affected CPU > > * between the time that try_stop_cpus() is called and the > > * time that it returns. > > - * > > - * In the current initial implementation of cpu_stop, the > > - * above condition is already met when the control reaches > > - * this point and the following smp_mb() is not strictly > > - * necessary. Do smp_mb() anyway for documentation and > > - * robustness against future implementation changes. > > */ > > - smp_mb(); /* See above comment block. */ > > + if (atomic_dec_and_test(&ess->count)) > > + wake_up(ess->wq); > > + > > return 0; > > } > > > > +static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work); > > + > > /** > > * synchronize_sched_expedited - Brute-force RCU-sched grace period > > * > > @@ -3304,12 +3310,11 @@ static int synchronize_sched_expedited_c > > */ > > void synchronize_sched_expedited(void) > > { > > - cpumask_var_t cm; > > - bool cma = false; > > - int cpu; > > - long firstsnap, s, snap; > > - int trycount = 0; > > + DECLARE_WAIT_QUEUE_HEAD_ONSTACK(stop_wait); > > + struct exp_stop_state ess = { .wq = &stop_wait, }; > > struct rcu_state *rsp = &rcu_sched_state; > > + long s, snap; > > + int cpu; > > > > /* > > * If we are in danger of counter wrap, just do synchronize_sched(). > > @@ -3332,7 +3337,6 @@ void synchronize_sched_expedited(void) > > * full memory barrier. > > */ > > snap = atomic_long_inc_return(&rsp->expedited_start); > > - firstsnap = snap; > > if (!try_get_online_cpus()) { > > /* CPU hotplug operation in flight, fall back to normal GP. */ > > wait_rcu_gp(call_rcu_sched); > > @@ -3341,82 +3345,44 @@ void synchronize_sched_expedited(void) > > } > > WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id())); > > > > - /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */ > > - cma = zalloc_cpumask_var(&cm, GFP_KERNEL); > > - if (cma) { > > - cpumask_copy(cm, cpu_online_mask); > > - cpumask_clear_cpu(raw_smp_processor_id(), cm); > > - for_each_cpu(cpu, cm) { > > - struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); > > - > > - if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) > > - cpumask_clear_cpu(cpu, cm); > > - } > > - if (cpumask_weight(cm) == 0) > > - goto all_cpus_idle; > > - } > > - > > /* > > * Each pass through the following loop attempts to force a > > * context switch on each CPU. > > */ > > - while (try_stop_cpus(cma ? cm : cpu_online_mask, > > - synchronize_sched_expedited_cpu_stop, > > - NULL) == -EAGAIN) { > > - put_online_cpus(); > > - atomic_long_inc(&rsp->expedited_tryfail); > > + mutex_lock(&rsp->expedited_mutex); > > > > - /* Check to see if someone else did our work for us. */ > > - s = atomic_long_read(&rsp->expedited_done); > > - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { > > - /* ensure test happens before caller kfree */ > > - smp_mb__before_atomic(); /* ^^^ */ > > - atomic_long_inc(&rsp->expedited_workdone1); > > - free_cpumask_var(cm); > > - return; > > - } > > + /* > > + * Check to see if someone else did our work for us, while we were > > + * waiting for the mutex. > > + */ > > + s = atomic_long_read(&rsp->expedited_done); > > + if (ULONG_CMP_GE((ulong)s, (ulong)snap)) { > > + /* ensure test happens before caller kfree */ > > + smp_mb__before_atomic(); /* ^^^ */ > > + atomic_long_inc(&rsp->expedited_workdone1); > > + goto unlock; > > + } > > > > - /* No joy, try again later. Or just synchronize_sched(). */ > > - if (trycount++ < 10) { > > - udelay(trycount * num_online_cpus()); > > - } else { > > - wait_rcu_gp(call_rcu_sched); > > - atomic_long_inc(&rsp->expedited_normal); > > - free_cpumask_var(cm); > > - return; > > - } > > + /* Stop each CPU that is online, non-idle, and not us. */ > > + for_each_online_cpu(cpu) { > > + struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); > > > > - /* Recheck to see if someone else did our work for us. */ > > - s = atomic_long_read(&rsp->expedited_done); > > - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { > > - /* ensure test happens before caller kfree */ > > - smp_mb__before_atomic(); /* ^^^ */ > > - atomic_long_inc(&rsp->expedited_workdone2); > > - free_cpumask_var(cm); > > - return; > > - } > > + /* Skip our CPU, */ > > + if (raw_smp_processor_id() == cpu) > > + continue; > > > > - /* > > - * Refetching sync_sched_expedited_started allows later > > - * callers to piggyback on our grace period. We retry > > - * after they started, so our grace period works for them, > > - * and they started after our first try, so their grace > > - * period works for us. > > - */ > > - if (!try_get_online_cpus()) { > > - /* CPU hotplug operation in flight, use normal GP. */ > > - wait_rcu_gp(call_rcu_sched); > > - atomic_long_inc(&rsp->expedited_normal); > > - free_cpumask_var(cm); > > - return; > > - } > > - snap = atomic_long_read(&rsp->expedited_start); > > - smp_mb(); /* ensure read is before try_stop_cpus(). */ > > + /* and any idle CPUs. */ > > + if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) > > + continue; > > + > > + atomic_inc(&ess.count); > > + stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop, , > > + &per_cpu(exp_stop_work, cpu)); > > } > > - atomic_long_inc(&rsp->expedited_stoppedcpus); > > > > -all_cpus_idle: > > - free_cpumask_var(cm); > > + wait_event(ess.wq, !atomic_read(&ess.count)); > > + > > + atomic_long_inc(&rsp->expedited_stoppedcpus); > > > > /* > > * Everyone up to our most recent fetch is covered by our grace > > @@ -3435,6 +3401,8 @@ void synchronize_sched_expedited(void) > > } > > } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s); > > atomic_long_inc(&rsp->expedited_done_exit); > > +unlock: > > + mutex_unlock(&rsp->expedited_mutex); > > > > put_online_cpus(); > > } > > --- a/kernel/rcu/tree.h > > +++ b/kernel/rcu/tree.h > > @@ -483,6 +483,7 @@ struct rcu_state { > > /* _rcu_barrier(). */ > > /* End of fields guarded by barrier_mutex. */ > > > > + struct mutex expedited_mutex; /* Serializes expediting. */ > > atomic_long_t expedited_start; /* Starting ticket. */ > > atomic_long_t expedited_done; /* Done ticket. */ > > atomic_long_t expedited_wrap; /* # near-wrap incidents. */ > > -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/