Sometimes block jobs must execute as a transaction group. Finishing jobs wait until all other jobs are ready to complete successfully. Failure or cancellation of one job cancels the other jobs in the group.
Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> --- blockjob.c | 160 ++++++++++++++++++++++++++++++++++++++++++++++ include/block/block.h | 1 + include/block/block_int.h | 3 +- include/block/blockjob.h | 49 ++++++++++++++ trace-events | 4 ++ 5 files changed, 216 insertions(+), 1 deletion(-) diff --git a/blockjob.c b/blockjob.c index 2755465..ff622f5 100644 --- a/blockjob.c +++ b/blockjob.c @@ -399,3 +399,163 @@ void block_job_defer_to_main_loop(BlockJob *job, qemu_bh_schedule(data->bh); } + +/* Transactional group of block jobs */ +struct BlockJobTxn { + /* Jobs may be in different AioContexts so protect all fields */ + QemuMutex lock; + + /* Reference count for txn object */ + unsigned int ref; + + /* Is this txn cancelling its jobs? */ + bool aborting; + + /* Number of jobs still running */ + unsigned int jobs_pending; + + /* List of jobs */ + QLIST_HEAD(, BlockJob) jobs; +}; + +BlockJobTxn *block_job_txn_new(void) +{ + BlockJobTxn *txn = g_new(BlockJobTxn, 1); + qemu_mutex_init(&txn->lock); + txn->ref = 1; /* dropped by block_job_txn_begin() */ + txn->aborting = false; + txn->jobs_pending = 0; + QLIST_INIT(&txn->jobs); + return txn; +} + +static void block_job_txn_unref(BlockJobTxn *txn) +{ + qemu_mutex_lock(&txn->lock); + + if (--txn->ref > 0) { + qemu_mutex_unlock(&txn->lock); + return; + } + + qemu_mutex_unlock(&txn->lock); + qemu_mutex_destroy(&txn->lock); + g_free(txn); +} + +/* The purpose of this is to keep txn alive until all jobs have been added */ +void block_job_txn_begin(BlockJobTxn *txn) +{ + block_job_txn_unref(txn); +} + +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn = txn; + + qemu_mutex_lock(&txn->lock); + txn->ref++; + txn->jobs_pending++; + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + qemu_mutex_unlock(&txn->lock); +} + +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of + * successful completion. Runs from main loop. + */ +static void block_job_txn_complete(BlockJob *job, void *opaque) +{ + BlockJobTxn *txn = opaque; + BlockJob *other_job; + bool aborting = txn->aborting; + + qemu_mutex_lock(&txn->lock); + txn->ref++; /* keep txn alive until the end of this loop */ + + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + AioContext *ctx; + + qemu_mutex_unlock(&txn->lock); + ctx = bdrv_get_aio_context(other_job->bs); + aio_context_acquire(ctx); + + /* Cancel all other jobs if aborting. Don't cancel our own failed job + * since cancellation throws away the error value. + */ + if (aborting && other_job != job) { + block_job_cancel(other_job); + } else { + block_job_enter(other_job); + } + + aio_context_release(ctx); + qemu_mutex_lock(&txn->lock); + } + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); +} + +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, + BlockJob *job, + int ret) +{ + if (!txn) { + return; + } + + qemu_mutex_lock(&txn->lock); + + /* This function is entered in 3 cases: + * + * 1. Successful job completion - wait for other jobs + * 2. First failed/cancelled job in txn - cancel other jobs and wait + * 3. Subsequent cancelled jobs - finish immediately, don't wait + */ + trace_block_job_txn_prepare_to_complete_entry(txn, job, ret, + block_job_is_cancelled(job), + txn->aborting, + txn->jobs_pending); + + if (txn->aborting) { /* Case 3 */ + assert(block_job_is_cancelled(job)); + goto out; /* already cancelled, don't yield */ + } + + if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */ +abort: + txn->aborting = true; + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); + } else { /* Case 1 */ + if (--txn->jobs_pending == 0) { + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); + } + } + + /* Wait for block_job_txn_complete() */ + do { + qemu_mutex_unlock(&txn->lock); + job->busy = false; + qemu_coroutine_yield(); + job->busy = true; + qemu_mutex_lock(&txn->lock); + + if (block_job_is_cancelled(job) && !txn->aborting) { + goto abort; /* this job just got cancelled by the user */ + } + } while (!txn->aborting && txn->jobs_pending > 0); + +out: + trace_block_job_txn_prepare_to_complete_return(txn, job, ret, + block_job_is_cancelled(job), + txn->aborting, + txn->jobs_pending); + + qemu_mutex_unlock(&txn->lock); + block_job_txn_unref(txn); +} diff --git a/include/block/block.h b/include/block/block.h index 3d62d3e..439925c 100644 --- a/include/block/block.h +++ b/include/block/block.h @@ -12,6 +12,7 @@ /* block.c */ typedef struct BlockDriver BlockDriver; typedef struct BlockJob BlockJob; +typedef struct BlockJobTxn BlockJobTxn; typedef struct BlockDriverInfo { /* in bytes, 0 if irrelevant */ diff --git a/include/block/block_int.h b/include/block/block_int.h index 5af712f..9464142 100644 --- a/include/block/block_int.h +++ b/include/block/block_int.h @@ -617,6 +617,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState *target, * @on_source_error: The action to take upon error reading from the source. * @on_target_error: The action to take upon error writing to the target. * @cb: Completion function for the job. + * @txn: Transaction that this job is part of (may be NULL). * @opaque: Opaque pointer value passed to @cb. * * Start a backup operation on @bs. Clusters in @bs are written to @target @@ -628,7 +629,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState *target, BlockdevOnError on_source_error, BlockdevOnError on_target_error, BlockCompletionFunc *cb, void *opaque, - Error **errp); + BlockJobTxn *txn, Error **errp); void blk_dev_change_media_cb(BlockBackend *blk, bool load); bool blk_dev_has_removable_media(BlockBackend *blk); diff --git a/include/block/blockjob.h b/include/block/blockjob.h index 57d8ef1..ce57e98 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -122,6 +122,10 @@ struct BlockJob { /** The opaque value that is passed to the completion function. */ void *opaque; + + /** Non-NULL if this job is part of a transaction */ + BlockJobTxn *txn; + QLIST_ENTRY(BlockJob) txn_list; }; /** @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job, BlockJobDeferToMainLoopFn *fn, void *opaque); +/** + * block_job_txn_new: + * + * Allocate and return a new block job transaction. Jobs can be added to the + * transaction using block_job_txn_add_job(). block_job_txn_begin() must be + * called when all jobs (if any) have been added. + * + * All jobs in the transaction either complete successfully or fail/cancel as a + * group. Jobs wait for each other before completing. Cancelling one job + * cancels all jobs in the transaction. + */ +BlockJobTxn *block_job_txn_new(void); + +/** + * block_job_txn_add_job: + * @txn: The transaction + * @job: Job to add to the transaction + * + * Add @job to the transaction. The @job must not already be in a transaction. + * The block job driver must call block_job_txn_prepare_to_complete() before + * final cleanup and completion. + */ +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job); + +/** + * block_job_txn_begin: + * @txn: The transaction + * + * Call this to mark the end of adding jobs to the transaction. This must be + * called even if no jobs were added. + */ +void block_job_txn_begin(BlockJobTxn *txn); + +/** + * block_job_txn_prepare_to_complete: + * @txn: The transaction + * @job: The block job + * @ret: Block job return value (0 for success, otherwise job failure) + * + * Wait for other jobs in the transaction to complete. If @ret is non-zero or + * @job is cancelled, all other jobs in the transaction will be cancelled. + */ +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, + BlockJob *job, int ret); + #endif diff --git a/trace-events b/trace-events index a589650..526e6ac 100644 --- a/trace-events +++ b/trace-events @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p" virtio_blk_data_plane_stop(void *s) "dataplane %p" virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u" +# blockjob.c +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u" +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d cancelled %d aborting %d jobs_pending %u" + # hw/virtio/dataplane/vring.c vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring physical %#"PRIx64" desc %p avail %p used %p" -- 2.4.2