On 06/25/2015 08:12 AM, Stefan Hajnoczi wrote:
> Sometimes block jobs must execute as a transaction group.  Finishing
> jobs wait until all other jobs are ready to complete successfully.
> Failure or cancellation of one job cancels the other jobs in the group.
> 
> Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
> ---
>  blockjob.c                | 160 
> ++++++++++++++++++++++++++++++++++++++++++++++
>  include/block/block.h     |   1 +
>  include/block/block_int.h |   3 +-
>  include/block/blockjob.h  |  49 ++++++++++++++
>  trace-events              |   4 ++
>  5 files changed, 216 insertions(+), 1 deletion(-)
> 
> diff --git a/blockjob.c b/blockjob.c
> index ec46fad..3c6f1d4 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
>  
>      qemu_bh_schedule(data->bh);
>  }
> +
> +/* Transactional group of block jobs */
> +struct BlockJobTxn {
> +    /* Jobs may be in different AioContexts so protect all fields */
> +    QemuMutex lock;
> +
> +    /* Reference count for txn object */
> +    unsigned int ref;
> +
> +    /* Is this txn cancelling its jobs? */
> +    bool aborting;
> +
> +    /* Number of jobs still running */
> +    unsigned int jobs_pending;
> +
> +    /* List of jobs */
> +    QLIST_HEAD(, BlockJob) jobs;
> +};
> +
> +BlockJobTxn *block_job_txn_new(void)
> +{
> +    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
> +    qemu_mutex_init(&txn->lock);
> +    txn->ref = 1; /* dropped by block_job_txn_begin() */
> +    txn->aborting = false;
> +    txn->jobs_pending = 0;
> +    QLIST_INIT(&txn->jobs);
> +    return txn;
> +}
> +
> +static void block_job_txn_unref(BlockJobTxn *txn)
> +{
> +    qemu_mutex_lock(&txn->lock);
> +
> +    if (--txn->ref > 0) {
> +        qemu_mutex_unlock(&txn->lock);
> +        return;
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    qemu_mutex_destroy(&txn->lock);
> +    g_free(txn);
> +}
> +
> +/* The purpose of this is to keep txn alive until all jobs have been added */
> +void block_job_txn_begin(BlockJobTxn *txn)
> +{
> +    block_job_txn_unref(txn);
> +}
> +

Maybe it's not entirely clear to the caller that "begin" may in fact
actually delete the BlockJobTxn. Shall we update the caller's pointer to
NULL in this case as a hint? Passing a **txn will imply that we are
giving up our ownership of the object.

> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    assert(!job->txn);
> +    job->txn = txn;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++;
> +    txn->jobs_pending++;
> +    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
> +    qemu_mutex_unlock(&txn->lock);
> +}
> +
> +/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
> + * successful completion.  Runs from main loop.
> + */
> +static void block_job_txn_complete(BlockJob *job, void *opaque)
> +{
> +    BlockJobTxn *txn = opaque;
> +    BlockJob *other_job;
> +    bool aborting = txn->aborting;
> +
> +    qemu_mutex_lock(&txn->lock);
> +    txn->ref++; /* keep txn alive until the end of this loop */
> +
> +    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
> +        AioContext *ctx;
> +
> +        qemu_mutex_unlock(&txn->lock);
> +        ctx = bdrv_get_aio_context(other_job->bs);
> +        aio_context_acquire(ctx);
> +
> +        /* Cancel all other jobs if aborting.  Don't cancel our own failed 
> job
> +         * since cancellation throws away the error value.
> +         */
> +        if (aborting && other_job != job) {
> +            block_job_cancel(other_job);
> +        } else {
> +            block_job_enter(other_job);
> +        }
> +
> +        aio_context_release(ctx);
> +        qemu_mutex_lock(&txn->lock);
> +    }
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> +

Maybe we can name this one along the lines of
block_job_txn_complete_impl or something clearly internal, so that we
can name the public interface simply "block_job_txn_complete."

Maybe I'm just bike shedding, but calling only a "prepare to X" function
without a matching "X" call in the exposed API seems odd.

I suppose it doesn't matter, because I can't think of anything nicer :)

> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job,
> +                                                    int ret)
> +{
> +    if (!txn) {
> +        return;
> +    }
> +
> +    qemu_mutex_lock(&txn->lock);
> +
> +    /* This function is entered in 3 cases:
> +     *
> +     * 1. Successful job completion - wait for other jobs
> +     * 2. First failed/cancelled job in txn - cancel other jobs and wait
> +     * 3. Subsequent cancelled jobs - finish immediately, don't wait
> +     */
> +    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
> +                                                  
> block_job_is_cancelled(job),
> +                                                  txn->aborting,
> +                                                  txn->jobs_pending);
> +
> +    if (txn->aborting) { /* Case 3 */
> +        assert(block_job_is_cancelled(job));
> +        goto out; /* already cancelled, don't yield */
> +    }
> +

So the first failure forces all jobs not-yet-complete to cancel. Is
there any chance for a race condition of two jobs completing almost
simultaneously, where the first fails and the second completes, and the
2nd job makes it here before it gets canceled?

BOD: I really assume the answer is "no," but it's not immediately
evident to me.

