bdrv_parent_drained_begin_single() is also called by
bdrv_replace_child_noperm(). The latter is often called
from coroutines, for example in bdrv_co_create_opts() callbacks.

This can potentially create deadlocks, because if the drain_saldo
in bdrv_replace_child_noperm is > 0, the coroutine will start
polling using BDRV_POLL_WHILE. Right now this does not seem
to happen, but if additional drains are used in future,
this will be much more likely to happen.

Fix the problem by doing something very similar to
bdrv_do_drained_begin(): if in coroutine, schedule a bh
to execute the drain in the main loop, and enter the coroutine
only once it is done.

Just as the other drains, check the coroutine case only when
effectively polling.

As a consequence of this, remove the coroutine assertion in
bdrv_do_drained_begin_quiesce. We are never polling in that case.

Signed-off-by: Emanuele Giuseppe Esposito <eespo...@redhat.com>
---
 block/io.c | 98 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 97 insertions(+), 1 deletion(-)

diff --git a/block/io.c b/block/io.c
index 4a3e8d037d..e446782ae0 100644
--- a/block/io.c
+++ b/block/io.c
@@ -67,10 +67,101 @@ static void 
bdrv_parent_drained_end_single_no_poll(BdrvChild *c,
     }
 }
 
+typedef struct {
+    Coroutine *co;
+    BdrvChild *child;
+    bool done;
+    bool begin;
+    bool poll;
+} BdrvCoDrainParentData;
+
+static void bdrv_co_drain_parent_bh_cb(void *opaque)
+{
+    BdrvCoDrainParentData *data = opaque;
+    Coroutine *co = data->co;
+    BdrvChild *child = data->child;
+    BlockDriverState *bs = child->bs;
+    AioContext *ctx = bdrv_get_aio_context(bs);
+
+    if (bs) {
+        aio_context_acquire(ctx);
+        bdrv_dec_in_flight(bs);
+    }
+
+    if (data->begin) {
+        bdrv_parent_drained_begin_single(child, data->poll);
+    } else {
+        assert(!data->poll);
+        bdrv_parent_drained_end_single(child);
+    }
+
+    if (bs) {
+        aio_context_release(ctx);
+    }
+
+    data->done = true;
+    aio_co_wake(co);
+}
+
+static void coroutine_fn bdrv_co_yield_to_drain_parent(BdrvChild *c,
+                                                       bool begin, bool poll)
+{
+    BdrvCoDrainParentData data;
+    Coroutine *self = qemu_coroutine_self();
+    BlockDriverState *bs = c->bs;
+    AioContext *ctx = bdrv_get_aio_context(bs);
+    AioContext *co_ctx = qemu_coroutine_get_aio_context(self);
+
+    /* Calling bdrv_drain() from a BH ensures the current coroutine yields and
+     * other coroutines run if they were queued by aio_co_enter(). */
+
+    assert(qemu_in_coroutine());
+    data = (BdrvCoDrainParentData) {
+        .co = self,
+        .child = c,
+        .done = false,
+        .begin = begin,
+        .poll = poll,
+    };
+
+    if (bs) {
+        bdrv_inc_in_flight(bs);
+    }
+
+    /*
+     * Temporarily drop the lock across yield or we would get deadlocks.
+     * bdrv_co_yield_to_drain_parent() reaquires the lock as needed.
+     *
+     * When we yield below, the lock for the current context will be
+     * released, so if this is actually the lock that protects bs, don't drop
+     * it a second time.
+     */
+    if (ctx != co_ctx) {
+        aio_context_release(ctx);
+    }
+    replay_bh_schedule_oneshot_event(ctx, bdrv_co_drain_parent_bh_cb, &data);
+
+    qemu_coroutine_yield();
+    /* If we are resumed from some other event (such as an aio completion or a
+     * timer callback), it is a bug in the caller that should be fixed. */
+    assert(data.done);
+
+    /* Reaquire the AioContext of bs if we dropped it */
+    if (ctx != co_ctx) {
+        aio_context_acquire(ctx);
+    }
+}
+
 void bdrv_parent_drained_end_single(BdrvChild *c)
 {
     int drained_end_counter = 0;
     IO_OR_GS_CODE();
+
+    if (qemu_in_coroutine()) {
+        bdrv_co_yield_to_drain_parent(c, false, false);
+        return;
+    }
+
     bdrv_parent_drained_end_single_no_poll(c, &drained_end_counter);
     BDRV_POLL_WHILE(c->bs, qatomic_read(&drained_end_counter) > 0);
 }
@@ -116,6 +207,12 @@ static bool bdrv_parent_drained_poll(BlockDriverState *bs, 
BdrvChild *ignore,
 void bdrv_parent_drained_begin_single(BdrvChild *c, bool poll)
 {
     IO_OR_GS_CODE();
+
+    if (poll && qemu_in_coroutine()) {
+        bdrv_co_yield_to_drain_parent(c, true, poll);
+        return;
+    }
+
     c->parent_quiesce_counter++;
     if (c->klass->drained_begin) {
         c->klass->drained_begin(c);
@@ -430,7 +527,6 @@ void bdrv_do_drained_begin_quiesce(BlockDriverState *bs,
                                    BdrvChild *parent, bool ignore_bds_parents)
 {
     IO_OR_GS_CODE();
-    assert(!qemu_in_coroutine());
 
     /* Stop things in parent-to-child order */
     if (qatomic_fetch_inc(&bs->quiesce_counter) == 0) {
-- 
2.31.1


Reply via email to