Hi Frederic,

Could you copy the r...@vger.kernel.org if you have another version, it
will help RCU hobbyists like me to catch up news in RCU, thanks! ;-)

Please see below for some comments, I'm still reading the whole
patchset, so probably I miss something..

On Fri, Oct 23, 2020 at 04:46:38PM +0200, Frederic Weisbecker wrote:
> In order to de-offload the callbacks processing of an rdp, we must clear
> SEGCBLIST_OFFLOAD and notify the CB kthread so that it clears its own
> bit flag and goes to sleep to stop handling callbacks. The GP kthread
> will also be notified the same way. Whoever acknowledges and clears its
> own bit last must notify the de-offloading worker so that it can resume
> the de-offloading while being sure that callbacks won't be handled
> remotely anymore.
> 
> Inspired-by: Paul E. McKenney <paul...@kernel.org>
> Signed-off-by: Frederic Weisbecker <frede...@kernel.org>
> Cc: Paul E. McKenney <paul...@kernel.org>
> Cc: Josh Triplett <j...@joshtriplett.org>
> Cc: Steven Rostedt <rost...@goodmis.org>
> Cc: Mathieu Desnoyers <mathieu.desnoy...@efficios.com>
> Cc: Lai Jiangshan <jiangshan...@gmail.com>
> Cc: Joel Fernandes <j...@joelfernandes.org>
> Cc: Neeraj Upadhyay <neer...@codeaurora.org>
> ---
>  include/linux/rcupdate.h   |   2 +
>  kernel/rcu/rcu_segcblist.c |  10 ++-
>  kernel/rcu/rcu_segcblist.h |   2 +-
>  kernel/rcu/tree.h          |   1 +
>  kernel/rcu/tree_plugin.h   | 134 +++++++++++++++++++++++++++++++------
>  5 files changed, 126 insertions(+), 23 deletions(-)
> 
> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> index 7c1ceff02852..bf8eb02411c2 100644
> --- a/include/linux/rcupdate.h
> +++ b/include/linux/rcupdate.h
> @@ -104,8 +104,10 @@ static inline void rcu_user_exit(void) { }
>  
>  #ifdef CONFIG_RCU_NOCB_CPU
>  void rcu_init_nohz(void);
> +int rcu_nocb_cpu_deoffload(int cpu);
>  #else /* #ifdef CONFIG_RCU_NOCB_CPU */
>  static inline void rcu_init_nohz(void) { }
> +static inline int rcu_nocb_cpu_deoffload(int cpu) { return 0; }
>  #endif /* #else #ifdef CONFIG_RCU_NOCB_CPU */
>  
>  /**
> diff --git a/kernel/rcu/rcu_segcblist.c b/kernel/rcu/rcu_segcblist.c
> index a96511b7cc98..3f6b5b724b39 100644
> --- a/kernel/rcu/rcu_segcblist.c
> +++ b/kernel/rcu/rcu_segcblist.c
> @@ -170,10 +170,14 @@ void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
>   * Mark the specified rcu_segcblist structure as offloaded.  This
>   * structure must be empty.
>   */
> -void rcu_segcblist_offload(struct rcu_segcblist *rsclp)
> +void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload)
>  {
> -     rcu_segcblist_clear_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY);
> -     rcu_segcblist_set_flags(rsclp, SEGCBLIST_OFFLOADED);
> +     if (offload) {
> +             rcu_segcblist_clear_flags(rsclp, SEGCBLIST_SOFTIRQ_ONLY);
> +             rcu_segcblist_set_flags(rsclp, SEGCBLIST_OFFLOADED);
> +     } else {
> +             rcu_segcblist_clear_flags(rsclp, SEGCBLIST_OFFLOADED);
> +     }
>  }
>  
>  /*
> diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
> index 575896a2518b..00ebeb8d39b7 100644
> --- a/kernel/rcu/rcu_segcblist.h
> +++ b/kernel/rcu/rcu_segcblist.h
> @@ -105,7 +105,7 @@ static inline bool rcu_segcblist_restempty(struct 
> rcu_segcblist *rsclp, int seg)
>  void rcu_segcblist_inc_len(struct rcu_segcblist *rsclp);
>  void rcu_segcblist_init(struct rcu_segcblist *rsclp);
>  void rcu_segcblist_disable(struct rcu_segcblist *rsclp);
> -void rcu_segcblist_offload(struct rcu_segcblist *rsclp);
> +void rcu_segcblist_offload(struct rcu_segcblist *rsclp, bool offload);
>  bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp);
>  bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp);
>  struct rcu_head *rcu_segcblist_first_cb(struct rcu_segcblist *rsclp);
> diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
> index e4f66b8f7c47..8047102be878 100644
> --- a/kernel/rcu/tree.h
> +++ b/kernel/rcu/tree.h
> @@ -200,6 +200,7 @@ struct rcu_data {
>       /* 5) Callback offloading. */
>  #ifdef CONFIG_RCU_NOCB_CPU
>       struct swait_queue_head nocb_cb_wq; /* For nocb kthreads to sleep on. */
> +     struct swait_queue_head nocb_state_wq; /* For offloading state changes 
> */
>       struct task_struct *nocb_gp_kthread;
>       raw_spinlock_t nocb_lock;       /* Guard following pair of fields. */
>       atomic_t nocb_lock_contended;   /* Contention experienced. */
> diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
> index fd8a52e9a887..09caf319a4a9 100644
> --- a/kernel/rcu/tree_plugin.h
> +++ b/kernel/rcu/tree_plugin.h
> @@ -2081,16 +2081,29 @@ static int rcu_nocb_gp_kthread(void *arg)
>       return 0;
>  }
>  
> +static inline bool nocb_cb_can_run(struct rcu_data *rdp)
> +{
> +     u8 flags = SEGCBLIST_OFFLOADED | SEGCBLIST_KTHREAD_CB;
> +     return rcu_segcblist_test_flags(&rdp->cblist, flags);
> +}
> +
> +static inline bool nocb_cb_wait_cond(struct rcu_data *rdp)
> +{
> +     return nocb_cb_can_run(rdp) && !READ_ONCE(rdp->nocb_cb_sleep);
> +}
> +
>  /*
>   * Invoke any ready callbacks from the corresponding no-CBs CPU,
>   * then, if there are no more, wait for more to appear.
>   */
>  static void nocb_cb_wait(struct rcu_data *rdp)
>  {
> -     unsigned long cur_gp_seq;
> -     unsigned long flags;
> +     struct rcu_segcblist *cblist = &rdp->cblist;
> +     struct rcu_node *rnp = rdp->mynode;
> +     bool needwake_state = false;
>       bool needwake_gp = false;
> -     struct rcu_node *rnp = rdp->mynode;
> +     unsigned long cur_gp_seq;
> +     unsigned long flags;
>  
>       local_irq_save(flags);
>       rcu_momentary_dyntick_idle();
> @@ -2100,32 +2113,50 @@ static void nocb_cb_wait(struct rcu_data *rdp)
>       local_bh_enable();
>       lockdep_assert_irqs_enabled();
>       rcu_nocb_lock_irqsave(rdp, flags);
> -     if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
> +     if (rcu_segcblist_nextgp(cblist, &cur_gp_seq) &&
>           rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
>           raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
>               needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
>               raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
>       }
> -     if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
> -             rcu_nocb_unlock_irqrestore(rdp, flags);
> -             if (needwake_gp)
> -                     rcu_gp_kthread_wake();
> -             return;
> -     }
>  
> -     trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
>       WRITE_ONCE(rdp->nocb_cb_sleep, true);
> +
> +     if (rcu_segcblist_test_flags(cblist, SEGCBLIST_OFFLOADED)) {
> +             if (rcu_segcblist_ready_cbs(cblist))
> +                     WRITE_ONCE(rdp->nocb_cb_sleep, false);
> +     } else {
> +             /*
> +              * De-offloading. Clear our flag and notify the de-offload 
> worker.
> +              * We won't touch the callbacks and keep sleeping until we ever
> +              * get re-offloaded.
> +              */
> +             WARN_ON_ONCE(!rcu_segcblist_test_flags(cblist, 
> SEGCBLIST_KTHREAD_CB));
> +             rcu_segcblist_clear_flags(cblist, SEGCBLIST_KTHREAD_CB);
> +             if (!rcu_segcblist_test_flags(cblist, SEGCBLIST_KTHREAD_GP))
> +                     needwake_state = true;
> +     }
> +
> +     if (rdp->nocb_cb_sleep)
> +             trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
> +
>       rcu_nocb_unlock_irqrestore(rdp, flags);
>       if (needwake_gp)
>               rcu_gp_kthread_wake();
> -     swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
> -                              !READ_ONCE(rdp->nocb_cb_sleep));
> -     if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
> +
> +     if (needwake_state)
> +             swake_up_one(&rdp->nocb_state_wq);
> +
> +     do {
> +             swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
> +                                                 nocb_cb_wait_cond(rdp));
> +
>               /* ^^^ Ensure CB invocation follows _sleep test. */
> -             return;
> -     }
> -     WARN_ON(signal_pending(current));
> -     trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
> +             if (smp_load_acquire(&rdp->nocb_cb_sleep)) {
> +                     WARN_ON(signal_pending(current));
> +                     trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, 
> TPS("WokeEmpty"));
> +             }
> +     } while (!nocb_cb_can_run(rdp));
>  }
>  
>  /*
> @@ -2187,6 +2218,69 @@ static void do_nocb_deferred_wakeup(struct rcu_data 
> *rdp)
>               do_nocb_deferred_wakeup_common(rdp);
>  }
>  
> +static int __rcu_nocb_rdp_deoffload(struct rcu_data *rdp)
> +{
> +     struct rcu_segcblist *cblist = &rdp->cblist;
> +     struct rcu_node *rnp = rdp->mynode;
> +     bool wake_cb = false;
> +     unsigned long flags;
> +
> +     printk("De-offloading %d\n", rdp->cpu);
> +
> +     rcu_nocb_lock_irqsave(rdp, flags);
> +     raw_spin_lock_rcu_node(rnp);

Could you explain why both nocb_lock and node lock are needed here?
Besides rcu_nocb_{un}lock_* can skip the lock part based on the offload
state, which gets modified right here (in a rcu_nocb_{un}lock_* critical
secion), and I think this is error-prone, because you may get a unpaired
lock-unlock (not in this patch though). Maybe just use node lock?

> +     rcu_segcblist_offload(cblist, false);
> +     raw_spin_unlock_rcu_node(rnp);
> +
> +     if (rdp->nocb_cb_sleep) {
> +             rdp->nocb_cb_sleep = false;
> +             wake_cb = true;
> +     }
> +     rcu_nocb_unlock_irqrestore(rdp, flags);
> +
> +     if (wake_cb)
> +             swake_up_one(&rdp->nocb_cb_wq);
> +
> +     swait_event_exclusive(rdp->nocb_state_wq,
> +                           !rcu_segcblist_test_flags(cblist, 
> SEGCBLIST_KTHREAD_CB));
> +
> +     return 0;
> +}
> +
> +static long rcu_nocb_rdp_deoffload(void *arg)
> +{
> +     struct rcu_data *rdp = arg;
> +
> +     WARN_ON_ONCE(rdp->cpu != raw_smp_processor_id());

I think this warning can actually happen, if I understand how workqueue
works correctly. Consider that the corresponding cpu gets offlined right
after the rcu_nocb_cpu_deoffloaed(), and the workqueue of that cpu
becomes unbound, and IIUC, workqueues don't do migration during
cpu-offlining, which means the worker can be scheduled to other CPUs,
and the work gets executed on another cpu. Am I missing something here?.

Regards,
Boqun

> +     return __rcu_nocb_rdp_deoffload(rdp);
> +}
> +
> +int rcu_nocb_cpu_deoffload(int cpu)
> +{
> +     struct rcu_data *rdp = per_cpu_ptr(&rcu_data, cpu);
> +     int ret = 0;
> +
> +     if (rdp == rdp->nocb_gp_rdp) {
> +             pr_info("Can't deoffload an rdp GP leader (yet)\n");
> +             return -EINVAL;
> +     }
> +     mutex_lock(&rcu_state.barrier_mutex);
> +     cpus_read_lock();
> +     if (rcu_segcblist_is_offloaded(&rdp->cblist)) {
> +             if (cpu_online(cpu)) {
> +                     ret = work_on_cpu(cpu, rcu_nocb_rdp_deoffload, rdp);
> +             } else {
> +                     ret = __rcu_nocb_rdp_deoffload(rdp);
> +             }
> +             if (!ret)
> +                     cpumask_clear_cpu(cpu, rcu_nocb_mask);
> +     }
> +     cpus_read_unlock();
> +     mutex_unlock(&rcu_state.barrier_mutex);
> +
> +     return ret;
> +}
> +
>  void __init rcu_init_nohz(void)
>  {
>       int cpu;
> @@ -2229,7 +2323,8 @@ void __init rcu_init_nohz(void)
>               rdp = per_cpu_ptr(&rcu_data, cpu);
>               if (rcu_segcblist_empty(&rdp->cblist))
>                       rcu_segcblist_init(&rdp->cblist);
> -             rcu_segcblist_offload(&rdp->cblist);
> +             rcu_segcblist_offload(&rdp->cblist, true);
> +             rcu_segcblist_set_flags(&rdp->cblist, SEGCBLIST_KTHREAD_CB);
>       }
>       rcu_organize_nocb_kthreads();
>  }
> @@ -2239,6 +2334,7 @@ static void __init 
> rcu_boot_init_nocb_percpu_data(struct rcu_data *rdp)
>  {
>       init_swait_queue_head(&rdp->nocb_cb_wq);
>       init_swait_queue_head(&rdp->nocb_gp_wq);
> +     init_swait_queue_head(&rdp->nocb_state_wq);
>       raw_spin_lock_init(&rdp->nocb_lock);
>       raw_spin_lock_init(&rdp->nocb_bypass_lock);
>       raw_spin_lock_init(&rdp->nocb_gp_lock);
> -- 
> 2.25.1
> 

Reply via email to