Most threads will use mutexes and other sleeping synchronization primitives (condition variables, semaphores, events) periodically. For these threads, the synchronization primitives are natural places to report a quiescent state (possibly an extended one).
Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- docs/rcu.txt | 33 ++++++++++++++++++++++++++++++++- util/qemu-thread-posix.c | 30 ++++++++++++++++++++++++++---- util/qemu-thread-win32.c | 16 +++++++++++++++- util/rcu.c | 3 --- 4 files changed, 73 insertions(+), 9 deletions(-) diff --git a/docs/rcu.txt b/docs/rcu.txt index a3510b9..6c4a852 100644 --- a/docs/rcu.txt +++ b/docs/rcu.txt @@ -168,6 +168,35 @@ of "quiescent states", i.e. points where no RCU read-side critical section can be active. All threads created with qemu_thread_create participate in the RCU mechanism and need to annotate such points. +Luckily, in most cases no manual annotation is needed, because waiting +on condition variables (qemu_cond_wait), semaphores (qemu_sem_wait, +qemu_sem_timedwait) or events (qemu_event_wait) implicitly marks the thread +as quiescent for the whole duration of the wait. (There is an exception +for semaphore waits with a zero timeout). + +Manual annotation is still needed in the following cases: + +- threads that spend their sleeping time in the kernel, for example + in a call to select(), poll(), sigwait() or WaitForMultipleObjects(). + The QEMU I/O thread is an example of this case. When running under + KVM, VCPUs are also in a quiescent state while running the guest. + +- threads that perform a lot of I/O. In QEMU, the workers used for + aio=thread are an example of this case (see aio_worker in block/raw-*). + +- threads that run continuously until they exit. The migration thread + is an example of this case. + +Regarding the second case, note that the workers run in the QEMU thread +pool. The thread pool uses semaphores for synchronization, hence it does +report quiescent states periodically. However, in some cases (e.g. NFS +mounted with the "hard" option) the workers can take an arbitrarily long +amount of time. When this happens, synchronize_rcu() will not exit and +call_rcu() callbacks will be delayed arbitrarily. It is therefore a +good idea to mark I/O system calls as quiescence points in the worker +functions. + + Marking quiescent states is done with the following three APIs: void rcu_quiescent_state(void); @@ -229,7 +258,9 @@ DIFFERENCES WITH LINUX type of the callback's argument to be the type of the first argument. call_rcu1 is the same as Linux's call_rcu. -- Quiescent points must be marked explicitly in the thread. +- Quiescent points must be marked explicitly unless the thread uses + condvars/semaphores/events for synchronization. Note that mutexes + do not report quiescent points (see the first item above). RCU PATTERNS diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index 2df3382..21190be 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -119,7 +119,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) { int err; + rcu_thread_offline(); err = pthread_cond_wait(&cond->cond, &mutex->lock); + rcu_thread_online(); if (err) error_exit(err, __func__); } @@ -212,6 +214,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms) int rc; struct timespec ts; + if (ms) { + rcu_thread_offline(); + } + #if defined(__APPLE__) || defined(__NetBSD__) compute_abs_deadline(&ts, ms); pthread_mutex_lock(&sem->lock); @@ -227,7 +233,10 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms) } } pthread_mutex_unlock(&sem->lock); - return (rc == ETIMEDOUT ? -1 : 0); + if (rc == ETIMEDOUT) { + rc == -1; + } + #else if (ms <= 0) { /* This is cheaper than sem_timedwait. */ @@ -235,7 +244,7 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms) rc = sem_trywait(&sem->sem); } while (rc == -1 && errno == EINTR); if (rc == -1 && errno == EAGAIN) { - return -1; + goto out; } } else { compute_abs_deadline(&ts, ms); @@ -243,18 +252,25 @@ int qemu_sem_timedwait(QemuSemaphore *sem, int ms) rc = sem_timedwait(&sem->sem, &ts); } while (rc == -1 && errno == EINTR); if (rc == -1 && errno == ETIMEDOUT) { - return -1; + goto out; } } if (rc < 0) { error_exit(errno, __func__); } - return 0; #endif + +out: + if (ms) { + rcu_thread_online(); + } + return rc; } void qemu_sem_wait(QemuSemaphore *sem) { + rcu_thread_offline(); + #if defined(__APPLE__) || defined(__NetBSD__) pthread_mutex_lock(&sem->lock); --sem->count; @@ -272,6 +288,8 @@ void qemu_sem_wait(QemuSemaphore *sem) error_exit(errno, __func__); } #endif + + rcu_thread_online(); } #ifdef __linux__ @@ -380,7 +398,11 @@ void qemu_event_wait(QemuEvent *ev) return; } } + rcu_thread_offline(); futex_wait(ev, EV_BUSY); + rcu_thread_online(); + } else { + rcu_quiescent_state(); } } diff --git a/util/qemu-thread-win32.c b/util/qemu-thread-win32.c index 18978be..9c14cf1 100644 --- a/util/qemu-thread-win32.c +++ b/util/qemu-thread-win32.c @@ -12,6 +12,7 @@ */ #include "qemu-common.h" #include "qemu/thread.h" +#include "qemu/rcu.h" #include <process.h> #include <assert.h> #include <limits.h> @@ -187,7 +188,9 @@ void qemu_cond_wait(QemuCond *cond, QemuMutex *mutex) * leaving mutex unlocked before we wait on semaphore. */ qemu_mutex_unlock(mutex); + rcu_thread_offline(); WaitForSingleObject(cond->sema, INFINITE); + rcu_thread_online(); /* Now waiters must rendez-vous with the signaling thread and * let it continue. For cond_broadcast this has heavy contention @@ -227,7 +230,16 @@ void qemu_sem_post(QemuSemaphore *sem) int qemu_sem_timedwait(QemuSemaphore *sem, int ms) { - int rc = WaitForSingleObject(sem->sema, ms); + int rc; + + if (ms) { + rcu_thread_offline(); + } + rc = WaitForSingleObject(sem->sema, ms); + if (ms) { + rcu_thread_online(); + } + if (rc == WAIT_OBJECT_0) { return 0; } @@ -267,7 +279,9 @@ void qemu_event_reset(QemuEvent *ev) void qemu_event_wait(QemuEvent *ev) { + rcu_thread_offline(); WaitForSingleObject(ev->event, INFINITE); + rcu_thread_online(); } struct QemuThreadData { diff --git a/util/rcu.c b/util/rcu.c index f1c5736..654f3bb 100644 --- a/util/rcu.c +++ b/util/rcu.c @@ -232,9 +232,6 @@ static void *call_rcu_thread(void *opaque) { struct rcu_head *node; - /* This thread is just a writer. */ - rcu_thread_offline(); - for (;;) { int tries = 0; int n = atomic_read(&rcu_call_count); -- 1.8.1.4