On Wed, Jun 24, 2015 at 10:10:04AM -0700, Paul E. McKenney wrote:
> On Wed, Jun 24, 2015 at 06:42:00PM +0200, Peter Zijlstra wrote:
> > On Wed, Jun 24, 2015 at 09:09:04AM -0700, Paul E. McKenney wrote:

[ . . . ]

> > > It looks like I do need to use smp_call_function_single() and your
> > > resched_cpu() because calling stop_one_cpu() sequentially is about
> > > twice as slow as try_stop_cpus() in rcutorture runs of up to 16 CPUs.
> > > But either way, your point about not stopping all the CPUs does hold.
> > 
> > Bah, I was afraid of that, the problem is that we wait for the
> > individual stop_work to complete before sending another.
> > 
> > The below is getting a little out of hand, but should avoid the problem
> > and might be easier than getting the IPI think going, but who knows.
> 
> OK, I will give this a try.  Of course, the counter needs to be
> initialized to 1 rather than zero, and it needs to be atomically
> decremented after all stop_one_cpu_nowait() invocations, otherwise you
> can get an early wakeup due to the usual race conditions.

Except that I promised Ingo I would check for CPUs failing to schedule
quickly enough, which means that I must track them individually rather
than via a single counter...

You did have me going for a bit, though!  ;-)

                                                        Thanx, Paul

