From: Wen Congyang <we...@cn.fujitsu.com>

It is not finished, but only show how it will be implemented.

Signed-off-by: Wen Congyang <we...@cn.fujitsu.com>
Signed-off-by: Yang Hongyang <yan...@cn.fujitsu.com>
---
 block.c                   |  48 +++++++
 block/blkcolo.c           | 338 ++++++++++++++++++++++++++++++++++++++++++++++
 include/block/block.h     |   6 +
 include/block/block_int.h |  21 +++
 4 files changed, 413 insertions(+)
 create mode 100644 block/blkcolo.c

diff --git a/block.c b/block.c
index 4165d42..82a5283 100644
--- a/block.c
+++ b/block.c
@@ -6086,3 +6086,51 @@ BlockAcctStats *bdrv_get_stats(BlockDriverState *bs)
 {
     return &bs->stats;
 }
+
+int bdrv_prepare_checkpoint(BlockDriverState *bs)
+{
+    BlockDriver *drv = bs->drv;
+    if (drv && drv->bdrv_prepare_checkpoint) {
+        return drv->bdrv_prepare_checkpoint(bs);
+    } else if (bs->file) {
+        return bdrv_prepare_checkpoint(bs->file);
+    }
+
+    return -1;
+}
+
+int bdrv_do_checkpoint(BlockDriverState *bs)
+{
+    BlockDriver *drv = bs->drv;
+    if (drv && drv->bdrv_do_checkpoint) {
+        return drv->bdrv_do_checkpoint(bs);
+    } else if (bs->file) {
+        return bdrv_do_checkpoint(bs->file);
+    }
+
+    return -1;
+}
+
+int64_t bdrv_get_sent_data_size(BlockDriverState *bs)
+{
+    BlockDriver *drv = bs->drv;
+    if (drv && drv->bdrv_get_sent_data_size) {
+        return drv->bdrv_get_sent_data_size(bs);
+    } else if (bs->file) {
+        return bdrv_get_sent_data_size(bs->file);
+    }
+
+    return -1;
+}
+
+int bdrv_stop_replication(BlockDriverState *bs)
+{
+    BlockDriver *drv = bs->drv;
+    if (drv && drv->bdrv_stop_replication) {
+        return drv->bdrv_stop_replication(bs);
+    } else if (bs->file) {
+        return bdrv_stop_replication(bs->file);
+    }
+
+    return -1;
+}
diff --git a/block/blkcolo.c b/block/blkcolo.c
new file mode 100644
index 0000000..57ed4df
--- /dev/null
+++ b/block/blkcolo.c
@@ -0,0 +1,338 @@
+/*
+ * Primary mode functions
+ */
+
+static void coroutine_fn colo_pvm_forward_co(void *opaque)
+{
+    /*
+     * If the list is empty:
+     *   the status is COLO_PVM_CHECKPOINT_NONE, set the
+     *   state to idle and yield.
+     *   the status is COLO_PVM_CHECKPOINT_START, send
+     *   COLO_BLOCK_CHECKPOINT_SEC to the secondary QEMU.
+     * Otherwise, send the write requests to the secondary
+     * QEMU.
+     */
+}
+
+static colo_forward_state *colo_pvm_forward_request(BDRVBlkcoloState *s,
+                                                    int64_t sector_num,
+                                                    int nb_sectors,
+                                                    QEMUIOVector *qiov)
+{
+    /*
+     * Add the write requests to the tail of the list.
+     * Wakeup the coroutine colo_pvm_forward_co() if
+     * it is in idle state.
+     */
+}
+
+static int coroutine_fn colo_pvm_handle_write_request(BlockDriverState *bs,
+                                                      int64_t sector_num,
+                                                      int nb_sectors,
+                                                      QEMUIOVector *qiov)
+{
+    int ret;
+
+    /*
+     * call colo_pvm_forward_request to forward the primary
+     * write requests to the secondary QEMU.
+     */
+
+    ret = bdrv_co_writev(bs->file, sector_num, nb_sectors, qiov);
+
+    /* wait until the write request is forwarded to the secondary QEMU */
+
+    return ret;
+}
+
+static int coroutine_fn colo_pvm_handle_read_request(BlockDriverState *bs,
+                                                     int64_t sector_num,
+                                                     int nb_sectors,
+                                                     QEMUIOVector *qiov)
+{
+    return bdrv_co_readv(bs->file, sector_num, nb_sectors, qiov);
+}
+
+/* It should be called in the migration/checkpoint thread */
+static int colo_pvm_hanlde_checkpoint(BDRVBlkcoloState *s)
+{
+    /*
+     * wait until COLO_BLOCK_CHECKPOINT_SEC is sent to the
+     * secondary QEMU
+     */
+}
+
+/* It should be called in the migration/checkpoint thread */
+static void cancel_pvm_forward(BDRVBlkcoloState *s)
+{
+    /*
+     * Set the state to cancelled, and wait all coroutines
+     * exit.
+     */
+
+    /* switch to unprotected mode */
+}
+
+/*
+ * Secondary mode functions
+ *
+ * All write requests are forwarded to secondary QEMU from primary QEMU.
+ * The secondary QEMU should do the following things:
+ * 1. Receive and handle the forwarded write requests
+ * 2. Buffer the secondary write requests
+ */
+
+static void coroutine_fn colo_svm_handle_pvm_write_req_co(void *opaque)
+{
+    /*
+     * Do the following things:
+     * 1. read the original sector content
+     * 2. write the original sector content into disk buffer
+     *    if the sector content is not buffered
+     * 3. write the request to disk buffer
+     */
+}
+
+static void coroutine_fn colo_svm_handle_pvm_write_reqs_co(void *opaque)
+{
+    /*
+     * If the list is empty, set the state to idle, and yield.
+     * Otherwise, pick the first forwarded primary write requests,
+     * and create a coroutine colo_svm_handle_pvm_write_req_co()
+     * to handle it.
+     */
+}
+
+static void coroutine_fn colo_svm_recv_pvm_write_requests_co(void *opaque)
+{
+    /*
+     * Receive the forwarded primary write requests,
+     * and put it to the tail of the list. Wakeup the
+     * coroutine colo_svm_handle_pvm_write_reqs_co to
+     * handle the write requests if the coroutine is
+     * idle.
+     */
+}
+
+/* It should be called in the migration/checkpoint thread */
+static int svm_wait_recv_completed(BDRVBlkcoloState *s)
+{
+    /* wait until all forwarded write requests are received */
+}
+
+/*
+ * It should be called in the migration/checkpoint thread, and the caller
+ * should be hold io thread lock
+ */
+static int svm_handle_checkpoint(BlockDriverState *bs)
+{
+    /*
+     * wait until all forwarded write requests are written
+     * to the secondary disk, and then clear disk buffer.
+     */
+}
+
+/* It should be called in the migration/checkpoint thread */
+static void cancel_svm_receive(BDRVBlkcoloState *s)
+{
+    /*
+     * Set the state to cancelled, and wait all coroutines
+     * exit.
+     */
+
+    /* switch to unprotected mode */
+}
+
+static int coroutine_fn colo_svm_handle_write_request(BlockDriverState *bs,
+                                                      int64_t sector_num,
+                                                      int nb_sectors,
+                                                      QEMUIOVector *qiov)
+{
+    /*
+     * Write the request to the disk buffer. How to limit the
+     * write speed?
+     */
+}
+
+static int coroutine_fn colo_svm_handle_read_request(BlockDriverState *bs,
+                                                     int64_t sector_num,
+                                                     int nb_sectors,
+                                                     QEMUIOVector *qiov)
+{
+    /*
+     * Read the sector content from secondary disk first. If the sector
+     * content is buffered, use the buffered content.
+     */
+}
+
+/* Unprotected mode functions */
+static int coroutine_fn
+colo_unprotected_handle_write_request(BlockDriverState *bs, int64_t sector_num,
+                                      int nb_sectors, QEMUIOVector *qiov)
+{
+    return bdrv_co_writev(bs->file, sector_num, nb_sectors, qiov);
+}
+
+static int coroutine_fn
+colo_unprotected_handle_read_request(BlockDriverState *bs, int64_t sector_num,
+                                     int nb_sectors, QEMUIOVector *qiov)
+{
+    return bdrv_co_readv(bs->file, sector_num, nb_sectors, qiov);
+}
+
+/* Valid blkcolo filenames look like blkcolo:host:port:/path/to/image */
+static void blkcolo_parse_filename(const char *filename, QDict *options,
+                                   Error **errp)
+{
+}
+
+static int blkcolo_open(BlockDriverState *bs, QDict *options, int flags,
+                        Error **errp)
+{
+    /*
+     * Open the file, don't use BDRV_O_PROTOCOL to ensure that we are above
+     * the real format
+     */
+
+    /*
+     * try to listen host:port. The host is secondary host, so
+     * inet_listen() should return -1 and the errno should be
+     * EADDRNOTAVAIL if the mode is PRIMARY_MODE. But we cannot
+     * get errno after inet_listen() returns.
+     *
+     * TODO: Add a new API like inet_listen() but return -errno?
+     */
+    return 0;
+}
+
+static void blkcolo_close(BlockDriverState *bs)
+{
+
+}
+
+static int64_t blkcolo_getlength(BlockDriverState *bs)
+{
+    return bdrv_getlength(bs->file);
+}
+
+static void blkcolo_refresh_filename(BlockDriverState *bs)
+{
+}
+
+static int blkcolo_co_readv(BlockDriverState *bs, int64_t sector_num,
+                            int nb_sectors, QEMUIOVector *qiov)
+{
+    BDRVBlkcoloState *s = bs->opaque;
+
+    switch (s->mode) {
+    case UNPROTECTED_MODE:
+        return colo_unprotected_handle_read_request(bs, sector_num,
+                                                    nb_sectors, qiov);
+    case PRIMARY_MODE:
+        return colo_pvm_handle_read_request(bs, sector_num, nb_sectors, qiov);
+    case SECONDARY_MODE:
+        return colo_svm_handle_read_request(bs, sector_num, nb_sectors, qiov);
+    default:
+        assert(0);
+        return -1;
+    }
+}
+
+static int blkcolo_co_writev(BlockDriverState *bs, int64_t sector_num,
+                             int nb_sectors, QEMUIOVector *qiov)
+{
+    BDRVBlkcoloState *s = bs->opaque;
+
+    switch (s->mode) {
+    case UNPROTECTED_MODE:
+        return colo_unprotected_handle_write_request(bs, sector_num,
+                                                     nb_sectors, qiov);
+    case PRIMARY_MODE:
+        return colo_pvm_handle_write_request(bs, sector_num, nb_sectors, qiov);
+    case SECONDARY_MODE:
+        return colo_svm_handle_write_request(bs, sector_num, nb_sectors, qiov);
+    default:
+        assert(0);
+        return -1;
+    }
+}
+
+static int blkcolo_prepare_checkpoint(BlockDriverState *bs)
+{
+    BDRVBlkcoloState *s = bs->opaque;
+
+    switch (s->mode) {
+    case SECONDARY_MODE:
+        return svm_wait_recv_completed(s);
+    case UNPROTECTED_MODE:
+    case PRIMARY_MODE:
+    default:
+        assert(0);
+        return -1;
+    }
+}
+
+static int blkcolo_do_checkpoint(BlockDriverState *bs)
+{
+    BDRVBlkcoloState *s = bs->opaque;
+
+    switch (s->mode) {
+    case PRIMARY_MODE:
+        return pvm_hanlde_checkpoint(s);
+    case SECONDARY_MODE:
+        return svm_handle_checkpoint(s);
+    case UNPROTECTED_MODE:
+    default:
+        assert(0);
+        return -1;
+    }
+}
+
+static int64_t blkcolo_sent_data_size(BlockDriverState *bs)
+{
+    return 0;
+}
+
+static int blkcolo_stop_replication(BlockDriverState *bs)
+{
+    BDRVBlkcoloState *s = bs->opaque;
+
+    switch (s->mode) {
+    case PRIMARY_MODE:
+        return cancel_pvm_forward(s);
+    case SECONDARY_MODE:
+        return cancel_svm_receive(s);
+    case UNPROTECTED_MODE:
+    default:
+        assert(0);
+        return -1;
+    }
+}
+
+static BlockDriver bdrv_blkcolo = {
+    .format_name = "blkcolo",
+    .protocol_name = "blkcolo",
+    .instance_size = sizeof(BDRVBlkcoloState),
+
+    .bdrv_parse_filename = blkcolo_parse_filename,
+    .bdrv_file_open = blkcolo_open,
+    .bdrv_close = blkcolo_close,
+    .bdrv_getlength = blkcolo_getlength,
+    .bdrv_refresh_filename = blkcolo_refresh_filename,
+
+    .bdrv_co_readv = blkcolo_co_readv,
+    .bdrv_co_writev = blkcolo_co_writev,
+
+    .bdrv_prepare_checkpoint = blkcolo_prepare_checkpoint,
+    .bdrv_do_checkpoint = blkcolo_do_checkpoint,
+    .bdrv_get_sent_data_size = blkcolo_sent_data_size,
+    .bdrv_stop_replication = blkcolo_stop_replication,
+};
+
+static void bdrv_blkcolo_init(void)
+{
+    bdrv_register(&bdrv_blkcolo);
+};
+
+block_int(bdrv_blkcolo_init);
diff --git a/include/block/block.h b/include/block/block.h
index 6e7275d..9086abc 100644
--- a/include/block/block.h
+++ b/include/block/block.h
@@ -546,4 +546,10 @@ void bdrv_flush_io_queue(BlockDriverState *bs);
 
 BlockAcctStats *bdrv_get_stats(BlockDriverState *bs);
 
