Sometimes block jobs must execute as a transaction group.  Finishing
jobs wait until all other jobs are ready to complete successfully.
Failure or cancellation of one job cancels the other jobs in the group.

Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com>
---
 blockjob.c                | 160 ++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h     |   1 +
 include/block/block_int.h |   3 +-
 include/block/blockjob.h  |  49 ++++++++++++++
 trace-events              |   4 ++
 5 files changed, 216 insertions(+), 1 deletion(-)

diff --git a/blockjob.c b/blockjob.c
index ec46fad..3c6f1d4 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -400,3 +400,163 @@ void block_job_defer_to_main_loop(BlockJob *job,
 
     qemu_bh_schedule(data->bh);
 }
+
+/* Transactional group of block jobs */
+struct BlockJobTxn {
+    /* Jobs may be in different AioContexts so protect all fields */
+    QemuMutex lock;
+
+    /* Reference count for txn object */
+    unsigned int ref;
+
+    /* Is this txn cancelling its jobs? */
+    bool aborting;
+
+    /* Number of jobs still running */
+    unsigned int jobs_pending;
+
+    /* List of jobs */
+    QLIST_HEAD(, BlockJob) jobs;
+};
+
+BlockJobTxn *block_job_txn_new(void)
+{
+    BlockJobTxn *txn = g_new(BlockJobTxn, 1);
+    qemu_mutex_init(&txn->lock);
+    txn->ref = 1; /* dropped by block_job_txn_begin() */
+    txn->aborting = false;
+    txn->jobs_pending = 0;
+    QLIST_INIT(&txn->jobs);
+    return txn;
+}
+
+static void block_job_txn_unref(BlockJobTxn *txn)
+{
+    qemu_mutex_lock(&txn->lock);
+
+    if (--txn->ref > 0) {
+        qemu_mutex_unlock(&txn->lock);
+        return;
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    qemu_mutex_destroy(&txn->lock);
+    g_free(txn);
+}
+
+/* The purpose of this is to keep txn alive until all jobs have been added */
+void block_job_txn_begin(BlockJobTxn *txn)
+{
+    block_job_txn_unref(txn);
+}
+
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
+{
+    if (!txn) {
+        return;
+    }
+
+    assert(!job->txn);
+    job->txn = txn;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++;
+    txn->jobs_pending++;
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
+    qemu_mutex_unlock(&txn->lock);
+}
+
+/* Cancel all other jobs in case of abort, wake all waiting jobs in case of
+ * successful completion.  Runs from main loop.
+ */
+static void block_job_txn_complete(BlockJob *job, void *opaque)
+{
+    BlockJobTxn *txn = opaque;
+    BlockJob *other_job;
+    bool aborting = txn->aborting;
+
+    qemu_mutex_lock(&txn->lock);
+    txn->ref++; /* keep txn alive until the end of this loop */
+
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
+        AioContext *ctx;
+
+        qemu_mutex_unlock(&txn->lock);
+        ctx = bdrv_get_aio_context(other_job->bs);
+        aio_context_acquire(ctx);
+
+        /* Cancel all other jobs if aborting.  Don't cancel our own failed job
+         * since cancellation throws away the error value.
+         */
+        if (aborting && other_job != job) {
+            block_job_cancel(other_job);
+        } else {
+            block_job_enter(other_job);
+        }
+
+        aio_context_release(ctx);
+        qemu_mutex_lock(&txn->lock);
+    }
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
+
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+                                                    BlockJob *job,
+                                                    int ret)
+{
+    if (!txn) {
+        return;
+    }
+
+    qemu_mutex_lock(&txn->lock);
+
+    /* This function is entered in 3 cases:
+     *
+     * 1. Successful job completion - wait for other jobs
+     * 2. First failed/cancelled job in txn - cancel other jobs and wait
+     * 3. Subsequent cancelled jobs - finish immediately, don't wait
+     */
+    trace_block_job_txn_prepare_to_complete_entry(txn, job, ret,
+                                                  block_job_is_cancelled(job),
+                                                  txn->aborting,
+                                                  txn->jobs_pending);
+
+    if (txn->aborting) { /* Case 3 */
+        assert(block_job_is_cancelled(job));
+        goto out; /* already cancelled, don't yield */
+    }
+
+    if (ret != 0 || block_job_is_cancelled(job)) { /* Case 2 */
+abort:
+        txn->aborting = true;
+        block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+    } else { /* Case 1 */
+        if (--txn->jobs_pending == 0) {
+            block_job_defer_to_main_loop(job, block_job_txn_complete, txn);
+        }
+    }
+
+    /* Wait for block_job_txn_complete() */
+    do {
+        qemu_mutex_unlock(&txn->lock);
+        job->busy = false;
+        qemu_coroutine_yield();
+        job->busy = true;
+        qemu_mutex_lock(&txn->lock);
+
+        if (block_job_is_cancelled(job) && !txn->aborting) {
+            goto abort; /* this job just got cancelled by the user */
+        }
+    } while (!txn->aborting && txn->jobs_pending > 0);
+
+out:
+    trace_block_job_txn_prepare_to_complete_return(txn, job, ret,
+                                                   block_job_is_cancelled(job),
+                                                   txn->aborting,
+                                                   txn->jobs_pending);
+
+    qemu_mutex_unlock(&txn->lock);
+    block_job_txn_unref(txn);
+}
diff --git a/include/block/block.h b/include/block/block.h
index a4c505d..cb19c73 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -13,6 +13,7 @@
 typedef struct BlockDriver BlockDriver;
 typedef struct BlockJob BlockJob;
 typedef struct BdrvChildRole BdrvChildRole;
