While the traditional irq_work relies on the ability to self-IPI, it makes sense to provide an unconditional irq_work_queue_remote() interface.
Signed-off-by: Peter Zijlstra (Intel) <pet...@infradead.org> --- include/linux/irq_work.h | 17 ++++-- kernel/irq_work.c | 129 ++++++++++++++++++++++++++++------------------- kernel/rcu/tree.c | 5 + 3 files changed, 94 insertions(+), 57 deletions(-) --- a/include/linux/irq_work.h +++ b/include/linux/irq_work.h @@ -46,22 +46,29 @@ static inline bool irq_work_is_busy(stru return atomic_read(&work->node.a_flags) & IRQ_WORK_BUSY; } +#ifdef CONFIG_IRQ_WORK + bool irq_work_queue(struct irq_work *work); bool irq_work_queue_on(struct irq_work *work, int cpu); void irq_work_tick(void); void irq_work_sync(struct irq_work *work); -#ifdef CONFIG_IRQ_WORK #include <asm/irq_work.h> void irq_work_run(void); bool irq_work_needs_cpu(void); -void irq_work_single(void *arg); -#else -static inline bool irq_work_needs_cpu(void) { return false; } + +#else /* !CONFIG_IRQ_WORK */ + static inline void irq_work_run(void) { } -static inline void irq_work_single(void *arg) { } +static inline bool irq_work_needs_cpu(void) { return false; } + +#endif /* CONFIG_IRQ_WORK */ + +#ifdef CONFIG_SMP +extern int irq_work_queue_remote(int cpu, struct irq_work *work); +extern void irq_work_single(void *arg); #endif #endif /* _LINUX_IRQ_WORK_H */ --- a/kernel/irq_work.c +++ b/kernel/irq_work.c @@ -20,10 +20,7 @@ #include <linux/smp.h> #include <asm/processor.h> -#ifdef CONFIG_IRQ_WORK - -static DEFINE_PER_CPU(struct llist_head, raised_list); -static DEFINE_PER_CPU(struct llist_head, lazy_list); +#if defined(CONFIG_IRQ_WORK) || defined(CONFIG_SMP) /* * Claim the entry so that no one else will poke at it. @@ -43,6 +40,82 @@ static bool irq_work_claim(struct irq_wo return true; } +void irq_work_single(void *arg) +{ + struct irq_work *work = arg; + int flags; + + /* + * Clear the PENDING bit, after this point the @work can be re-used. + * The PENDING bit acts as a lock, and we own it, so we can clear it + * without atomic ops. + */ + flags = atomic_read(&work->node.a_flags); + flags &= ~IRQ_WORK_PENDING; + atomic_set(&work->node.a_flags, flags); + + /* + * See irq_work_claim(). + */ + smp_mb(); + + lockdep_irq_work_enter(flags); + work->func(work); + lockdep_irq_work_exit(flags); + + /* + * Clear the BUSY bit, if set, and return to the free state if no-one + * else claimed it meanwhile. + */ + (void)atomic_cmpxchg(&work->node.a_flags, flags, flags & ~IRQ_WORK_BUSY); +} + +/* + * Synchronize against the irq_work @entry, ensures the entry is not + * currently in use. + */ +void irq_work_sync(struct irq_work *work) +{ + lockdep_assert_irqs_enabled(); + + while (irq_work_is_busy(work)) + cpu_relax(); +} +EXPORT_SYMBOL_GPL(irq_work_sync); + +#endif /* CONFIG_IRQ_WORK || CONFIG_SMP */ + +#ifdef CONFIG_SMP + +static void __irq_work_queue_remote(int cpu, struct irq_work *work) +{ + /* Arch remote IPI send/receive backend aren't NMI safe */ + WARN_ON_ONCE(in_nmi()); + __smp_call_single_queue(cpu, &work->node.llist); +} + +int irq_work_queue_remote(int cpu, struct irq_work *work) +{ + /* + * Ensures preemption is disabled in the caller. + */ + WARN_ON_ONCE(cpu == smp_processor_id()); + + if (!irq_work_claim(work)) + return -EBUSY; + + __irq_work_queue_remote(cpu, work); + + return 0; +} + +#endif /* CONFIG_SMP */ + +#ifdef CONFIG_IRQ_WORK + +static DEFINE_PER_CPU(struct llist_head, raised_list); +static DEFINE_PER_CPU(struct llist_head, lazy_list); + void __weak arch_irq_work_raise(void) { /* @@ -101,9 +174,7 @@ bool irq_work_queue_on(struct irq_work * preempt_disable(); if (cpu != smp_processor_id()) { - /* Arch remote IPI send/receive backend aren't NMI safe */ - WARN_ON_ONCE(in_nmi()); - __smp_call_single_queue(cpu, &work->node.llist); + __irq_work_queue_remote(cpu, work); } else { __irq_work_queue_local(work); } @@ -131,36 +202,6 @@ bool irq_work_needs_cpu(void) return true; } -void irq_work_single(void *arg) -{ - struct irq_work *work = arg; - int flags; - - /* - * Clear the PENDING bit, after this point the @work can be re-used. - * The PENDING bit acts as a lock, and we own it, so we can clear it - * without atomic ops. - */ - flags = atomic_read(&work->node.a_flags); - flags &= ~IRQ_WORK_PENDING; - atomic_set(&work->node.a_flags, flags); - - /* - * See irq_work_claim(). - */ - smp_mb(); - - lockdep_irq_work_enter(flags); - work->func(work); - lockdep_irq_work_exit(flags); - - /* - * Clear the BUSY bit, if set, and return to the free state if no-one - * else claimed it meanwhile. - */ - (void)atomic_cmpxchg(&work->node.a_flags, flags, flags & ~IRQ_WORK_BUSY); -} - static void irq_work_run_list(struct llist_head *list) { struct irq_work *work, *tmp; @@ -196,17 +237,5 @@ void irq_work_tick(void) irq_work_run_list(this_cpu_ptr(&lazy_list)); } -/* - * Synchronize against the irq_work @entry, ensures the entry is not - * currently in use. - */ -void irq_work_sync(struct irq_work *work) -{ - lockdep_assert_irqs_enabled(); - - while (irq_work_is_busy(work)) - cpu_relax(); -} -EXPORT_SYMBOL_GPL(irq_work_sync); - #endif /* CONFIG_IRQ_WORK */ + --- a/kernel/rcu/tree.c +++ b/kernel/rcu/tree.c @@ -1308,13 +1308,14 @@ static int rcu_implicit_dynticks_qs(stru resched_cpu(rdp->cpu); WRITE_ONCE(rdp->last_fqs_resched, jiffies); } - if (IS_ENABLED(CONFIG_IRQ_WORK) && - !rdp->rcu_iw_pending && rdp->rcu_iw_gp_seq != rnp->gp_seq && +#ifdef CONFIG_IRQ_WORK + if (!rdp->rcu_iw_pending && rdp->rcu_iw_gp_seq != rnp->gp_seq && (rnp->ffmask & rdp->grpmask)) { rdp->rcu_iw_pending = true; rdp->rcu_iw_gp_seq = rnp->gp_seq; irq_work_queue_on(&rdp->rcu_iw, rdp->cpu); } +#endif } return 0;