OK, let me send this patch as 6/5 in reply to 0/5. Added a couple
of helpers and the comment.

Please let me know what do you think.

On 06/30, Oleg Nesterov wrote:
>
> On 06/30, Oleg Nesterov wrote:
> >
> > On 06/30, Oleg Nesterov wrote:
> > >
> > > But let me send some cleanups first. Plus I believe I found another
> > > stop_machine bug, see the last patch. So I hope these changes make
> > > sense in any case.
> >
> > The last patch fixes the bug, I think. Say, stop_one_cpu(X) can race
> > with _cpu_down(X)->stop_machine() so that the kernel will crash if this
> > CPU X becomes online again.
> >
> > The window after cpu_stopper_thread() returns and before smpboot_thread()
> > calls ->park() is tiny, but still this is possible afaics.
> 
> Now lets try to remove lglocks from stop_machine.c. See the patch at
> the end. It lacks the comments and the changelog, just for review.
> 
> On top of this series, so let me show the code with this patch applied,
> 
>       static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work 
> *work1,
>                                           int cpu2, struct cpu_stop_work 
> *work2)
>       {
>               struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
>               struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
>               int err;
>       retry:
>               spin_lock_irq(&stopper1->lock);
>               spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
>               err = -ENOENT;
>               if (!cpu_active(cpu1) || !cpu_active(cpu2))
>                       goto unlock;
> 
>               BUG_ON(!stopper1->enabled || !stopper2->enabled);
> 
>               err = -EDEADLK;
>               if (list_empty(&stopper1->stop_work.list) !=
>                   list_empty(&stopper2->stop_work.list))
>                       goto unlock;
> 
>               err = 0;
>               list_add_tail(&work1->list, &stopper1->works);
>               list_add_tail(&work2->list, &stopper2->works);
>               wake_up_process(stopper1->thread);
>               wake_up_process(stopper2->thread);
>       unlock:
>               spin_unlock(&stopper2->lock);
>               spin_unlock_irq(&stopper1->lock);
> 
>               if (unlikely(err == -EDEADLK)) {
>                       cond_resched();
>                       goto retry;
>               }
>               return err;
>       }
> 
>       int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t 
> fn, void *arg)
>       {
>               struct cpu_stop_done done;
>               struct cpu_stop_work work1, work2;
>               struct multi_stop_data msdata = {
>                       .fn = fn,
>                       .data = arg,
>                       .num_threads = 2,
>                       .active_cpus = cpumask_of(cpu1),
>               };
> 
>               set_state(&msdata, MULTI_STOP_PREPARE);
>               cpu_stop_init_done(&done, 2);
> 
>               work1.fn   = work2.fn   = multi_cpu_stop;
>               work1.arg  = work2.arg  = &msdata;
>               work1.done = work2.done = &done;
> 
>               if (cpu1 > cpu2)
>                       swap(cpu1, cpu2);
>               if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
>                       return -ENOENT;
> 
>               wait_for_completion(&done.completion);
>               BUG_ON(!done.executed);
>               return done.ret;
>       }
> 
> Note the -EDEADLK check in cpu_stop_queue_two_works(). This avoids the
> race with stop_cpus(). We need to ensure that if we race with stop_cpus()
> then either stop_cpus() wins and queues both works on these CPU's, or
> we win this race and queue both works.
> 
> The "false positive" -EDEADLK can happen if we race with, say,
> stop_cpus(cpumask_of(cpu1)). But this is very unlikely (in fact it is
> always called with cpumask == cpu_online_mask), and in this case we
> need to wait anyway, so I think the "busy wait" loop can work.
> 
> As for cpu_active() checks... This was copied from the current code,
> but I think they should die later. This needs another cleanup, imo
> the stopper->enabled logic should be improved.
> 
> What do you think?
> 
> Oleg.
> 
> ------------------------------------------------------------------------------
> Subject: [PATCH] stop_machine: kill stop_cpus_lock and lg_double_lock/unlock()
> 
> ---
>  include/linux/lglock.h  |    5 ---
>  kernel/locking/lglock.c |   22 -----------
>  kernel/stop_machine.c   |   92 +++++++++++++++++++++++++---------------------
>  3 files changed, 50 insertions(+), 69 deletions(-)
> 
> diff --git a/include/linux/lglock.h b/include/linux/lglock.h
> index c92ebd1..0081f00 100644
> --- a/include/linux/lglock.h
> +++ b/include/linux/lglock.h
> @@ -52,15 +52,10 @@ struct lglock {
>       static struct lglock name = { .lock = &name ## _lock }
>  
>  void lg_lock_init(struct lglock *lg, char *name);
> -
>  void lg_local_lock(struct lglock *lg);
>  void lg_local_unlock(struct lglock *lg);
>  void lg_local_lock_cpu(struct lglock *lg, int cpu);
>  void lg_local_unlock_cpu(struct lglock *lg, int cpu);
> -
> -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2);
> -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2);
> -
>  void lg_global_lock(struct lglock *lg);
>  void lg_global_unlock(struct lglock *lg);
>  
> diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c
> index 951cfcd..86ae2ae 100644
> --- a/kernel/locking/lglock.c
> +++ b/kernel/locking/lglock.c
> @@ -60,28 +60,6 @@ void lg_local_unlock_cpu(struct lglock *lg, int cpu)
>  }
>  EXPORT_SYMBOL(lg_local_unlock_cpu);
>  
> -void lg_double_lock(struct lglock *lg, int cpu1, int cpu2)
> -{
> -     BUG_ON(cpu1 == cpu2);
> -
> -     /* lock in cpu order, just like lg_global_lock */
> -     if (cpu2 < cpu1)
> -             swap(cpu1, cpu2);
> -
> -     preempt_disable();
> -     lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_);
> -     arch_spin_lock(per_cpu_ptr(lg->lock, cpu1));
> -     arch_spin_lock(per_cpu_ptr(lg->lock, cpu2));
> -}
> -
> -void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2)
> -{
> -     lock_release(&lg->lock_dep_map, 1, _RET_IP_);
> -     arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1));
> -     arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2));
> -     preempt_enable();
> -}
> -
>  void lg_global_lock(struct lglock *lg)
>  {
>       int i;
> diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
> index 12484e5..d53c86c 100644
> --- a/kernel/stop_machine.c
> +++ b/kernel/stop_machine.c
> @@ -20,7 +20,6 @@
>  #include <linux/kallsyms.h>
>  #include <linux/smpboot.h>
>  #include <linux/atomic.h>
> -#include <linux/lglock.h>
>  
>  /*
>   * Structure to determine completion condition and record errors.  May
> @@ -47,14 +46,6 @@ struct cpu_stopper {
>  static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
>  static bool stop_machine_initialized = false;
>  
> -/*
> - * Avoids a race between stop_two_cpus and global stop_cpus, where
> - * the stoppers could get queued up in reverse order, leading to
> - * system deadlock. Using an lglock means stop_two_cpus remains
> - * relatively cheap.
> - */
> -DEFINE_STATIC_LGLOCK(stop_cpus_lock);
> -
>  static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int 
> nr_todo)
>  {
>       memset(done, 0, sizeof(*done));
> @@ -213,6 +204,42 @@ static int multi_cpu_stop(void *data)
>       return err;
>  }
>  
> +static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
> +                                 int cpu2, struct cpu_stop_work *work2)
> +{
> +     struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
> +     struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
> +     int err;
> +retry:
> +     spin_lock_irq(&stopper1->lock);
> +     spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
> +     err = -ENOENT;
> +     if (!cpu_active(cpu1) || !cpu_active(cpu2))
> +             goto unlock;
> +
> +     BUG_ON(!stopper1->enabled || !stopper2->enabled);
> +
> +     err = -EDEADLK;
> +     if (list_empty(&stopper1->stop_work.list) !=
> +         list_empty(&stopper2->stop_work.list))
> +             goto unlock;
> +
> +     err = 0;
> +     list_add_tail(&work1->list, &stopper1->works);
> +     list_add_tail(&work2->list, &stopper2->works);
> +     wake_up_process(stopper1->thread);
> +     wake_up_process(stopper2->thread);
> +unlock:
> +     spin_unlock(&stopper2->lock);
> +     spin_unlock_irq(&stopper1->lock);
> +
> +     if (unlikely(err == -EDEADLK)) {
> +             cond_resched();
> +             goto retry;
> +     }
> +     return err;
> +}
> +
>  /**
>   * stop_two_cpus - stops two cpus
>   * @cpu1: the cpu to stop
> @@ -228,48 +255,28 @@ int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, 
> cpu_stop_fn_t fn, void *
>  {
>       struct cpu_stop_done done;
>       struct cpu_stop_work work1, work2;
> -     struct multi_stop_data msdata;
> -
> -     preempt_disable();
> -     msdata = (struct multi_stop_data){
> +     struct multi_stop_data msdata = {
>               .fn = fn,
>               .data = arg,
>               .num_threads = 2,
>               .active_cpus = cpumask_of(cpu1),
>       };
>  
> -     work1 = work2 = (struct cpu_stop_work){
> -             .fn = multi_cpu_stop,
> -             .arg = &msdata,
> -             .done = &done
> -     };
> -
> -     cpu_stop_init_done(&done, 2);
>       set_state(&msdata, MULTI_STOP_PREPARE);
> +     cpu_stop_init_done(&done, 2);
>  
> -     /*
> -      * If we observe both CPUs active we know _cpu_down() cannot yet have
> -      * queued its stop_machine works and therefore ours will get executed
> -      * first. Or its not either one of our CPUs that's getting unplugged,
> -      * in which case we don't care.
> -      *
> -      * This relies on the stopper workqueues to be FIFO.
> -      */
> -     if (!cpu_active(cpu1) || !cpu_active(cpu2)) {
> -             preempt_enable();
> -             return -ENOENT;
> -     }
> -
> -     lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
> -     cpu_stop_queue_work(cpu1, &work1);
> -     cpu_stop_queue_work(cpu2, &work2);
> -     lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
> +     work1.fn   = work2.fn   = multi_cpu_stop;
> +     work1.arg  = work2.arg  = &msdata;
> +     work1.done = work2.done = &done;
>  
> -     preempt_enable();
> +     if (cpu1 > cpu2)
> +             swap(cpu1, cpu2);
> +     if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
> +             return -ENOENT;
>  
>       wait_for_completion(&done.completion);
> -
> -     return done.executed ? done.ret : -ENOENT;
> +     BUG_ON(!done.executed);
> +     return done.ret;
>  }
>  
>  /**
> @@ -308,7 +315,7 @@ static void queue_stop_cpus_work(const struct cpumask 
> *cpumask,
>        * preempted by a stopper which might wait for other stoppers
>        * to enter @fn which can lead to deadlock.
>        */
> -     lg_global_lock(&stop_cpus_lock);
> +     preempt_disable();
>       for_each_cpu(cpu, cpumask) {
>               work = &per_cpu(cpu_stopper.stop_work, cpu);
>               work->fn = fn;
> @@ -316,7 +323,7 @@ static void queue_stop_cpus_work(const struct cpumask 
> *cpumask,
>               work->done = done;
>               cpu_stop_queue_work(cpu, work);
>       }
> -     lg_global_unlock(&stop_cpus_lock);
> +     preempt_enable();
>  }
>  
>  static int __stop_cpus(const struct cpumask *cpumask,
> @@ -505,6 +512,7 @@ static int __init cpu_stop_init(void)
>  
>               spin_lock_init(&stopper->lock);
>               INIT_LIST_HEAD(&stopper->works);
> +             INIT_LIST_HEAD(&stopper->stop_work.list);
>       }
>  
>       BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
> -- 
> 1.5.5.1
> 

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to