> +    if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
> +abort:
> +        txn->aborting = true;
> +        block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +    } else { /* Case 1 */
> +        if (--txn->jobs_pending == 0) {
> +            block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
> +        }
> +    }
> +
> +    /* Wait for block_job_txn_complete() */
> +    do {
> +        qemu_mutex_unlock(&txn->lock);
> +        job->busy = false;
> +        qemu_coroutine_yield();
> +        job->busy = true;
> +        qemu_mutex_lock(&txn->lock);
> +
> +        if (block_job_is_cancelled(job) && !txn->aborting) {
> +            goto abort; /* this job just got cancelled by the user */
> +        }
> +    } while (!txn->aborting && txn->jobs_pending > 0);
> +
> +out:
> +    trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
> +                                                   
> block_job_is_cancelled(job),
> +                                                   txn->aborting,
> +                                                   txn->jobs_pending);
> +
> +    qemu_mutex_unlock(&txn->lock);
> +    block_job_txn_unref(txn);
> +}
> diff --git a/include/block/block.h b/include/block/block.h
> index a4c505d..cb19c73 100644
> --- a/include/block/block.h
> +++ b/include/block/block.h
> @@ -13,6 +13,7 @@
>  typedef struct BlockDriver BlockDriver;
>  typedef struct BlockJob BlockJob;
>  typedef struct BdrvChildRole BdrvChildRole;
> +typedef struct BlockJobTxn BlockJobTxn;
>  
>  typedef struct BlockDriverInfo {
>      /* in bytes, 0 if irrelevant */
> diff --git a/include/block/block_int.h b/include/block/block_int.h
> index ea3e7f0..812a18a 100644
> --- a/include/block/block_int.h
> +++ b/include/block/block_int.h
> @@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState 
> *target,
>   * @on_source_error: The action to take upon error reading from the source.
>   * @on_target_error: The action to take upon error writing to the target.
>   * @cb: Completion function for the job.
> + * @txn: Transaction that this job is part of (may be NULL).
>   * @opaque: Opaque pointer value passed to @cb.
>   *
>   * Start a backup operation on @bs.  Clusters in @bs are written to @target
> @@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState 
> *target,
>                    BlockdevOnError on_source_error,
>                    BlockdevOnError on_target_error,
>                    BlockCompletionFunc *cb, void *opaque,
> -                  Error **errp);
> +                  BlockJobTxn *txn, Error **errp);
>  
>  void blk_dev_change_media_cb(BlockBackend *blk, bool load);
>  bool blk_dev_has_removable_media(BlockBackend *blk);
> diff --git a/include/block/blockjob.h b/include/block/blockjob.h
> index 57d8ef1..ce57e98 100644
> --- a/include/block/blockjob.h
> +++ b/include/block/blockjob.h
> @@ -122,6 +122,10 @@ struct BlockJob {
>  
>      /** The opaque value that is passed to the completion function.  */
>      void *opaque;
> +
> +    /** Non-NULL if this job is part of a transaction */
> +    BlockJobTxn *txn;
> +    QLIST_ENTRY(BlockJob) txn_list;
>  };
>  
>  /**
> @@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
>                                    BlockJobDeferToMainLoopFn *fn,
>                                    void *opaque);
>  
> +/**
> + * block_job_txn_new:
> + *
> + * Allocate and return a new block job transaction.  Jobs can be added to the
> + * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
> + * called when all jobs (if any) have been added.
> + *
> + * All jobs in the transaction either complete successfully or fail/cancel 
> as a
> + * group.  Jobs wait for each other before completing.  Cancelling one job
> + * cancels all jobs in the transaction.
> + */
> +BlockJobTxn *block_job_txn_new(void);
> +
> +/**
> + * block_job_txn_add_job:
> + * @txn: The transaction
> + * @job: Job to add to the transaction
> + *
> + * Add @job to the transaction.  The @job must not already be in a 
> transaction.
> + * The block job driver must call block_job_txn_prepare_to_complete() before
> + * final cleanup and completion.
> + */
> +void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
> +
> +/**
> + * block_job_txn_begin:
> + * @txn: The transaction
> + *
> + * Call this to mark the end of adding jobs to the transaction.  This must be
> + * called even if no jobs were added.
> + */
> +void block_job_txn_begin(BlockJobTxn *txn);
> +
> +/**
> + * block_job_txn_prepare_to_complete:
> + * @txn: The transaction
> + * @job: The block job
> + * @ret: Block job return value (0 for success, otherwise job failure)
> + *
> + * Wait for other jobs in the transaction to complete.  If @ret is non-zero 
> or
> + * @job is cancelled, all other jobs in the transaction will be cancelled.
> + */
> +void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
> +                                                    BlockJob *job, int ret);
> +
>  #endif
> diff --git a/trace-events b/trace-events
> index 52b7efa..b6a43a0 100644
> --- a/trace-events
> +++ b/trace-events
> @@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
>  virtio_blk_data_plane_stop(void *s) "dataplane %p"
>  virtio_blk_data_plane_process_request(void *s, unsigned int out_num, 
> unsigned int in_num, unsigned int head) "dataplane %p out_num %u in_num %u 
> head %u"
>  
> +# blockjob.c
> +block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool 
> cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d 
> cancelled %d aborting %d jobs_pending %u"
> +block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool 
> cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d 
> cancelled %d aborting %d jobs_pending %u"
> +
>  # hw/virtio/dataplane/vring.c
>  vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring 
> physical %#"PRIx64" desc %p avail %p used %p"
>  
> 

Bike-shedding comments and a benefit-of-the-doubt aside;

Reviewed-by: John Snow <js...@redhat.com>

Reply via email to