Data object size was fix to 4MB and not selectable.
This patch add feature to select data object size of VDI.

If you want to use 8MB data object_size, specify the shift bit num.
ex) dog vdi create -z 23 hogehoge 100M

Signed-off-by: Teruaki Ishizaki <ishizaki.teru...@lab.ntt.co.jp>
---
 dog/common.c                |    7 +-
 dog/dog.h                   |    6 +-
 dog/farm/farm.c             |   17 ++-
 dog/vdi.c                   |  254 ++++++++++++++++++++++++++++++-------------
 include/fec.h               |   12 +-
 include/sheepdog_proto.h    |    7 +-
 lib/fec.c                   |    9 +-
 sheep/gateway.c             |    2 +-
 sheep/group.c               |    3 +-
 sheep/journal.c             |    5 +-
 sheep/object_cache.c        |   27 +++--
 sheep/ops.c                 |   14 ++-
 sheep/plain_store.c         |   17 ++-
 sheep/recovery.c            |    3 +-
 sheep/sheep_priv.h          |    6 +-
 sheep/vdi.c                 |   82 +++++++++++---
 tests/unit/sheep/test_vdi.c |    6 +-
 17 files changed, 336 insertions(+), 141 deletions(-)

diff --git a/dog/common.c b/dog/common.c
index 2d8a173..11011a7 100644
--- a/dog/common.c
+++ b/dog/common.c
@@ -365,7 +365,8 @@ void show_progress(uint64_t done, uint64_t total, bool raw)
        free(buf);
 }
 
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid)
+size_t get_store_objsize(uint8_t copy_policy, uint32_t object_size,
+                        uint64_t oid)
 {
        if (is_vdi_obj(oid))
                return SD_INODE_SIZE;
@@ -375,9 +376,9 @@ size_t get_store_objsize(uint8_t copy_policy, uint64_t oid)
                int d;
 
                ec_policy_to_dp(copy_policy, &d, NULL);
-               return SD_DATA_OBJ_SIZE / d;
+               return object_size / d;
        }
-       return get_objsize(oid);
+       return get_objsize(oid, object_size);
 }
 
 bool is_erasure_oid(uint64_t oid, uint8_t policy)
diff --git a/dog/dog.h b/dog/dog.h
index 80becc6..d460a0b 100644
--- a/dog/dog.h
+++ b/dog/dog.h
@@ -87,10 +87,12 @@ void confirm(const char *message);
 void work_queue_wait(struct work_queue *q);
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
                  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-                 uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy);
+                 uint8_t nr_copies, uint8_t copy_policy,
+                 uint8_t store_policy, uint32_t object_size);
 int do_vdi_check(const struct sd_inode *inode);
 void show_progress(uint64_t done, uint64_t total, bool raw);
-size_t get_store_objsize(uint8_t copy_policy, uint64_t oid);
+size_t get_store_objsize(uint8_t copy_policy, uint32_t object_size,
+                        uint64_t oid);
 bool is_erasure_oid(uint64_t oid, uint8_t policy);
 uint8_t parse_copy(const char *str, uint8_t *copy_policy);
 
diff --git a/dog/farm/farm.c b/dog/farm/farm.c
index 9414d42..c5fa40e 100644
--- a/dog/farm/farm.c
+++ b/dog/farm/farm.c
@@ -38,6 +38,7 @@ struct active_vdi_entry {
        uint8_t  nr_copies;
        uint8_t copy_policy;
        uint8_t store_policy;
+       uint32_t object_size;
 };
 
 struct registered_obj_entry {
@@ -77,6 +78,7 @@ static void update_active_vdi_entry(struct active_vdi_entry 
*vdi,
        vdi->nr_copies = new->nr_copies;
        vdi->copy_policy = new->copy_policy;
        vdi->store_policy = new->store_policy;
+       vdi->object_size = (UINT32_C(1) << new->block_size_shift);
 }
 
 static void add_active_vdi(struct sd_inode *new)
@@ -131,7 +133,8 @@ static int create_active_vdis(void)
                                  vdi->vdi_id, &new_vid,
                                  false, vdi->nr_copies,
                                  vdi->copy_policy,
-                                 vdi->store_policy) < 0)
+                                 vdi->store_policy,
+                                 vdi->object_size) < 0)
                        return -1;
        }
        return 0;