+typedef struct BlockJobTxn BlockJobTxn;
 
 typedef struct BlockDriverInfo {
     /* in bytes, 0 if irrelevant */
diff --git a/include/block/block_int.h b/include/block/block_int.h
index ea3e7f0..812a18a 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -639,6 +639,7 @@ void mirror_start(BlockDriverState *bs, BlockDriverState 
*target,
  * @on_source_error: The action to take upon error reading from the source.
  * @on_target_error: The action to take upon error writing to the target.
  * @cb: Completion function for the job.
+ * @txn: Transaction that this job is part of (may be NULL).
  * @opaque: Opaque pointer value passed to @cb.
  *
  * Start a backup operation on @bs.  Clusters in @bs are written to @target
@@ -650,7 +651,7 @@ void backup_start(BlockDriverState *bs, BlockDriverState 
*target,
                   BlockdevOnError on_source_error,
                   BlockdevOnError on_target_error,
                   BlockCompletionFunc *cb, void *opaque,
-                  Error **errp);
+                  BlockJobTxn *txn, Error **errp);
 
 void blk_dev_change_media_cb(BlockBackend *blk, bool load);
 bool blk_dev_has_removable_media(BlockBackend *blk);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 57d8ef1..ce57e98 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -122,6 +122,10 @@ struct BlockJob {
 
     /** The opaque value that is passed to the completion function.  */
     void *opaque;
+
+    /** Non-NULL if this job is part of a transaction */
+    BlockJobTxn *txn;
+    QLIST_ENTRY(BlockJob) txn_list;
 };
 
 /**
@@ -348,4 +352,49 @@ void block_job_defer_to_main_loop(BlockJob *job,
                                   BlockJobDeferToMainLoopFn *fn,
                                   void *opaque);
 
+/**
+ * block_job_txn_new:
+ *
+ * Allocate and return a new block job transaction.  Jobs can be added to the
+ * transaction using block_job_txn_add_job().  block_job_txn_begin() must be
+ * called when all jobs (if any) have been added.
+ *
+ * All jobs in the transaction either complete successfully or fail/cancel as a
+ * group.  Jobs wait for each other before completing.  Cancelling one job
+ * cancels all jobs in the transaction.
+ */
+BlockJobTxn *block_job_txn_new(void);
+
+/**
+ * block_job_txn_add_job:
+ * @txn: The transaction
+ * @job: Job to add to the transaction
+ *
+ * Add @job to the transaction.  The @job must not already be in a transaction.
+ * The block job driver must call block_job_txn_prepare_to_complete() before
+ * final cleanup and completion.
+ */
+void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job);
+
+/**
+ * block_job_txn_begin:
+ * @txn: The transaction
+ *
+ * Call this to mark the end of adding jobs to the transaction.  This must be
+ * called even if no jobs were added.
+ */
+void block_job_txn_begin(BlockJobTxn *txn);
+
+/**
+ * block_job_txn_prepare_to_complete:
+ * @txn: The transaction
+ * @job: The block job
+ * @ret: Block job return value (0 for success, otherwise job failure)
+ *
+ * Wait for other jobs in the transaction to complete.  If @ret is non-zero or
+ * @job is cancelled, all other jobs in the transaction will be cancelled.
+ */
+void coroutine_fn block_job_txn_prepare_to_complete(BlockJobTxn *txn,
+                                                    BlockJob *job, int ret);
+
 #endif
diff --git a/trace-events b/trace-events
index 52b7efa..b6a43a0 100644
--- a/trace-events
+++ b/trace-events
@@ -123,6 +123,10 @@ virtio_blk_data_plane_start(void *s) "dataplane %p"
 virtio_blk_data_plane_stop(void *s) "dataplane %p"
 virtio_blk_data_plane_process_request(void *s, unsigned int out_num, unsigned 
int in_num, unsigned int head) "dataplane %p out_num %u in_num %u head %u"
 
+# blockjob.c
+block_job_txn_prepare_to_complete_entry(void *txn, void *job, int ret, bool 
cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d 
cancelled %d aborting %d jobs_pending %u"
+block_job_txn_prepare_to_complete_return(void *txn, void *job, int ret, bool 
cancelled, bool aborting, unsigned int jobs_pending) "txn %p job %p ret %d 
cancelled %d aborting %d jobs_pending %u"
+
 # hw/virtio/dataplane/vring.c
 vring_setup(uint64_t physical, void *desc, void *avail, void *used) "vring 
physical %#"PRIx64" desc %p avail %p used %p"
 
-- 
2.4.3


Reply via email to