Background
==========
The previous implementation had four steps to begin reclamation.
1. call_rcu_thread() would wait for the first callback.
2. call_rcu_thread() would periodically poll until a decent number of
callbacks piled up or it timed out.
3. synchronize_rcu() would statr a grace period (GP).
4. wait_for_readers() would wait for the GP to end. It would also
trigger the force_rcu notifier to break busy loops in a read-side
critical section if drain_call_rcu() had been called.
Problem
=======
The separation of waiting logic across these steps led to suboptimal
behavior:
The GP was delayed until call_rcu_thread() stops polling.
force_rcu was not consistently triggered when call_rcu_thread() detected
a high number of pending callbacks or a timeout. This inconsistency
sometimes led to stalls, as reported in a virtio-gpu issue where memory
unmapping was blocked[1].
wait_for_readers() imposed unnecessary overhead in non-urgent cases by
unconditionally executing qatomic_set(&index->waiting, true) and
qemu_event_reset(&rcu_gp_event), which are necessary only for expedited
synchronization.
Solution
========
Move the polling in call_rcu_thread() to wait_for_readers() to prevent
the delay of the GP. Additionally, reorganize wait_for_readers() to
distinguish between two states:
Normal State: it relies exclusively on periodic polling to detect
the end of the GP and maintains the read-side fast path.
Force Quiescent State: Whenever expediting synchronization, it always
triggers force_rcu and executes both qatomic_set(&index->waiting, true)
and qemu_event_reset(&rcu_gp_event). This avoids stalls while confining
the read-side overhead to this state.
This unified approach, inspired by the Linux RCU, ensures consistent and
efficient RCU grace period handling and confirms resolution of the
virtio-gpu issue.
[1]
https://lore.kernel.org/qemu-devel/[email protected]/
Signed-off-by: Akihiko Odaki <[email protected]>
Link:
https://lore.kernel.org/r/[email protected]
Tested-by: Dmitry Osipenko <[email protected]>
Signed-off-by: Paolo Bonzini <[email protected]>
---
util/rcu.c | 81 +++++++++++++++++++++++++++++++++++-------------------
1 file changed, 52 insertions(+), 29 deletions(-)
diff --git a/util/rcu.c b/util/rcu.c
index b703c86f15a..acac9446ea9 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -43,10 +43,14 @@
#define RCU_GP_LOCKED (1UL << 0)
#define RCU_GP_CTR (1UL << 1)
+
+#define RCU_CALL_MIN_SIZE 30
+
unsigned long rcu_gp_ctr = RCU_GP_LOCKED;
QemuEvent rcu_gp_event;
static int in_drain_call_rcu;
+static int rcu_call_count;
static QemuMutex rcu_registry_lock;
static QemuMutex rcu_sync_lock;
@@ -76,15 +80,29 @@ static void wait_for_readers(void)
{
ThreadList qsreaders = QLIST_HEAD_INITIALIZER(qsreaders);
struct rcu_reader_data *index, *tmp;
+ int sleeps = 0;
+ bool forced = false;
for (;;) {
- /* We want to be notified of changes made to rcu_gp_ongoing
- * while we walk the list.
+ /*
+ * Force the grace period to end and wait for it if any of the
+ * following heuristical conditions are satisfied:
+ * - A decent number of callbacks piled up.
+ * - It timed out.
+ * - It is in a drain_call_rcu() call.
+ *
+ * Otherwise, periodically poll the grace period, hoping it ends
+ * promptly.
*/
- qemu_event_reset(&rcu_gp_event);
+ if (!forced &&
+ (qatomic_read(&rcu_call_count) >= RCU_CALL_MIN_SIZE ||
+ sleeps >= 5 || qatomic_read(&in_drain_call_rcu))) {
+ forced = true;
- QLIST_FOREACH(index, ®istry, node) {
- qatomic_set(&index->waiting, true);
+ QLIST_FOREACH(index, ®istry, node) {
+ notifier_list_notify(&index->force_rcu, NULL);
+ qatomic_set(&index->waiting, true);
+ }
}
/* Here, order the stores to index->waiting before the loads of
@@ -106,8 +124,6 @@ static void wait_for_readers(void)
* get some extra futex wakeups.
*/
qatomic_set(&index->waiting, false);
- } else if (qatomic_read(&in_drain_call_rcu)) {
- notifier_list_notify(&index->force_rcu, NULL);
}
}
@@ -115,7 +131,8 @@ static void wait_for_readers(void)
break;
}
- /* Wait for one thread to report a quiescent state and try again.
+ /*
+ * Sleep for a while and try again.
* Release rcu_registry_lock, so rcu_(un)register_thread() doesn't
* wait too much time.
*
@@ -133,7 +150,20 @@ static void wait_for_readers(void)
* rcu_registry_lock is released.
*/
qemu_mutex_unlock(&rcu_registry_lock);
- qemu_event_wait(&rcu_gp_event);
+
+ if (forced) {
+ qemu_event_wait(&rcu_gp_event);
+
+ /*
+ * We want to be notified of changes made to rcu_gp_ongoing
+ * while we walk the list.
+ */
+ qemu_event_reset(&rcu_gp_event);
+ } else {
+ g_usleep(10000);
+ sleeps++;
+ }
+
qemu_mutex_lock(&rcu_registry_lock);
}
@@ -173,15 +203,11 @@ void synchronize_rcu(void)
}
}
-
-#define RCU_CALL_MIN_SIZE 30
-
/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
* from liburcu. Note that head is only used by the consumer.
*/
static struct rcu_head dummy;
static struct rcu_head *head = &dummy, **tail = &dummy.next;
-static int rcu_call_count;
static QemuEvent rcu_call_ready_event;
static void enqueue(struct rcu_head *node)
@@ -259,30 +285,27 @@ static void *call_rcu_thread(void *opaque)
rcu_register_thread();
for (;;) {
- int tries = 0;
- int n = qatomic_read(&rcu_call_count);
+ int n;
- /* Heuristically wait for a decent number of callbacks to pile up.
+ /*
* Fetch rcu_call_count now, we only must process elements that were
* added before synchronize_rcu() starts.
*/
- while (n == 0 || (n < RCU_CALL_MIN_SIZE && ++tries <= 5)) {
- g_usleep(10000);
- if (n == 0) {
- qemu_event_reset(&rcu_call_ready_event);
- n = qatomic_read(&rcu_call_count);
- if (n == 0) {
-#if defined(CONFIG_MALLOC_TRIM)
- malloc_trim(4 * 1024 * 1024);
-#endif
- qemu_event_wait(&rcu_call_ready_event);
- }
- }
+ for (;;) {
+ qemu_event_reset(&rcu_call_ready_event);
n = qatomic_read(&rcu_call_count);
+ if (n) {
+ break;
+ }
+
+#if defined(CONFIG_MALLOC_TRIM)
+ malloc_trim(4 * 1024 * 1024);
+#endif
+ qemu_event_wait(&rcu_call_ready_event);
}
- qatomic_sub(&rcu_call_count, n);
synchronize_rcu();
+ qatomic_sub(&rcu_call_count, n);
bql_lock();
while (n > 0) {
node = try_dequeue();