@@ -202,7 +205,7 @@ out:
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint8_t nr_copies,
-                         uint8_t copy_policy)
+                         uint8_t copy_policy, uint32_t object_size)
 {
        int ret;
        struct sd_req hdr;
@@ -213,13 +216,14 @@ static int notify_vdi_add(uint32_t vdi_id, uint8_t 
nr_copies,
        hdr.vdi_state.new_vid = vdi_id;
        hdr.vdi_state.copies = nr_copies;
        hdr.vdi_state.copy_policy = copy_policy;
+       hdr.vdi_state.object_size = object_size;
        hdr.vdi_state.set_bitmap = true;
 
        ret = dog_exec_req(&sd_nid, &hdr, buf);
 
        if (ret < 0)
-               sd_err("Fail to notify vdi add event(%"PRIx32", %d)", vdi_id,
-                      nr_copies);
+               sd_err("Fail to notify vdi add event(%"PRIx32", %d"
+                      ", %"PRIu32")", vdi_id, nr_copies, object_size);
        if (rsp->result != SD_RES_SUCCESS) {
                sd_err("%s", sd_strerror(rsp->result));
                ret = -1;
@@ -261,7 +265,7 @@ static void do_save_object(struct work *work)
 
        sw = container_of(work, struct snapshot_work, work);
 
-       size = get_objsize(sw->entry.oid);
+       size = get_objsize(sw->entry.oid, sw->entry.object_size);
        buf = xmalloc(size);
 
        if (dog_read_object(sw->entry.oid, buf, size, 0, true) < 0)
@@ -413,7 +417,8 @@ static void do_load_object(struct work *work)
        vid = oid_to_vid(sw->entry.oid);
        if (register_vdi(vid)) {
                if (notify_vdi_add(vid, sw->entry.nr_copies,
-                                  sw->entry.copy_policy) < 0)
+                                  sw->entry.copy_policy,
+                                  sw->entry.object_size) < 0)
                        goto error;
        }
 
diff --git a/dog/vdi.c b/dog/vdi.c
index 5353062..3b0c408 100644
--- a/dog/vdi.c
+++ b/dog/vdi.c
@@ -38,6 +38,8 @@ static struct sd_option vdi_options[] = {
        {'o', "oid", true, "specify the object id of the tracking object"},
        {'e', "exist", false, "only check objects exist or not,\n"
         "                          neither comparing nor repairing"},
+       {'z', "objsize", true, "specify the bit shift num for"
+                              " data object size"},
        { 0, NULL, false, NULL },
 };
 
@@ -49,6 +51,7 @@ static struct vdi_cmd_data {
        bool delete;
        bool prealloc;
        int nr_copies;
+       uint32_t object_size;
        bool writeback;
        int from_snapshot_id;
        char from_snapshot_tag[SD_MAX_VDI_TAG_LEN];
@@ -67,6 +70,7 @@ struct get_vdi_info {
        uint32_t snapid;
        uint8_t nr_copies;
        uint8_t copy_policy;
+       uint32_t object_size;
 };
 
 int dog_bnode_writer(uint64_t oid, void *mem, unsigned int len, uint64_t 
offset,
@@ -118,6 +122,7 @@ static void print_vdi_list(uint32_t vid, const char *name, 
const char *tag,
        struct tm tm;
        char dbuf[128];
        struct get_vdi_info *info = data;
+       uint32_t object_size = (UINT32_C(1) << i->block_size_shift);
 
        if (info && strcmp(name, info->name) != 0)
                return;
@@ -143,23 +148,24 @@ static void print_vdi_list(uint32_t vid, const char 
*name, const char *tag,
                                putchar('\\');
                        putchar(*name++);
                }
-               printf(" %d %s %s %s %s %" PRIx32 " %s %s\n", snapid,
-                      strnumber(i->vdi_size),
-                      strnumber(my_objs * SD_DATA_OBJ_SIZE),
-                      strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+               printf(" %d %s %s %s %s %" PRIx32 " %s %s %" PRIu32 "\n",
+                      snapid, strnumber(i->vdi_size),
+                      strnumber(my_objs * object_size),
+                      strnumber(cow_objs * object_size),
                       dbuf, vid,
                       redundancy_scheme(i->nr_copies, i->copy_policy),
-                      i->tag);
+                      i->tag, object_size);
        } else {
-               printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32 " %6s %13s\n",
+               printf("%c %-8s %5d %7s %7s %7s %s  %7" PRIx32
+                      " %6s %13s %7" PRIu32 "\n",
                       vdi_is_snapshot(i) ? 's' : (is_clone ? 'c' : ' '),
                       name, snapid,
                       strnumber(i->vdi_size),
-                      strnumber(my_objs * SD_DATA_OBJ_SIZE),
-                      strnumber(cow_objs * SD_DATA_OBJ_SIZE),
+                      strnumber(my_objs * object_size),
+                      strnumber(cow_objs * object_size),
                       dbuf, vid,
                       redundancy_scheme(i->nr_copies, i->copy_policy),
-                      i->tag);
+                      i->tag, object_size);
        }
 }
 
@@ -282,7 +288,8 @@ static int vdi_list(int argc, char **argv)
        const char *vdiname = argv[optind];
 
        if (!raw_output)
-               printf("  Name        Id    Size    Used  Shared    Creation 
time   VDI id  Copies  Tag\n");
+               printf("  Name        Id    Size    Used  Shared"
+                      "    Creation time   VDI id  Copies  Tag    Obj Size\n");
 
        if (vdiname) {
                struct get_vdi_info info;
@@ -396,7 +403,8 @@ int read_vdi_obj(const char *vdiname, int snapid, const 
char *tag,
 
 int do_vdi_create(const char *vdiname, int64_t vdi_size,
                  uint32_t base_vid, uint32_t *vdi_id, bool snapshot,
-                 uint8_t nr_copies, uint8_t copy_policy, uint8_t store_policy)
+                 uint8_t nr_copies, uint8_t copy_policy,
+                 uint8_t store_policy, uint32_t object_size)
 {
        struct sd_req hdr;
        struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
@@ -416,6 +424,7 @@ int do_vdi_create(const char *vdiname, int64_t vdi_size,
        hdr.vdi.copies = nr_copies;
        hdr.vdi.copy_policy = copy_policy;
        hdr.vdi.store_policy = store_policy;
+       hdr.vdi.object_size = object_size;
 
        ret = dog_exec_req(&sd_nid, &hdr, buf);
        if (ret < 0)
@@ -440,6 +449,8 @@ static int vdi_create(int argc, char **argv)
        uint32_t vid;
        uint64_t oid;
        uint32_t idx, max_idx;
+       uint32_t object_size;
+       uint64_t old_max_total_size = 0;
        struct sd_inode *inode = NULL;
        int ret;
 
@@ -451,10 +462,34 @@ static int vdi_create(int argc, char **argv)
        if (ret < 0)
                return EXIT_USAGE;
 
-       if (size > SD_OLD_MAX_VDI_SIZE && 0 == vdi_cmd_data.store_policy) {
+       if (vdi_cmd_data.object_size)
+               old_max_total_size =
+                       vdi_cmd_data.object_size * OLD_MAX_DATA_OBJS;
+       else{
+               struct sd_req hdr;
+               struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
+               struct cluster_info cinfo;
+               sd_init_req(&hdr, SD_OP_CLUSTER_INFO);
+               hdr.data_length = sizeof(cinfo);
+               ret = dog_exec_req(&sd_nid, &hdr, &cinfo);
+               if (ret < 0) {
+                       sd_err("Fail to execute request: SD_OP_CLUSTER_INFO");
+                       ret = EXIT_FAILURE;
+                       goto out;
+               }
+               if (rsp->result != SD_RES_SUCCESS) {
+                       sd_err("%s", sd_strerror(rsp->result));
+                       ret = EXIT_FAILURE;
+                       goto out;
+               }
+               old_max_total_size = cinfo.object_size * OLD_MAX_DATA_OBJS;
+       }
+
+       if (size > old_max_total_size && 0 == vdi_cmd_data.store_policy) {
                sd_err("VDI size is larger than %s bytes, please use '-y' to "
-                      "create a hyper volume with size up to %s bytes",
-                      strnumber(SD_OLD_MAX_VDI_SIZE),
+                      "create a hyper volume with size up to %s bytes"
+                      " or use '-z' to create larger object size volume",
+                      strnumber(old_max_total_size),
                       strnumber(SD_MAX_VDI_SIZE));
                return EXIT_USAGE;
        }
@@ -466,7 +501,8 @@ static int vdi_create(int argc, char **argv)
 
        ret = do_vdi_create(vdiname, size, 0, &vid, false,
                            vdi_cmd_data.nr_copies, vdi_cmd_data.copy_policy,
-                           vdi_cmd_data.store_policy);
+                           vdi_cmd_data.store_policy,
+                           vdi_cmd_data.object_size);
        if (ret != EXIT_SUCCESS || !vdi_cmd_data.prealloc)
                goto out;
 
@@ -479,10 +515,11 @@ static int vdi_create(int argc, char **argv)
                ret = EXIT_FAILURE;
                goto out;
        }
-       max_idx = DIV_ROUND_UP(size, SD_DATA_OBJ_SIZE);
+       object_size = (UINT32_C(1) << inode->block_size_shift);
+       max_idx = DIV_ROUND_UP(size, object_size);
 
        for (idx = 0; idx < max_idx; idx++) {
-               vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+               vdi_show_progress(idx * object_size, inode->vdi_size);
                oid = vid_to_data_oid(vid, idx);
 
                ret = dog_write_object(oid, 0, NULL, 0, 0, 0, inode->nr_copies,
@@ -499,7 +536,7 @@ static int vdi_create(int argc, char **argv)
                        goto out;
                }
        }
-       vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+       vdi_show_progress(idx * object_size, inode->vdi_size);
        ret = EXIT_SUCCESS;
 
 out:
@@ -559,6 +596,7 @@ static int vdi_snapshot(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
        uint32_t vid, new_vid;
+       uint32_t object_size;
        int ret;
        char buf[SD_INODE_HEADER_SIZE];
        struct sd_inode *inode = (struct sd_inode *)buf;
@@ -662,9 +700,10 @@ static int vdi_snapshot(int argc, char **argv)
        if (ret != SD_RES_SUCCESS)
                goto out;
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
        ret = do_vdi_create(vdiname, inode->vdi_size, vid, &new_vid, true,
                            inode->nr_copies, inode->copy_policy,
-                           inode->store_policy);
+                           inode->store_policy, object_size);
 
        if (ret == EXIT_SUCCESS && verbose) {
                if (raw_output)
@@ -691,6 +730,7 @@ static int vdi_clone(int argc, char **argv)
        uint32_t base_vid, new_vid, vdi_id;
        uint64_t oid;
        uint32_t idx, max_idx, ret;
+       uint32_t object_size;
        struct sd_inode *inode = NULL, *new_inode = NULL;
        char *buf = NULL;
 
@@ -719,9 +759,10 @@ static int vdi_clone(int argc, char **argv)
        if (vdi_cmd_data.no_share == true)
                base_vid = 0;
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
        ret = do_vdi_create(dst_vdi, inode->vdi_size, base_vid, &new_vid, false,
                            inode->nr_copies, inode->copy_policy,
-                           inode->store_policy);
+                           inode->store_policy, object_size);
        if (ret != EXIT_SUCCESS ||
                        (!vdi_cmd_data.prealloc && !vdi_cmd_data.no_share))
                goto out;
@@ -732,23 +773,23 @@ static int vdi_clone(int argc, char **argv)
        if (ret != EXIT_SUCCESS)
                goto out;
 
-       buf = xzalloc(SD_DATA_OBJ_SIZE);
+       buf = xzalloc(object_size);
        max_idx = count_data_objs(inode);
 
        for (idx = 0; idx < max_idx; idx++) {
                size_t size;
 
-               vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+               vdi_show_progress(idx * object_size, inode->vdi_size);
                vdi_id = sd_inode_get_vid(inode, idx);
                if (vdi_id) {
                        oid = vid_to_data_oid(vdi_id, idx);
-                       ret = dog_read_object(oid, buf, SD_DATA_OBJ_SIZE, 0,
+                       ret = dog_read_object(oid, buf, object_size, 0,
                                              true);
                        if (ret) {
                                ret = EXIT_FAILURE;
                                goto out;
                        }
-                       size = SD_DATA_OBJ_SIZE;
+                       size = object_size;
                } else {
                        if (vdi_cmd_data.no_share && !vdi_cmd_data.prealloc)
                                continue;
@@ -772,7 +813,7 @@ static int vdi_clone(int argc, char **argv)
                        goto out;
                }
        }
-       vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
+       vdi_show_progress(idx * object_size, inode->vdi_size);
        ret = EXIT_SUCCESS;
 
 out:
@@ -952,6 +993,7 @@ static int vdi_rollback(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
        uint32_t base_vid, new_vid;
+       uint32_t object_size;
        int ret;
        char buf[SD_INODE_HEADER_SIZE];
        struct sd_inode *inode = (struct sd_inode *)buf;
@@ -977,9 +1019,10 @@ static int vdi_rollback(int argc, char **argv)
                return EXIT_FAILURE;
        }
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
        ret = do_vdi_create(vdiname, inode->vdi_size, base_vid, &new_vid,
                             false, vdi_cmd_data.nr_copies, inode->copy_policy,
-                            inode->store_policy);
+                            inode->store_policy, object_size);
 
        if (ret == EXIT_SUCCESS && verbose) {
                if (raw_output)
@@ -1494,6 +1537,7 @@ static int vdi_read(int argc, char **argv)
        struct sd_inode *inode = NULL;
        uint64_t offset = 0, oid, done = 0, total = (uint64_t) -1;
        uint32_t vdi_id, idx;
+       uint32_t object_size;
        unsigned int len;
        char *buf = NULL;
 
@@ -1509,25 +1553,27 @@ static int vdi_read(int argc, char **argv)
        }
 
        inode = malloc(sizeof(*inode));
-       buf = xmalloc(SD_DATA_OBJ_SIZE);
 
        ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
                           vdi_cmd_data.snapshot_tag, NULL, inode,
                           SD_INODE_SIZE);
        if (ret != EXIT_SUCCESS)
-               goto out;
+               goto load_inode_err;
 
        if (inode->vdi_size < offset) {
                sd_err("Read offset is beyond the end of the VDI");
                ret = EXIT_FAILURE;
-               goto out;
+               goto load_inode_err;
        }
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
+       buf = xmalloc(object_size);
+
        total = min(total, inode->vdi_size - offset);
-       idx = offset / SD_DATA_OBJ_SIZE;
-       offset %= SD_DATA_OBJ_SIZE;
+       idx = offset / object_size;
+       offset %= object_size;
        while (done < total) {
-               len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+               len = min(total - done, object_size - offset);
                vdi_id = sd_inode_get_vid(inode, idx);
                if (vdi_id) {
                        oid = vid_to_data_oid(vdi_id, idx);
@@ -1554,8 +1600,9 @@ static int vdi_read(int argc, char **argv)
        fsync(STDOUT_FILENO);
        ret = EXIT_SUCCESS;
 out:
-       free(inode);
        free(buf);
+load_inode_err:
+       free(inode);
 
        return ret;
 }
@@ -1564,6 +1611,7 @@ static int vdi_write(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
        uint32_t vid, flags, vdi_id, idx;
+       uint32_t object_size;
        int ret;
        struct sd_inode *inode = NULL;
        uint64_t offset = 0, oid, old_oid, done = 0, total = (uint64_t) -1;
@@ -1583,26 +1631,28 @@ static int vdi_write(int argc, char **argv)
        }
 
        inode = xmalloc(sizeof(*inode));
-       buf = xmalloc(SD_DATA_OBJ_SIZE);
 
        ret = read_vdi_obj(vdiname, 0, "", &vid, inode, SD_INODE_SIZE);
        if (ret != EXIT_SUCCESS)
-               goto out;
+               goto load_inode_err;
 
        if (inode->vdi_size < offset) {
                sd_err("Write offset is beyond the end of the VDI");
                ret = EXIT_FAILURE;
-               goto out;
+               goto load_inode_err;
        }
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
+       buf = xmalloc(object_size);
+
        total = min(total, inode->vdi_size - offset);
-       idx = offset / SD_DATA_OBJ_SIZE;
-       offset %= SD_DATA_OBJ_SIZE;
+       idx = offset / object_size;
+       offset %= object_size;
        while (done < total) {
                create = false;
                old_oid = 0;
                flags = 0;
-               len = min(total - done, SD_DATA_OBJ_SIZE - offset);
+               len = min(total - done, object_size - offset);
 
                vdi_id = sd_inode_get_vid(inode, idx);
                if (!vdi_id)
@@ -1647,7 +1697,7 @@ static int vdi_write(int argc, char **argv)
                }
 
                offset += len;
-               if (offset == SD_DATA_OBJ_SIZE) {
+               if (offset == object_size) {
                        offset = 0;
                        idx++;
                }
@@ -1655,8 +1705,9 @@ static int vdi_write(int argc, char **argv)
        }
        ret = EXIT_SUCCESS;
 out:
-       free(inode);
        free(buf);
+load_inode_err:
+       free(inode);
 
        return ret;
 }
@@ -1709,6 +1760,7 @@ struct vdi_check_info {
        uint64_t oid;
        uint8_t nr_copies;
        uint8_t copy_policy;
+       uint32_t object_size;
        uint64_t total;
        uint64_t *done;
        int refcnt;
@@ -1721,7 +1773,7 @@ struct vdi_check_info {
 static void free_vdi_check_info(struct vdi_check_info *info)
 {
        if (info->done) {
-               *info->done += SD_DATA_OBJ_SIZE;
+               *info->done += info->object_size;
                vdi_show_progress(*info->done, info->total);
        }
        free(info);
@@ -1783,6 +1835,7 @@ static void vdi_check_object_work(struct work *work)
        if (is_erasure_oid(info->oid, info->copy_policy)) {
                sd_init_req(&hdr, SD_OP_READ_PEER);
                hdr.data_length = get_store_objsize(info->copy_policy,
+                                                   info->object_size,
                                                    info->oid);
                hdr.obj.ec_index = vcw->ec_index;
                hdr.epoch = sd_epoch;
@@ -1856,7 +1909,8 @@ static void check_erasure_object(struct vdi_check_info 
*info)
        struct fec *ctx = ec_init(d, dp);
        int miss_idx[dp], input_idx[dp];
        uint64_t oid = info->oid;
-       size_t len = get_store_objsize(info->copy_policy, oid);
+       size_t len = get_store_objsize(info->copy_policy,
+                                      info->object_size, oid);
        char *obj = xmalloc(len);
        uint8_t *input[dp];
 
@@ -1882,7 +1936,8 @@ static void check_erasure_object(struct vdi_check_info 
*info)
                        uint8_t *ds[d];
                        for (j = 0; j < d; j++)
                                ds[j] = info->vcw[j].buf;
-                       ec_decode_buffer(ctx, ds, idx, obj, d + k);
+                       ec_decode_buffer(ctx, ds, idx, obj, d + k,
+                                        info->object_size);
                        if (memcmp(obj, info->vcw[d + k].buf, len) != 0) {
                                /* TODO repair the inconsistency */
                                sd_err("object %"PRIx64" is inconsistent", oid);
@@ -1900,7 +1955,8 @@ static void check_erasure_object(struct vdi_check_info 
*info)
 
                        for (i = 0; i < d; i++)
                                ds[i] = input[i];
-                       ec_decode_buffer(ctx, ds, input_idx, obj, m);
+                       ec_decode_buffer(ctx, ds, input_idx, obj, m,
+                                        info->object_size);
                        write_object_to(info->vcw[m].vnode, oid, obj,
                                        len, true, info->vcw[m].ec_index);
                        fprintf(stdout, "fixed missing %"PRIx64", "
@@ -2023,6 +2079,7 @@ struct check_arg {
        uint64_t *done;
        struct work_queue *wq;
        int nr_copies;
+       uint32_t object_size;
 };
 
 static void check_cb(struct sd_index *idx, void *arg, int ignore)
@@ -2032,7 +2089,7 @@ static void check_cb(struct sd_index *idx, void *arg, int 
ignore)
 
        if (idx->vdi_id) {
                oid = vid_to_data_oid(idx->vdi_id, idx->idx);
-               *(carg->done) = (uint64_t)idx->idx * SD_DATA_OBJ_SIZE;
+               *(carg->done) = (uint64_t)idx->idx * carg->object_size;
                vdi_show_progress(*(carg->done), carg->inode->vdi_size);
                queue_vdi_check_work(carg->inode, oid, NULL, carg->wq,
                                     carg->nr_copies);
@@ -2046,6 +2103,7 @@ int do_vdi_check(const struct sd_inode *inode)
        uint32_t vid;
        struct work_queue *wq;
        int nr_copies = min((int)inode->nr_copies, sd_zones_nr);
+       uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
 
        if (0 < inode->copy_policy && sd_zones_nr < (int)inode->nr_copies) {
                sd_err("ABORT: Not enough active zones for consistency-checking"
@@ -2070,12 +2128,13 @@ int do_vdi_check(const struct sd_inode *inode)
                                queue_vdi_check_work(inode, oid, &done, wq,
                                                     nr_copies);
                        } else {
-                               done += SD_DATA_OBJ_SIZE;
+                               done += object_size;
                                vdi_show_progress(done, inode->vdi_size);
                        }
                }
        } else {
-               struct check_arg arg = {inode, &done, wq, nr_copies};
+               struct check_arg arg = {inode, &done, wq, nr_copies,
+                                       object_size};
                sd_inode_index_walk(inode, check_cb, &arg);
                vdi_show_progress(inode->vdi_size, inode->vdi_size);
        }
@@ -2125,11 +2184,12 @@ struct obj_backup {
        uint32_t offset;
        uint32_t length;
        uint32_t reserved;
-       uint8_t data[SD_DATA_OBJ_SIZE];
+       uint8_t *data;
 };
 
 /* discards redundant area from backup data */
-static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data)
+static void compact_obj_backup(struct obj_backup *backup, uint8_t *from_data,
+                              uint32_t object_size)
 {
        uint8_t *p1, *p2;
 
@@ -2142,8 +2202,8 @@ static void compact_obj_backup(struct obj_backup *backup, 
uint8_t *from_data)
                backup->length -= SECTOR_SIZE;
        }
 
-       p1 = backup->data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
-       p2 = from_data + SD_DATA_OBJ_SIZE - SECTOR_SIZE;
+       p1 = backup->data + object_size - SECTOR_SIZE;
+       p2 = from_data + object_size - SECTOR_SIZE;
        while (backup->length > 0 && memcmp(p1, p2, SECTOR_SIZE) == 0) {
                p1 -= SECTOR_SIZE;
                p2 -= SECTOR_SIZE;
@@ -2152,29 +2212,29 @@ static void compact_obj_backup(struct obj_backup 
*backup, uint8_t *from_data)
 }
 
 static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
-                         struct obj_backup *backup)
+                         struct obj_backup *backup, uint32_t object_size)
 {
        int ret;
-       uint8_t *from_data = xzalloc(SD_DATA_OBJ_SIZE);
+       uint8_t *from_data = xzalloc(object_size);
 
        backup->idx = idx;
        backup->offset = 0;
-       backup->length = SD_DATA_OBJ_SIZE;
+       backup->length = object_size;
 
        if (to_vid) {
                ret = dog_read_object(vid_to_data_oid(to_vid, idx),
-                                     backup->data, SD_DATA_OBJ_SIZE, 0, true);
+                                     backup->data, object_size, 0, true);
                if (ret != SD_RES_SUCCESS) {
                        sd_err("Failed to read object %" PRIx32 ", %d", to_vid,
                               idx);
                        return EXIT_FAILURE;
                }
        } else
-               memset(backup->data, 0, SD_DATA_OBJ_SIZE);
+               memset(backup->data, 0, object_size);
 
        if (from_vid) {
                ret = dog_read_object(vid_to_data_oid(from_vid, idx), from_data,
-                                     SD_DATA_OBJ_SIZE, 0, true);
+                                     object_size, 0, true);
                if (ret != SD_RES_SUCCESS) {
                        sd_err("Failed to read object %" PRIx32 ", %d",
                               from_vid, idx);
@@ -2182,7 +2242,7 @@ static int get_obj_backup(uint32_t idx, uint32_t 
from_vid, uint32_t to_vid,
                }
        }
 
-       compact_obj_backup(backup, from_data);
+       compact_obj_backup(backup, from_data, object_size);
 
        free(from_data);
 
@@ -2194,13 +2254,13 @@ static int vdi_backup(int argc, char **argv)
        const char *vdiname = argv[optind++];
        int ret = EXIT_SUCCESS;
        uint32_t idx, nr_objs;
+       uint32_t object_size;
        struct sd_inode *from_inode = xzalloc(sizeof(*from_inode));
        struct sd_inode *to_inode = xzalloc(sizeof(*to_inode));
        struct backup_hdr hdr = {
                .version = VDI_BACKUP_FORMAT_VERSION,
                .magic = VDI_BACKUP_MAGIC,
        };
-       struct obj_backup *backup = xzalloc(sizeof(*backup));
 
        if ((!vdi_cmd_data.snapshot_id && !vdi_cmd_data.snapshot_tag[0]) ||
            (!vdi_cmd_data.from_snapshot_id &&
@@ -2214,21 +2274,25 @@ static int vdi_backup(int argc, char **argv)
                           vdi_cmd_data.from_snapshot_tag, NULL,
                           from_inode, SD_INODE_SIZE);
        if (ret != EXIT_SUCCESS)
-               goto out;
+               goto load_inode_err;
 
        ret = read_vdi_obj(vdiname, vdi_cmd_data.snapshot_id,
                           vdi_cmd_data.snapshot_tag, NULL, to_inode,
                           SD_INODE_SIZE);
        if (ret != EXIT_SUCCESS)
-               goto out;
+               goto load_inode_err;
 
        nr_objs = count_data_objs(to_inode);
 
+       struct obj_backup *backup = xzalloc(sizeof(*backup));
+       object_size = (UINT32_C(1) << from_inode->block_size_shift);
+       backup->data = xzalloc(sizeof(uint8_t) * object_size);
+
        ret = xwrite(STDOUT_FILENO, &hdr, sizeof(hdr));
        if (ret < 0) {
                sd_err("failed to write backup header, %m");
                ret = EXIT_SYSFAIL;
-               goto out;
+               goto error;
        }
 
        for (idx = 0; idx < nr_objs; idx++) {
@@ -2238,9 +2302,10 @@ static int vdi_backup(int argc, char **argv)
                if (to_vid == 0 && from_vid == 0)
                        continue;
 
-               ret = get_obj_backup(idx, from_vid, to_vid, backup);
+               ret = get_obj_backup(idx, from_vid, to_vid,
+                                    backup, object_size);
                if (ret != EXIT_SUCCESS)
-                       goto out;
+                       goto error;
 
                if (backup->length == 0)
                        continue;
@@ -2250,14 +2315,14 @@ static int vdi_backup(int argc, char **argv)
                if (ret < 0) {
                        sd_err("failed to write backup data, %m");
                        ret = EXIT_SYSFAIL;
-                       goto out;
+                       goto error;
                }
                ret = xwrite(STDOUT_FILENO, backup->data + backup->offset,
                             backup->length);
                if (ret < 0) {
                        sd_err("failed to write backup data, %m");
                        ret = EXIT_SYSFAIL;
-                       goto out;
+                       goto error;
                }
        }
 
@@ -2269,15 +2334,18 @@ static int vdi_backup(int argc, char **argv)
        if (ret < 0) {
                sd_err("failed to write end marker, %m");
                ret = EXIT_SYSFAIL;
-               goto out;
+               goto error;
        }
 
        fsync(STDOUT_FILENO);
        ret = EXIT_SUCCESS;
-out:
+error:
+       free(backup->data);
+       free(backup);
+load_inode_err:
        free(from_inode);
        free(to_inode);
-       free(backup);
+out:
        return ret;
 }
 
@@ -2310,6 +2378,7 @@ static uint32_t do_restore(const char *vdiname, int 
snapid, const char *tag)
 {
        int ret;
        uint32_t vid;
+       uint32_t object_size;
        struct backup_hdr hdr;
        struct obj_backup *backup = xzalloc(sizeof(*backup));
        struct sd_inode *inode = xzalloc(sizeof(*inode));
@@ -2329,9 +2398,10 @@ static uint32_t do_restore(const char *vdiname, int 
snapid, const char *tag)
        if (ret != EXIT_SUCCESS)
                goto out;
 
+       object_size = (UINT32_C(1) << inode->block_size_shift);
        ret = do_vdi_create(vdiname, inode->vdi_size, inode->vdi_id, &vid,
                            false, inode->nr_copies, inode->copy_policy,
-                           inode->store_policy);
+                           inode->store_policy, object_size);
        if (ret != EXIT_SUCCESS) {
                sd_err("Failed to read VDI");
                goto out;
@@ -2435,12 +2505,15 @@ static int vdi_restore(int argc, char **argv)
 out:
        if (need_current_recovery) {
                int recovery_ret;
+               uint32_t object_size =
+                       (UINT32_C(1) << current_inode->block_size_shift);
                /* recreate the current vdi object */
                recovery_ret = do_vdi_create(vdiname, current_inode->vdi_size,
                                             current_inode->parent_vdi_id, NULL,
                                             true, current_inode->nr_copies,
                                             current_inode->copy_policy,
-                                            current_inode->store_policy);
+                                            current_inode->store_policy,
+                                            object_size);
                if (recovery_ret != EXIT_SUCCESS) {
                        sd_err("failed to resume the current vdi");
                        ret = recovery_ret;
@@ -2563,9 +2636,25 @@ static int vdi_cache_info(int argc, char **argv)
 
        fprintf(stdout, "Name\tTag\tTotal\tDirty\tClean\n");
        for (i = 0; i < info.count; i++) {
-               uint64_t total = info.caches[i].total * SD_DATA_OBJ_SIZE,
-                        dirty = info.caches[i].dirty * SD_DATA_OBJ_SIZE,
+               uint32_t object_size;
+               uint32_t vid = info.caches[i].vid;
+               struct sd_inode *inode = NULL;
+               int r;
+
+               r = dog_read_object(vid_to_vdi_oid(vid), inode,
+                                   SD_INODE_HEADER_SIZE, 0, true);
+               if (r != EXIT_SUCCESS)
+                       return r;
+
+               if (!inode->block_size_shift)
+                       return EXIT_FAILURE;
+
+               object_size = (UINT32_C(1) << inode->block_size_shift);
+
+               uint64_t total = info.caches[i].total * object_size,
+                        dirty = info.caches[i].dirty * object_size,
                         clean = total - dirty;
+
                char name[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
 
                ret = vid_to_name_tag(info.caches[i].vid, name, tag);
@@ -2955,7 +3044,7 @@ static struct subcommand vdi_cmd[] = {
        {"check", "<vdiname>", "seaphT", "check and repair image's consistency",
         NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
         vdi_check, vdi_options},
-       {"create", "<vdiname> <size>", "PycaphrvT", "create an image",
+       {"create", "<vdiname> <size>", "PycaphrvzT", "create an image",
         NULL, CMD_NEED_NODELIST|CMD_NEED_ARG,
         vdi_create, vdi_options},
        {"snapshot", "<vdiname>", "saphrvT", "create a snapshot",
@@ -3023,6 +3112,7 @@ static struct subcommand vdi_cmd[] = {
 static int vdi_parser(int ch, const char *opt)
 {
        char *p;
+       uint32_t object_size_shift_bit;
 
        switch (ch) {
        case 'P':
@@ -3101,6 +3191,20 @@ static int vdi_parser(int ch, const char *opt)
        case 'e':
                vdi_cmd_data.exist = true;
                break;
+       case 'z':
+               object_size_shift_bit = (uint32_t)atoi(opt);
+               if (object_size_shift_bit > 31) {
+                       sd_err("Object Size is limited to 2^31."
+                              " Please set shift bit lower than 31");
+                       exit(EXIT_FAILURE);
+               }
+               vdi_cmd_data.object_size =
+                               (UINT32_C(1) << object_size_shift_bit);
+               if (!vdi_cmd_data.object_size) {
+                       sd_err("Invalid parameter %s", opt);
+                       exit(EXIT_FAILURE);
+               }
+               break;
        }
 
        return 0;
diff --git a/include/fec.h b/include/fec.h
index 1ae32e4..b3ef8d8 100644
--- a/include/fec.h
+++ b/include/fec.h
@@ -96,12 +96,12 @@ void fec_encode(const struct fec *code,
                size_t num_block_nums, size_t sz);
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-                      char *buf, int idx);
+                      char *buf, int idx, uint32_t object_size);
 
 /* for isa-l */
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-                      char *buf, int idx);
+                      char *buf, int idx, uint32_t object_size);
 
 /*
  * @param inpkts an array of packets (size k); If a primary block, i, is 
present
@@ -119,7 +119,6 @@ void fec_decode(const struct fec *code,
 
 /* Set data stripe as sector size to make VM happy */
 #define SD_EC_DATA_STRIPE_SIZE (512) /* 512 Byte */
-#define SD_EC_NR_STRIPE_PER_OBJECT (SD_DATA_OBJ_SIZE / SD_EC_DATA_STRIPE_SIZE)
 #define SD_EC_MAX_STRIP (16)
 
 static inline int ec_policy_to_dp(uint8_t policy, int *d, int *p)
@@ -205,11 +204,12 @@ static inline void ec_destroy(struct fec *ctx)
 }
 
 static inline void ec_decode_buffer(struct fec *ctx, uint8_t *input[],
-                                   const int in_idx[], char *buf, int idx)
+                                   const int in_idx[], char *buf,
+                                   int idx, uint32_t object_size)
 {
        if (cpu_has_ssse3)
-               isa_decode_buffer(ctx, input, in_idx, buf, idx);
+               isa_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
        else
-               fec_decode_buffer(ctx, input, in_idx, buf, idx);
+               fec_decode_buffer(ctx, input, in_idx, buf, idx, object_size);
 }
 #endif
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index cbb65b6..5cdedf5 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -477,10 +477,11 @@ static inline bool is_data_obj(uint64_t oid)
 
 static inline size_t count_data_objs(const struct sd_inode *inode)
 {
-       return DIV_ROUND_UP(inode->vdi_size, SD_DATA_OBJ_SIZE);
+       return DIV_ROUND_UP(inode->vdi_size,
+                           (UINT32_C(1) << inode->block_size_shift));
 }
 
-static inline size_t get_objsize(uint64_t oid)
+static inline size_t get_objsize(uint64_t oid, uint32_t object_size)
 {
        if (is_vdi_obj(oid))
                return SD_INODE_SIZE;
@@ -494,7 +495,7 @@ static inline size_t get_objsize(uint64_t oid)
        if (is_ledger_object(oid))
                return SD_LEDGER_OBJ_SIZE;
 
-       return SD_DATA_OBJ_SIZE;
+       return object_size;
 }
 
 static inline uint64_t data_oid_to_idx(uint64_t oid)
diff --git a/lib/fec.c b/lib/fec.c
index c4e7a6f..fb40773 100644
--- a/lib/fec.c
+++ b/lib/fec.c
@@ -696,12 +696,13 @@ out:
 }
 
 void fec_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-                     char *buf, int idx)
+                     char *buf, int idx, uint32_t object_size)
 {
        int i, j, d = ctx->d;
        size_t strip_size = SD_EC_DATA_STRIPE_SIZE / d;
+       uint32_t nr_stripe_per_object = object_size / SD_EC_DATA_STRIPE_SIZE;
 
-       for (i = 0; i < SD_EC_NR_STRIPE_PER_OBJECT; i++) {
+       for (i = 0; i < nr_stripe_per_object; i++) {
                const uint8_t *in[d];
                uint8_t out[strip_size];
 
@@ -713,9 +714,9 @@ void fec_decode_buffer(struct fec *ctx, uint8_t *input[], 
const int in_idx[],
 }
 
 void isa_decode_buffer(struct fec *ctx, uint8_t *input[], const int in_idx[],
-                      char *buf, int idx)
+                      char *buf, int idx, uint32_t object_size)
 {
-       int ed = ctx->d, edp = ctx->dp, len = SD_DATA_OBJ_SIZE / ed, i;
+       int ed = ctx->d, edp = ctx->dp, len = object_size / ed, i;
        unsigned char ec_tbl[ed * edp * 32];
        unsigned char bm[ed * ed];
        unsigned char cm[ed];
diff --git a/sheep/gateway.c b/sheep/gateway.c
index 7f7d1d1..408660a 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -713,7 +713,7 @@ out:
 static int gateway_handle_cow(struct request *req)
 {
        uint64_t oid = req->rq.obj.oid;
-       size_t len = get_objsize(oid);
+       size_t len = get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
        struct sd_req hdr, *req_hdr = &req->rq;
        char *buf = xvalloc(len);
        int ret;
diff --git a/sheep/group.c b/sheep/group.c
index 2b98a9b..e379241 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -510,7 +510,7 @@ retry:
                if (vs[i].deleted)
                        atomic_set_bit(vs[i].vid, sys->vdi_deleted);
                add_vdi_state(vs[i].vid, vs[i].nr_copies, vs[i].snapshot,
-                             vs[i].copy_policy);
+                             vs[i].copy_policy, vs[i].object_size);
        }
 out:
        free(vs);
@@ -766,6 +766,7 @@ static void cinfo_collection_done(struct work *work)
                sd_debug("nr_copies: %d", vs->nr_copies);
                sd_debug("snapshot: %d", vs->snapshot);
                sd_debug("copy_policy: %d", vs->copy_policy);
+               sd_debug("object_size: %"PRIu32, vs->object_size);
                sd_debug("lock_state: %x", vs->lock_state);
                sd_debug("owner: %s",
                         addr_to_str(vs->lock_owner.addr, vs->lock_owner.port));
diff --git a/sheep/journal.c b/sheep/journal.c
index 5beabdf..4df9a74 100644
--- a/sheep/journal.c
+++ b/sheep/journal.c
@@ -137,6 +137,7 @@ static int replay_journal_entry(struct journal_descriptor 
*jd)
 {
        char path[PATH_MAX];
        ssize_t size;
+       uint32_t object_size = 0;
        int fd, flags = O_WRONLY, ret = 0;
        void *buf = NULL;
        char *p = (char *)jd;
@@ -168,9 +169,9 @@ static int replay_journal_entry(struct journal_descriptor 
*jd)
                sd_err("open %m");
                return -1;
        }
-
        if (jd->create) {
-               ret = prealloc(fd, get_objsize(jd->oid));
+               object_size = get_vdi_object_size(oid_to_vid(jd->oid));
+               ret = prealloc(fd, object_size);
                if (ret < 0)
                        goto out;
        }
diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index a0da92d..31eb003 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -126,7 +126,8 @@ static inline bool idx_has_vdi_bit(uint64_t idx)
 
 static inline size_t get_cache_block_size(uint64_t oid)
 {
-       size_t bsize = DIV_ROUND_UP(get_objsize(oid),
+       uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+       size_t bsize = DIV_ROUND_UP(get_objsize(oid, object_size),
                                    sizeof(uint64_t) * BITS_PER_BYTE);
 
        return round_up(bsize, BLOCK_SIZE); /* To be FS friendly */
@@ -457,6 +458,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, 
uint64_t bmap,
        void *buf;
        off_t offset;
        uint64_t oid = idx_to_oid(vid, idx);
+       uint32_t object_size = get_objsize(oid, get_vdi_object_size(vid));
        size_t data_length, bsize = get_cache_block_size(oid);
        int ret = SD_RES_NO_MEM;
        int first_bit, last_bit;
@@ -473,7 +475,7 @@ static int push_cache_object(uint32_t vid, uint64_t idx, 
uint64_t bmap,
                 oid, bsize, bmap, first_bit, last_bit);
        offset = first_bit * bsize;
        data_length = min((last_bit - first_bit + 1) * bsize,
-                         get_objsize(oid) - (size_t)offset);
+                         object_size - (size_t)offset);
 
        buf = xvalloc(data_length);
        ret = read_cache_object_noupdate(vid, idx, buf, data_length, offset);
@@ -517,6 +519,7 @@ static void do_reclaim_object(struct object_cache *oc)
        struct object_cache_entry *entry;
        uint64_t oid;
        uint32_t cap;
+       uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
        write_lock_cache(oc);
        list_for_each_entry(entry, &oc->lru_head, lru_list) {
@@ -539,7 +542,7 @@ static void do_reclaim_object(struct object_cache *oc)
                if (remove_cache_object(oc, entry_idx(entry)) != SD_RES_SUCCESS)
                        continue;
                free_cache_entry(entry);
-               cap = uatomic_sub_return(&gcache.capacity, CACHE_OBJECT_SIZE);
+               cap = uatomic_sub_return(&gcache.capacity, cache_object_size);
                sd_debug("%"PRIx64" reclaimed. capacity:%"PRId32, oid, cap);
                if (cap <= HIGH_WATERMARK)
                        break;
@@ -685,13 +688,14 @@ alloc_cache_entry(struct object_cache *oc, uint64_t idx)
 static void add_to_lru_cache(struct object_cache *oc, uint64_t idx, bool 
create)
 {
        struct object_cache_entry *entry = alloc_cache_entry(oc, idx);
+       uint32_t cache_object_size = get_vdi_object_size(oc->vid) / 1048576;
 
        sd_debug("oid %"PRIx64" added", idx_to_oid(oc->vid, idx));
 
        write_lock_cache(oc);
        if (unlikely(lru_tree_insert(&oc->lru_tree, entry)))
                panic("the object already exist");
-       uatomic_add(&gcache.capacity, CACHE_OBJECT_SIZE);
+       uatomic_add(&gcache.capacity, cache_object_size);
        list_add_tail(&entry->lru_list, &oc->lru_head);
        oc->total_count++;
        if (create) {
@@ -736,7 +740,8 @@ static int object_cache_lookup(struct object_cache *oc, 
uint64_t idx,
                ret = SD_RES_EIO;
                goto out;
        }
-       ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx)));
+       ret = prealloc(fd, get_objsize(idx_to_oid(oc->vid, idx),
+                                      get_vdi_object_size(oc->vid)));
        if (unlikely(ret < 0)) {
                ret = SD_RES_EIO;
                goto out_close;
@@ -804,7 +809,7 @@ static int object_cache_pull(struct object_cache *oc, 
uint64_t idx)
        struct sd_req hdr;
        int ret;
        uint64_t oid = idx_to_oid(oc->vid, idx);
-       uint32_t data_length = get_objsize(oid);
+       uint32_t data_length = get_objsize(oid, oc->vid);
        void *buf;
 
        buf = xvalloc(data_length);
@@ -939,11 +944,14 @@ void object_cache_delete(uint32_t vid)
        int h = hash(vid);
        struct object_cache_entry *entry;
        char path[PATH_MAX];
+       uint32_t cache_object_size;
 
        cache = find_object_cache(vid, false);
        if (!cache)
                return;
 
+       cache_object_size = get_vdi_object_size(cache->vid) / 1048576;
+
        /* Firstly we free memory */
        sd_write_lock(&hashtable_lock[h]);
        hlist_del(&cache->hash);
@@ -952,7 +960,7 @@ void object_cache_delete(uint32_t vid)
        write_lock_cache(cache);
        list_for_each_entry(entry, &cache->lru_head, lru_list) {
                free_cache_entry(entry);
-               uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+               uatomic_sub(&gcache.capacity, cache_object_size);
        }
        unlock_cache(cache);
        sd_destroy_rw_lock(&cache->lock);
@@ -1294,6 +1302,7 @@ int object_cache_remove(uint64_t oid)
        /* Inc the entry refcount to exclude the reclaimer */
        struct object_cache_entry *entry = oid_to_entry(oid);
        struct object_cache *oc;
+       uint32_t cache_object_size_mb;
        int ret;
 
        if (!entry)
@@ -1305,6 +1314,8 @@ int object_cache_remove(uint64_t oid)
        while (refcount_read(&entry->refcnt) > 1)
                usleep(100000); /* Object might be in push */
 
+       cache_object_size_mb = get_vdi_object_size(oc->vid) / 1048576;
+
        write_lock_cache(oc);
        /*
         * We assume no other thread will inc the refcount of this entry
@@ -1321,7 +1332,7 @@ int object_cache_remove(uint64_t oid)
        free_cache_entry(entry);
        unlock_cache(oc);
 
-       uatomic_sub(&gcache.capacity, CACHE_OBJECT_SIZE);
+       uatomic_sub(&gcache.capacity, cache_object_size_mb);
 
        return SD_RES_SUCCESS;
 }
diff --git a/sheep/ops.c b/sheep/ops.c
index 0c2389a..e5f4c4c 100644
--- a/sheep/ops.c
+++ b/sheep/ops.c
@@ -93,6 +93,7 @@ static int cluster_new_vdi(struct request *req)
                .copy_policy = hdr->vdi.copy_policy,
                .store_policy = hdr->vdi.store_policy,
                .nr_copies = hdr->vdi.copies,
+               .object_size = hdr->vdi.object_size,
                .time = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000,
        };
 
@@ -105,6 +106,9 @@ static int cluster_new_vdi(struct request *req)
        if (iocb.copy_policy)
                iocb.nr_copies = ec_policy_to_dp(iocb.copy_policy, NULL, NULL);
 
+       if (!hdr->vdi.object_size)
+               iocb.object_size = sys->cinfo.object_size;
+
        if (hdr->data_length != SD_MAX_VDI_LEN)
                return SD_RES_INVALID_PARMS;
 
@@ -115,6 +119,7 @@ static int cluster_new_vdi(struct request *req)
 
        rsp->vdi.vdi_id = vid;
        rsp->vdi.copies = iocb.nr_copies;
+       rsp->vdi.object_size = iocb.object_size;
 
        return ret;
 }
@@ -236,6 +241,7 @@ static int cluster_get_vdi_info(struct request *req)
 
        rsp->vdi.vdi_id = info.vid;
        rsp->vdi.copies = get_vdi_copy_number(info.vid);
+       rsp->vdi.object_size = get_vdi_object_size(info.vid);
 
        return ret;
 }
@@ -655,13 +661,14 @@ static int cluster_notify_vdi_add(const struct sd_req 
*req, struct sd_rsp *rsp,
                /* make the previous working vdi a snapshot */
                add_vdi_state(req->vdi_state.old_vid,
                              get_vdi_copy_number(req->vdi_state.old_vid),
-                             true, req->vdi_state.copy_policy);
+                             true, req->vdi_state.copy_policy,
+                             get_vdi_object_size(req->vdi_state.old_vid));
 
        if (req->vdi_state.set_bitmap)
                atomic_set_bit(req->vdi_state.new_vid, sys->vdi_inuse);
 
        add_vdi_state(req->vdi_state.new_vid, req->vdi_state.copies, false,
-                     req->vdi_state.copy_policy);
+                     req->vdi_state.copy_policy, req->vdi_state.object_size);
 
        return SD_RES_SUCCESS;
 }
@@ -759,9 +766,10 @@ static int cluster_alter_vdi_copy(const struct sd_req 
*req, struct sd_rsp *rsp,
 
        uint32_t vid = req->vdi_state.new_vid;
        int nr_copies = req->vdi_state.copies;
+       uint32_t object_size = req->vdi_state.object_size;
        struct vnode_info *vinfo;
 
-       add_vdi_state(vid, nr_copies, false, 0);
+       add_vdi_state(vid, nr_copies, false, 0, object_size);
 
        vinfo = get_vnode_info();
        start_recovery(vinfo, vinfo, false);
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
index 1b7b66c..e344189 100644
--- a/sheep/plain_store.c
+++ b/sheep/plain_store.c
@@ -152,7 +152,8 @@ static int default_trim(int fd, uint64_t oid, const struct 
siocb *iocb,
 
        if (*poffset + *plen < iocb->offset + iocb->length) {
                uint64_t end = iocb->offset + iocb->length;
-               if (end == get_objsize(oid))
+               uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+               if (end == get_objsize(oid, object_size))
                        /* This is necessary to punch the last block */
                        end = round_up(end, BLOCK_SIZE);
                sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
@@ -267,6 +268,7 @@ int default_cleanup(void)
 static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
 {
        int ret;
+       uint32_t object_size;
        struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
        struct siocb iocb = {
                .epoch = epoch,
@@ -280,9 +282,9 @@ static int init_vdi_state(uint64_t oid, const char *wd, 
uint32_t epoch)
                       "wat %s", oid, epoch, wd);
                goto out;
        }
-
+       object_size = (UINT32_C(1) << inode->block_size_shift);
        add_vdi_state(oid_to_vid(oid), inode->nr_copies,
-                     vdi_is_snapshot(inode), inode->copy_policy);
+                     vdi_is_snapshot(inode), inode->copy_policy, object_size);
 
        if (inode->name[0] == '\0')
                atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
@@ -402,9 +404,9 @@ size_t get_store_objsize(uint64_t oid)
                uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
                int d;
                ec_policy_to_dp(policy, &d, NULL);
-               return SD_DATA_OBJ_SIZE / d;
+               return get_vdi_object_size(oid_to_vid(oid)) / d;
        }
-       return get_objsize(oid);
+       return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
 }
 
 int default_create_and_write(uint64_t oid, const struct siocb *iocb)
@@ -413,6 +415,7 @@ int default_create_and_write(uint64_t oid, const struct 
siocb *iocb)
        int flags = prepare_iocb(oid, iocb, true);
        int ret, fd;
        uint32_t len = iocb->length;
+       uint32_t object_size = 0;
        size_t obj_size;
        uint64_t offset = iocb->offset;
 
@@ -452,7 +455,9 @@ int default_create_and_write(uint64_t oid, const struct 
siocb *iocb)
 
        trim_zero_blocks(iocb->buf, &offset, &len);
 
-       if (offset != 0 || len != get_objsize(oid)) {
+       object_size = get_vdi_object_size(oid_to_vid(oid));
+
+       if (offset != 0 || len != get_objsize(oid, object_size)) {
                if (is_sparse_object(oid))
                        ret = xftruncate(fd, obj_size);
                else
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 7874fc9..9bf2d9c 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -429,6 +429,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t 
idx,
        char *lost = xvalloc(len);
        int i, j;
        uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
+       uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
        int ed = 0, edp;
        edp = ec_policy_to_dp(policy, &ed, NULL);
        struct fec *ctx = ec_init(ed, edp);
@@ -458,7 +459,7 @@ static void *rebuild_erasure_object(uint64_t oid, uint8_t 
idx,
        }
 
        /* Rebuild the lost replica */
-       ec_decode_buffer(ctx, bufs, idxs, lost, idx);
+       ec_decode_buffer(ctx, bufs, idxs, lost, idx, object_size);
 out:
        ec_destroy(ctx);
        for (i = 0; i < ed; i++)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 5fc6b90..37946d1 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -219,6 +219,7 @@ struct vdi_iocb {
        uint8_t copy_policy;
        uint8_t store_policy;
        uint8_t nr_copies;
+       uint32_t object_size;
        uint64_t time;
 };
 
@@ -326,9 +327,12 @@ int fill_vdi_state_list(const struct sd_req *hdr,
 bool oid_is_readonly(uint64_t oid);
 int get_vdi_copy_number(uint32_t vid);
 int get_vdi_copy_policy(uint32_t vid);
+uint32_t get_vdi_object_size(uint32_t vid);
 int get_obj_copy_number(uint64_t oid, int nr_zones);
 int get_req_copy_number(struct request *req);
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t);
+uint32_t get_req_object_size(struct request *req);
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+                 uint8_t, uint32_t object_size);
 int vdi_exist(uint32_t vid);
 int vdi_create(const struct vdi_iocb *iocb, uint32_t *new_vid);
 int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t *new_vid);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index 1c8fb36..95b3230 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -14,6 +14,7 @@
 struct vdi_state_entry {
        uint32_t vid;
        unsigned int nr_copies;
+       uint32_t object_size;
        bool snapshot;
        bool deleted;
        uint8_t copy_policy;
@@ -132,6 +133,23 @@ int get_vdi_copy_policy(uint32_t vid)
        return entry->copy_policy;
 }
 
+uint32_t get_vdi_object_size(uint32_t vid)
+{
+       struct vdi_state_entry *entry;
+
+       sd_read_lock(&vdi_state_lock);
+       entry = vdi_state_search(&vdi_state_root, vid);
+       sd_rw_unlock(&vdi_state_lock);
+
+       if (!entry) {
+               sd_alert("copy number for %" PRIx32 " not found, set %" PRIx32,
+                        vid, sys->cinfo.object_size);
+               return sys->cinfo.object_size;
+       }
+
+       return entry->object_size;
+}
+
 int get_obj_copy_number(uint64_t oid, int nr_zones)
 {
        return min(get_vdi_copy_number(oid_to_vid(oid)), nr_zones);
@@ -149,7 +167,19 @@ int get_req_copy_number(struct request *req)
        return nr_copies;
 }
 
-int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot, uint8_t cp)
+uint32_t get_req_object_size(struct request *req)
+{
+       uint32_t object_size;
+
+       object_size = req->rq.data_length;
+       if (!object_size)
+               object_size = get_vdi_object_size(oid_to_vid(req->rq.obj.oid));
+
+       return object_size;
+}
+
+int add_vdi_state(uint32_t vid, int nr_copies, bool snapshot,
+                 uint8_t cp, uint32_t object_size)
 {
        struct vdi_state_entry *entry, *old;
 
@@ -158,6 +188,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool 
snapshot, uint8_t cp)
        entry->nr_copies = nr_copies;
        entry->snapshot = snapshot;
        entry->copy_policy = cp;
+       entry->object_size = object_size;
 
        entry->lock_state = LOCK_STATE_UNLOCKED;
        memset(&entry->owner, 0, sizeof(struct node_id));
@@ -173,7 +204,8 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool 
snapshot, uint8_t cp)
                sd_mutex_unlock(&m);
        }
 
-       sd_debug("%" PRIx32 ", %d, %d", vid, nr_copies, cp);
+       sd_debug("%" PRIx32 ", %d, %d, %"PRIu32,
+                vid, nr_copies, cp, object_size);
 
        sd_write_lock(&vdi_state_lock);
        old = vdi_state_insert(&vdi_state_root, entry);
@@ -183,6 +215,7 @@ int add_vdi_state(uint32_t vid, int nr_copies, bool 
snapshot, uint8_t cp)
                entry->nr_copies = nr_copies;
                entry->snapshot = snapshot;
                entry->copy_policy = cp;
+               entry->object_size = object_size;
        }
 
        sd_rw_unlock(&vdi_state_lock);
@@ -209,6 +242,7 @@ int fill_vdi_state_list(const struct sd_req *hdr,
                vs[last].nr_copies = entry->nr_copies;
                vs[last].snapshot = entry->snapshot;
                vs[last].copy_policy = entry->copy_policy;
+               vs[last].object_size = entry->object_size;
                vs[last].lock_state = entry->lock_state;
                vs[last].lock_owner = entry->owner;
                vs[last].nr_participants = entry->nr_participants;
@@ -251,6 +285,7 @@ static struct vdi_state *fill_vdi_state_list_with_alloc(int 
*result_nr)
                vs[i].snapshot = entry->snapshot;
                vs[i].deleted = entry->deleted;
                vs[i].copy_policy = entry->copy_policy;
+               vs[i].object_size = entry->object_size;
                vs[i].lock_state = entry->lock_state;
                vs[i].lock_owner = entry->owner;
                vs[i].nr_participants = entry->nr_participants;
@@ -861,7 +896,7 @@ static struct sd_inode *alloc_inode(const struct vdi_iocb 
*iocb,
                                    struct generation_reference *gref)
 {
        struct sd_inode *new = xzalloc(sizeof(*new));
-       unsigned long block_size = SD_DATA_OBJ_SIZE;
+       unsigned long block_size = iocb->object_size;
 
        pstrcpy(new->name, sizeof(new->name), iocb->name);
        new->vdi_id = new_vid;
@@ -903,9 +938,10 @@ static int create_vdi(const struct vdi_iocb *iocb, 
uint32_t new_snapid,
        int ret;
 
        sd_debug("%s: size %" PRIu64 ", new_vid %" PRIx32 ", copies %d, "
-                "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8,
-                iocb->name, iocb->size, new_vid, iocb->nr_copies, new_snapid,
-                new->copy_policy, new->store_policy);
+                "snapid %" PRIu32 " copy policy %"PRIu8 "store policy %"PRIu8
+                "object_size %"PRIu32, iocb->name, iocb->size, new_vid,
+                 iocb->nr_copies, new_snapid, new->copy_policy,
+                 new->store_policy, iocb->object_size);
 
        ret = sd_write_object(vid_to_vdi_oid(new_vid), (char *)new,
                              sizeof(*new), 0, true);
@@ -940,8 +976,9 @@ static int clone_vdi(const struct vdi_iocb *iocb, uint32_t 
new_snapid,
        int ret;
 
        sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-                "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-                base_vid, iocb->nr_copies, new_snapid);
+                "copies %d, object_size %" PRIu32 ", snapid %" PRIu32,
+                iocb->name, iocb->size, new_vid, base_vid,
+                iocb->nr_copies, iocb->object_size, new_snapid);
 
        ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
                             sizeof(*base), 0);
@@ -1002,8 +1039,9 @@ static int snapshot_vdi(const struct vdi_iocb *iocb, 
uint32_t new_snapid,
        int ret;
 
        sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-                "copies %d, snapid %" PRIu32, iocb->name, iocb->size, new_vid,
-                base_vid, iocb->nr_copies, new_snapid);
+                "copies %d, object_size %"PRIu32 ", snapid %" PRIu32,
+                iocb->name, iocb->size, new_vid, base_vid,
+                iocb->nr_copies, iocb->object_size, new_snapid);
 
        ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
                             sizeof(*base), 0);
@@ -1071,8 +1109,9 @@ static int rebase_vdi(const struct vdi_iocb *iocb, 
uint32_t new_snapid,
        int ret;
 
        sd_debug("%s: size %" PRIu64 ", vid %" PRIx32 ", base %" PRIx32 ", "
-                "cur %" PRIx32 ", copies %d, snapid %" PRIu32, iocb->name,
-                iocb->size, new_vid, base_vid, cur_vid, iocb->nr_copies,
+                "cur %" PRIx32 ", copies %d, object_size %"PRIu32
+                ", snapid %" PRIu32, iocb->name, iocb->size, new_vid,
+                base_vid, cur_vid, iocb->nr_copies, iocb->object_size,
                 new_snapid);
 
        ret = sd_read_object(vid_to_vdi_oid(base_vid), (char *)base,
@@ -1260,7 +1299,7 @@ int vdi_lookup(const struct vdi_iocb *iocb, struct 
vdi_info *info)
 }
 
 static int notify_vdi_add(uint32_t vdi_id, uint32_t nr_copies, uint32_t 
old_vid,
-                         uint8_t copy_policy)
+                         uint8_t copy_policy, uint32_t object_size)
 {
        int ret;
        struct sd_req hdr;
@@ -1271,11 +1310,13 @@ static int notify_vdi_add(uint32_t vdi_id, uint32_t 
nr_copies, uint32_t old_vid,
        hdr.vdi_state.copies = nr_copies;
        hdr.vdi_state.set_bitmap = false;
        hdr.vdi_state.copy_policy = copy_policy;
+       hdr.vdi_state.object_size = object_size;
 
        ret = exec_local_req(&hdr, NULL);
        if (ret != SD_RES_SUCCESS)
                sd_err("fail to notify vdi add event(%" PRIx32 ", %d, %" PRIx32
-                      ")", vdi_id, nr_copies, old_vid);
+                      ", %"PRIu32 ")", vdi_id, nr_copies,
+                      old_vid, object_size);
 
        return ret;
 }
@@ -1326,7 +1367,7 @@ int vdi_create(const struct vdi_iocb *iocb, uint32_t 
*new_vid)
                info.snapid = 1;
        *new_vid = info.free_bit;
        ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-                            iocb->copy_policy);
+                            iocb->copy_policy, iocb->object_size);
        if (ret != SD_RES_SUCCESS)
                return ret;
 
