This patch adds support of bypassing coroutinue
in bdrv_co_aio_rw_vector(), which is in the fast path
block device, especially for virtio-blk dataplane.

Signed-off-by: Ming Lei <ming....@canonical.com>
---
 block.c |  185 +++++++++++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 157 insertions(+), 28 deletions(-)

diff --git a/block.c b/block.c
index 2326dab..e1812a7 100644
--- a/block.c
+++ b/block.c
@@ -35,6 +35,7 @@
 #include "qmp-commands.h"
 #include "qemu/timer.h"
 #include "qapi-event.h"
+#include "qemu/gc.h"
 
 #ifdef CONFIG_BSD
 #include <sys/types.h>
@@ -55,6 +56,21 @@ struct BdrvDirtyBitmap {
     QLIST_ENTRY(BdrvDirtyBitmap) list;
 };
 
+typedef struct CoroutineIOCompletion {
+    Coroutine *coroutine;
+    int ret;
+    bool bypass;
+    SimpleGC gc;
+} CoroutineIOCompletion;
+
+typedef struct BlockDriverAIOCBCoroutine {
+    BlockDriverAIOCB common;
+    BlockRequest req;
+    bool is_write;
+    bool *done;
+    QEMUBH *bh;
+} BlockDriverAIOCBCoroutine;
+
 #define NOT_DONE 0x7fffffff /* used while emulated sync operation in progress 
*/
 
 static void bdrv_dev_change_media_cb(BlockDriverState *bs, bool load);
@@ -120,6 +136,48 @@ int is_windows_drive(const char *filename)
 }
 #endif
 
+static CoroutineIOCompletion *bdrv_get_co_io_comp(void *acb)
+{
+    return (CoroutineIOCompletion *)(acb +
+               sizeof(BlockDriverAIOCBCoroutine));
+}
+
+static BlockDriverAIOCBCoroutine *bdrv_get_aio_co(void *co)
+{
+    assert(((CoroutineIOCompletion *)co)->bypass);
+
+    return (BlockDriverAIOCBCoroutine *)(co -
+               sizeof(BlockDriverAIOCBCoroutine));
+}
+
+static void bdrv_init_io_comp(CoroutineIOCompletion *co)
+{
+    co->coroutine = NULL;
+    co->bypass = false;
+    co->ret = 0;
+    simple_gc_init(&co->gc);
+}
+
+static void bdrv_free_qiov(void *addr)
+{
+    qemu_iovec_destroy((QEMUIOVector *)addr);
+    g_free(addr);
+}
+
+static void bdrv_gc_add_qiov(CoroutineIOCompletion *co,
+                             QEMUIOVector *qiov)
+{
+    QEMUIOVector *iov = g_malloc(sizeof(QEMUIOVector));
+
+    *iov = *qiov;
+    simple_gc_add(&co->gc, iov, bdrv_free_qiov);
+}
+
+static void bdrv_gc_add_buf(CoroutineIOCompletion *co, void *addr)
+{
+    simple_gc_add(&co->gc, addr, NULL);
+}
+
 /* throttling disk I/O limits */
 void bdrv_set_io_limits(BlockDriverState *bs,
                         ThrottleConfig *cfg)
@@ -3081,7 +3139,16 @@ static int coroutine_fn 
bdrv_aligned_preadv(BlockDriverState *bs,
             ret = drv->bdrv_co_readv(bs, sector_num, local_sectors,
                                      &local_qiov);
 
-            qemu_iovec_destroy(&local_qiov);
+
+            if (qemu_coroutine_self_bypassed()) {
+                CoroutineIOCompletion *pco = bdrv_get_co_io_comp(
+                                             qemu_coroutine_get_var());
+
+                /* GC will destroy the local iov after IO is completed */
+                bdrv_gc_add_qiov(pco, &local_qiov);
+            } else {
+                qemu_iovec_destroy(&local_qiov);
+            }
         } else {
             ret = 0;
         }
