On Wednesday, 2021-03-17 at 13:16:39 +01, Paolo Bonzini wrote:

> An invariant of the current rwlock is that if multiple coroutines hold a
> reader lock, all must be runnable. The unlock implementation relies on
> this, choosing to wake a single coroutine when the final read lock
> holder exits the critical section, assuming that it will wake a
> coroutine attempting to acquire a write lock.
>
> The downgrade implementation violates this assumption by creating a
> read lock owning coroutine that is exclusively runnable - any other
> coroutines that are waiting to acquire a read lock are *not* made
> runnable when the write lock holder converts its ownership to read
> only.
>
> More in general, the old implementation had lots of other fairness bugs.
> The root cause of the bugs was that CoQueue would wake up readers even
> if there were pending writers, and would wake up writers even if there
> were readers.  In that case, the coroutine would go back to sleep *at
> the end* of the CoQueue, losing its place at the head of the line.
>
> To fix this, keep the queue of waiters explicitly in the CoRwLock
> instead of using CoQueue, and store for each whether it is a
> potential reader or a writer.  This way, downgrade can look at the
> first queued coroutines and wake it only if it is a reader, causing
> all other readers in line to be released in turn.
>
> Reported-by: David Edmondson <david.edmond...@oracle.com>
> Signed-off-by: Paolo Bonzini <pbonz...@redhat.com>

A couple of nits below, but it looks good overall.

Reviewed-by: David Edmondson <david.edmond...@oracle.com>

