On Wed, Jun 24, 2015 at 10:10:17AM -0700, Paul E. McKenney wrote: > OK, I will give this a try. Of course, the counter needs to be > initialized to 1 rather than zero, and it needs to be atomically > decremented after all stop_one_cpu_nowait() invocations, otherwise you > can get an early wakeup due to the usual race conditions.
Clever that. How about something like this, it replaced mutex and start/done ticket thing with an MCS style lockless FIFO queue. I further uses the gpnum/completed thing to short circuit things if we've waited long enough. --- --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -3253,23 +3253,28 @@ void cond_synchronize_rcu(unsigned long } EXPORT_SYMBOL_GPL(cond_synchronize_rcu); +struct exp_stop_state { + wait_queue_head_t *wq; + atomic_t count; +}; + static int synchronize_sched_expedited_cpu_stop(void *data) { + struct exp_stop_state *ess = data; + /* * There must be a full memory barrier on each affected CPU * between the time that try_stop_cpus() is called and the * time that it returns. - * - * In the current initial implementation of cpu_stop, the - * above condition is already met when the control reaches - * this point and the following smp_mb() is not strictly - * necessary. Do smp_mb() anyway for documentation and - * robustness against future implementation changes. */ - smp_mb(); /* See above comment block. */ + if (atomic_dec_and_test(&ess->count)) + wake_up(ess->wq); + return 0; } +static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work); + /** * synchronize_sched_expedited - Brute-force RCU-sched grace period * @@ -3304,138 +3309,84 @@ static int synchronize_sched_expedited_c */ void synchronize_sched_expedited(void) { - cpumask_var_t cm; - bool cma = false; - int cpu; - long firstsnap, s, snap; - int trycount = 0; + DECLARE_WAIT_QUEUE_HEAD_ONSTACK(stop_wait); + struct exp_stop_state ess = { + .wq = &stop_wait, + .count = ATOMIC_INIT(1), + }; struct rcu_state *rsp = &rcu_sched_state; + struct expedited_queue_task { + struct expedited_queue_task *next; + struct task_struct *task; + int done; + } *prev, *next, entry = { + .task = current, + }; + long gpnum; + int cpu; - /* - * If we are in danger of counter wrap, just do synchronize_sched(). - * By allowing sync_sched_expedited_started to advance no more than - * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring - * that more than 3.5 billion CPUs would be required to force a - * counter wrap on a 32-bit system. Quite a few more CPUs would of - * course be required on a 64-bit system. - */ - if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start), - (ulong)atomic_long_read(&rsp->expedited_done) + - ULONG_MAX / 8)) { - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_wrap); - return; - } - - /* - * Take a ticket. Note that atomic_inc_return() implies a - * full memory barrier. - */ - snap = atomic_long_inc_return(&rsp->expedited_start); - firstsnap = snap; if (!try_get_online_cpus()) { /* CPU hotplug operation in flight, fall back to normal GP. */ wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_normal); return; } WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id())); - /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */ - cma = zalloc_cpumask_var(&cm, GFP_KERNEL); - if (cma) { - cpumask_copy(cm, cpu_online_mask); - cpumask_clear_cpu(raw_smp_processor_id(), cm); - for_each_cpu(cpu, cm) { - struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); + smp_mb(); + gpnum = smp_load_acquire(&rsp->gpnum); - if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) - cpumask_clear_cpu(cpu, cm); + /* MCS style queue 'lock' */ + prev = xchg(&rsp->expedited_queue, &entry); + if (prev) { + WRITE_ONCE(prev->next, &entry); + for (;;) { + set_current_state(TASK_UNINTERRUPTIBLE); + if (smp_load_acquire(&entry.done)) + break; + schedule(); } - if (cpumask_weight(cm) == 0) - goto all_cpus_idle; + __set_current_state(TASK_RUNNING); } /* - * Each pass through the following loop attempts to force a - * context switch on each CPU. + * Check to see if someone else did our work for us, while we were + * waiting on the queue. */ - while (try_stop_cpus(cma ? cm : cpu_online_mask, - synchronize_sched_expedited_cpu_stop, - NULL) == -EAGAIN) { - put_online_cpus(); - atomic_long_inc(&rsp->expedited_tryfail); - - /* Check to see if someone else did our work for us. */ - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_workdone1); - free_cpumask_var(cm); - return; - } - - /* No joy, try again later. Or just synchronize_sched(). */ - if (trycount++ < 10) { - udelay(trycount * num_online_cpus()); - } else { - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_normal); - free_cpumask_var(cm); - return; - } - - /* Recheck to see if someone else did our work for us. */ - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_workdone2); - free_cpumask_var(cm); - return; - } + if (ULONG_CMP_LT(gpnum, smp_load_acquire(&rsp->completed))) + goto unlock; - /* - * Refetching sync_sched_expedited_started allows later - * callers to piggyback on our grace period. We retry - * after they started, so our grace period works for them, - * and they started after our first try, so their grace - * period works for us. - */ - if (!try_get_online_cpus()) { - /* CPU hotplug operation in flight, use normal GP. */ - wait_rcu_gp(call_rcu_sched); - atomic_long_inc(&rsp->expedited_normal); - free_cpumask_var(cm); - return; - } - snap = atomic_long_read(&rsp->expedited_start); - smp_mb(); /* ensure read is before try_stop_cpus(). */ + /* Stop each CPU that is online, non-idle, and not us. */ + for_each_online_cpu(cpu) { + struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu); + + /* Skip our CPU, */ + if (raw_smp_processor_id() == cpu) + continue; + + /* and any idle CPUs. */ + if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1)) + continue; + + atomic_inc(&ess.count); + stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop, + &ess, &per_cpu(exp_stop_work, cpu)); } - atomic_long_inc(&rsp->expedited_stoppedcpus); + atomic_dec(&ess.count); -all_cpus_idle: - free_cpumask_var(cm); + wait_event(stop_wait, !atomic_read(&ess.count)); - /* - * Everyone up to our most recent fetch is covered by our grace - * period. Update the counter, but only if our work is still - * relevant -- which it won't be if someone who started later - * than we did already did their update. - */ - do { - atomic_long_inc(&rsp->expedited_done_tries); - s = atomic_long_read(&rsp->expedited_done); - if (ULONG_CMP_GE((ulong)s, (ulong)snap)) { - /* ensure test happens before caller kfree */ - smp_mb__before_atomic(); /* ^^^ */ - atomic_long_inc(&rsp->expedited_done_lost); - break; - } - } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s); - atomic_long_inc(&rsp->expedited_done_exit); +unlock: + /* MCS style queue 'unlock' */ + next = READ_ONCE(entry.next); + if (!next) { + if (cmpxchg(&rsp->expedited_queue, &entry, NULL) == &entry) + goto done; + while (!(next = READ_ONCE(entry.next))) + cpu_relax(); + } + smp_store_release(&next->done, 1); +done: put_online_cpus(); } EXPORT_SYMBOL_GPL(synchronize_sched_expedited); --- a/kernel/rcu/tree.h +++ b/kernel/rcu/tree.h @@ -483,17 +483,7 @@ struct rcu_state { /* _rcu_barrier(). */ /* End of fields guarded by barrier_mutex. */ - atomic_long_t expedited_start; /* Starting ticket. */ - atomic_long_t expedited_done; /* Done ticket. */ - atomic_long_t expedited_wrap; /* # near-wrap incidents. */ - atomic_long_t expedited_tryfail; /* # acquisition failures. */ - atomic_long_t expedited_workdone1; /* # done by others #1. */ - atomic_long_t expedited_workdone2; /* # done by others #2. */ - atomic_long_t expedited_normal; /* # fallbacks to normal. */ - atomic_long_t expedited_stoppedcpus; /* # successful stop_cpus. */ - atomic_long_t expedited_done_tries; /* # tries to update _done. */ - atomic_long_t expedited_done_lost; /* # times beaten to _done. */ - atomic_long_t expedited_done_exit; /* # times exited _done loop. */ + void *expedited_queue; unsigned long jiffies_force_qs; /* Time at which to invoke */ /* force_quiescent_state(). */ -- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majord...@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html Please read the FAQ at http://www.tux.org/lkml/