On Mon, Nov 03, 2025 at 06:45:30PM +0900, Akihiko Odaki wrote:
> On 2025/10/30 3:22, Peter Xu wrote:
> > On Wed, Oct 29, 2025 at 03:12:48PM +0900, Akihiko Odaki wrote:
> > > drain_call_rcu() triggers the force quiescent state, but it can be
> > > delayed if the RCU thread is sleeping. Ensure the force quiescent state
> > > is immediately triggered by waking the RCU thread up.
> > >
> > > The logic to trigger the force quiescent state is decoupled as
> > > force_rcu() so that it can be used independently.
> > >
> > > Signed-off-by: Akihiko Odaki <[email protected]>
> > > ---
> > > include/qemu/rcu.h | 1 +
> > > util/rcu.c | 106
> > > ++++++++++++++++++++++++++++++++---------------------
> > > 2 files changed, 65 insertions(+), 42 deletions(-)
> > >
> > > diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
> > > index 020dbe4d8b77..d6aa4e5854d3 100644
> > > --- a/include/qemu/rcu.h
> > > +++ b/include/qemu/rcu.h
> > > @@ -118,6 +118,7 @@ static inline void rcu_read_unlock(void)
> > > }
> > > }
> > > +void force_rcu(void);
> > > void synchronize_rcu(void);
> > > /*
> > > diff --git a/util/rcu.c b/util/rcu.c
> > > index 3c4af9d213c8..85f9333f5dff 100644
> > > --- a/util/rcu.c
> > > +++ b/util/rcu.c
> > > @@ -49,10 +49,13 @@
> > > unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
> > > QemuEvent rcu_gp_event;
> > > -static int in_drain_call_rcu;
> > > +static bool forced;
> > > static int rcu_call_count;
> > > static QemuMutex rcu_registry_lock;
> > > +/* Set when the forced variable is set or rcu_call_count becomes
> > > non-zero. */
> > > +static QemuEvent sync_event;
> > > +
> > > /*
> > > * Check whether a quiescent state was crossed between the beginning of
> > > * update_counter_and_wait and now.
> > > @@ -74,36 +77,21 @@ QEMU_DEFINE_CO_TLS(struct rcu_reader_data, rcu_reader)
> > > typedef QLIST_HEAD(, rcu_reader_data) ThreadList;
> > > static ThreadList registry = QLIST_HEAD_INITIALIZER(registry);
> > > +void force_rcu(void)
> > > +{
> > > + qatomic_set(&forced, true);
> > > + qemu_event_set(&sync_event);
> > > +}
> > > +
> > > /* Wait for previous parity/grace period to be empty of readers. */
> > > -static void wait_for_readers(void)
> > > +static void wait_for_readers(bool sleep)
> > > {
> > > ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
> > > struct rcu_reader_data *index, *tmp;
> > > - int sleeps = 0;
> > > - bool forced = false;
> > > + int sleeps = sleep ? 5 : 0;
> > > + bool waiting = false;
> > > for (;;) {
> > > - /*
> > > - * Force the grace period to end and wait for it if any of the
> > > - * following heuristical conditions are satisfied:
> > > - * - A decent number of callbacks piled up.
> > > - * - It timed out.
> > > - * - It is in a drain_call_rcu() call.
> > > - *
> > > - * Otherwise, periodically poll the grace period, hoping it ends
> > > - * promptly.
> > > - */
> > > - if (!forced &&
> > > - (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
> > > - sleeps >= 5 || qatomic_read(&in_drain_call_rcu))) {
> > > - forced = true;
> > > -
> > > - QLIST_FOREACH(index, ®istry, node) {
> > > - notifier_list_notify(&index->force_rcu, NULL);
> > > - qatomic_set(&index->waiting, true);
> > > - }
> > > - }
> >
> > IIUC this is the part to set index->waiting first whenever necessary, then
> > that'll guarantee the wait(rcu_gp_event) will be notified in rcu unlock.
> >
> > Now we removed this chunk, then could it happen if waiting=true and the
> > wait(rcu_gp_event) may wait for more than it should (as nobody will wake it
> > up if all threads have waiting=false)?
>
> It is not removed, but it is moved along with the assignment of the waiting
> variable. So index->waiting is still set whenever the waiting variable is
> set and no hang up will happen.
Ah, I noticed the "waiting" is actually the global variable and I think
when I read it last time I somehow misread it with "sleep" (which was
passed down from the caller).
if (waiting) { <--------------------
qemu_event_wait(&rcu_gp_event);
qemu_event_reset(&rcu_gp_event);
} else if (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
Yeah, I think it's non-issue. Please ignore my question.
>
> >
> > The other thing is, right below here there's the code and comment:
> >
> > /* Here, order the stores to index->waiting before the loads of
> > * index->ctr. Pairs with smp_mb_placeholder() in
> > rcu_read_unlock(),
> > * ensuring that the loads of index->ctr are sequentially
> > consistent.
> > *
> > * If this is the last iteration, this barrier also prevents
> > * frees from seeping upwards, and orders the two wait phases
> > * on architectures with 32-bit longs; see enter_qs().
> > */
> > smp_mb_global();
> >
> > IIUC it explains the mb_global() to order the updates of waiting and the
> > reads of index->ctr. It doesn't look like applicable anymore. Said that,
> > I think we should indeed still need some barrier to make sure we read
> > index->ctr at least to be after we update global gp_ctr (done before
> > calling wait_for_readers()). I'm not sure if it means the mb is needed,
> > however maybe at least the comment is outdated if so.
>
> It is still applicable. The stores to index->waiting is still present and
> needs to be ordered before the loads of index->ctr.
>
> >
> > > -
> > > /* Here, order the stores to index->waiting before the loads of
> > > * index->ctr. Pairs with smp_mb_placeholder() in
> > > rcu_read_unlock(),
> > > * ensuring that the loads of index->ctr are sequentially
> > > consistent.
> > > @@ -150,7 +138,8 @@ static void wait_for_readers(void)
> > > */
> > > qemu_mutex_unlock(&rcu_registry_lock);
> > > - if (forced) {
> > > + if (waiting) {
> > > + /* Wait for the forced quiescent state. */
> > > qemu_event_wait(&rcu_gp_event);
> > > /*
> > > @@ -158,9 +147,25 @@ static void wait_for_readers(void)
> > > * while we walk the list.
> > > */
> > > qemu_event_reset(&rcu_gp_event);
> > > + } else if (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
> > > + !sleeps || qemu_event_timedwait(&sync_event, 10)) {
> > > + /*
> > > + * Now one of the following heuristical conditions is
> > > satisfied:
> > > + * - A decent number of callbacks piled up.
> > > + * - It timed out.
> > > + * - force_rcu() was called.
> > > + *
> > > + * Force a quiescent state.
> > > + */
> > > + waiting = true;
> > > +
> > > + QLIST_FOREACH(index, ®istry, node) {
> > > + notifier_list_notify(&index->force_rcu, NULL);
> > > + qatomic_set(&index->waiting, true);
> > > + }
> > > } else {
> > > - g_usleep(10000);
> > > - sleeps++;
> > > + /* Try again. */
> > > + sleeps--;
> > > }
> > > qemu_mutex_lock(&rcu_registry_lock);
> > > @@ -170,7 +175,7 @@ static void wait_for_readers(void)
> > > QLIST_SWAP(®istry, &qsreaders, node);
> > > }
> > > -static void enter_qs(void)
> > > +static void enter_qs(bool sleep)
> > > {
> > > /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
> > > * Pairs with smp_mb_placeholder() in rcu_read_lock().
> > > @@ -189,14 +194,14 @@ static void enter_qs(void)
> > > * Switch parity: 0 -> 1, 1 -> 0.
> > > */
> > > qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> > > - wait_for_readers();
> > > + wait_for_readers(sleep);
> > > qatomic_set(&rcu_gp_ctr, rcu_gp_ctr ^ RCU_GP_CTR);
> > > } else {
> > > /* Increment current grace period. */
> > > qatomic_set(&rcu_gp_ctr, rcu_gp_ctr + RCU_GP_CTR);
> > > }
> > > - wait_for_readers();
> > > + wait_for_readers(sleep);
> > > }
> > > }
> > > @@ -205,7 +210,6 @@ static void enter_qs(void)
> > > */
> > > static struct rcu_head dummy;
> > > static struct rcu_head *head = &dummy, **tail = &dummy.next;
> > > -static QemuEvent rcu_call_ready_event;
> > > static void enqueue(struct rcu_head *node)
> > > {
> > > @@ -282,6 +286,7 @@ static void *call_rcu_thread(void *opaque)
> > > rcu_register_thread();
> > > for (;;) {
> > > + bool sleep = true;
> > > int n;
> > > /*
> > > @@ -289,7 +294,7 @@ static void *call_rcu_thread(void *opaque)
> > > * added before enter_qs() starts.
> > > */
> > > for (;;) {
> > > - qemu_event_reset(&rcu_call_ready_event);
> > > + qemu_event_reset(&sync_event);
> > > n = qatomic_read(&rcu_call_count);
> > > if (n) {
> > > break;
> > > @@ -298,20 +303,36 @@ static void *call_rcu_thread(void *opaque)
> > > #if defined(CONFIG_MALLOC_TRIM)
> > > malloc_trim(4 * 1024 * 1024);
> > > #endif
> > > - qemu_event_wait(&rcu_call_ready_event);
> > > + qemu_event_wait(&sync_event);
> > > + }
> > > +
> > > + /*
> > > + * Ensure that an event for a rcu_call_count change will not
> > > interrupt
> > > + * wait_for_readers().
> > > + */
> > > + qemu_event_reset(&sync_event);
> > > +
> > > + /*
> > > + * Ensure that the forced variable has not been set after
> > > fetching
> > > + * rcu_call_count; otherwise we may get confused by a force
> > > quiescent
> > > + * state request for an element later than n.
> > > + */
> > > + while (qatomic_xchg(&forced, false)) {
> > > + sleep = false;
> > > + n = qatomic_read(&rcu_call_count);
> > > }
> >
> > This is pretty tricky, and I wonder if it will make the code easier to read
> > if we convert the sync_event to be a semaphore instead. When as a sem, it
> > will take account of whatever kick to it, either a call_rcu1() or an
> > enforced rcu flush, so that we don't need to reset it. Meanwhile, we don't
> > worry on an slightly outdated "n" read because the 2nd round of sem_wait()
> > will catch that new "n".
> >
> > Instead, worst case is rcu thread runs one more round without seeing
> > callbacks on the queue.
> >
> > I'm not sure if that could help simplying code, maybe also make it less
> > error-prone.
>
> Semaphore is not applicable here because it will not de-duplicate concurrent
> kicks of RCU threads.
Why concurrent kicks of rcu threads is a problem? QemuSemaphore is
thread-safe itself, meanwhile IIUC it only still causes call_rcu_thread()
loops some more rounds reading "n", which looks all safe. No?
>
> >
> > > - enter_qs();
> > > + enter_qs(sleep);
> > > qatomic_sub(&rcu_call_count, n);
> > > bql_lock();
> > > while (n > 0) {
> > > node = try_dequeue();
> > > while (!node) {
> >
> > I have a pure question here not relevant to your changes.. do you know when
> > this "if" will trigger? It seems to me the enqueue() should always happen
> > before the increment of rcu_call_count:
> >
> > void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
> > {
> > node->func = func;
> > enqueue(node);
> >
> > if (!qatomic_fetch_inc(&rcu_call_count)) {
> > qemu_event_set(&sync_event);
> > }
> > }>
> > I believe qatomic_fetch_inc() is RMW so it's strong mb and order
> > guaranteed. Then here why the node can be null even if we're sure >=n have
> > been enqueued?
>
> Indeed, enqueue() always happens before the increment of rcu_call_count
> performed by the same thread.
>
> The node can still be NULL when there are two concurrent call_rcu1()
> executions. In the following example, rcu_call_count will be greater than
> the number of visible nodes after (A) and before (B):
>
> Thread T Thread U
> call_rcu1(O)
> enqueue(O)
> Load N from tail
> tail = O->next
> call_rcu1(P)
> enqueue(P)
> Load O->next from tail
> tail = P
> O->next = P
> rcu_call_count++ (A)
> N->next = O (B)
> rcu_call_count++
Thanks, yeah it makes sense. If you think worthwhile, maybe we could add a
comment after the first try_dequeue().
--
Peter Xu