> ---
> v3->v4: clean up the code and fix upgrade logic.  Fix upgrade comment too.
>
>  include/qemu/coroutine.h   |  17 +++--
>  util/qemu-coroutine-lock.c | 148 ++++++++++++++++++++++++-------------
>  2 files changed, 106 insertions(+), 59 deletions(-)
>
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index 84eab6e3bf..7919d3bb62 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, 
> QemuLockable *lock);
>  bool qemu_co_queue_empty(CoQueue *queue);
>  
>  
> +typedef struct CoRwTicket CoRwTicket;
>  typedef struct CoRwlock {
> -    int pending_writer;
> -    int reader;
>      CoMutex mutex;
> -    CoQueue queue;
> +
> +    /* Number of readers, of -1 if owned for writing.  */
> +    int owners;

s/, of/, or/

> +
> +    /* Waiting coroutines.  */
> +    QSIMPLEQ_HEAD(, CoRwTicket) tickets;
>  } CoRwlock;
>  
>  /**
> @@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock);
>  /**
>   * Write Locks the CoRwlock from a reader.  This is a bit more efficient than
>   * @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock.
> - * However, if the lock cannot be upgraded immediately, control is 
> transferred
> - * to the caller of the current coroutine.  Also, @qemu_co_rwlock_upgrade
> - * only overrides CoRwlock fairness if there are no concurrent readers, so
> - * another writer might run while @qemu_co_rwlock_upgrade blocks.
> + * Note that if the lock cannot be upgraded immediately, control is 
> transferred
> + * to the caller of the current coroutine; another writer might run while
> + * @qemu_co_rwlock_upgrade blocks.

This is better, thanks.

>   */
>  void qemu_co_rwlock_upgrade(CoRwlock *lock);
>  
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index eb73cf11dc..2669403839 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -327,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
>      trace_qemu_co_mutex_unlock_return(mutex, self);
>  }
>  
> +struct CoRwTicket {
> +    bool read;
> +    Coroutine *co;
> +    QSIMPLEQ_ENTRY(CoRwTicket) next;
> +};
> +
>  void qemu_co_rwlock_init(CoRwlock *lock)
>  {
> -    memset(lock, 0, sizeof(*lock));
> -    qemu_co_queue_init(&lock->queue);
>      qemu_co_mutex_init(&lock->mutex);
> +    lock->owners = 0;
> +    QSIMPLEQ_INIT(&lock->tickets);
> +}
> +
> +/* Releases the internal CoMutex.  */
> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
> +{
> +    CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
> +    Coroutine *co = NULL;
> +
> +    /*
> +     * Setting lock->owners here prevents rdlock and wrlock from
> +     * sneaking in between unlock and wake.
> +     */
> +
> +    if (tkt) {
> +        if (tkt->read) {
> +            if (lock->owners >= 0) {
> +                lock->owners++;
> +                co = tkt->co;
> +            }
> +        } else {
> +            if (lock->owners == 0) {
> +                lock->owners = -1;
> +                co = tkt->co;
> +            }
> +        }
> +    }
> +
> +    if (co) {
> +        QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
> +        qemu_co_mutex_unlock(&lock->mutex);
> +        aio_co_wake(co);
> +    } else {
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    }

This block could be pushed up into the earlier block, but I imagine that
the compiler will do it for you.

>  }
>  
>  void qemu_co_rwlock_rdlock(CoRwlock *lock)
> @@ -340,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
>  
>      qemu_co_mutex_lock(&lock->mutex);
>      /* For fairness, wait if a writer is in line.  */
> -    while (lock->pending_writer) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +    if (lock->owners == 0 || (lock->owners > 0 && 
> QSIMPLEQ_EMPTY(&lock->tickets))) {
> +        lock->owners++;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        CoRwTicket my_ticket = { true, self };
> +
> +        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> +        qemu_co_mutex_unlock(&lock->mutex);
> +        qemu_coroutine_yield();
> +        assert(lock->owners >= 1);
> +
> +        /* Possibly wake another reader, which will wake the next in line.  
> */
> +        qemu_co_mutex_lock(&lock->mutex);
> +        qemu_co_rwlock_maybe_wake_one(lock);
>      }
> -    lock->reader++;
> -    qemu_co_mutex_unlock(&lock->mutex);
>  
> -    /* The rest of the read-side critical section is run without the mutex.  
> */
>      self->locks_held++;
>  }
>  
> @@ -355,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
>      Coroutine *self = qemu_coroutine_self();
>  
>      assert(qemu_in_coroutine());
> -    if (!lock->reader) {
> -        /* The critical section started in qemu_co_rwlock_wrlock.  */
> -        qemu_co_queue_restart_all(&lock->queue);
> -    } else {
> -        self->locks_held--;
> +    self->locks_held--;
>  
> -        qemu_co_mutex_lock(&lock->mutex);
> -        lock->reader--;
> -        assert(lock->reader >= 0);
> -        /* Wakeup only one waiting writer */
> -        if (!lock->reader) {
> -            qemu_co_queue_next(&lock->queue);
> -        }
> +    qemu_co_mutex_lock(&lock->mutex);
> +    if (lock->owners > 0) {
> +        lock->owners--;
> +    } else {
> +        assert(lock->owners == -1);
> +        lock->owners = 0;
>      }
> -    qemu_co_mutex_unlock(&lock->mutex);
> +
> +    qemu_co_rwlock_maybe_wake_one(lock);
>  }
>  
>  void qemu_co_rwlock_downgrade(CoRwlock *lock)
>  {
> -    Coroutine *self = qemu_coroutine_self();
> -
> -    /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
> -     * qemu_co_rwlock_upgrade.
> -     */
> -    assert(lock->reader == 0);
> -    lock->reader++;
> -    qemu_co_mutex_unlock(&lock->mutex);
> +    qemu_co_mutex_lock(&lock->mutex);
> +    assert(lock->owners == -1);
> +    lock->owners = 1;
>  
> -    /* The rest of the read-side critical section is run without the mutex.  
> */
> -    self->locks_held++;
> +    /* Possibly wake another reader, which will wake the next in line.  */
> +    qemu_co_rwlock_maybe_wake_one(lock);
>  }
>  
>  void qemu_co_rwlock_wrlock(CoRwlock *lock)
>  {
> +    Coroutine *self = qemu_coroutine_self();
> +
>      qemu_co_mutex_lock(&lock->mutex);
> -    lock->pending_writer++;
> -    while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> +    if (lock->owners == 0) {
> +        lock->owners = -1;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
> +
> +        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> +        qemu_co_mutex_unlock(&lock->mutex);
> +        qemu_coroutine_yield();
> +        assert(lock->owners == -1);
>      }
> -    lock->pending_writer--;
>  
> -    /* The rest of the write-side critical section is run with
> -     * the mutex taken, so that lock->reader remains zero.
> -     * There is no need to update self->locks_held.
> -     */
> +    self->locks_held++;
>  }
>  
>  void qemu_co_rwlock_upgrade(CoRwlock *lock)
>  {
> -    Coroutine *self = qemu_coroutine_self();
> -
>      qemu_co_mutex_lock(&lock->mutex);
> -    assert(lock->reader > 0);
> -    lock->reader--;
> -    lock->pending_writer++;
> -    while (lock->reader) {
> -        qemu_co_queue_wait(&lock->queue, &lock->mutex);
> -    }
> -    lock->pending_writer--;
> +    assert(lock->owners > 0);
> +    /* For fairness, wait if a writer is in line.  */
> +    if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
> +        lock->owners = -1;
> +        qemu_co_mutex_unlock(&lock->mutex);
> +    } else {
> +        CoRwTicket my_ticket = { false, qemu_coroutine_self() };
>  
> -    /* The rest of the write-side critical section is run with
> -     * the mutex taken, similar to qemu_co_rwlock_wrlock.  Do
> -     * not account for the lock twice in self->locks_held.
> -     */
> -    self->locks_held--;
> +        lock->owners--;
> +        QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> +        qemu_co_rwlock_maybe_wake_one(lock);
> +        qemu_coroutine_yield();
> +        assert(lock->owners == -1);
> +    }
>  }
> -- 
> 2.29.2

dme.
-- 
Today is different. Today is not the same.

Reply via email to