If the child is not ready, read/write/getlength/flush will
return -errno. It is not critical error, and can be ignored:
1. read/write:
   Just not report the error event.
2. getlength:
   just ignore it. If all children's getlength return -errno,
   and be ignored, return -EIO.
3. flush:
   Just ignore it. If all children's getlength return -errno,
   and be ignored, return 0.

Usage: children.x.ignore-errors=true

Signed-off-by: Wen Congyang <we...@cn.fujitsu.com>
Signed-off-by: zhanghailiang <zhang.zhanghaili...@huawei.com>
Signed-off-by: Gonglei <arei.gong...@huawei.com>
---
 block/quorum.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 57 insertions(+), 7 deletions(-)

diff --git a/block/quorum.c b/block/quorum.c
index 3eb8fd3..2f86fa1 100644
--- a/block/quorum.c
+++ b/block/quorum.c
@@ -30,6 +30,7 @@
 #define QUORUM_OPT_BLKVERIFY      "blkverify"
 #define QUORUM_OPT_REWRITE        "rewrite-corrupted"
 #define QUORUM_OPT_READ_PATTERN   "read-pattern"
+#define QUORUM_CHILDREN_OPT_IGNORE_ERRORS   "ignore-errors"
 
 /* This union holds a vote hash value */
 typedef union QuorumVoteValue {
@@ -65,6 +66,7 @@ typedef struct QuorumVotes {
 /* the following structure holds the state of one quorum instance */
 typedef struct BDRVQuorumState {
     BlockDriverState **bs; /* children BlockDriverStates */
+    bool *ignore_errors;   /* ignore children's error? */
     int num_children;      /* children count */
     int threshold;         /* if less than threshold children reads gave the
                             * same result a quorum error occurs.
@@ -99,6 +101,7 @@ typedef struct QuorumChildRequest {
     uint8_t *buf;
     int ret;
     QuorumAIOCB *parent;
+    int index;
 } QuorumChildRequest;
 
 /* Quorum will use the following structure to track progress of each read/write
@@ -211,6 +214,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
         acb->qcrs[i].buf = NULL;
         acb->qcrs[i].ret = 0;
         acb->qcrs[i].parent = acb;
+        acb->qcrs[i].index = i;
     }
 
     return acb;
@@ -304,7 +308,7 @@ static void quorum_aio_cb(void *opaque, int ret)
     acb->count++;
     if (ret == 0) {
         acb->success_count++;
-    } else {
+    } else if (!s->ignore_errors[sacb->index]) {
         quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
     }
     assert(acb->count <= s->num_children);
@@ -719,19 +723,31 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState *bs,
 static int64_t quorum_getlength(BlockDriverState *bs)
 {
     BDRVQuorumState *s = bs->opaque;
-    int64_t result;
+    int64_t result = -EIO;
     int i;
 
     /* check that all file have the same length */
-    result = bdrv_getlength(s->bs[0]);
-    if (result < 0) {
-        return result;
-    }
-    for (i = 1; i < s->num_children; i++) {
+    for (i = 0; i < s->num_children; i++) {
         int64_t value = bdrv_getlength(s->bs[i]);
+
         if (value < 0) {
             return value;
         }
+
+        if (value == 0 && s->ignore_errors[i]) {
+            /*
+             * If the child is not ready, it cannot return -errno,
+             * otherwise refresh_total_sectors() will fail when
+             * we open the child.
+             */
+            continue;
+        }
+
+        if (result == -EIO) {
+            result = value;
+            continue;
+        }
+
         if (value != result) {
             return -EIO;
         }
@@ -769,6 +785,9 @@ static coroutine_fn int quorum_co_flush(BlockDriverState 
*bs)
 
     for (i = 0; i < s->num_children; i++) {
         result = bdrv_co_flush(s->bs[i]);
+        if (result < 0 && s->ignore_errors[i]) {
+            result = 0;
+        }
         result_value.l = result;
         quorum_count_vote(&error_votes, &result_value, i);
     }
@@ -843,6 +862,19 @@ static QemuOptsList quorum_runtime_opts = {
     },
 };
 
+static QemuOptsList quorum_children_common_opts = {
+    .name = "quorum children",
+    .head = QTAILQ_HEAD_INITIALIZER(quorum_children_common_opts.head),
+    .desc = {
+        {
+            .name = QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
+            .type = QEMU_OPT_BOOL,
+            .help = "ignore child I/O error",
+        },
+        { /* end of list */ }
+    },
+};
+
 static int parse_read_pattern(const char *opt)
 {
     int i;
@@ -938,11 +970,14 @@ static int quorum_open(BlockDriverState *bs, QDict 
*options, int flags,
     /* allocate the children BlockDriverState array */
     s->bs = g_new0(BlockDriverState *, s->num_children);
     opened = g_new0(bool, s->num_children);
+    s->ignore_errors = g_new0(bool, s->num_children);
 
     for (i = 0, lentry = qlist_first(list); lentry;
          lentry = qlist_next(lentry), i++) {
         QDict *d;
         QString *string;
+        QemuOpts *children_opts = NULL;
+        bool value;
 
         switch (qobject_type(lentry->value))
         {
@@ -950,6 +985,20 @@ static int quorum_open(BlockDriverState *bs, QDict 
*options, int flags,
             case QTYPE_QDICT:
                 d = qobject_to_qdict(lentry->value);
                 QINCREF(d);
+
+                children_opts = qemu_opts_create(&quorum_children_common_opts,
+                                                 NULL, 0, &error_abort);
+                qemu_opts_absorb_qdict(children_opts, d, &local_err);
+                if (local_err) {
+                    ret = -EINVAL;
+                    qemu_opts_del(children_opts);
+                    goto close_exit;
+                }
+                value = qemu_opt_get_bool(children_opts,
+                                          QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
+                                          false);
+                s->ignore_errors[i]  = value;
+                qemu_opts_del(children_opts);
                 ret = bdrv_open(&s->bs[i], NULL, NULL, d, flags, NULL,
                                 &local_err);
                 break;
@@ -1008,6 +1057,7 @@ static void quorum_close(BlockDriverState *bs)
     }
 
     g_free(s->bs);
+    g_free(s->ignore_errors);
 }
 
 static void quorum_detach_aio_context(BlockDriverState *bs)
-- 
2.1.0


Reply via email to