@@ -1366,7 +1407,7 @@ int vdi_snapshot(const struct vdi_iocb *iocb, uint32_t 
*new_vid)
        assert(info.snapid > 0);
        *new_vid = info.free_bit;
        ret = notify_vdi_add(*new_vid, iocb->nr_copies, info.vid,
-                            iocb->copy_policy);
+                            iocb->copy_policy, iocb->object_size);
        if (ret != SD_RES_SUCCESS)
                return ret;
 
@@ -1745,6 +1786,15 @@ int sd_create_hyper_volume(const char *name, uint32_t 
*vdi_id)
        hdr.vdi.copies = sys->cinfo.nr_copies;
        hdr.vdi.copy_policy = sys->cinfo.copy_policy;
        hdr.vdi.store_policy = 1;
+       /* XXX Cannot use both features, Hypervolume and Change object size */
+       if (sys->cinfo.object_size != SD_DATA_OBJ_SIZE) {
+               hdr.vdi.object_size = SD_DATA_OBJ_SIZE;
+               sd_warn("Cluster default object size is not"
+                       " SD_DATA_OBJ_SIZE(%lu)."
+                       "Set VDI object size %lu and create HyperVolume",
+                       SD_DATA_OBJ_SIZE, SD_DATA_OBJ_SIZE);
+       }
+
 
        ret = exec_local_req(&hdr, buf);
        if (ret != SD_RES_SUCCESS) {
diff --git a/tests/unit/sheep/test_vdi.c b/tests/unit/sheep/test_vdi.c
index 2f8946b..132caf5 100644
--- a/tests/unit/sheep/test_vdi.c
+++ b/tests/unit/sheep/test_vdi.c
@@ -17,9 +17,9 @@
 
 START_TEST(test_vdi)
 {
-       add_vdi_state(1, 1, true, 0);
-       add_vdi_state(2, 1, true, 0);
-       add_vdi_state(3, 2, false, 0);
+       add_vdi_state(1, 1, true, 0, 4194304);
+       add_vdi_state(2, 1, true, 0, 4194304);
+       add_vdi_state(3, 2, false, 0, 4194304);
 
        ck_assert_int_eq(get_vdi_copy_number(1), 1);
        ck_assert_int_eq(get_vdi_copy_number(2), 1);
-- 
1.7.1

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to