+/* Checkpoint control, called in migration/checkpoint thread */
+int bdrv_prepare_checkpoint(BlockDriverState *bs);
+int bdrv_do_checkpoint(BlockDriverState *bs);
+int64_t bdrv_get_sent_data_size(BlockDriverState *bs);
+int bdrv_stop_replication(BlockDriverState *bs);
+
 #endif
diff --git a/include/block/block_int.h b/include/block/block_int.h
index 06a21dd..ee6320e 100644
--- a/include/block/block_int.h
+++ b/include/block/block_int.h
@@ -273,6 +273,27 @@ struct BlockDriver {
     void (*bdrv_io_unplug)(BlockDriverState *bs);
     void (*bdrv_flush_io_queue)(BlockDriverState *bs);
 
+    /* Checkpoint control, called in migration/checkpoint thread */
+    /*
+     * Before doing a new checkpoint, we should wait all write requests
+     * being transfered from Primary QEMU to Secondary QEMU.
+     */
+    int (*bdrv_prepare_checkpoint)(BlockDriverState *bs);
+    /*
+     * Drop Disk buffer when doing checkpoint.
+     */
+    int (*bdrv_do_checkpoint)(BlockDriverState *bs);
+    /* This is used on Primary node.
+     * Migration/checkpoint thread should call this interface in order
+     * to decide whether to start a new checkpoint or not. If the data
+     * amount being sent is too large, we should start a new checkpoint.
+     */
+    int64_t (*bdrv_get_sent_data_size)(BlockDriverState *bs);
+    /* After failover, we should flush Disk buffer into secondary disk
+     * and stop block replication.
+     */
+    int (*bdrv_stop_replication)(BlockDriverState *bs);
+
     QLIST_ENTRY(BlockDriver) list;
 };
 
-- 
1.9.1


Reply via email to