On Wed, Jun 24, 2015 at 07:28:18PM +0200, Peter Zijlstra wrote:
> How about something like this, it replaced mutex and start/done ticket
> thing with an MCS style lockless FIFO queue.
> 
> I further uses the gpnum/completed thing to short circuit things if
> we've waited long enough.

Prettier version

--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -3253,23 +3253,41 @@ void cond_synchronize_rcu(unsigned long
 }
 EXPORT_SYMBOL_GPL(cond_synchronize_rcu);
 
+struct expedited_task_state {
+       struct expedited_task_state *next;
+       struct task_struct *task;
+       atomic_t count;
+       int done;
+};
+
 static int synchronize_sched_expedited_cpu_stop(void *data)
 {
+       struct expedited_task_state *ets = data;
+
        /*
         * There must be a full memory barrier on each affected CPU
         * between the time that try_stop_cpus() is called and the
         * time that it returns.
-        *
-        * In the current initial implementation of cpu_stop, the
-        * above condition is already met when the control reaches
-        * this point and the following smp_mb() is not strictly
-        * necessary.  Do smp_mb() anyway for documentation and
-        * robustness against future implementation changes.
         */
-       smp_mb(); /* See above comment block. */
+       if (atomic_dec_and_test(&ets->count))
+               wake_up_process(ets->task);
+
        return 0;
 }
 
