This is the first step of pushing down the AioContext lock. Bottom halves are already protected by their own lock, use it also for walking_bh and for the handlers list (including walking_handlers). The (lock, walking_foo) pair is wrapped into the QemuLockCnt primitive.
Paolo Paolo Bonzini (10): aio: rename bh_lock to list_lock qemu-thread: introduce QemuLockCnt aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh qemu-thread: optimize QemuLockCnt with futexes on Linux aio: tweak walking in dispatch phase aio-posix: remove walking_handlers, protecting AioHandler list with list_lock aio-win32: remove walking_handlers, protecting AioHandler list with list_lock aio: document locking aio: push aio_context_acquire/release down to dispatching async: optimize aio_bh_poll aio-posix.c | 81 ++++----- aio-win32.c | 107 ++++++------ async.c | 47 +++--- docs/lockcnt.txt | 342 ++++++++++++++++++++++++++++++++++++++ docs/multiple-iothreads.txt | 5 +- include/block/aio.h | 38 ++--- include/qemu/futex.h | 36 ++++ include/qemu/thread.h | 19 +++ util/Makefile.objs | 1 + util/lockcnt.c | 395 ++++++++++++++++++++++++++++++++++++++++++++ util/qemu-thread-posix.c | 25 +-- util/trace-events | 10 ++ 12 files changed, 955 insertions(+), 151 deletions(-) create mode 100644 docs/lockcnt.txt create mode 100644 include/qemu/futex.h create mode 100644 util/lockcnt.c -- 2.9.3 >From 26f2b638715ac83a2411dceeb131c46e36a1316d Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 3 Mar 2015 14:42:19 +0100 Subject: [PATCH 01/10] aio: rename bh_lock to list_lock This will be used for AioHandlers too. There is going to be little or no contention, so it is better to reuse the same lock. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- async.c | 20 ++++++++++---------- include/block/aio.h | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/async.c b/async.c index b2de360..ef7043f 100644 --- a/async.c +++ b/async.c @@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->bh_lock); + qemu_mutex_lock(&ctx->list_lock); bh->next = ctx->first_bh; bh->scheduled = 1; bh->deleted = 1; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->bh_lock); + qemu_mutex_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->bh_lock); + qemu_mutex_lock(&ctx->list_lock); bh->next = ctx->first_bh; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->bh_lock); + qemu_mutex_unlock(&ctx->list_lock); return bh; } @@ -120,7 +120,7 @@ int aio_bh_poll(AioContext *ctx) /* remove deleted bhs */ if (!ctx->walking_bh) { - qemu_mutex_lock(&ctx->bh_lock); + qemu_mutex_lock(&ctx->list_lock); bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; @@ -131,7 +131,7 @@ int aio_bh_poll(AioContext *ctx) bhp = &bh->next; } } - qemu_mutex_unlock(&ctx->bh_lock); + qemu_mutex_unlock(&ctx->list_lock); } return ret; @@ -270,7 +270,7 @@ aio_ctx_finalize(GSource *source) } #endif - qemu_mutex_lock(&ctx->bh_lock); + qemu_mutex_lock(&ctx->list_lock); while (ctx->first_bh) { QEMUBH *next = ctx->first_bh->next; @@ -280,12 +280,12 @@ aio_ctx_finalize(GSource *source) g_free(ctx->first_bh); ctx->first_bh = next; } - qemu_mutex_unlock(&ctx->bh_lock); + qemu_mutex_unlock(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL); event_notifier_cleanup(&ctx->notifier); qemu_rec_mutex_destroy(&ctx->lock); - qemu_mutex_destroy(&ctx->bh_lock); + qemu_mutex_destroy(&ctx->list_lock); timerlistgroup_deinit(&ctx->tlg); } @@ -371,7 +371,7 @@ AioContext *aio_context_new(Error **errp) ctx->linux_aio = NULL; #endif ctx->thread_pool = NULL; - qemu_mutex_init(&ctx->bh_lock); + qemu_mutex_init(&ctx->list_lock); qemu_rec_mutex_init(&ctx->lock); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); diff --git a/include/block/aio.h b/include/block/aio.h index c7ae27c..eee3139 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -90,7 +90,7 @@ struct AioContext { uint32_t notify_me; /* lock to protect between bh's adders and deleter */ - QemuMutex bh_lock; + QemuMutex list_lock; /* Anchor of the list of Bottom Halves belonging to the context */ struct QEMUBH *first_bh; -- 2.9.3 >From d8c909334ccc6d5f9b84428c3cb10dae2bdec6af Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Sun, 19 Jul 2015 10:33:02 +0200 Subject: [PATCH 02/10] qemu-thread: introduce QemuLockCnt A QemuLockCnt comprises a counter and a mutex, with primitives to increment and decrement the counter, and to take and release the mutex. It can be used to do lock-free visits to a data structure whenever mutexes would be too heavy-weight and the critical section is too long for RCU. This could be implemented simply by protecting the counter with the mutex, but QemuLockCnt is harder to misuse and more efficient. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- docs/lockcnt.txt | 343 ++++++++++++++++++++++++++++++++++++++++++++++++++ include/qemu/thread.h | 17 +++ util/Makefile.objs | 1 + util/lockcnt.c | 113 +++++++++++++++++ 4 files changed, 474 insertions(+) create mode 100644 docs/lockcnt.txt create mode 100644 util/lockcnt.c diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt new file mode 100644 index 0000000..fc5d240 --- /dev/null +++ b/docs/lockcnt.txt @@ -0,0 +1,343 @@ +DOCUMENTATION FOR LOCKED COUNTERS (aka QemuLockCnt) +=================================================== + +QEMU often uses reference counts to track data structures that are being +accessed and should not be freed. For example, a loop that invoke +callbacks like this is not safe: + + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + +QLIST_FOREACH_SAFE protects against deletion of the current node (ioh) +by stashing away its "next" pointer. However, ioh->fd_write could +actually delete the next node from the list. The simplest way to +avoid this is to mark the node as deleted, and remove it from the +list in the above loop: + + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + +If however this loop must also be reentrant, i.e. it is possible that +ioh->fd_write invokes the loop again, some kind of counting is needed: + + walking_handlers++; + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + if (walking_handlers == 1) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + walking_handlers--; + +One may think of using the RCU primitives, rcu_read_lock() and +rcu_read_unlock(); effectively, the RCU nesting count would take +the place of the walking_handlers global variable. Indeed, +reference counting and RCU have similar purposes, but their usage in +general is complementary: + +- reference counting is fine-grained and limited to a single data + structure; RCU delays reclamation of *all* RCU-protected data + structures; + +- reference counting works even in the presence of code that keeps + a reference for a long time; RCU critical sections in principle + should be kept short; + +- reference counting is often applied to code that is not thread-safe + but is reentrant; in fact, usage of reference counting in QEMU predates + the introduction of threads by many years. RCU is generally used to + protect readers from other threads freeing memory after concurrent + modifications to a data structure. + +- reclaiming data can be done by a separate thread in the case of RCU; + this can improve performance, but also delay reclamation undesirably. + With reference counting, reclamation is deterministic. + +This file documents QemuLockCnt, an abstraction for using reference +counting in code that has to be both thread-safe and reentrant. + + +QemuLockCnt concepts +-------------------- + +A QemuLockCnt comprises both a counter and a mutex; it has primitives +to increment and decrement the counter, and to take and release the +mutex. The counter notes how many visits to the data structures are +taking place (the visits could be from different threads, or there could +be multiple reentrant visits from the same thread). The basic rules +governing the counter/mutex pair then are the following: + +- Data protected by the QemuLockCnt must not be freed unless the + counter is zero and the mutex is taken. + +- A new visit cannot be started while the counter is zero and the + mutex is taken. + +Most of the time, the mutex protects all writes to the data structure, +not just frees, though there could be cases where this is not necessary. + +Reads, instead, can be done without taking the mutex, as long as the +readers and writers use the same macros that are used for RCU, for +example atomic_rcu_read, atomic_rcu_set, QLIST_FOREACH_RCU, etc. This is +because the reads are done outside a lock and a set or QLIST_INSERT_HEAD +can happen concurrently with the read. The RCU API ensures that the +processor and the compiler see all required memory barriers. + +This could be implemented simply by protecting the counter with the +mutex, for example: + + // (1) + qemu_mutex_lock(&walking_handlers_mutex); + walking_handlers++; + qemu_mutex_unlock(&walking_handlers_mutex); + + ... + + // (2) + qemu_mutex_lock(&walking_handlers_mutex); + if (--walking_handlers == 0) { + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } + } + qemu_mutex_unlock(&walking_handlers_mutex); + +Here, no frees can happen in the code represented by the ellipsis. +If another thread is executing critical section (2), that part of +the code cannot be entered, because the thread will not be able +to increment the walking_handlers variable. And of course +during the visit any other thread will see a nonzero value for +walking_handlers, as in the single-threaded code. + +Note that it is possible for multiple concurrent accesses to delay +the cleanup arbitrarily; in other words, for the walking_handlers +counter to never become zero. For this reason, this technique is +more easily applicable if concurrent access to the structure is rare. + +However, critical sections are easy to forget since you have to do +them for each modification of the counter. QemuLockCnt ensures that +all modifications of the counter take the lock appropriately, and it +can also be more efficient in two ways: + +- it avoids taking the lock for many operations (for example + incrementing the counter while it is non-zero); + +- on some platforms, one could implement QemuLockCnt to hold the + lock and the mutex in a single word, making it no more expensive + than simply managing a counter using atomic operations (see + docs/atomics.txt). This is not implemented yet, but can be + very helpful if concurrent access to the data structure is + expected to be rare. + + +Using the same mutex for frees and writes can still incur some small +inefficiencies; for example, a visit can never start if the counter is +zero and the mutex is taken---even if the mutex is taken by a write, +which in principle need not block a visit of the data structure. +However, these are usually not a problem if any of the following +assumptions are valid: + +- concurrent access is possible but rare + +- writes are rare + +- writes are frequent, but this kind of write (e.g. appending to a + list) has a very small critical section. + +For example, QEMU uses QemuLockCnt to manage an AioContext's list of +bottom halves and file descriptor handlers. Modifications to the list +of file descriptor handlers are rare. Creation of a new bottom half is +frequent and can happen on a fast path; however: 1) it is almost never +concurrent with a visit to the list of bottom halves; 2) it only has +three instructions in the critical path, two assignments and a smp_wmb(). + + +QemuLockCnt API +--------------- + + void qemu_lockcnt_init(QemuLockCnt *lockcnt); + + Initialize lockcnt's counter to zero and prepare its mutex + for usage. + + void qemu_lockcnt_destroy(QemuLockCnt *lockcnt); + + Destroy lockcnt's mutex. + + void qemu_lockcnt_inc(QemuLockCnt *lockcnt); + + If the lockcnt's count is zero, wait for critical sections + to finish and increment lockcnt's count to 1. If the count + is not zero, just increment it. + + Because this function can wait on the mutex, it must not be + called while the lockcnt's mutex is held by the current thread. + For the same reason, qemu_lockcnt_inc can also contribute to + AB-BA deadlocks. This is a sample deadlock scenario: + + thread 1 thread 2 + ------------------------------------------------------- + qemu_lockcnt_lock(&lc1); + qemu_lockcnt_lock(&lc2); + qemu_lockcnt_inc(&lc2); + qemu_lockcnt_inc(&lc1); + + void qemu_lockcnt_dec(QemuLockCnt *lockcnt); + + Decrement lockcnt's count. + + bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt); + + Decrement the count. If the new count is zero, lock + the mutex and return true. Otherwise, return false. + + bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt); + + If the count is 1, decrement the count to zero, lock + the mutex and return true. Otherwise, return false. + + void qemu_lockcnt_lock(QemuLockCnt *lockcnt); + + Lock the lockcnt's mutex. Remember that concurrent visits + are not blocked unless the count is also zero. You can + use qemu_lockcnt_count to check for this inside a critical + section. + + void qemu_lockcnt_unlock(QemuLockCnt *lockcnt); + + Release the lockcnt's mutex. + + void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt); + + This is the same as + + qemu_lockcnt_unlock(lockcnt); + qemu_lockcnt_inc(lockcnt); + + but more efficient. + + int qemu_lockcnt_count(QemuLockCnt *lockcnt); + + Return the lockcnt's count. The count can change at any time + any time; still, while the lockcnt is locked, one can usefully + check whether the count is non-zero. + + +QemuLockCnt usage +----------------- + +This section explains the typical usage patterns for QemuLockCnt functions. + +Setting a variable to a non-NULL value can be done between +qemu_lockcnt_lock and qemu_lockcnt_unlock: + + qemu_lockcnt_lock(&xyz_lockcnt); + if (!xyz) { + new_xyz = g_new(XYZ, 1); + ... + atomic_rcu_set(&xyz, new_xyz); + } + qemu_lockcnt_unlock(&xyz_lockcnt); + +Accessing the value can be done between qemu_lockcnt_inc and +qemu_lockcnt_dec: + + qemu_lockcnt_inc(&xyz_lockcnt); + if (xyz) { + XYZ *p = atomic_rcu_read(&xyz); + ... + /* Accesses can now be done through "p". */ + } + qemu_lockcnt_dec(&xyz_lockcnt); + +Freeing the object can similarly use qemu_lockcnt_lock and +qemu_lockcnt_unlock, but you also need to ensure that the count +is zero (i.e. there is no concurrent visit). Because qemu_lockcnt_inc +takes the QemuLockCnt's lock, the count cannot become non-zero while +the object is being freed. Freeing an object looks like this: + + qemu_lockcnt_lock(&xyz_lockcnt); + if (!qemu_lockcnt_count(&xyz_lockcnt)) { + g_free(xyz); + xyz = NULL; + } + qemu_lockcnt_unlock(&xyz_lockcnt); + +If an object has to be freed right after a visit, you can combine +the decrement, the locking and the check on count as follows: + + qemu_lockcnt_inc(&xyz_lockcnt); + if (xyz) { + XYZ *p = atomic_rcu_read(&xyz); + ... + /* Accesses can now be done through "p". */ + } + if (qemu_lockcnt_dec_and_lock(&xyz_lockcnt)) { + g_free(xyz); + xyz = NULL; + qemu_lockcnt_unlock(&xyz_lockcnt); + } + +QemuLockCnt can also be used to access a list as follows: + + qemu_lockcnt_inc(&io_handlers_lockcnt); + QLIST_FOREACH_RCU(ioh, &io_handlers, pioh) { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + + if (qemu_lockcnt_dec_and_lock(&io_handlers_lockcnt)) { + QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + } + } + qemu_lockcnt_unlock(&io_handlers_lockcnt); + } + +Again, the RCU primitives are used because new items can be added to the +list during the walk. QLIST_FOREACH_RCU ensures that the processor and +the compiler see the appropriate memory barriers. + +An alternative pattern uses qemu_lockcnt_dec_if_lock: + + qemu_lockcnt_inc(&io_handlers_lockcnt); + QLIST_FOREACH_SAFE_RCU(ioh, &io_handlers, next, pioh) { + if (ioh->deleted) { + if (qemu_lockcnt_dec_if_lock(&io_handlers_lockcnt)) { + QLIST_REMOVE(ioh, next); + g_free(ioh); + qemu_lockcnt_inc_and_unlock(&io_handlers_lockcnt); + } + } else { + if (ioh->revents & G_IO_OUT) { + ioh->fd_write(ioh->opaque); + } + } + } + qemu_lockcnt_dec(&io_handlers_lockcnt); + +Here you can use qemu_lockcnt_dec instead of qemu_lockcnt_dec_and_lock, +because there is no special task to do if the count goes from 1 to 0. diff --git a/include/qemu/thread.h b/include/qemu/thread.h index e8e665f..ce18b17 100644 --- a/include/qemu/thread.h +++ b/include/qemu/thread.h @@ -8,6 +8,7 @@ typedef struct QemuMutex QemuMutex; typedef struct QemuCond QemuCond; typedef struct QemuSemaphore QemuSemaphore; typedef struct QemuEvent QemuEvent; +typedef struct QemuLockCnt QemuLockCnt; typedef struct QemuThread QemuThread; #ifdef _WIN32 @@ -98,4 +99,20 @@ static inline void qemu_spin_unlock(QemuSpin *spin) __sync_lock_release(&spin->value); } +struct QemuLockCnt { + QemuMutex mutex; + unsigned count; +}; + +void qemu_lockcnt_init(QemuLockCnt *lockcnt); +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt); +void qemu_lockcnt_inc(QemuLockCnt *lockcnt); +void qemu_lockcnt_dec(QemuLockCnt *lockcnt); +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt); +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt); +void qemu_lockcnt_lock(QemuLockCnt *lockcnt); +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt); +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt); +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt); + #endif diff --git a/util/Makefile.objs b/util/Makefile.objs index ad0f9c7..c1f247d 100644 --- a/util/Makefile.objs +++ b/util/Makefile.objs @@ -1,5 +1,6 @@ util-obj-y = osdep.o cutils.o unicode.o qemu-timer-common.o util-obj-y += bufferiszero.o +util-obj-y += lockcnt.o util-obj-$(CONFIG_POSIX) += compatfd.o util-obj-$(CONFIG_POSIX) += event_notifier-posix.o util-obj-$(CONFIG_POSIX) += mmap-alloc.o diff --git a/util/lockcnt.c b/util/lockcnt.c new file mode 100644 index 0000000..78ed1e4 --- /dev/null +++ b/util/lockcnt.c @@ -0,0 +1,113 @@ +/* + * QemuLockCnt implementation + * + * Copyright Red Hat, Inc. 2015 + * + * Author: + * Paolo Bonzini <pbonz...@redhat.com> + */ +#include "qemu/osdep.h" +#include "qemu/thread.h" +#include "qemu/atomic.h" + +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + qemu_mutex_init(&lockcnt->mutex); + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ + qemu_mutex_destroy(&lockcnt->mutex); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int old; + for (;;) { + old = atomic_read(&lockcnt->count); + if (old == 0) { + qemu_lockcnt_lock(lockcnt); + qemu_lockcnt_inc_and_unlock(lockcnt); + return; + } else { + if (atomic_cmpxchg(&lockcnt->count, old, old + 1) == old) { + return; + } + } + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_dec(&lockcnt->count); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * It is impossible for the counter to become nonzero while the mutex + * is taken. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + while (val > 1) { + int old = atomic_cmpxchg(&lockcnt->count, val, val - 1); + if (old != val) { + val = old; + continue; + } + + return false; + } + + qemu_lockcnt_lock(lockcnt); + if (atomic_fetch_dec(&lockcnt->count) == 1) { + return true; + } + + qemu_lockcnt_unlock(lockcnt); + return false; +} + +/* Decrement a counter and return locked if it is decremented to zero. + * Otherwise do nothing. + * + * It is impossible for the counter to become nonzero while the mutex + * is taken. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_mb_read(&lockcnt->count); + if (val > 1) { + return false; + } + + qemu_lockcnt_lock(lockcnt); + if (atomic_fetch_dec(&lockcnt->count) == 1) { + return true; + } + + qemu_lockcnt_inc_and_unlock(lockcnt); + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + qemu_mutex_lock(&lockcnt->mutex); +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + atomic_inc(&lockcnt->count); + qemu_mutex_unlock(&lockcnt->mutex); +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + qemu_mutex_unlock(&lockcnt->mutex); +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return lockcnt->count; +} -- 2.9.3 >From fb80b3f54c4442d2ca6c6e4cf140a028272f0485 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 14 Apr 2015 15:33:38 +0200 Subject: [PATCH 03/10] aio: make ctx->list_lock a QemuLockCnt, subsuming ctx->walking_bh This will make it possible to walk the list of bottom halves without holding the AioContext lock---and in turn to call bottom half handlers without holding the lock. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- async.c | 35 ++++++++++++++++------------------- include/block/aio.h | 12 +++++------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/async.c b/async.c index ef7043f..f606785 100644 --- a/async.c +++ b/async.c @@ -53,14 +53,14 @@ void aio_bh_schedule_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->list_lock); + qemu_lockcnt_lock(&ctx->list_lock); bh->next = ctx->first_bh; bh->scheduled = 1; bh->deleted = 1; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->list_lock); + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -73,12 +73,12 @@ QEMUBH *aio_bh_new(AioContext *ctx, QEMUBHFunc *cb, void *opaque) .cb = cb, .opaque = opaque, }; - qemu_mutex_lock(&ctx->list_lock); + qemu_lockcnt_lock(&ctx->list_lock); bh->next = ctx->first_bh; /* Make sure that the members are ready before putting bh into list */ smp_wmb(); ctx->first_bh = bh; - qemu_mutex_unlock(&ctx->list_lock); + qemu_lockcnt_unlock(&ctx->list_lock); return bh; } @@ -93,13 +93,11 @@ int aio_bh_poll(AioContext *ctx) QEMUBH *bh, **bhp, *next; int ret; - ctx->walking_bh++; + qemu_lockcnt_inc(&ctx->list_lock); ret = 0; - for (bh = ctx->first_bh; bh; bh = next) { - /* Make sure that fetching bh happens before accessing its members */ - smp_read_barrier_depends(); - next = bh->next; + for (bh = atomic_rcu_read(&ctx->first_bh); bh; bh = next) { + next = atomic_rcu_read(&bh->next); /* The atomic_xchg is paired with the one in qemu_bh_schedule. The * implicit memory barrier ensures that the callback sees all writes * done by the scheduling thread. It also ensures that the scheduling @@ -116,11 +114,8 @@ int aio_bh_poll(AioContext *ctx) } } - ctx->walking_bh--; - /* remove deleted bhs */ - if (!ctx->walking_bh) { - qemu_mutex_lock(&ctx->list_lock); + if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) { bhp = &ctx->first_bh; while (*bhp) { bh = *bhp; @@ -131,7 +126,7 @@ int aio_bh_poll(AioContext *ctx) bhp = &bh->next; } } - qemu_mutex_unlock(&ctx->list_lock); + qemu_lockcnt_unlock(&ctx->list_lock); } return ret; @@ -187,7 +182,8 @@ aio_compute_timeout(AioContext *ctx) int timeout = -1; QEMUBH *bh; - for (bh = ctx->first_bh; bh; bh = bh->next) { + for (bh = atomic_rcu_read(&ctx->first_bh); bh; + bh = atomic_rcu_read(&bh->next)) { if (bh->scheduled) { if (bh->idle) { /* idle bottom halves will be polled at least @@ -270,7 +266,8 @@ aio_ctx_finalize(GSource *source) } #endif - qemu_mutex_lock(&ctx->list_lock); + qemu_lockcnt_lock(&ctx->list_lock); + assert(!qemu_lockcnt_count(&ctx->list_lock)); while (ctx->first_bh) { QEMUBH *next = ctx->first_bh->next; @@ -280,12 +277,12 @@ aio_ctx_finalize(GSource *source) g_free(ctx->first_bh); ctx->first_bh = next; } - qemu_mutex_unlock(&ctx->list_lock); + qemu_lockcnt_unlock(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL); event_notifier_cleanup(&ctx->notifier); qemu_rec_mutex_destroy(&ctx->lock); - qemu_mutex_destroy(&ctx->list_lock); + qemu_lockcnt_destroy(&ctx->list_lock); timerlistgroup_deinit(&ctx->tlg); } @@ -363,6 +360,7 @@ AioContext *aio_context_new(Error **errp) goto fail; } g_source_set_can_recurse(&ctx->source, true); + qemu_lockcnt_init(&ctx->list_lock); aio_set_event_notifier(ctx, &ctx->notifier, false, (EventNotifierHandler *) @@ -371,7 +369,6 @@ AioContext *aio_context_new(Error **errp) ctx->linux_aio = NULL; #endif ctx->thread_pool = NULL; - qemu_mutex_init(&ctx->list_lock); qemu_rec_mutex_init(&ctx->lock); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); diff --git a/include/block/aio.h b/include/block/aio.h index eee3139..b833fe8 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -89,17 +89,15 @@ struct AioContext { */ uint32_t notify_me; - /* lock to protect between bh's adders and deleter */ - QemuMutex list_lock; + /* A lock to protect between bh's adders and deleter, and to ensure + * that no callbacks are removed while we're walking and dispatching + * them. + */ + QemuLockCnt list_lock; /* Anchor of the list of Bottom Halves belonging to the context */ struct QEMUBH *first_bh; - /* A simple lock used to protect the first_bh list, and ensure that - * no callbacks are removed while we're walking and dispatching callbacks. - */ - int walking_bh; - /* Used by aio_notify. * * "notified" is used to avoid expensive event_notifier_test_and_clear -- 2.9.3 >From 1d5f234d99c1408a1bc3ff3f44c21f3104579693 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Sun, 19 Jul 2015 12:13:48 +0200 Subject: [PATCH 04/10] qemu-thread: optimize QemuLockCnt with futexes on Linux This is complex, but I think it is reasonably documented in the source. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- docs/lockcnt.txt | 9 +- include/qemu/futex.h | 36 ++++++ include/qemu/thread.h | 2 + util/lockcnt.c | 282 +++++++++++++++++++++++++++++++++++++++++++++++ util/qemu-thread-posix.c | 25 +---- util/trace-events | 10 ++ 6 files changed, 335 insertions(+), 29 deletions(-) create mode 100644 include/qemu/futex.h diff --git a/docs/lockcnt.txt b/docs/lockcnt.txt index fc5d240..594764b 100644 --- a/docs/lockcnt.txt +++ b/docs/lockcnt.txt @@ -142,12 +142,11 @@ can also be more efficient in two ways: - it avoids taking the lock for many operations (for example incrementing the counter while it is non-zero); -- on some platforms, one could implement QemuLockCnt to hold the - lock and the mutex in a single word, making it no more expensive +- on some platforms, one can implement QemuLockCnt to hold the lock + and the mutex in a single word, making the fast path no more expensive than simply managing a counter using atomic operations (see - docs/atomics.txt). This is not implemented yet, but can be - very helpful if concurrent access to the data structure is - expected to be rare. + docs/atomics.txt). This can be very helpful if concurrent access to + the data structure is expected to be rare. Using the same mutex for frees and writes can still incur some small diff --git a/include/qemu/futex.h b/include/qemu/futex.h new file mode 100644 index 0000000..c3d1089 --- /dev/null +++ b/include/qemu/futex.h @@ -0,0 +1,36 @@ +/* + * Wrappers around Linux futex syscall + * + * Copyright Red Hat, Inc. 2015 + * + * Author: + * Paolo Bonzini <pbonz...@redhat.com> + * + * This work is licensed under the terms of the GNU GPL, version 2 or later. + * See the COPYING file in the top-level directory. + * + */ + +#include <sys/syscall.h> +#include <linux/futex.h> + +#define futex(...) syscall(__NR_futex, __VA_ARGS__) + +static inline void futex_wake(void *f, int n) +{ + futex(f, FUTEX_WAKE, n, NULL, NULL, 0); +} + +static inline void futex_wait(void *f, unsigned val) +{ + while (futex(f, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { + switch (errno) { + case EWOULDBLOCK: + return; + case EINTR: + break; /* get out of switch and retry */ + default: + abort(); + } + } +} diff --git a/include/qemu/thread.h b/include/qemu/thread.h index ce18b17..6b9cbd0 100644 --- a/include/qemu/thread.h +++ b/include/qemu/thread.h @@ -100,7 +100,9 @@ static inline void qemu_spin_unlock(QemuSpin *spin) } struct QemuLockCnt { +#ifndef CONFIG_LINUX QemuMutex mutex; +#endif unsigned count; }; diff --git a/util/lockcnt.c b/util/lockcnt.c index 78ed1e4..71e8f8f 100644 --- a/util/lockcnt.c +++ b/util/lockcnt.c @@ -9,7 +9,288 @@ #include "qemu/osdep.h" #include "qemu/thread.h" #include "qemu/atomic.h" +#include "trace.h" +#ifdef CONFIG_LINUX +#include "qemu/futex.h" + +/* On Linux, bits 0-1 are a futex-based lock, bits 2-31 are the counter. + * For the mutex algorithm see Ulrich Drepper's "Futexes Are Tricky" (ok, + * this is not the most relaxing citation I could make...). It is similar + * to mutex2 in the paper. + */ + +#define QEMU_LOCKCNT_STATE_MASK 3 +#define QEMU_LOCKCNT_STATE_FREE 0 +#define QEMU_LOCKCNT_STATE_LOCKED 1 +#define QEMU_LOCKCNT_STATE_WAITING 2 + +#define QEMU_LOCKCNT_COUNT_STEP 4 +#define QEMU_LOCKCNT_COUNT_SHIFT 2 + +void qemu_lockcnt_init(QemuLockCnt *lockcnt) +{ + lockcnt->count = 0; +} + +void qemu_lockcnt_destroy(QemuLockCnt *lockcnt) +{ +} + +/* *val is the current value of lockcnt->count. + * + * If the lock is free, try a cmpxchg from *val to new_if_free; if + * successful return true and set *val to the new value of + * lockcnt->count, i.e. new_if_free. + * + * If the lock is taken, wait for it to be released and return false + * *without trying again to take the lock*. Again, set *val to the + * new value of lockcnt->count. *waited is set to true before sleeping. + * + * new_if_free's bottom two bits must not be QEMU_LOCKCNT_STATE_LOCKED + * if calling this function a second time after it has returned + * false. + */ +static bool qemu_lockcnt_cmpxchg_or_wait(QemuLockCnt *lockcnt, int *val, + int new_if_free, bool *waited) +{ + /* Fast path for when the lock is free. */ + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_FREE) { + int expected = *val; + + trace_lockcnt_fast_path_attempt(lockcnt, expected, new_if_free); + *val = atomic_cmpxchg(&lockcnt->count, expected, new_if_free); + if (*val == expected) { + trace_lockcnt_fast_path_success(lockcnt, expected, new_if_free); + *val = new_if_free; + return true; + } + } + + /* The slow path moves from locked to waiting if necessary, then + * does a futex wait. Both steps can be repeated ad nauseam, + * only getting out of the loop if we can have another shot at the + * fast path. Once we can, get out to compute the new destination + * value for the fast path. + */ + while ((*val & QEMU_LOCKCNT_STATE_MASK) != QEMU_LOCKCNT_STATE_FREE) { + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_LOCKED) { + int expected = *val; + int new = expected - QEMU_LOCKCNT_STATE_LOCKED + QEMU_LOCKCNT_STATE_WAITING; + + trace_lockcnt_futex_wait_prepare(lockcnt, expected, new); + *val = atomic_cmpxchg(&lockcnt->count, expected, new); + if (*val == expected) { + *val = new; + } + continue; + } + + if ((*val & QEMU_LOCKCNT_STATE_MASK) == QEMU_LOCKCNT_STATE_WAITING) { + *waited = true; + trace_lockcnt_futex_wait(lockcnt, *val); + futex_wait(&lockcnt->count, *val); + *val = atomic_read(&lockcnt->count); + trace_lockcnt_futex_wait_resume(lockcnt, *val); + continue; + } + + abort(); + } + return false; +} + +static void lockcnt_wake(QemuLockCnt *lockcnt) +{ + trace_lockcnt_futex_wake(lockcnt); + futex_wake(&lockcnt->count, 1); +} + +void qemu_lockcnt_inc(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + bool waited = false; + + for (;;) { + if (val >= QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + val = atomic_cmpxchg(&lockcnt->count, val, val + QEMU_LOCKCNT_COUNT_STEP); + if (val == expected) { + break; + } + } else { + /* The fast path is (0, unlocked)->(1, unlocked). */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, QEMU_LOCKCNT_COUNT_STEP, + &waited)) { + break; + } + } + } + + /* If we were woken by another thread, we should also wake one because + * we are effectively releasing the lock that was given to us. This is + * the case where qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING + * in the low bits, and qemu_lockcnt_inc_and_unlock would find it and + * wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_dec(QemuLockCnt *lockcnt) +{ + atomic_sub(&lockcnt->count, QEMU_LOCKCNT_COUNT_STEP); +} + +/* Decrement a counter, and return locked if it is decremented to zero. + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_and_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + for (;;) { + if (val >= 2 * QEMU_LOCKCNT_COUNT_STEP) { + int expected = val; + int new = val - QEMU_LOCKCNT_COUNT_STEP; + val = atomic_cmpxchg(&lockcnt->count, val, new); + if (val == expected) { + break; + } + } + + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +/* If the counter is one, decrement it and return locked. Otherwise do + * nothing. + * + * If the function returns true, it is impossible for the counter to + * become nonzero until the next qemu_lockcnt_unlock. + */ +bool qemu_lockcnt_dec_if_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int locked_state = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + while (val < 2 * QEMU_LOCKCNT_COUNT_STEP) { + /* If count is going 1->0, take the lock. The fast path is + * (1, unlocked)->(0, locked) or (1, unlocked)->(0, waiting). + */ + if (qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, locked_state, &waited)) { + return true; + } + + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + locked_state = QEMU_LOCKCNT_STATE_WAITING; + } + } + + /* If we were woken by another thread, but we're returning in unlocked + * state, we should also wake a thread because we are effectively + * releasing the lock that was given to us. This is the case where + * qemu_lockcnt_lock would leave QEMU_LOCKCNT_STATE_WAITING in the low + * bits, and qemu_lockcnt_inc_and_unlock would find it and wake someone. + */ + if (waited) { + lockcnt_wake(lockcnt); + } + return false; +} + +void qemu_lockcnt_lock(QemuLockCnt *lockcnt) +{ + int val = atomic_read(&lockcnt->count); + int step = QEMU_LOCKCNT_STATE_LOCKED; + bool waited = false; + + /* The third argument is only used if the low bits of val are 0 + * (QEMU_LOCKCNT_STATE_FREE), so just blindly mix in the desired + * state. + */ + while (!qemu_lockcnt_cmpxchg_or_wait(lockcnt, &val, val + step, &waited)) { + if (waited) { + /* At this point we do not know if there are more waiters. Assume + * there are. + */ + step = QEMU_LOCKCNT_STATE_WAITING; + } + } +} + +void qemu_lockcnt_inc_and_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = (val + QEMU_LOCKCNT_COUNT_STEP) & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +void qemu_lockcnt_unlock(QemuLockCnt *lockcnt) +{ + int expected, new, val; + + val = atomic_read(&lockcnt->count); + do { + expected = val; + new = val & ~QEMU_LOCKCNT_STATE_MASK; + trace_lockcnt_unlock_attempt(lockcnt, val, new); + val = atomic_cmpxchg(&lockcnt->count, val, new); + } while (val != expected); + + trace_lockcnt_unlock_success(lockcnt, val, new); + if (val & QEMU_LOCKCNT_STATE_WAITING) { + lockcnt_wake(lockcnt); + } +} + +unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) +{ + return lockcnt->count >> QEMU_LOCKCNT_COUNT_SHIFT; +} +#else void qemu_lockcnt_init(QemuLockCnt *lockcnt) { qemu_mutex_init(&lockcnt->mutex); @@ -111,3 +392,4 @@ unsigned qemu_lockcnt_count(QemuLockCnt *lockcnt) { return lockcnt->count; } +#endif diff --git a/util/qemu-thread-posix.c b/util/qemu-thread-posix.c index d20cdde..a3df214 100644 --- a/util/qemu-thread-posix.c +++ b/util/qemu-thread-posix.c @@ -11,10 +11,6 @@ * */ #include "qemu/osdep.h" -#ifdef __linux__ -#include <sys/syscall.h> -#include <linux/futex.h> -#endif #include "qemu/thread.h" #include "qemu/atomic.h" #include "qemu/notify.h" @@ -294,26 +290,7 @@ void qemu_sem_wait(QemuSemaphore *sem) } #ifdef __linux__ -#define futex(...) syscall(__NR_futex, __VA_ARGS__) - -static inline void futex_wake(QemuEvent *ev, int n) -{ - futex(ev, FUTEX_WAKE, n, NULL, NULL, 0); -} - -static inline void futex_wait(QemuEvent *ev, unsigned val) -{ - while (futex(ev, FUTEX_WAIT, (int) val, NULL, NULL, 0)) { - switch (errno) { - case EWOULDBLOCK: - return; - case EINTR: - break; /* get out of switch and retry */ - default: - abort(); - } - } -} +#include "qemu/futex.h" #else static inline void futex_wake(QemuEvent *ev, int n) { diff --git a/util/trace-events b/util/trace-events index ed06aee..2b8aa30 100644 --- a/util/trace-events +++ b/util/trace-events @@ -30,3 +30,13 @@ qemu_anon_ram_free(void *ptr, size_t size) "ptr %p size %zu" hbitmap_iter_skip_words(const void *hb, void *hbi, uint64_t pos, unsigned long cur) "hb %p hbi %p pos %"PRId64" cur 0x%lx" hbitmap_reset(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 hbitmap_set(void *hb, uint64_t start, uint64_t count, uint64_t sbit, uint64_t ebit) "hb %p items %"PRIu64",%"PRIu64" bits %"PRIu64"..%"PRIu64 + +# util/lockcnt.c +lockcnt_fast_path_attempt(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d" +lockcnt_fast_path_success(const void *lockcnt, int expected, int new) "lockcnt %p fast path %d->%d succeeded" +lockcnt_unlock_attempt(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d" +lockcnt_unlock_success(const void *lockcnt, int expected, int new) "lockcnt %p unlock %d->%d succeeded" +lockcnt_futex_wait_prepare(const void *lockcnt, int expected, int new) "lockcnt %p preparing slow path %d->%d" +lockcnt_futex_wait(const void *lockcnt, int val) "lockcnt %p waiting on %d" +lockcnt_futex_wait_resume(const void *lockcnt, int new) "lockcnt %p after wait: %d" +lockcnt_futex_wake(const void *lockcnt) "lockcnt %p waking up one waiter" -- 2.9.3 >From 5b336f5f328c1495dfa22fd3776d7e5274c794c1 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 14 Apr 2015 15:06:34 +0200 Subject: [PATCH 05/10] aio: tweak walking in dispatch phase Preparing for the following patch, use QLIST_FOREACH_SAFE and modify the placement of walking_handlers increment/decrement. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- aio-posix.c | 27 +++++++++++++-------------- aio-win32.c | 26 ++++++++++++-------------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/aio-posix.c b/aio-posix.c index e13b9ab..93a50ad 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -292,7 +292,7 @@ bool aio_pending(AioContext *ctx) bool aio_dispatch(AioContext *ctx) { - AioHandler *node; + AioHandler *node, *tmp; bool progress = false; /* @@ -308,12 +308,10 @@ bool aio_dispatch(AioContext *ctx) * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - node = QLIST_FIRST(&ctx->aio_handlers); - while (node) { - AioHandler *tmp; - int revents; + ctx->walking_handlers++; - ctx->walking_handlers++; + QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) { + int revents; revents = node->pfd.revents & node->pfd.events; node->pfd.revents = 0; @@ -337,17 +335,18 @@ bool aio_dispatch(AioContext *ctx) progress = true; } - tmp = node; - node = QLIST_NEXT(node, node); - - ctx->walking_handlers--; - - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); + if (node->deleted) { + ctx->walking_handlers--; + if (!ctx->walking_handlers) { + QLIST_REMOVE(node, node); + g_free(node); + } + ctx->walking_handlers++; } } + ctx->walking_handlers--; + /* Run our timers */ progress |= timerlistgroup_run_timers(&ctx->tlg); diff --git a/aio-win32.c b/aio-win32.c index c8c249e..f27b56b 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -209,20 +209,18 @@ bool aio_pending(AioContext *ctx) static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) { - AioHandler *node; + AioHandler *node, *tmp; bool progress = false; + ctx->walking_handlers++; + /* * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - node = QLIST_FIRST(&ctx->aio_handlers); - while (node) { - AioHandler *tmp; + QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) { int revents = node->pfd.revents; - ctx->walking_handlers++; - if (!node->deleted && (revents || event_notifier_get_handle(node->e) == event) && node->io_notify) { @@ -257,17 +255,17 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) } } - tmp = node; - node = QLIST_NEXT(node, node); - - ctx->walking_handlers--; - - if (!ctx->walking_handlers && tmp->deleted) { - QLIST_REMOVE(tmp, node); - g_free(tmp); + if (node->deleted) { + ctx->walking_handlers--; + if (!ctx->walking_handlers) { + QLIST_REMOVE(node, node); + g_free(node); + } + ctx->walking_handlers++; } } + ctx->walking_handlers--; return progress; } -- 2.9.3 >From 0e03f21233d33cd08c9dd32bd3de00949e8b5810 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 3 Mar 2015 14:42:46 +0100 Subject: [PATCH 06/10] aio-posix: remove walking_handlers, protecting AioHandler list with list_lock Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- aio-posix.c | 51 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/aio-posix.c b/aio-posix.c index 93a50ad..c64d36d 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -16,7 +16,7 @@ #include "qemu/osdep.h" #include "qemu-common.h" #include "block/block.h" -#include "qemu/queue.h" +#include "qemu/rcu_queue.h" #include "qemu/sockets.h" #ifdef CONFIG_EPOLL_CREATE1 #include <sys/epoll.h> @@ -206,6 +206,8 @@ void aio_set_fd_handler(AioContext *ctx, bool is_new = false; bool deleted = false; + qemu_lockcnt_lock(&ctx->list_lock); + node = find_aio_handler(ctx, fd); /* Are we deleting the fd handler? */ @@ -217,7 +219,7 @@ void aio_set_fd_handler(AioContext *ctx, g_source_remove_poll(&ctx->source, &node->pfd); /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { @@ -233,7 +235,7 @@ void aio_set_fd_handler(AioContext *ctx, /* Alloc and insert if it's not already there */ node = g_new0(AioHandler, 1); node->pfd.fd = fd; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); g_source_add_poll(&ctx->source, &node->pfd); is_new = true; @@ -249,6 +251,7 @@ void aio_set_fd_handler(AioContext *ctx, } aio_epoll_update(ctx, node, is_new); + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); if (deleted) { g_free(node); @@ -272,22 +275,32 @@ bool aio_prepare(AioContext *ctx) bool aio_pending(AioContext *ctx) { AioHandler *node; + bool result = false; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { int revents; revents = node->pfd.revents & node->pfd.events; if (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR) && node->io_read && aio_node_check(ctx, node->is_external)) { - return true; + result = true; + break; } if (revents & (G_IO_OUT | G_IO_ERR) && node->io_write && aio_node_check(ctx, node->is_external)) { - return true; + result = true; + break; } } + qemu_lockcnt_dec(&ctx->list_lock); - return false; + return result; } bool aio_dispatch(AioContext *ctx) @@ -308,13 +321,12 @@ bool aio_dispatch(AioContext *ctx) * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - ctx->walking_handlers++; + qemu_lockcnt_inc(&ctx->list_lock); - QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) { + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { int revents; - revents = node->pfd.revents & node->pfd.events; - node->pfd.revents = 0; + revents = atomic_xchg(&node->pfd.revents, 0) & node->pfd.events; if (!node->deleted && (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && @@ -336,16 +348,15 @@ bool aio_dispatch(AioContext *ctx) } if (node->deleted) { - ctx->walking_handlers--; - if (!ctx->walking_handlers) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { QLIST_REMOVE(node, node); g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); } - ctx->walking_handlers++; } } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); /* Run our timers */ progress |= timerlistgroup_run_timers(&ctx->tlg); @@ -420,14 +431,12 @@ bool aio_poll(AioContext *ctx, bool blocking) atomic_add(&ctx->notify_me, 2); } - ctx->walking_handlers++; - + qemu_lockcnt_inc(&ctx->list_lock); assert(npfd == 0); /* fill pollfds */ - if (!aio_epoll_enabled(ctx)) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (!node->deleted && node->pfd.events && aio_node_check(ctx, node->is_external)) { add_pollfd(node); @@ -464,12 +473,12 @@ bool aio_poll(AioContext *ctx, bool blocking) /* if we have any readable fds, dispatch event */ if (ret > 0) { for (i = 0; i < npfd; i++) { - nodes[i]->pfd.revents = pollfds[i].revents; + atomic_or(&nodes[i]->pfd.revents, pollfds[i].revents); } } npfd = 0; - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); /* Run dispatch even if there were no readable fds to run timers */ if (aio_dispatch(ctx)) { -- 2.9.3 >From 7f71f6a0a3e54d7307b570411d973d1bd49c40c8 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 3 Mar 2015 14:42:46 +0100 Subject: [PATCH 07/10] aio-win32: remove walking_handlers, protecting AioHandler list with list_lock Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- aio-win32.c | 82 +++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 50 insertions(+), 32 deletions(-) diff --git a/aio-win32.c b/aio-win32.c index f27b56b..7ae2c14 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -20,6 +20,7 @@ #include "block/block.h" #include "qemu/queue.h" #include "qemu/sockets.h" +#include "qemu/rcu_queue.h" struct AioHandler { EventNotifier *e; @@ -43,6 +44,7 @@ void aio_set_fd_handler(AioContext *ctx, /* fd is a SOCKET in our case */ AioHandler *node; + qemu_lockcnt_lock(&ctx->list_lock); QLIST_FOREACH(node, &ctx->aio_handlers, node) { if (node->pfd.fd == fd && !node->deleted) { break; @@ -52,14 +54,14 @@ void aio_set_fd_handler(AioContext *ctx, /* Are we deleting the fd handler? */ if (!io_read && !io_write) { if (node) { - /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + /* If aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { /* Otherwise, delete it for real. We can't just mark it as * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. + * releasing the list_lock. */ QLIST_REMOVE(node, node); g_free(node); @@ -72,7 +74,7 @@ void aio_set_fd_handler(AioContext *ctx, /* Alloc and insert if it's not already there */ node = g_new0(AioHandler, 1); node->pfd.fd = fd; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); } node->pfd.events = 0; @@ -97,6 +99,7 @@ void aio_set_fd_handler(AioContext *ctx, FD_CONNECT | FD_WRITE | FD_OOB); } + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -107,6 +110,7 @@ void aio_set_event_notifier(AioContext *ctx, { AioHandler *node; + qemu_lockcnt_lock(&ctx->list_lock); QLIST_FOREACH(node, &ctx->aio_handlers, node) { if (node->e == e && !node->deleted) { break; @@ -118,14 +122,14 @@ void aio_set_event_notifier(AioContext *ctx, if (node) { g_source_remove_poll(&ctx->source, &node->pfd); - /* If the lock is held, just mark the node as deleted */ - if (ctx->walking_handlers) { + /* aio_poll is in progress, just mark the node as deleted */ + if (qemu_lockcnt_count(&ctx->list_lock)) { node->deleted = 1; node->pfd.revents = 0; } else { /* Otherwise, delete it for real. We can't just mark it as * deleted because deleted nodes are only cleaned up after - * releasing the walking_handlers lock. + * releasing the list_lock. */ QLIST_REMOVE(node, node); g_free(node); @@ -139,7 +143,7 @@ void aio_set_event_notifier(AioContext *ctx, node->pfd.fd = (uintptr_t)event_notifier_get_handle(e); node->pfd.events = G_IO_IN; node->is_external = is_external; - QLIST_INSERT_HEAD(&ctx->aio_handlers, node, node); + QLIST_INSERT_HEAD_RCU(&ctx->aio_handlers, node, node); g_source_add_poll(&ctx->source, &node->pfd); } @@ -147,6 +151,7 @@ void aio_set_event_notifier(AioContext *ctx, node->io_notify = io_notify; } + qemu_lockcnt_unlock(&ctx->list_lock); aio_notify(ctx); } @@ -157,10 +162,16 @@ bool aio_prepare(AioContext *ctx) bool have_select_revents = false; fd_set rfds, wfds; + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + /* fill fd sets */ FD_ZERO(&rfds); FD_ZERO(&wfds); - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (node->io_read) { FD_SET ((SOCKET)node->pfd.fd, &rfds); } @@ -170,61 +181,71 @@ bool aio_prepare(AioContext *ctx) } if (select(0, &rfds, &wfds, NULL, &tv0) > 0) { - QLIST_FOREACH(node, &ctx->aio_handlers, node) { - node->pfd.revents = 0; + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (FD_ISSET(node->pfd.fd, &rfds)) { - node->pfd.revents |= G_IO_IN; + atomic_or(&node->pfd.revents, G_IO_IN); have_select_revents = true; } if (FD_ISSET(node->pfd.fd, &wfds)) { - node->pfd.revents |= G_IO_OUT; + atomic_or(&node->pfd.revents, G_IO_OUT); have_select_revents = true; } } } + qemu_lockcnt_dec(&ctx->list_lock); return have_select_revents; } bool aio_pending(AioContext *ctx) { AioHandler *node; + bool result = false; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + /* + * We have to walk very carefully in case aio_set_fd_handler is + * called while we're walking. + */ + qemu_lockcnt_inc(&ctx->list_lock); + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (node->pfd.revents && node->io_notify) { - return true; + result = true; + break; } if ((node->pfd.revents & G_IO_IN) && node->io_read) { - return true; + result = true; + break; } if ((node->pfd.revents & G_IO_OUT) && node->io_write) { - return true; + result = true; + break; } } - return false; + qemu_lockcnt_dec(&ctx->list_lock); + return result; } static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) { - AioHandler *node, *tmp; + AioHandler *node; bool progress = false; + AioHandler *tmp; - ctx->walking_handlers++; + qemu_lockcnt_inc(&ctx->list_lock); /* * We have to walk very carefully in case aio_set_fd_handler is * called while we're walking. */ - QLIST_FOREACH_SAFE(node, &ctx->aio_handlers, node, tmp) { - int revents = node->pfd.revents; + QLIST_FOREACH_SAFE_RCU(node, &ctx->aio_handlers, node, tmp) { + int revents = atomic_xchg(&node->pfd.revents, 0); if (!node->deleted && (revents || event_notifier_get_handle(node->e) == event) && node->io_notify) { - node->pfd.revents = 0; node->io_notify(node->e); /* aio_notify() does not count as progress */ @@ -235,7 +256,6 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) if (!node->deleted && (node->io_read || node->io_write)) { - node->pfd.revents = 0; if ((revents & G_IO_IN) && node->io_read) { node->io_read(node->opaque); progress = true; @@ -256,16 +276,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) } if (node->deleted) { - ctx->walking_handlers--; - if (!ctx->walking_handlers) { + if (qemu_lockcnt_dec_if_lock(&ctx->list_lock)) { QLIST_REMOVE(node, node); g_free(node); + qemu_lockcnt_inc_and_unlock(&ctx->list_lock); } - ctx->walking_handlers++; } } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); return progress; } @@ -301,20 +320,19 @@ bool aio_poll(AioContext *ctx, bool blocking) atomic_add(&ctx->notify_me, 2); } + qemu_lockcnt_inc(&ctx->list_lock); have_select_revents = aio_prepare(ctx); - ctx->walking_handlers++; - /* fill fd sets */ count = 0; - QLIST_FOREACH(node, &ctx->aio_handlers, node) { + QLIST_FOREACH_RCU(node, &ctx->aio_handlers, node) { if (!node->deleted && node->io_notify && aio_node_check(ctx, node->is_external)) { events[count++] = event_notifier_get_handle(node->e); } } - ctx->walking_handlers--; + qemu_lockcnt_dec(&ctx->list_lock); first = true; /* ctx->notifier is always registered. */ -- 2.9.3 >From 8aa213a1e9e61d59cfccea22c248d78ef175ecd8 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 3 Mar 2015 15:06:48 +0100 Subject: [PATCH 08/10] aio: document locking Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- docs/multiple-iothreads.txt | 5 ++--- include/block/aio.h | 32 ++++++++++++++++---------------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/docs/multiple-iothreads.txt b/docs/multiple-iothreads.txt index 0e7cdb2..a03f887 100644 --- a/docs/multiple-iothreads.txt +++ b/docs/multiple-iothreads.txt @@ -84,9 +84,8 @@ How to synchronize with an IOThread AioContext is not thread-safe so some rules must be followed when using file descriptors, event notifiers, timers, or BHs across threads: -1. AioContext functions can be called safely from file descriptor, event -notifier, timer, or BH callbacks invoked by the AioContext. No locking is -necessary. +1. AioContext functions can always be called safely. They handle their +own locking internally. 2. Other threads wishing to access the AioContext must use aio_context_acquire()/aio_context_release() for mutual exclusion. Once the diff --git a/include/block/aio.h b/include/block/aio.h index b833fe8..5c2f53b 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -52,18 +52,12 @@ struct LinuxAioState; struct AioContext { GSource source; - /* Protects all fields from multi-threaded access */ + /* Used by AioContext users to protect from multi-threaded access. */ QemuRecMutex lock; - /* The list of registered AIO handlers */ + /* The list of registered AIO handlers. Protected by ctx->list_lock. */ QLIST_HEAD(, AioHandler) aio_handlers; - /* This is a simple lock used to protect the aio_handlers list. - * Specifically, it's used to ensure that no callbacks are removed while - * we're walking and dispatching callbacks. - */ - int walking_handlers; - /* Used to avoid unnecessary event_notifier_set calls in aio_notify; * accessed with atomic primitives. If this field is 0, everything * (file descriptors, bottom halves, timers) will be re-evaluated @@ -89,9 +83,9 @@ struct AioContext { */ uint32_t notify_me; - /* A lock to protect between bh's adders and deleter, and to ensure - * that no callbacks are removed while we're walking and dispatching - * them. + /* A lock to protect between QEMUBH and AioHandler adders and deleter, + * and to ensure that no callbacks are removed while we're walking and + * dispatching them. */ QemuLockCnt list_lock; @@ -113,7 +107,9 @@ struct AioContext { bool notified; EventNotifier notifier; - /* Thread pool for performing work and receiving completion callbacks */ + /* Thread pool for performing work and receiving completion callbacks. + * Has its own locking. + */ struct ThreadPool *thread_pool; #ifdef CONFIG_LINUX_AIO @@ -123,7 +119,9 @@ struct AioContext { struct LinuxAioState *linux_aio; #endif - /* TimerLists for calling timers - one per clock type */ + /* TimerLists for calling timers - one per clock type. Has its own + * locking. + */ QEMUTimerListGroup tlg; int external_disable_cnt; @@ -165,9 +163,11 @@ void aio_context_unref(AioContext *ctx); * automatically takes care of calling aio_context_acquire and * aio_context_release. * - * Access to timers and BHs from a thread that has not acquired AioContext - * is possible. Access to callbacks for now must be done while the AioContext - * is owned by the thread (FIXME). + * Note that this is separate from bdrv_drained_begin/bdrv_drained_end. A + * thread still has to call those to avoid being interrupted by the guest. + * + * Bottom halves, timers and callbacks can be created or removed without + * acquiring the AioContext. */ void aio_context_acquire(AioContext *ctx); -- 2.9.3 >From 8e29d8e56a1e886d2447a7789ae0c0eeea4f9943 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Tue, 14 Apr 2015 15:33:02 +0200 Subject: [PATCH 09/10] aio: push aio_context_acquire/release down to dispatching The AioContext data structures are now protected by list_lock and/or they are walked with FOREACH_RCU primitives. There is no need anymore to acquire the AioContext for the entire duration of aio_dispatch. Instead, just acquire it before and after invoking the callbacks. The next step is then to push it further down. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- aio-posix.c | 15 ++++++--------- aio-win32.c | 15 +++++++-------- async.c | 2 ++ 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/aio-posix.c b/aio-posix.c index c64d36d..32e493f 100644 --- a/aio-posix.c +++ b/aio-posix.c @@ -332,7 +332,9 @@ bool aio_dispatch(AioContext *ctx) (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR)) && aio_node_check(ctx, node->is_external) && node->io_read) { + aio_context_acquire(ctx); node->io_read(node->opaque); + aio_context_release(ctx); /* aio_notify() does not count as progress */ if (node->opaque != &ctx->notifier) { @@ -343,7 +345,9 @@ bool aio_dispatch(AioContext *ctx) (revents & (G_IO_OUT | G_IO_ERR)) && aio_node_check(ctx, node->is_external) && node->io_write) { + aio_context_acquire(ctx); node->io_write(node->opaque); + aio_context_release(ctx); progress = true; } @@ -359,7 +363,9 @@ bool aio_dispatch(AioContext *ctx) qemu_lockcnt_dec(&ctx->list_lock); /* Run our timers */ + aio_context_acquire(ctx); progress |= timerlistgroup_run_timers(&ctx->tlg); + aio_context_release(ctx); return progress; } @@ -417,7 +423,6 @@ bool aio_poll(AioContext *ctx, bool blocking) bool progress; int64_t timeout; - aio_context_acquire(ctx); progress = false; /* aio_notify can avoid the expensive event_notifier_set if @@ -447,9 +452,6 @@ bool aio_poll(AioContext *ctx, bool blocking) timeout = blocking ? aio_compute_timeout(ctx) : 0; /* wait until next event */ - if (timeout) { - aio_context_release(ctx); - } if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) { AioHandler epoll_handler; @@ -464,9 +466,6 @@ bool aio_poll(AioContext *ctx, bool blocking) if (blocking) { atomic_sub(&ctx->notify_me, 2); } - if (timeout) { - aio_context_acquire(ctx); - } aio_notify_accept(ctx); @@ -485,8 +484,6 @@ bool aio_poll(AioContext *ctx, bool blocking) progress = true; } - aio_context_release(ctx); - return progress; } diff --git a/aio-win32.c b/aio-win32.c index 7ae2c14..374d28e 100644 --- a/aio-win32.c +++ b/aio-win32.c @@ -246,7 +246,9 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) if (!node->deleted && (revents || event_notifier_get_handle(node->e) == event) && node->io_notify) { + aio_context_acquire(ctx); node->io_notify(node->e); + aio_context_release(ctx); /* aio_notify() does not count as progress */ if (node->e != &ctx->notifier) { @@ -257,11 +259,15 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event) if (!node->deleted && (node->io_read || node->io_write)) { if ((revents & G_IO_IN) && node->io_read) { + aio_context_acquire(ctx); node->io_read(node->opaque); + aio_context_release(ctx); progress = true; } if ((revents & G_IO_OUT) && node->io_write) { + aio_context_acquire(ctx); node->io_write(node->opaque); + aio_context_release(ctx); progress = true; } @@ -306,7 +312,6 @@ bool aio_poll(AioContext *ctx, bool blocking) int count; int timeout; - aio_context_acquire(ctx); progress = false; /* aio_notify can avoid the expensive event_notifier_set if @@ -348,17 +353,11 @@ bool aio_poll(AioContext *ctx, bool blocking) timeout = blocking && !have_select_revents ? qemu_timeout_ns_to_ms(aio_compute_timeout(ctx)) : 0; - if (timeout) { - aio_context_release(ctx); - } ret = WaitForMultipleObjects(count, events, FALSE, timeout); if (blocking) { assert(first); atomic_sub(&ctx->notify_me, 2); } - if (timeout) { - aio_context_acquire(ctx); - } if (first) { aio_notify_accept(ctx); @@ -381,8 +380,8 @@ bool aio_poll(AioContext *ctx, bool blocking) progress |= aio_dispatch_handlers(ctx, event); } while (count > 0); + aio_context_acquire(ctx); progress |= timerlistgroup_run_timers(&ctx->tlg); - aio_context_release(ctx); return progress; } diff --git a/async.c b/async.c index f606785..95927fc 100644 --- a/async.c +++ b/async.c @@ -110,7 +110,9 @@ int aio_bh_poll(AioContext *ctx) ret = 1; } bh->idle = 0; + aio_context_acquire(ctx); aio_bh_call(bh); + aio_context_release(ctx); } } -- 2.9.3 >From f98b9a351346e8648772fa21150fcdca706b7860 Mon Sep 17 00:00:00 2001 From: Paolo Bonzini <pbonz...@redhat.com> Date: Mon, 11 May 2015 17:51:05 +0200 Subject: [PATCH 10/10] async: optimize aio_bh_poll Avoid entering the slow path of qemu_lockcnt_dec_and_lock if no bottom half has to be deleted. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- async.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/async.c b/async.c index 95927fc..6f8184b 100644 --- a/async.c +++ b/async.c @@ -92,6 +92,7 @@ int aio_bh_poll(AioContext *ctx) { QEMUBH *bh, **bhp, *next; int ret; + bool deleted = false; qemu_lockcnt_inc(&ctx->list_lock); @@ -114,9 +115,17 @@ int aio_bh_poll(AioContext *ctx) aio_bh_call(bh); aio_context_release(ctx); } + if (bh->deleted) { + deleted = true; + } } /* remove deleted bhs */ + if (!deleted) { + qemu_lockcnt_dec(&ctx->list_lock); + return ret; + } + if (qemu_lockcnt_dec_and_lock(&ctx->list_lock)) { bhp = &ctx->first_bh; while (*bhp) { @@ -130,7 +139,6 @@ int aio_bh_poll(AioContext *ctx) } qemu_lockcnt_unlock(&ctx->list_lock); } - return ret; } -- 2.9.3