If the child is not ready, read/write/getlength/flush will return -errno. It is not critical error, and can be ignored: 1. read/write: Just not report the error event. 2. getlength: just ignore it. If all children's getlength return -errno, and be ignored, return -EIO. 3. flush: Just ignore it. If all children's getlength return -errno, and be ignored, return 0.
Usage: children.x.ignore-errors=true Signed-off-by: Wen Congyang <we...@cn.fujitsu.com> Signed-off-by: zhanghailiang <zhang.zhanghaili...@huawei.com> Signed-off-by: Gonglei <arei.gong...@huawei.com> --- block/quorum.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 7 deletions(-) diff --git a/block/quorum.c b/block/quorum.c index 3eb8fd3..2f86fa1 100644 --- a/block/quorum.c +++ b/block/quorum.c @@ -30,6 +30,7 @@ #define QUORUM_OPT_BLKVERIFY "blkverify" #define QUORUM_OPT_REWRITE "rewrite-corrupted" #define QUORUM_OPT_READ_PATTERN "read-pattern" +#define QUORUM_CHILDREN_OPT_IGNORE_ERRORS "ignore-errors" /* This union holds a vote hash value */ typedef union QuorumVoteValue { @@ -65,6 +66,7 @@ typedef struct QuorumVotes { /* the following structure holds the state of one quorum instance */ typedef struct BDRVQuorumState { BlockDriverState **bs; /* children BlockDriverStates */ + bool *ignore_errors; /* ignore children's error? */ int num_children; /* children count */ int threshold; /* if less than threshold children reads gave the * same result a quorum error occurs. @@ -99,6 +101,7 @@ typedef struct QuorumChildRequest { uint8_t *buf; int ret; QuorumAIOCB *parent; + int index; } QuorumChildRequest; /* Quorum will use the following structure to track progress of each read/write @@ -211,6 +214,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s, acb->qcrs[i].buf = NULL; acb->qcrs[i].ret = 0; acb->qcrs[i].parent = acb; + acb->qcrs[i].index = i; } return acb; @@ -304,7 +308,7 @@ static void quorum_aio_cb(void *opaque, int ret) acb->count++; if (ret == 0) { acb->success_count++; - } else { + } else if (!s->ignore_errors[sacb->index]) { quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret); } assert(acb->count <= s->num_children); @@ -719,19 +723,31 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs, static int64_t quorum_getlength(BlockDriverState *bs) { BDRVQuorumState *s = bs->opaque; - int64_t result; + int64_t result = -EIO; int i; /* check that all file have the same length */ - result = bdrv_getlength(s->bs[0]); - if (result < 0) { - return result; - } - for (i = 1; i < s->num_children; i++) { + for (i = 0; i < s->num_children; i++) { int64_t value = bdrv_getlength(s->bs[i]); + if (value < 0) { return value; } + + if (value == 0 && s->ignore_errors[i]) { + /* + * If the child is not ready, it cannot return -errno, + * otherwise refresh_total_sectors() will fail when + * we open the child. + */ + continue; + } + + if (result == -EIO) { + result = value; + continue; + } + if (value != result) { return -EIO; } @@ -769,6 +785,9 @@ static coroutine_fn int quorum_co_flush(BlockDriverState *bs) for (i = 0; i < s->num_children; i++) { result = bdrv_co_flush(s->bs[i]); + if (result < 0 && s->ignore_errors[i]) { + result = 0; + } result_value.l = result; quorum_count_vote(&error_votes, &result_value, i); } @@ -843,6 +862,19 @@ static QemuOptsList quorum_runtime_opts = { }, }; +static QemuOptsList quorum_children_common_opts = { + .name = "quorum children", + .head = QTAILQ_HEAD_INITIALIZER(quorum_children_common_opts.head), + .desc = { + { + .name = QUORUM_CHILDREN_OPT_IGNORE_ERRORS, + .type = QEMU_OPT_BOOL, + .help = "ignore child I/O error", + }, + { /* end of list */ } + }, +}; + static int parse_read_pattern(const char *opt) { int i; @@ -938,11 +970,14 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags, /* allocate the children BlockDriverState array */ s->bs = g_new0(BlockDriverState *, s->num_children); opened = g_new0(bool, s->num_children); + s->ignore_errors = g_new0(bool, s->num_children); for (i = 0, lentry = qlist_first(list); lentry; lentry = qlist_next(lentry), i++) { QDict *d; QString *string; + QemuOpts *children_opts = NULL; + bool value; switch (qobject_type(lentry->value)) { @@ -950,6 +985,20 @@ static int quorum_open(BlockDriverState *bs, QDict *options, int flags, case QTYPE_QDICT: d = qobject_to_qdict(lentry->value); QINCREF(d); + + children_opts = qemu_opts_create(&quorum_children_common_opts, + NULL, 0, &error_abort); + qemu_opts_absorb_qdict(children_opts, d, &local_err); + if (local_err) { + ret = -EINVAL; + qemu_opts_del(children_opts); + goto close_exit; + } + value = qemu_opt_get_bool(children_opts, + QUORUM_CHILDREN_OPT_IGNORE_ERRORS, + false); + s->ignore_errors[i] = value; + qemu_opts_del(children_opts); ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL, &local_err); break; @@ -1008,6 +1057,7 @@ static void quorum_close(BlockDriverState *bs) } g_free(s->bs); + g_free(s->ignore_errors); } static void quorum_detach_aio_context(BlockDriverState *bs) -- 2.1.0