This patch adds support of bypassing coroutinue in bdrv_co_aio_rw_vector(), which is in the fast path block device, especially for virtio-blk dataplane.
Signed-off-by: Ming Lei <ming....@canonical.com> --- block.c | 185 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 157 insertions(+), 28 deletions(-) diff --git a/block.c b/block.c index 2326dab..e1812a7 100644 --- a/block.c +++ b/block.c @@ -35,6 +35,7 @@ #include "qmp-commands.h" #include "qemu/timer.h" #include "qapi-event.h" +#include "qemu/gc.h" #ifdef CONFIG_BSD #include <sys/types.h> @@ -55,6 +56,21 @@ struct BdrvDirtyBitmap { QLIST_ENTRY(BdrvDirtyBitmap) list; }; +typedef struct CoroutineIOCompletion { + Coroutine *coroutine; + int ret; + bool bypass; + SimpleGC gc; +} CoroutineIOCompletion; + +typedef struct BlockDriverAIOCBCoroutine { + BlockDriverAIOCB common; + BlockRequest req; + bool is_write; + bool *done; + QEMUBH *bh; +} BlockDriverAIOCBCoroutine; + #define NOT_DONE 0x7fffffff /* used while emulated sync operation in progress */ static void bdrv_dev_change_media_cb(BlockDriverState *bs, bool load); @@ -120,6 +136,48 @@ int is_windows_drive(const char *filename) } #endif +static CoroutineIOCompletion *bdrv_get_co_io_comp(void *acb) +{ + return (CoroutineIOCompletion *)(acb + + sizeof(BlockDriverAIOCBCoroutine)); +} + +static BlockDriverAIOCBCoroutine *bdrv_get_aio_co(void *co) +{ + assert(((CoroutineIOCompletion *)co)->bypass); + + return (BlockDriverAIOCBCoroutine *)(co - + sizeof(BlockDriverAIOCBCoroutine)); +} + +static void bdrv_init_io_comp(CoroutineIOCompletion *co) +{ + co->coroutine = NULL; + co->bypass = false; + co->ret = 0; + simple_gc_init(&co->gc); +} + +static void bdrv_free_qiov(void *addr) +{ + qemu_iovec_destroy((QEMUIOVector *)addr); + g_free(addr); +} + +static void bdrv_gc_add_qiov(CoroutineIOCompletion *co, + QEMUIOVector *qiov) +{ + QEMUIOVector *iov = g_malloc(sizeof(QEMUIOVector)); + + *iov = *qiov; + simple_gc_add(&co->gc, iov, bdrv_free_qiov); +} + +static void bdrv_gc_add_buf(CoroutineIOCompletion *co, void *addr) +{ + simple_gc_add(&co->gc, addr, NULL); +} + /* throttling disk I/O limits */ void bdrv_set_io_limits(BlockDriverState *bs, ThrottleConfig *cfg) @@ -3081,7 +3139,16 @@ static int coroutine_fn bdrv_aligned_preadv(BlockDriverState *bs, ret = drv->bdrv_co_readv(bs, sector_num, local_sectors, &local_qiov); - qemu_iovec_destroy(&local_qiov); + + if (qemu_coroutine_self_bypassed()) { + CoroutineIOCompletion *pco = bdrv_get_co_io_comp( + qemu_coroutine_get_var()); + + /* GC will destroy the local iov after IO is completed */ + bdrv_gc_add_qiov(pco, &local_qiov); + } else { + qemu_iovec_destroy(&local_qiov); + } } else { ret = 0; } @@ -3165,9 +3232,19 @@ static int coroutine_fn bdrv_co_do_preadv(BlockDriverState *bs, tracked_request_end(&req); if (use_local_qiov) { - qemu_iovec_destroy(&local_qiov); - qemu_vfree(head_buf); - qemu_vfree(tail_buf); + if (!qemu_coroutine_self_bypassed()) { + qemu_iovec_destroy(&local_qiov); + qemu_vfree(head_buf); + qemu_vfree(tail_buf); + } else { + CoroutineIOCompletion *pco = bdrv_get_co_io_comp( + qemu_coroutine_get_var()); + + /* GC will release resources after IO is completed */ + bdrv_gc_add_qiov(pco, &local_qiov); + head_buf == NULL ? true : bdrv_gc_add_buf(pco, head_buf); + tail_buf == NULL ? true : bdrv_gc_add_buf(pco, tail_buf); + } } return ret; @@ -4659,15 +4736,6 @@ static BlockDriverAIOCB *bdrv_aio_writev_em(BlockDriverState *bs, return bdrv_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1); } - -typedef struct BlockDriverAIOCBCoroutine { - BlockDriverAIOCB common; - BlockRequest req; - bool is_write; - bool *done; - QEMUBH* bh; -} BlockDriverAIOCBCoroutine; - static void bdrv_aio_co_cancel_em(BlockDriverAIOCB *blockacb) { AioContext *aio_context = bdrv_get_aio_context(blockacb->bs); @@ -4686,6 +4754,12 @@ static const AIOCBInfo bdrv_em_co_aiocb_info = { .cancel = bdrv_aio_co_cancel_em, }; +static const AIOCBInfo bdrv_em_co_bypass_aiocb_info = { + .aiocb_size = sizeof(BlockDriverAIOCBCoroutine) + + sizeof(CoroutineIOCompletion), + .cancel = bdrv_aio_co_cancel_em, +}; + static void bdrv_co_em_bh(void *opaque) { BlockDriverAIOCBCoroutine *acb = opaque; @@ -4705,6 +4779,13 @@ static void coroutine_fn bdrv_co_do_rw(void *opaque) { BlockDriverAIOCBCoroutine *acb = opaque; BlockDriverState *bs = acb->common.bs; + bool bypass = qemu_coroutine_self_bypassed(); + CoroutineIOCompletion *co = bdrv_get_co_io_comp(acb); + + if (bypass) { + bdrv_init_io_comp(bdrv_get_co_io_comp(acb)); + qemu_coroutine_set_var(acb); + } if (!acb->is_write) { acb->req.error = bdrv_co_do_readv(bs, acb->req.sector, @@ -4714,8 +4795,11 @@ static void coroutine_fn bdrv_co_do_rw(void *opaque) acb->req.nb_sectors, acb->req.qiov, acb->req.flags); } - acb->bh = aio_bh_new(bdrv_get_aio_context(bs), bdrv_co_em_bh, acb); - qemu_bh_schedule(acb->bh); + /* co->bypass is used for detecting early completion */ + if (!bypass || !co->bypass) { + acb->bh = aio_bh_new(bdrv_get_aio_context(bs), bdrv_co_em_bh, acb); + qemu_bh_schedule(acb->bh); + } } static bool bdrv_rw_aligned(BlockDriverState *bs, @@ -4767,8 +4851,27 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs, { Coroutine *co; BlockDriverAIOCBCoroutine *acb; + const AIOCBInfo *aiocb_info; + bool bypass; - acb = qemu_aio_get(&bdrv_em_co_aiocb_info, bs, cb, opaque); + /* + * In longterm, creating of coroutine should be pushed far further + * to make a fast path in cases of unnecessary coroutine usage. + * + * Also when the bypass mechanism is mature, the 'bypass_co' hint + * which is set in device can be moved to block layer so that bypass + * can be enabled automatically. + */ + if (bs->bypass_co && + bdrv_co_can_bypass_co(bs, sector_num, nb_sectors, flags, is_write)) { + aiocb_info = &bdrv_em_co_bypass_aiocb_info; + bypass = true; + } else { + aiocb_info = &bdrv_em_co_aiocb_info; + bypass = false; + } + + acb = qemu_aio_get(aiocb_info, bs, cb, opaque); acb->req.sector = sector_num; acb->req.nb_sectors = nb_sectors; acb->req.qiov = qiov; @@ -4776,8 +4879,14 @@ static BlockDriverAIOCB *bdrv_co_aio_rw_vector(BlockDriverState *bs, acb->is_write = is_write; acb->done = NULL; - co = qemu_coroutine_create(bdrv_co_do_rw); - qemu_coroutine_enter(co, acb); + if (!bypass) { + co = qemu_coroutine_create(bdrv_co_do_rw); + qemu_coroutine_enter(co, acb); + } else { + qemu_coroutine_set_bypass(true); + bdrv_co_do_rw(acb); + qemu_coroutine_set_bypass(false); + } return &acb->common; } @@ -4871,17 +4980,23 @@ void qemu_aio_release(void *p) /**************************************************************/ /* Coroutine block device emulation */ -typedef struct CoroutineIOCompletion { - Coroutine *coroutine; - int ret; -} CoroutineIOCompletion; - static void bdrv_co_io_em_complete(void *opaque, int ret) { CoroutineIOCompletion *co = opaque; - co->ret = ret; - qemu_coroutine_enter(co->coroutine, NULL); + if (!co->bypass) { + co->ret = ret; + qemu_coroutine_enter(co->coroutine, NULL); + } else { + BlockDriverAIOCBCoroutine *acb = bdrv_get_aio_co(co); + + simple_gc_free_all(&co->gc); + + acb->req.error = ret; + acb->bh = aio_bh_new(bdrv_get_aio_context(acb->common.bs), + bdrv_co_em_bh, acb); + qemu_bh_schedule(acb->bh); + } } static int coroutine_fn bdrv_co_io_em(BlockDriverState *bs, int64_t sector_num, @@ -4891,21 +5006,35 @@ static int coroutine_fn bdrv_co_io_em(BlockDriverState *bs, int64_t sector_num, CoroutineIOCompletion co = { .coroutine = qemu_coroutine_self(), }; + CoroutineIOCompletion *pco = &co; BlockDriverAIOCB *acb; + if (qemu_coroutine_bypassed(pco->coroutine)) { + pco = bdrv_get_co_io_comp(qemu_coroutine_get_var()); + pco->bypass = true; + } + if (is_write) { acb = bs->drv->bdrv_aio_writev(bs, sector_num, iov, nb_sectors, - bdrv_co_io_em_complete, &co); + bdrv_co_io_em_complete, pco); } else { acb = bs->drv->bdrv_aio_readv(bs, sector_num, iov, nb_sectors, - bdrv_co_io_em_complete, &co); + bdrv_co_io_em_complete, pco); } trace_bdrv_co_io_em(bs, sector_num, nb_sectors, is_write, acb); if (!acb) { + /* + * no completion callback for failure case, let bdrv_co_do_rw + * handle completion. + */ + pco->bypass = false; return -EIO; } - qemu_coroutine_yield(); + + if (!pco->bypass) { + qemu_coroutine_yield(); + } return co.ret; } -- 1.7.9.5