> > ---
> > --- a/kernel/rcu/tree.c
> > +++ b/kernel/rcu/tree.c
> > @@ -103,6 +103,7 @@ struct rcu_state sname##_state = { \
> >     .orphan_nxttail = &sname##_state.orphan_nxtlist, \
> >     .orphan_donetail = &sname##_state.orphan_donelist, \
> >     .barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
> > +   .expedited_mutex = __MUTEX_INITIALIZER(sname##_state.expedited_mutex), \
> >     .name = RCU_STATE_NAME(sname), \
> >     .abbr = sabbr, \
> >  }
> > @@ -3253,23 +3254,28 @@ void cond_synchronize_rcu(unsigned long
> >  }
> >  EXPORT_SYMBOL_GPL(cond_synchronize_rcu);
> > 
> > +struct exp_stop_state {
> > +   wait_queue_head_t       *wq;
> > +   atomic_t                count;
> > +};
> > +
> >  static int synchronize_sched_expedited_cpu_stop(void *data)
> >  {
> > +   struct exp_stop_state *ess = data;
> > +
> >     /*
> >      * There must be a full memory barrier on each affected CPU
> >      * between the time that try_stop_cpus() is called and the
> >      * time that it returns.
> > -    *
> > -    * In the current initial implementation of cpu_stop, the
> > -    * above condition is already met when the control reaches
> > -    * this point and the following smp_mb() is not strictly
> > -    * necessary.  Do smp_mb() anyway for documentation and
> > -    * robustness against future implementation changes.
> >      */
> > -   smp_mb(); /* See above comment block. */
> > +   if (atomic_dec_and_test(&ess->count))
> > +           wake_up(ess->wq);
> > +
> >     return 0;
> >  }
> > 
> > +static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work);
> > +
> >  /**
> >   * synchronize_sched_expedited - Brute-force RCU-sched grace period
> >   *
> > @@ -3304,12 +3310,11 @@ static int synchronize_sched_expedited_c
> >   */
> >  void synchronize_sched_expedited(void)
> >  {
> > -   cpumask_var_t cm;
> > -   bool cma = false;
> > -   int cpu;
> > -   long firstsnap, s, snap;
> > -   int trycount = 0;
> > +   DECLARE_WAIT_QUEUE_HEAD_ONSTACK(stop_wait);
> > +   struct exp_stop_state ess = { .wq = &stop_wait, };
> >     struct rcu_state *rsp = &rcu_sched_state;
> > +   long s, snap;
> > +   int cpu;
> > 
> >     /*
> >      * If we are in danger of counter wrap, just do synchronize_sched().
> > @@ -3332,7 +3337,6 @@ void synchronize_sched_expedited(void)
> >      * full memory barrier.
> >      */
> >     snap = atomic_long_inc_return(&rsp->expedited_start);
> > -   firstsnap = snap;
> >     if (!try_get_online_cpus()) {
> >             /* CPU hotplug operation in flight, fall back to normal GP. */
> >             wait_rcu_gp(call_rcu_sched);
> > @@ -3341,82 +3345,44 @@ void synchronize_sched_expedited(void)
> >     }
> >     WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
> > 
> > -   /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */
> > -   cma = zalloc_cpumask_var(&cm, GFP_KERNEL);
> > -   if (cma) {
> > -           cpumask_copy(cm, cpu_online_mask);
> > -           cpumask_clear_cpu(raw_smp_processor_id(), cm);
> > -           for_each_cpu(cpu, cm) {
> > -                   struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
> > -
> > -                   if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
> > -                           cpumask_clear_cpu(cpu, cm);
> > -           }
> > -           if (cpumask_weight(cm) == 0)
> > -                   goto all_cpus_idle;
> > -   }
> > -
> >     /*
> >      * Each pass through the following loop attempts to force a
> >      * context switch on each CPU.
> >      */
> > -   while (try_stop_cpus(cma ? cm : cpu_online_mask,
> > -                        synchronize_sched_expedited_cpu_stop,
> > -                        NULL) == -EAGAIN) {
> > -           put_online_cpus();
> > -           atomic_long_inc(&rsp->expedited_tryfail);
> > +   mutex_lock(&rsp->expedited_mutex);
> > 
> > -           /* Check to see if someone else did our work for us. */
> > -           s = atomic_long_read(&rsp->expedited_done);
> > -           if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
> > -                   /* ensure test happens before caller kfree */
> > -                   smp_mb__before_atomic(); /* ^^^ */
> > -                   atomic_long_inc(&rsp->expedited_workdone1);
> > -                   free_cpumask_var(cm);
> > -                   return;
> > -           }
> > +   /*
> > +    * Check to see if someone else did our work for us, while we were
> > +    * waiting for the mutex.
> > +    */
> > +   s = atomic_long_read(&rsp->expedited_done);
> > +   if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
> > +           /* ensure test happens before caller kfree */
> > +           smp_mb__before_atomic(); /* ^^^ */
> > +           atomic_long_inc(&rsp->expedited_workdone1);
> > +           goto unlock;
> > +   }
> > 
> > -           /* No joy, try again later.  Or just synchronize_sched(). */
> > -           if (trycount++ < 10) {
> > -                   udelay(trycount * num_online_cpus());
> > -           } else {
> > -                   wait_rcu_gp(call_rcu_sched);
> > -                   atomic_long_inc(&rsp->expedited_normal);
> > -                   free_cpumask_var(cm);
> > -                   return;
> > -           }
> > +   /* Stop each CPU that is online, non-idle, and not us. */
> > +   for_each_online_cpu(cpu) {
> > +           struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
> > 
> > -           /* Recheck to see if someone else did our work for us. */
> > -           s = atomic_long_read(&rsp->expedited_done);
> > -           if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
> > -                   /* ensure test happens before caller kfree */
> > -                   smp_mb__before_atomic(); /* ^^^ */
> > -                   atomic_long_inc(&rsp->expedited_workdone2);
> > -                   free_cpumask_var(cm);
> > -                   return;
> > -           }
> > +           /* Skip our CPU, */
> > +           if (raw_smp_processor_id() == cpu)
> > +                   continue;
> > 
> > -           /*
> > -            * Refetching sync_sched_expedited_started allows later
> > -            * callers to piggyback on our grace period.  We retry
> > -            * after they started, so our grace period works for them,
> > -            * and they started after our first try, so their grace
> > -            * period works for us.
> > -            */
> > -           if (!try_get_online_cpus()) {
> > -                   /* CPU hotplug operation in flight, use normal GP. */
> > -                   wait_rcu_gp(call_rcu_sched);
> > -                   atomic_long_inc(&rsp->expedited_normal);
> > -                   free_cpumask_var(cm);
> > -                   return;
> > -           }
> > -           snap = atomic_long_read(&rsp->expedited_start);
> > -           smp_mb(); /* ensure read is before try_stop_cpus(). */
> > +           /* and any idle CPUs. */
> > +           if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
> > +                   continue;
> > +
> > +           atomic_inc(&ess.count);
> > +           stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop, ,
> > +                           &per_cpu(exp_stop_work, cpu));
> >     }
> > -   atomic_long_inc(&rsp->expedited_stoppedcpus);
> > 
> > -all_cpus_idle:
> > -   free_cpumask_var(cm);
> > +   wait_event(ess.wq, !atomic_read(&ess.count));
> > +
> > +   atomic_long_inc(&rsp->expedited_stoppedcpus);
> > 
> >     /*
> >      * Everyone up to our most recent fetch is covered by our grace
> > @@ -3435,6 +3401,8 @@ void synchronize_sched_expedited(void)
> >             }
> >     } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
> >     atomic_long_inc(&rsp->expedited_done_exit);
> > +unlock:
> > +   mutex_unlock(&rsp->expedited_mutex);
> > 
> >     put_online_cpus();
> >  }
> > --- a/kernel/rcu/tree.h
> > +++ b/kernel/rcu/tree.h
> > @@ -483,6 +483,7 @@ struct rcu_state {
> >                                             /*  _rcu_barrier(). */
> >     /* End of fields guarded by barrier_mutex. */
> > 
> > +   struct mutex  expedited_mutex;          /* Serializes expediting. */
> >     atomic_long_t expedited_start;          /* Starting ticket. */
> >     atomic_long_t expedited_done;           /* Done ticket. */
> >     atomic_long_t expedited_wrap;           /* # near-wrap incidents. */
> > 

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to