Running a very small critical section on pthread_mutex_t and CoMutex shows that pthread_mutex_t is much faster because it doesn't actually go to sleep. What happens is that the critical section is shorter than the latency of entering the kernel and thus FUTEX_WAIT always fails. With CoMutex there is no such latency but you still want to avoid wait and wakeup. So introduce it artificially.
This only works with two waiters; because CoMutex is fair, it will always have more waits and wakeups than a pthread_mutex_t. Signed-off-by: Paolo Bonzini <pbonz...@redhat.com> --- include/qemu/coroutine.h | 5 +++++ util/qemu-coroutine-lock.c | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h index 018a60d..d15a09a 100644 --- a/include/qemu/coroutine.h +++ b/include/qemu/coroutine.h @@ -163,6 +163,11 @@ typedef struct CoMutex { */ unsigned locked; + /* Context that is holding the lock. Useful to avoid spinning + * when two coroutines on the same AioContext try to get the lock. :) + */ + AioContext *ctx; + /* A queue of waiters. Elements are added atomically in front of * from_push. to_pop is only populated, and popped from, by whoever * is in charge of the next wakeup. This can be an unlocker or, diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c index 7ed0f37..aa59e82 100644 --- a/util/qemu-coroutine-lock.c +++ b/util/qemu-coroutine-lock.c @@ -177,18 +177,44 @@ void qemu_co_mutex_init(CoMutex *mutex) void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) { + AioContext *ctx = qemu_get_current_aio_context(); Coroutine *self = qemu_coroutine_self(); CoWaitRecord w; unsigned old_handoff; + int waiters, i; + + /* Running a very small critical section on pthread_mutex_t and CoMutex + * shows that pthread_mutex_t is much faster because it doesn't actually + * go to sleep. What happens is that the critical section is shorter + * than the latency of entering the kernel and thus FUTEX_WAIT always + * fails. With CoMutex there is no such latency but you still want to + * avoid wait and wakeup. So introduce it artificially. + */ + i = 0; +retry_fast_path: + waiters = atomic_cmpxchg(&mutex->locked, 0, 1); + if (waiters != 0) { + while (waiters == 1 && ++i < 1000) { + if (atomic_read(&mutex->ctx) == ctx) { + break; + } + if (atomic_read(&mutex->locked) == 0) { + goto retry_fast_path; + } + /* cpu_relax(); */ + } + waiters = atomic_fetch_inc(&mutex->locked); + } - if (atomic_fetch_inc(&mutex->locked) == 0) { + if (waiters == 0) { /* Uncontended. */ trace_qemu_co_mutex_lock_uncontended(mutex, self); + mutex->ctx = ctx; return; } trace_qemu_co_mutex_lock_entry(mutex, self); - self->ctx = qemu_get_current_aio_context(); + self->ctx = ctx; w.co = self; push_waiter(mutex, &w); @@ -207,9 +233,11 @@ void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex) if (co == self) { /* We got the lock ourselves! */ assert(to_wake == &w); + mutex->ctx = ctx; return; } + mutex->ctx = co->ctx; qemu_coroutine_wake(co->ctx, co); } @@ -223,6 +251,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) trace_qemu_co_mutex_unlock_entry(mutex, self); + mutex->ctx = NULL; assert(mutex->locked); assert(qemu_in_coroutine()); @@ -237,6 +266,7 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) if (to_wake) { Coroutine *co = to_wake->co; + mutex->ctx = co->ctx; qemu_coroutine_wake(co->ctx, co); goto out; } -- 2.5.5