@@ -3165,9 +3232,19 @@ static int coroutine_fn 
bdrv_co_do_preadv(BlockDriverState *bs,
     tracked_request_end(&req);
 
     if (use_local_qiov) {
-        qemu_iovec_destroy(&local_qiov);
-        qemu_vfree(head_buf);
-        qemu_vfree(tail_buf);
+        if (!qemu_coroutine_self_bypassed()) {
+            qemu_iovec_destroy(&local_qiov);
+            qemu_vfree(head_buf);
+            qemu_vfree(tail_buf);
+        } else {
+            CoroutineIOCompletion *pco = bdrv_get_co_io_comp(
+                                         qemu_coroutine_get_var());
+
+            /* GC will release resources after IO is completed */
+            bdrv_gc_add_qiov(pco, &local_qiov);
+            head_buf == NULL ? true : bdrv_gc_add_buf(pco, head_buf);
+            tail_buf == NULL ? true : bdrv_gc_add_buf(pco, tail_buf);
+        }
     }
 
     return ret;
@@ -4659,15 +4736,6 @@ static BlockDriverAIOCB 
*bdrv_aio_writev_em(BlockDriverState *bs,
     return bdrv_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
 }
 
-
-typedef struct BlockDriverAIOCBCoroutine {
-    BlockDriverAIOCB common;
-    BlockRequest req;
-    bool is_write;
-    bool *done;
-    QEMUBH* bh;
-} BlockDriverAIOCBCoroutine;
-
 static void bdrv_aio_co_cancel_em(BlockDriverAIOCB *blockacb)
 {
     AioContext *aio_context = bdrv_get_aio_context(blockacb->bs);
@@ -4686,6 +4754,12 @@ static const AIOCBInfo bdrv_em_co_aiocb_info = {
     .cancel             = bdrv_aio_co_cancel_em,
 };
 
+static const AIOCBInfo bdrv_em_co_bypass_aiocb_info = {
+    .aiocb_size         = sizeof(BlockDriverAIOCBCoroutine) +
+                          sizeof(CoroutineIOCompletion),
+    .cancel             = bdrv_aio_co_cancel_em,
+};
+
 static void bdrv_co_em_bh(void *opaque)
 {
     BlockDriverAIOCBCoroutine *acb = opaque;
@@ -4705,6 +4779,13 @@ static void coroutine_fn bdrv_co_do_rw(void *opaque)
 {
     BlockDriverAIOCBCoroutine *acb = opaque;
     BlockDriverState *bs = acb->common.bs;
+    bool bypass = qemu_coroutine_self_bypassed();
+    CoroutineIOCompletion *co = bdrv_get_co_io_comp(acb);
+
+    if (bypass) {
+        bdrv_init_io_comp(bdrv_get_co_io_comp(acb));
+        qemu_coroutine_set_var(acb);
+    }
 
     if (!acb->is_write) {
         acb->req.error = bdrv_co_do_readv(bs, acb->req.sector,
@@ -4714,8 +4795,11 @@ static void coroutine_fn bdrv_co_do_rw(void *opaque)
             acb->req.nb_sectors, acb->req.qiov, acb->req.flags);
     }
 
-    acb->bh = aio_bh_new(bdrv_get_aio_context(bs), bdrv_co_em_bh, acb);
-    qemu_bh_schedule(acb->bh);
+    /* co->bypass is used for detecting early completion */
+    if (!bypass || !co->bypass) {
+        acb->bh = aio_bh_new(bdrv_get_aio_context(bs), bdrv_co_em_bh, acb);
+        qemu_bh_schedule(acb->bh);
+    }
 }
 
 static bool bdrv_rw_aligned(BlockDriverState *bs,
@@ -4767,8 +4851,27 @@ static BlockDriverAIOCB 
*bdrv_co_aio_rw_vector(BlockDriverState *bs,
 {
     Coroutine *co;
     BlockDriverAIOCBCoroutine *acb;
+    const AIOCBInfo *aiocb_info;
+    bool bypass;
 
-    acb = qemu_aio_get(&bdrv_em_co_aiocb_info, bs, cb, opaque);
+    /*
+     * In longterm, creating of coroutine should be pushed far further
+     * to make a fast path in cases of unnecessary coroutine usage.
+     *
+     * Also when the bypass mechanism is mature, the 'bypass_co' hint
+     * which is set in device can be moved to block layer so that bypass
+     * can be enabled automatically.
+     */
+    if (bs->bypass_co &&
+        bdrv_co_can_bypass_co(bs, sector_num, nb_sectors, flags, is_write)) {
+        aiocb_info = &bdrv_em_co_bypass_aiocb_info;
+        bypass = true;
+    } else {
+        aiocb_info = &bdrv_em_co_aiocb_info;
+        bypass = false;
+    }
+
+    acb = qemu_aio_get(aiocb_info, bs, cb, opaque);
     acb->req.sector = sector_num;
     acb->req.nb_sectors = nb_sectors;
     acb->req.qiov = qiov;
@@ -4776,8 +4879,14 @@ static BlockDriverAIOCB 
*bdrv_co_aio_rw_vector(BlockDriverState *bs,
     acb->is_write = is_write;
     acb->done = NULL;
 
-    co = qemu_coroutine_create(bdrv_co_do_rw);
-    qemu_coroutine_enter(co, acb);
+    if (!bypass) {
+        co = qemu_coroutine_create(bdrv_co_do_rw);
+        qemu_coroutine_enter(co, acb);
+    } else {
+        qemu_coroutine_set_bypass(true);
+        bdrv_co_do_rw(acb);
+        qemu_coroutine_set_bypass(false);
+    }
 
     return &acb->common;
 }
@@ -4871,17 +4980,23 @@ void qemu_aio_release(void *p)
 /**************************************************************/
 /* Coroutine block device emulation */
 
-typedef struct CoroutineIOCompletion {
-    Coroutine *coroutine;
-    int ret;
-} CoroutineIOCompletion;
-
 static void bdrv_co_io_em_complete(void *opaque, int ret)
 {
     CoroutineIOCompletion *co = opaque;
 
-    co->ret = ret;
-    qemu_coroutine_enter(co->coroutine, NULL);
+    if (!co->bypass) {
+        co->ret = ret;
+        qemu_coroutine_enter(co->coroutine, NULL);
+    } else {
+        BlockDriverAIOCBCoroutine *acb = bdrv_get_aio_co(co);
+
+        simple_gc_free_all(&co->gc);
+
+        acb->req.error = ret;
+        acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs),
+                             bdrv_co_em_bh, acb);
+        qemu_bh_schedule(acb->bh);
+    }
 }
 
 static int coroutine_fn bdrv_co_io_em(BlockDriverState *bs, int64_t sector_num,
@@ -4891,21 +5006,35 @@ static int coroutine_fn bdrv_co_io_em(BlockDriverState 
*bs, int64_t sector_num,
     CoroutineIOCompletion co = {
         .coroutine = qemu_coroutine_self(),
     };
+    CoroutineIOCompletion *pco = &co;
     BlockDriverAIOCB *acb;
 
+    if (qemu_coroutine_bypassed(pco->coroutine)) {
+        pco = bdrv_get_co_io_comp(qemu_coroutine_get_var());
+        pco->bypass = true;
+    }
+
     if (is_write) {
         acb = bs->drv->bdrv_aio_writev(bs, sector_num, iov, nb_sectors,
-                                       bdrv_co_io_em_complete, &co);
+                                       bdrv_co_io_em_complete, pco);
     } else {
         acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors,
-                                      bdrv_co_io_em_complete, &co);
+                                      bdrv_co_io_em_complete, pco);
     }
 
     trace_bdrv_co_io_em(bs, sector_num, nb_sectors, is_write, acb);
     if (!acb) {
+        /*
+         * no completion callback for failure case, let bdrv_co_do_rw
+         * handle completion.
+         */
+        pco->bypass = false;
         return -EIO;
     }
-    qemu_coroutine_yield();
+
+    if (!pco->bypass) {
+        qemu_coroutine_yield();
+    }
 
     return co.ret;
 }
-- 
1.7.9.5


Reply via email to