On Thu, Jun 18, 2015 at 04:49:20PM +0800, Wen Congyang wrote:

CCing Alberto Garcia for the quorum block driver.

> If the child is not ready, read/write/getlength/flush will
> return -errno. It is not critical error, and can be ignored:
> 1. read/write:
>    Just not report the error event.
> 2. getlength:
>    just ignore it. If all children's getlength return -errno,
>    and be ignored, return -EIO.
> 3. flush:
>    Just ignore it. If all children's getlength return -errno,
>    and be ignored, return 0.
> 
> Usage: children.x.ignore-errors=true
> 
> Signed-off-by: Wen Congyang <we...@cn.fujitsu.com>
> Signed-off-by: zhanghailiang <zhang.zhanghaili...@huawei.com>
> Signed-off-by: Gonglei <arei.gong...@huawei.com>
> ---
>  block/quorum.c | 84 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
>  1 file changed, 77 insertions(+), 7 deletions(-)
> 
> diff --git a/block/quorum.c b/block/quorum.c
> index 01cfac0..c5dbb69 100644
> --- a/block/quorum.c
> +++ b/block/quorum.c
> @@ -30,6 +30,7 @@
>  #define QUORUM_OPT_BLKVERIFY      "blkverify"
>  #define QUORUM_OPT_REWRITE        "rewrite-corrupted"
>  #define QUORUM_OPT_READ_PATTERN   "read-pattern"
> +#define QUORUM_CHILDREN_OPT_IGNORE_ERRORS   "ignore-errors"
>  
>  /* This union holds a vote hash value */
>  typedef union QuorumVoteValue {
> @@ -65,6 +66,7 @@ typedef struct QuorumVotes {
>  /* the following structure holds the state of one quorum instance */
>  typedef struct BDRVQuorumState {
>      BlockDriverState **bs; /* children BlockDriverStates */
> +    bool *ignore_errors;   /* ignore children's error? */
>      int num_children;      /* children count */
>      int threshold;         /* if less than threshold children reads gave the
>                              * same result a quorum error occurs.
> @@ -99,6 +101,7 @@ typedef struct QuorumChildRequest {
>      uint8_t *buf;
>      int ret;
>      QuorumAIOCB *parent;
> +    int index;
>  } QuorumChildRequest;
>  
>  /* Quorum will use the following structure to track progress of each 
> read/write
> @@ -211,6 +214,7 @@ static QuorumAIOCB *quorum_aio_get(BDRVQuorumState *s,
>          acb->qcrs[i].buf = NULL;
>          acb->qcrs[i].ret = 0;
>          acb->qcrs[i].parent = acb;
> +        acb->qcrs[i].index = i;
>      }
>  
>      return acb;
> @@ -304,7 +308,7 @@ static void quorum_aio_cb(void *opaque, int ret)
>      acb->count++;
>      if (ret == 0) {
>          acb->success_count++;
> -    } else {
> +    } else if (!s->ignore_errors[sacb->index]) {
>          quorum_report_bad(acb, sacb->aiocb->bs->node_name, ret);
>      }
>      assert(acb->count <= s->num_children);
> @@ -719,19 +723,31 @@ static BlockAIOCB *quorum_aio_writev(BlockDriverState 
> *bs,
>  static int64_t quorum_getlength(BlockDriverState *bs)
>  {
>      BDRVQuorumState *s = bs->opaque;
> -    int64_t result;
> +    int64_t result = -EIO;
>      int i;
>  
>      /* check that all file have the same length */
> -    result = bdrv_getlength(s->bs[0]);
> -    if (result < 0) {
> -        return result;
> -    }
> -    for (i = 1; i < s->num_children; i++) {
> +    for (i = 0; i < s->num_children; i++) {
>          int64_t value = bdrv_getlength(s->bs[i]);
> +
>          if (value < 0) {
>              return value;
>          }
> +
> +        if (value == 0 && s->ignore_errors[i]) {
> +            /*
> +             * If the child is not ready, it cannot return -errno,
> +             * otherwise refresh_total_sectors() will fail when
> +             * we open the child.
> +             */
> +            continue;
> +        }
> +
> +        if (result == -EIO) {
> +            result = value;
> +            continue;
> +        }
> +
>          if (value != result) {
>              return -EIO;
>          }
> @@ -769,6 +785,9 @@ static coroutine_fn int quorum_co_flush(BlockDriverState 
> *bs)
>  
>      for (i = 0; i < s->num_children; i++) {
>          result = bdrv_co_flush(s->bs[i]);
> +        if (result < 0 && s->ignore_errors[i]) {
> +            result = 0;
> +        }
>          result_value.l = result;
>          quorum_count_vote(&error_votes, &result_value, i);
>      }
> @@ -843,6 +862,19 @@ static QemuOptsList quorum_runtime_opts = {
>      },
>  };
>  
> +static QemuOptsList quorum_children_common_opts = {
> +    .name = "quorum children",
> +    .head = QTAILQ_HEAD_INITIALIZER(quorum_children_common_opts.head),
> +    .desc = {
> +        {
> +            .name = QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
> +            .type = QEMU_OPT_BOOL,
> +            .help = "ignore child I/O error",
> +        },
> +        { /* end of list */ }
> +    },
> +};
> +
>  static int parse_read_pattern(const char *opt)
>  {
>      int i;
> @@ -861,6 +893,37 @@ static int parse_read_pattern(const char *opt)
>      return -EINVAL;
>  }
>  
> +static int parse_children_options(BDRVQuorumState *s, QDict *options,
> +                                  const char *indexstr, int index,
> +                                  Error **errp)
> +{
> +    QemuOpts *children_opts = NULL;
> +    Error *local_err = NULL;
> +    int ret = 0;
> +    bool value;
> +
> +    children_opts = qemu_opts_create(&quorum_children_common_opts, NULL, 0,
> +                                     &error_abort);
> +    qemu_opts_absorb_qdict_by_index(children_opts, options, indexstr,
> +                                    &local_err);
> +    if (local_err) {
> +        ret = -EINVAL;
> +        goto out;
> +    }
> +
> +    value = qemu_opt_get_bool(children_opts, 
> QUORUM_CHILDREN_OPT_IGNORE_ERRORS,
> +                              false);
> +    s->ignore_errors[index] = value;
> +
> +out:
> +    qemu_opts_del(children_opts);
> +    /* propagate error */
> +    if (local_err) {
> +        error_propagate(errp, local_err);
> +    }
> +    return ret;
> +}
> +
>  static int quorum_open(BlockDriverState *bs, QDict *options, int flags,
>                         Error **errp)
>  {
> @@ -931,12 +994,18 @@ static int quorum_open(BlockDriverState *bs, QDict 
> *options, int flags,
>      /* allocate the children BlockDriverState array */
>      s->bs = g_new0(BlockDriverState *, s->num_children);
>      opened = g_new0(bool, s->num_children);
> +    s->ignore_errors = g_new0(bool, s->num_children);
>  
>      for (i = 0; i < s->num_children; i++) {
>          char indexstr[32];
>          ret = snprintf(indexstr, 32, "children.%d", i);
>          assert(ret < 32);
>  
> +        ret = parse_children_options(s, options, indexstr, i, &local_err);
> +        if (ret < 0) {
> +            goto close_exit;
> +        }
> +
>          ret = bdrv_open_image(&s->bs[i], NULL, options, indexstr, bs,
>                                &child_format, false, &local_err);
>          if (ret < 0) {
> @@ -979,6 +1048,7 @@ static void quorum_close(BlockDriverState *bs)
>      }
>  
>      g_free(s->bs);
> +    g_free(s->ignore_errors);
>  }
>  
>  static void quorum_detach_aio_context(BlockDriverState *bs)
> -- 
> 2.4.3
> 
> 

Attachment: pgpmJDlPEixrE.pgp
Description: PGP signature

Reply via email to