+static DEFINE_PER_CPU(struct cpu_stop_work, exp_stop_work);
+
+#define current_wait(cond)                                     \
+do {                                                           \
+       for (;;) {                                              \
+               set_current_state(TASK_UNINTERRUPTIBLE);        \
+               if (cond)                                       \
+                       break;                                  \
+               schedule();                                     \
+       }                                                       \
+       __set_current_state(TASK_RUNNING);                      \
+} while (0)
+
 /**
  * synchronize_sched_expedited - Brute-force RCU-sched grace period
  *
@@ -3304,138 +3322,71 @@ static int synchronize_sched_expedited_c
  */
 void synchronize_sched_expedited(void)
 {
-       cpumask_var_t cm;
-       bool cma = false;
-       int cpu;
-       long firstsnap, s, snap;
-       int trycount = 0;
        struct rcu_state *rsp = &rcu_sched_state;
+       struct expedited_task_state *prev, *next, entry = {
+               .task = current,
+               .count = ATOMIC_INIT(1), /* avoid spurious wakeups */
+       };
+       long gpnum;
+       int cpu;
 
-       /*
-        * If we are in danger of counter wrap, just do synchronize_sched().
-        * By allowing sync_sched_expedited_started to advance no more than
-        * ULONG_MAX/8 ahead of sync_sched_expedited_done, we are ensuring
-        * that more than 3.5 billion CPUs would be required to force a
-        * counter wrap on a 32-bit system.  Quite a few more CPUs would of
-        * course be required on a 64-bit system.
-        */
-       if (ULONG_CMP_GE((ulong)atomic_long_read(&rsp->expedited_start),
-                        (ulong)atomic_long_read(&rsp->expedited_done) +
-                        ULONG_MAX / 8)) {
-               wait_rcu_gp(call_rcu_sched);
-               atomic_long_inc(&rsp->expedited_wrap);
-               return;
-       }
-
-       /*
-        * Take a ticket.  Note that atomic_inc_return() implies a
-        * full memory barrier.
-        */
-       snap = atomic_long_inc_return(&rsp->expedited_start);
-       firstsnap = snap;
        if (!try_get_online_cpus()) {
                /* CPU hotplug operation in flight, fall back to normal GP. */
                wait_rcu_gp(call_rcu_sched);
-               atomic_long_inc(&rsp->expedited_normal);
                return;
        }
        WARN_ON_ONCE(cpu_is_offline(raw_smp_processor_id()));
 
-       /* Offline CPUs, idle CPUs, and any CPU we run on are quiescent. */
-       cma = zalloc_cpumask_var(&cm, GFP_KERNEL);
-       if (cma) {
-               cpumask_copy(cm, cpu_online_mask);
-               cpumask_clear_cpu(raw_smp_processor_id(), cm);
-               for_each_cpu(cpu, cm) {
-                       struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
-
-                       if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
-                               cpumask_clear_cpu(cpu, cm);
-               }
-               if (cpumask_weight(cm) == 0)
-                       goto all_cpus_idle;
+       smp_mb();
+       gpnum = smp_load_acquire(&rsp->gpnum);
+
+       /* MCS style queue 'lock' */
+       prev = xchg(&rsp->expedited_queue, &entry);
+       if (prev) {
+               WRITE_ONCE(prev->next, &entry);
+               current_wait(smp_load_acquire(&entry.done));
        }
 
        /*
-        * Each pass through the following loop attempts to force a
-        * context switch on each CPU.
+        * Check to see if someone else did our work for us, while we were
+        * waiting on the queue.
         */
-       while (try_stop_cpus(cma ? cm : cpu_online_mask,
-                            synchronize_sched_expedited_cpu_stop,
-                            NULL) == -EAGAIN) {
-               put_online_cpus();
-               atomic_long_inc(&rsp->expedited_tryfail);
-
-               /* Check to see if someone else did our work for us. */
-               s = atomic_long_read(&rsp->expedited_done);
-               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
-                       /* ensure test happens before caller kfree */
-                       smp_mb__before_atomic(); /* ^^^ */
-                       atomic_long_inc(&rsp->expedited_workdone1);
-                       free_cpumask_var(cm);
-                       return;
-               }
-
-               /* No joy, try again later.  Or just synchronize_sched(). */
-               if (trycount++ < 10) {
-                       udelay(trycount * num_online_cpus());
-               } else {
-                       wait_rcu_gp(call_rcu_sched);
-                       atomic_long_inc(&rsp->expedited_normal);
-                       free_cpumask_var(cm);
-                       return;
-               }
-
-               /* Recheck to see if someone else did our work for us. */
-               s = atomic_long_read(&rsp->expedited_done);
-               if (ULONG_CMP_GE((ulong)s, (ulong)firstsnap)) {
-                       /* ensure test happens before caller kfree */
-                       smp_mb__before_atomic(); /* ^^^ */
-                       atomic_long_inc(&rsp->expedited_workdone2);
-                       free_cpumask_var(cm);
-                       return;
-               }
-
-               /*
-                * Refetching sync_sched_expedited_started allows later
-                * callers to piggyback on our grace period.  We retry
-                * after they started, so our grace period works for them,
-                * and they started after our first try, so their grace
-                * period works for us.
-                */
-               if (!try_get_online_cpus()) {
-                       /* CPU hotplug operation in flight, use normal GP. */
-                       wait_rcu_gp(call_rcu_sched);
-                       atomic_long_inc(&rsp->expedited_normal);
-                       free_cpumask_var(cm);
-                       return;
-               }
-               snap = atomic_long_read(&rsp->expedited_start);
-               smp_mb(); /* ensure read is before try_stop_cpus(). */
+       if (ULONG_CMP_LT(gpnum, smp_load_acquire(&rsp->completed)))
+               goto unlock;
+
+       /* Stop each CPU that is online, non-idle, and not us. */
+       for_each_online_cpu(cpu) {
+               struct rcu_dynticks *rdtp = &per_cpu(rcu_dynticks, cpu);
+
+               /* Skip our CPU, */
+               if (raw_smp_processor_id() == cpu)
+                       continue;
+
+               /* and any idle CPUs. */
+               if (!(atomic_add_return(0, &rdtp->dynticks) & 0x1))
+                       continue;
+
+               atomic_inc(&entry.count);
+               stop_one_cpu_nowait(cpu, synchronize_sched_expedited_cpu_stop,
+                               &entry, &per_cpu(exp_stop_work, cpu));
        }
-       atomic_long_inc(&rsp->expedited_stoppedcpus);
 
-all_cpus_idle:
-       free_cpumask_var(cm);
+       atomic_dec(&entry.count); /* let the wakeups in */
+       current_wait(!atomic_read(&entry.count));
 
-       /*
-        * Everyone up to our most recent fetch is covered by our grace
-        * period.  Update the counter, but only if our work is still
-        * relevant -- which it won't be if someone who started later
-        * than we did already did their update.
-        */
-       do {
-               atomic_long_inc(&rsp->expedited_done_tries);
-               s = atomic_long_read(&rsp->expedited_done);
-               if (ULONG_CMP_GE((ulong)s, (ulong)snap)) {
-                       /* ensure test happens before caller kfree */
-                       smp_mb__before_atomic(); /* ^^^ */
-                       atomic_long_inc(&rsp->expedited_done_lost);
-                       break;
-               }
-       } while (atomic_long_cmpxchg(&rsp->expedited_done, s, snap) != s);
-       atomic_long_inc(&rsp->expedited_done_exit);
+unlock:
+       /* MCS style queue 'unlock' */
+       next = READ_ONCE(entry.next);
+       if (!next) {
+               if (cmpxchg(&rsp->expedited_queue, &entry, NULL) == &entry)
+                       goto done;
+               while (!(next = READ_ONCE(entry.next)))
+                       cpu_relax();
+       }
+       smp_store_release(&next->done, 1);
+       wake_up_process(next->task);
 
+done:
        put_online_cpus();
 }
 EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -483,17 +483,7 @@ struct rcu_state {
                                                /*  _rcu_barrier(). */
        /* End of fields guarded by barrier_mutex. */
 
-       atomic_long_t expedited_start;          /* Starting ticket. */
-       atomic_long_t expedited_done;           /* Done ticket. */
-       atomic_long_t expedited_wrap;           /* # near-wrap incidents. */
-       atomic_long_t expedited_tryfail;        /* # acquisition failures. */
-       atomic_long_t expedited_workdone1;      /* # done by others #1. */
-       atomic_long_t expedited_workdone2;      /* # done by others #2. */
-       atomic_long_t expedited_normal;         /* # fallbacks to normal. */
-       atomic_long_t expedited_stoppedcpus;    /* # successful stop_cpus. */
-       atomic_long_t expedited_done_tries;     /* # tries to update _done. */
-       atomic_long_t expedited_done_lost;      /* # times beaten to _done. */
-       atomic_long_t expedited_done_exit;      /* # times exited _done loop. */
+       void *expedited_queue;
 
        unsigned long jiffies_force_qs;         /* Time at which to invoke */
                                                /*  force_quiescent_state(). */
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Reply via email to