For various purposes, BDS users call bdrv_drain or bdrv_drain_all to make sure there are no pending requests duringA a series of operations on the BDS. But in the middle of operations, the caller may 1) yield from a coroutine (mirror_run); 2) defer the next part of work to a BH (mirror_run); 3) call nested aio_poll (qmp_transaction); etc..
This lock/unlock API is introduced to help assure above complications won't spoil the purpose of the bdrv_drain(): bdrv_lock should help quiesce other readers and writers in the beginning of such operations, and bdrv_unlock should resume the blocked requests. A notifier list is added to allow devices to cooperate with the lock and pause themselves, for example, by not processing more requests on the NBD export. Signed-off-by: Fam Zheng <f...@redhat.com> --- block.c | 10 +++++++ block/io.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++ include/block/block.h | 39 +++++++++++++++++++++++++++ include/block/block_int.h | 5 ++++ 4 files changed, 123 insertions(+) diff --git a/block.c b/block.c index e9f31b7..abda2f7 100644 --- a/block.c +++ b/block.c @@ -252,8 +252,10 @@ BlockDriverState *bdrv_new(void) bdrv_iostatus_disable(bs); notifier_list_init(&bs->close_notifiers); notifier_with_return_list_init(&bs->before_write_notifiers); + notifier_list_init(&bs->lock_notifiers); qemu_co_queue_init(&bs->throttled_reqs[0]); qemu_co_queue_init(&bs->throttled_reqs[1]); + qemu_co_queue_init(&bs->lock_queue); bs->refcnt = 1; bs->aio_context = qemu_get_aio_context(); @@ -1716,6 +1718,7 @@ void bdrv_close(BlockDriverState *bs) { BdrvAioNotifier *ban, *ban_next; + assert(!bdrv_is_locked(bs)); if (bs->job) { block_job_cancel_sync(bs->job); } @@ -1846,12 +1849,19 @@ static void bdrv_move_feature_fields(BlockDriverState *bs_dest, /* job */ bs_dest->job = bs_src->job; + /* lock */ + bs_dest->lock_owner = bs_src->lock_owner; + bs_dest->lock_level = bs_src->lock_level; + bs_dest->lock_queue = bs_src->lock_queue; + bs_dest->lock_notifiers = bs_src->lock_notifiers; + /* keep the same entry in bdrv_states */ bs_dest->device_list = bs_src->device_list; bs_dest->blk = bs_src->blk; memcpy(bs_dest->op_blockers, bs_src->op_blockers, sizeof(bs_dest->op_blockers)); + } /* diff --git a/block/io.c b/block/io.c index e394d92..9aa4b71 100644 --- a/block/io.c +++ b/block/io.c @@ -2601,3 +2601,72 @@ void bdrv_flush_io_queue(BlockDriverState *bs) bdrv_flush_io_queue(bs->file); } } + +static void bdrv_lock_notify(BlockDriverState *bs, bool locking) +{ + BdrvLockEvent event = (BdrvLockEvent) { + .bs = bs, + .locking = locking, + }; + notifier_list_notify(&bs->lock_notifiers, &event); +} + +void bdrv_lock(BlockDriverState *bs) +{ + Coroutine *self = qemu_coroutine_self(); + bool notify = true; + + /* + * XXX: eventually we only allow coroutine callers. For now, let's allow + * the exceptional non-coroutine callers to serialize by themselves, e.g. + * with BQL. + */ + assert(qemu_in_coroutine() + || self == bs->lock_owner || bs->lock_level == 0); + + if (bs->lock_level) { + if (self == bs->lock_owner) { + bs->lock_level++; + return; + } else { + qemu_co_queue_wait(&bs->lock_queue); + notify = false; + } + } + assert(bs->lock_level == 0); + + if (notify) { + bdrv_lock_notify(bs, true); + } + bs->lock_level++; + bs->lock_owner = self; + + bdrv_drain(bs); +} + +void bdrv_unlock(BlockDriverState *bs) +{ + assert(bs->lock_level > 0); + if (!--bs->lock_level) { + if (!qemu_co_queue_empty(&bs->lock_queue)) { + /* + * XXX: do we need a BH to run lock_queue? + * If so, be careful of bdrv_set_aio_context(). + **/ + qemu_co_queue_next(&bs->lock_queue); + } else { + bdrv_lock_notify(bs, false); + } + } +} + +bool bdrv_is_locked(BlockDriverState *bs) +{ + assert((bs->lock_level == 0) == qemu_co_queue_empty(&bs->lock_queue)); + return !!bs->lock_level; +} + +void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier) +{ + notifier_list_add(&bs->lock_notifiers, notifier); +} diff --git a/include/block/block.h b/include/block/block.h index c1c963e..068b01e 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -591,6 +591,45 @@ void bdrv_io_plug(BlockDriverState *bs); void bdrv_io_unplug(BlockDriverState *bs); void bdrv_flush_io_queue(BlockDriverState *bs); +/** + * bdrv_lock: + * + * Begin a temporary exclusive accessing by locking the BDS. + * + * This lock is recursive: if bs is unlocked, the caller context will acquire + * the lock; otherwise if the caller is in the same coroutine context that + * already holds the lock, it will only add a recurse level; otherwise, this + * function will block until the lock is released by the other owner. + */ +void bdrv_lock(BlockDriverState *bs); + +/** + * bdrv_lock: + * + * Reduce the recurse level, or if it's the outermost unlock, release the lock. + */ +void bdrv_unlock(BlockDriverState *bs); + +/** + * bdrv_is_locked: + * + * Return if the bs is locked. + */ +bool bdrv_is_locked(BlockDriverState *bs); + +typedef struct { + BlockDriverState *bs; + bool locking; +} BdrvLockEvent; + +/** + * bdrv_add_lock_unlock_notifier: + * + * Add a notifier that will get notified when bs is locked or unlocked, with a + * BdrvLockEvent data. + */ +void bdrv_add_lock_unlock_notifier(BlockDriverState *bs, Notifier *notifier); + BlockAcctStats *bdrv_get_stats(BlockDriverState *bs); #endif diff --git a/include/block/block_int.h b/include/block/block_int.h index f004378..a742fea 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -433,6 +433,11 @@ struct BlockDriverState { /* threshold limit for writes, in bytes. "High water mark". */ uint64_t write_threshold_offset; NotifierWithReturn write_threshold_notifier; + + Coroutine *lock_owner; + int lock_level; + CoQueue lock_queue; + NotifierList lock_notifiers; }; -- 2.4.2