On Fri, Jul 31, 2015 at 01:12:46PM +0200, Peter Zijlstra wrote:
> On Thu, Jul 30, 2015 at 11:55:27PM +0200, Peter Zijlstra wrote:
> > 
> > Exclusion between stop_{one,two}_cpu{,s}() and stop_cpus() makes this
> > trivially go away.
> > 
> > Paul's RCU branch already kills try_stop_cpus() dead, so that wart is
> > also gone. But we're still stuck with stop_machine_from_inactive_cpu()
> > which does a spin-wait for exclusive state. So I suppose we'll have to
> > keep stop_cpus_mutex :/
> 
> *groan* we really need to kill that from_inactive_cpu() shite too. Lemme
> go have a look at how this MTRR crap works.

*sigh* we can't get rid of it :/ The hardware is 'broken', see:
d0af9eed5aa9 ("x86, pat/mtrr: Rendezvous all the cpus for MTRR/PAT
init")

And that means we must not ever block and the global primitive must be a
spinner.

The below is the best we can do.. At least it localizes the ugly.

--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -20,7 +20,6 @@
 #include <linux/kallsyms.h>
 #include <linux/smpboot.h>
 #include <linux/atomic.h>
-#include <linux/lglock.h>
 
 /*
  * Structure to determine completion condition and record errors.  May
@@ -47,14 +46,6 @@ struct cpu_stopper {
 static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
 static bool stop_machine_initialized = false;
 
-/*
- * Avoids a race between stop_two_cpus and global stop_cpus, where
- * the stoppers could get queued up in reverse order, leading to
- * system deadlock. Using an lglock means stop_two_cpus remains
- * relatively cheap.
- */
-DEFINE_STATIC_LGLOCK(stop_cpus_lock);
-
 static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int 
nr_todo)
 {
        memset(done, 0, sizeof(*done));
@@ -74,20 +65,25 @@ static void cpu_stop_signal_done(struct
 }
 
 /* queue @work to @stopper.  if offline, @work is completed immediately */
-static void cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
+static void __cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
 {
        struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
 
-       unsigned long flags;
-
-       spin_lock_irqsave(&stopper->lock, flags);
-
        if (stopper->enabled) {
                list_add_tail(&work->list, &stopper->works);
                wake_up_process(stopper->thread);
-       } else
+       } else {
                cpu_stop_signal_done(work->done, false);
+       }
+}
 
+static void cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
+{
+       struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
+       unsigned long flags;
+
+       spin_lock_irqsave(&stopper->lock, flags);
+       __cpu_stop_queue_work(cpu, work);
        spin_unlock_irqrestore(&stopper->lock, flags);
 }
 
@@ -226,9 +222,14 @@ static int multi_cpu_stop(void *data)
  */
 int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void 
*arg)
 {
-       struct cpu_stop_done done;
+       struct cpu_stopper *stopper1, *stopper2;
        struct cpu_stop_work work1, work2;
        struct multi_stop_data msdata;
+       struct cpu_stop_done done;
+       unsigned long flags;
+
+       if (cpu2 < cpu1)
+               swap(cpu1, cpu2);
 
        preempt_disable();
        msdata = (struct multi_stop_data){
@@ -260,10 +261,17 @@ int stop_two_cpus(unsigned int cpu1, uns
                return -ENOENT;
        }
 
-       lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
-       cpu_stop_queue_work(cpu1, &work1);
-       cpu_stop_queue_work(cpu2, &work2);
-       lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
+       stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
+       stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
+
+       spin_lock_irqsave(&stopper1->lock, flags);
+       spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
+
+       __cpu_stop_queue_work(cpu1, &work1);
+       __cpu_stop_queue_work(cpu2, &work2);
+
+       spin_unlock(&stopper2->lock);
+       spin_unlock_irqrestore(&stopper1->lock, flags);
 
        preempt_enable();
 
@@ -304,19 +312,24 @@ static void queue_stop_cpus_work(const s
        unsigned int cpu;
 
        /*
-        * Disable preemption while queueing to avoid getting
-        * preempted by a stopper which might wait for other stoppers
-        * to enter @fn which can lead to deadlock.
+        * Disgusting, but take all relevant per-cpu spinlocks to serialize
+        * against stop_{one,two}_cpu{,s}().
         */
-       lg_global_lock(&stop_cpus_lock);
+       preempt_disable();
+       for_each_cpu(cpu, cpumask)
+               arch_spin_lock((arch_spinlock_t *)&per_cpu(cpu_stopper.lock, 
cpu));
+
        for_each_cpu(cpu, cpumask) {
                work = &per_cpu(cpu_stopper.stop_work, cpu);
                work->fn = fn;
                work->arg = arg;
                work->done = done;
-               cpu_stop_queue_work(cpu, work);
+               __cpu_stop_queue_work(cpu, work);
        }
-       lg_global_unlock(&stop_cpus_lock);
+
+       for_each_cpu(cpu, cpumask)
+               arch_spin_unlock((arch_spinlock_t *)&per_cpu(cpu_stopper.lock, 
cpu));
+       preempt_enable();
 }
 
 static int __stop_cpus(const struct cpumask *cpumask,
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to