On 06/25/2015 08:12 AM, Stefan Hajnoczi wrote: > Sometimes block jobs must execute as a transaction group. Finishing > jobs wait until all other jobs are ready to complete successfully. > Failure or cancellation of one job cancels the other jobs in the group. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > --- > blockjob.c | 160 > ++++++++++++++++++++++++++++++++++++++++++++++ > include/block/block.h | 1 + > include/block/block_int.h | 3 +- > include/block/blockjob.h | 49 ++++++++++++++ > trace-events | 4 ++ > 5 files changed, 216 insertions(+), 1 deletion(-) > > diff --git a/blockjob.c b/blockjob.c > index ec46fad..3c6f1d4 100644 > --- a/blockjob.c > +++ b/blockjob.c > @@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job, > > qemu_bh_schedule(data->bh); > } > + > +/* Transactional group of block jobs */ > +struct BlockJobTxn { > + /* Jobs may be in different AioContexts so protect all fields */ > + QemuMutex lock; > + > + /* Reference count for txn object */ > + unsigned int ref; > + > + /* Is this txn cancelling its jobs? */ > + bool aborting; > + > + /* Number of jobs still running */ > + unsigned int jobs_pending; > + > + /* List of jobs */ > + QLIST_HEAD(, BlockJob) jobs; > +}; > + > +BlockJobTxn *block_job_txn_new(void) > +{ > + BlockJobTxn *txn = g_new(BlockJobTxn, 1); > + qemu_mutex_init(&txn->lock); > + txn->ref = 1; /* dropped by block_job_txn_begin() */ > + txn->aborting = false; > + txn->jobs_pending = 0; > + QLIST_INIT(&txn->jobs); > + return txn; > +} > + > +static void block_job_txn_unref(BlockJobTxn *txn) > +{ > + qemu_mutex_lock(&txn->lock); > + > + if (--txn->ref > 0) { > + qemu_mutex_unlock(&txn->lock); > + return; > + } > + > + qemu_mutex_unlock(&txn->lock); > + qemu_mutex_destroy(&txn->lock); > + g_free(txn); > +} > + > +/* The purpose of this is to keep txn alive until all jobs have been added */ > +void block_job_txn_begin(BlockJobTxn *txn) > +{ > + block_job_txn_unref(txn); > +} > +
Maybe it's not entirely clear to the caller that "begin" may in fact actually delete the BlockJobTxn. Shall we update the caller's pointer to NULL in this case as a hint? Passing a **txn will imply that we are giving up our ownership of the object. > +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job) > +{ > + if (!txn) { > + return; > + } > + > + assert(!job->txn); > + job->txn = txn; > + > + qemu_mutex_lock(&txn->lock); > + txn->ref++; > + txn->jobs_pending++; > + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); > + qemu_mutex_unlock(&txn->lock); > +} > + > +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of > + * successful completion. Runs from main loop. > + */ > +static void block_job_txn_complete(BlockJob *job, void *opaque) > +{ > + BlockJobTxn *txn = opaque; > + BlockJob *other_job; > + bool aborting = txn->aborting; > + > + qemu_mutex_lock(&txn->lock); > + txn->ref++; /* keep txn alive until the end of this loop */ > + > + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { > + AioContext *ctx; > + > + qemu_mutex_unlock(&txn->lock); > + ctx = bdrv_get_aio_context(other_job->bs); > + aio_context_acquire(ctx); > + > + /* Cancel all other jobs if aborting. Don't cancel our own failed > job > + * since cancellation throws away the error value. > + */ > + if (aborting && other_job != job) { > + block_job_cancel(other_job); > + } else { > + block_job_enter(other_job); > + } > + > + aio_context_release(ctx); > + qemu_mutex_lock(&txn->lock); > + } > + > + qemu_mutex_unlock(&txn->lock); > + block_job_txn_unref(txn); > +} > + Maybe we can name this one along the lines of block_job_txn_complete_impl or something clearly internal, so that we can name the public interface simply "block_job_txn_complete." Maybe I'm just bike shedding, but calling only a "prepare to X" function without a matching "X" call in the exposed API seems odd. I suppose it doesn't matter, because I can't think of anything nicer :) > +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, > + BlockJob *job, > + int ret) > +{ > + if (!txn) { > + return; > + } > + > + qemu_mutex_lock(&txn->lock); > + > + /* This function is entered in 3 cases: > + * > + * 1. Successful job completion - wait for other jobs > + * 2. First failed/cancelled job in txn - cancel other jobs and wait > + * 3. Subsequent cancelled jobs - finish immediately, don't wait > + */ > + trace_block_job_txn_prepare_to_complete_entry(txn, job, ret, > + > block_job_is_cancelled(job), > + txn->aborting, > + txn->jobs_pending); > + > + if (txn->aborting) { /* Case 3 */ > + assert(block_job_is_cancelled(job)); > + goto out; /* already cancelled, don't yield */ > + } > + So the first failure forces all jobs not-yet-complete to cancel. Is there any chance for a race condition of two jobs completing almost simultaneously, where the first fails and the second completes, and the 2nd job makes it here before it gets canceled? BOD: I really assume the answer is "no," but it's not immediately evident to me. > + if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */ > +abort: > + txn->aborting = true; > + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); > + } else { /* Case 1 */ > + if (--txn->jobs_pending == 0) { > + block_job_defer_to_main_loop(job, block_job_txn_complete, txn); > + } > + } > + > + /* Wait for block_job_txn_complete() */ > + do { > + qemu_mutex_unlock(&txn->lock); > + job->busy = false; > + qemu_coroutine_yield(); > + job->busy = true; > + qemu_mutex_lock(&txn->lock); > + > + if (block_job_is_cancelled(job) && !txn->aborting) { > + goto abort; /* this job just got cancelled by the user */ > + } > + } while (!txn->aborting && txn->jobs_pending > 0); > + > +out: > + trace_block_job_txn_prepare_to_complete_return(txn, job, ret, > + > block_job_is_cancelled(job), > + txn->aborting, > + txn->jobs_pending); > + > + qemu_mutex_unlock(&txn->lock); > + block_job_txn_unref(txn); > +} > diff --git a/include/block/block.h b/include/block/block.h > index a4c505d..cb19c73 100644 > --- a/include/block/block.h > +++ b/include/block/block.h > @@ -13,6 +13,7 @@ > typedef struct BlockDriver BlockDriver; > typedef struct BlockJob BlockJob; > typedef struct BdrvChildRole BdrvChildRole; > +typedef struct BlockJobTxn BlockJobTxn; > > typedef struct BlockDriverInfo { > /* in bytes, 0 if irrelevant */ > diff --git a/include/block/block_int.h b/include/block/block_int.h > index ea3e7f0..812a18a 100644 > --- a/include/block/block_int.h > +++ b/include/block/block_int.h > @@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState > *target, > * @on_source_error: The action to take upon error reading from the source. > * @on_target_error: The action to take upon error writing to the target. > * @cb: Completion function for the job. > + * @txn: Transaction that this job is part of (may be NULL). > * @opaque: Opaque pointer value passed to @cb. > * > * Start a backup operation on @bs. Clusters in @bs are written to @target > @@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState > *target, > BlockdevOnError on_source_error, > BlockdevOnError on_target_error, > BlockCompletionFunc *cb, void *opaque, > - Error **errp); > + BlockJobTxn *txn, Error **errp); > > void blk_dev_change_media_cb(BlockBackend *blk, bool load); > bool blk_dev_has_removable_media(BlockBackend *blk); > diff --git a/include/block/blockjob.h b/include/block/blockjob.h > index 57d8ef1..ce57e98 100644 > --- a/include/block/blockjob.h > +++ b/include/block/blockjob.h > @@ -122,6 +122,10 @@ struct BlockJob { > > /** The opaque value that is passed to the completion function. */ > void *opaque; > + > + /** Non-NULL if this job is part of a transaction */ > + BlockJobTxn *txn; > + QLIST_ENTRY(BlockJob) txn_list; > }; > > /** > @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job, > BlockJobDeferToMainLoopFn *fn, > void *opaque); > > +/** > + * block_job_txn_new: > + * > + * Allocate and return a new block job transaction. Jobs can be added to the > + * transaction using block_job_txn_add_job(). block_job_txn_begin() must be > + * called when all jobs (if any) have been added. > + * > + * All jobs in the transaction either complete successfully or fail/cancel > as a > + * group. Jobs wait for each other before completing. Cancelling one job > + * cancels all jobs in the transaction. > + */ > +BlockJobTxn *block_job_txn_new(void); > + > +/** > + * block_job_txn_add_job: > + * @txn: The transaction > + * @job: Job to add to the transaction > + * > + * Add @job to the transaction. The @job must not already be in a > transaction. > + * The block job driver must call block_job_txn_prepare_to_complete() before > + * final cleanup and completion. > + */ > +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job); > + > +/** > + * block_job_txn_begin: > + * @txn: The transaction > + * > + * Call this to mark the end of adding jobs to the transaction. This must be > + * called even if no jobs were added. > + */ > +void block_job_txn_begin(BlockJobTxn *txn); > + > +/** > + * block_job_txn_prepare_to_complete: > + * @txn: The transaction > + * @job: The block job > + * @ret: Block job return value (0 for success, otherwise job failure) > + * > + * Wait for other jobs in the transaction to complete. If @ret is non-zero > or > + * @job is cancelled, all other jobs in the transaction will be cancelled. > + */ > +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn, > + BlockJob *job, int ret); > + > #endif > diff --git a/trace-events b/trace-events > index 52b7efa..b6a43a0 100644 > --- a/trace-events > +++ b/trace-events > @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p" > virtio_blk_data_plane_stop(void *s) "dataplane %p" > virtio_blk_data_plane_process_request(void *s, unsigned int out_num, > unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u > head %u" > > +# blockjob.c > +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool > cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d > cancelled %d aborting %d jobs_pending %u" > +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool > cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d > cancelled %d aborting %d jobs_pending %u" > + > # hw/virtio/dataplane/vring.c > vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring > physical %#"PRIx64" desc %p avail %p used %p" > > Bike-shedding comments and a benefit-of-the-doubt aside; Reviewed-by: John Snow <js...@redhat.com>