The CoQueue API offers thread-safety via the lock argument that
qemu_co_queue_wait() and qemu_co_enter_next() take. BlockBackend
currently does not make use of the lock argument. This means that
multiple threads submitting I/O requests can corrupt the CoQueue's
QSIMPLEQ.

Add a QemuMutex and pass it to CoQueue APIs so that the queue is
protected. While we're at it, also assert that the queue is empty when
the BlockBackend is deleted.

Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
---
 block/block-backend.c | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/block/block-backend.c b/block/block-backend.c
index ec7eafd7fb..c4dd16f4da 100644
--- a/block/block-backend.c
+++ b/block/block-backend.c
@@ -81,6 +81,7 @@ struct BlockBackend {
     QLIST_HEAD(, BlockBackendAioNotifier) aio_notifiers;
 
     int quiesce_counter; /* atomic: written under BQL, read by other threads */
+    QemuMutex queued_requests_lock; /* protects queued_requests */
     CoQueue queued_requests;
     bool disable_request_queuing; /* atomic */
 
@@ -368,6 +369,7 @@ BlockBackend *blk_new(AioContext *ctx, uint64_t perm, 
uint64_t shared_perm)
 
     block_acct_init(&blk->stats);
 
+    qemu_mutex_init(&blk->queued_requests_lock);
     qemu_co_queue_init(&blk->queued_requests);
     notifier_list_init(&blk->remove_bs_notifiers);
     notifier_list_init(&blk->insert_bs_notifiers);
@@ -485,6 +487,8 @@ static void blk_delete(BlockBackend *blk)
     assert(QLIST_EMPTY(&blk->remove_bs_notifiers.notifiers));
     assert(QLIST_EMPTY(&blk->insert_bs_notifiers.notifiers));
     assert(QLIST_EMPTY(&blk->aio_notifiers));
+    assert(qemu_co_queue_empty(&blk->queued_requests));
+    qemu_mutex_destroy(&blk->queued_requests_lock);
     QTAILQ_REMOVE(&block_backends, blk, link);
     drive_info_del(blk->legacy_dinfo);
     block_acct_cleanup(&blk->stats);
@@ -1273,9 +1277,16 @@ static void coroutine_fn 
blk_wait_while_drained(BlockBackend *blk)
 
     if (qatomic_read(&blk->quiesce_counter) &&
         !qatomic_read(&blk->disable_request_queuing)) {
+        /*
+         * Take lock before decrementing in flight counter so main loop thread
+         * waits for us to enqueue ourselves before it can leave the drained
+         * section.
+         */
+        qemu_mutex_lock(&blk->queued_requests_lock);
         blk_dec_in_flight(blk);
-        qemu_co_queue_wait(&blk->queued_requests, NULL);
+        qemu_co_queue_wait(&blk->queued_requests, &blk->queued_requests_lock);
         blk_inc_in_flight(blk);
+        qemu_mutex_unlock(&blk->queued_requests_lock);
     }
 }
 
@@ -2611,9 +2622,12 @@ static void blk_root_drained_end(BdrvChild *child)
         if (blk->dev_ops && blk->dev_ops->drained_end) {
             blk->dev_ops->drained_end(blk->dev_opaque);
         }
-        while (qemu_co_enter_next(&blk->queued_requests, NULL)) {
+        qemu_mutex_lock(&blk->queued_requests_lock);
+        while (qemu_co_enter_next(&blk->queued_requests,
+                                  &blk->queued_requests_lock)) {
             /* Resume all queued requests */
         }
+        qemu_mutex_unlock(&blk->queued_requests_lock);
     }
 }
 
-- 
2.39.2


Reply via email to