1. add sd_extent_header to manage meta-data in data_vdi_id[] or middle-node 2. add new type of object: B-tree object as middle-node
Signed-off-by: Robin Dong <san...@taobao.com> --- dog/vdi.c | 2 +- include/sheepdog_proto.h | 43 ++++- lib/sd_inode.c | 519 +++++++++++++++++++++++++++++++++++++++++++++- sheep/vdi.c | 1 + sheepfs/volume.c | 2 +- 5 files changed, 558 insertions(+), 9 deletions(-) diff --git a/dog/vdi.c b/dog/vdi.c index faf06f0..960e2a0 100644 --- a/dog/vdi.c +++ b/dog/vdi.c @@ -558,7 +558,7 @@ static int vdi_create(int argc, char **argv) goto out; } - sd_inode_set_vdi(inode, idx, vid); + INODE_SET_VDI(inode, idx, vid); ret = sd_write_object(vid_to_vdi_oid(vid), 0, &vid, sizeof(vid), SD_INODE_HEADER_SIZE + sizeof(vid) * idx, 0, inode->nr_copies, inode->copy_policy, diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h index 30ff397..c338efa 100644 --- a/include/sheepdog_proto.h +++ b/include/sheepdog_proto.h @@ -74,6 +74,9 @@ #define SD_RES_JOIN_FAILED 0x18 /* Target node had failed to join sheepdog */ #define SD_RES_HALT 0x19 /* Sheepdog is stopped doing IO */ #define SD_RES_READONLY 0x1A /* Object is read-only */ +#define SD_RES_BTREE_NOT_FOUND 0x1B /* Cannot found node in btree */ +#define SD_RES_BTREE_FOUND 0x1C /* Found node in btree */ +#define SD_RES_BTREE_REPEAT 0x1D /* Should repeat op in btree */ /* errors above 0x80 are sheepdog-internal */ @@ -92,8 +95,9 @@ #define VDI_BIT (UINT64_C(1) << 63) #define VMSTATE_BIT (UINT64_C(1) << 62) #define VDI_ATTR_BIT (UINT64_C(1) << 61) +#define VDI_BTREE_BIT (UINT64_C(1) << 60) #define MAX_DATA_OBJS (1ULL << 20) -#define MAX_CHILDREN 1024U +#define MAX_CHILDREN (1024U - 1) /* we use the last uint32_t as btree_counter */ #define SD_MAX_VDI_LEN 256U #define SD_MAX_VDI_TAG_LEN 256U #define SD_MAX_VDI_ATTR_KEY_LEN 256U @@ -104,8 +108,8 @@ #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS) #define SD_INODE_SIZE (sizeof(struct sd_inode)) -#define SD_INODE_HEADER_SIZE (sizeof(struct sd_inode) - \ - sizeof(uint32_t) * MAX_DATA_OBJS) +#define SD_INODE_INDEX_SIZE (sizeof(uint32_t) * MAX_DATA_OBJS) +#define SD_INODE_HEADER_SIZE (sizeof(struct sd_inode) - SD_INODE_INDEX_SIZE) #define SD_ATTR_OBJ_SIZE (sizeof(struct sheepdog_vdi_attr)) #define CURRENT_VDI_ID 0 @@ -215,16 +219,35 @@ struct sd_inode { uint64_t vdi_size; uint64_t vm_state_size; uint8_t copy_policy; - uint8_t reserved; + uint8_t store_policy; uint8_t nr_copies; uint8_t block_size_shift; uint32_t snap_id; uint32_t vdi_id; uint32_t parent_vdi_id; uint32_t child_vdi_id[MAX_CHILDREN]; + uint32_t btree_counter; uint32_t data_vdi_id[MAX_DATA_OBJS]; }; +struct sd_extent { + int idx; + uint32_t vdi_id; +}; + +struct sd_extent_idx { + int idx; + uint64_t oid; +}; + +#define INODE_BTREE_MAGIC 0x6274 + +struct sd_extent_header { + uint16_t magic; + uint16_t depth; /* 1 -- ext node; 2 -- idx node */ + uint32_t entries; +}; + typedef int (*write_node_fn)(uint64_t id, void *mem, unsigned int len, int copies, int copy_policy, int create); typedef int (*read_node_fn)(uint64_t id, void **mem, unsigned int len); @@ -347,10 +370,15 @@ static inline bool is_vdi_attr_obj(uint64_t oid) return !!(oid & VDI_ATTR_BIT); } +static inline bool is_vdi_btree_obj(uint64_t oid) +{ + return !!(oid & VDI_BTREE_BIT); +} + static inline bool is_data_obj(uint64_t oid) { return !is_vdi_obj(oid) && !is_vmstate_obj(oid) && - !is_vdi_attr_obj(oid); + !is_vdi_attr_obj(oid) && !is_vdi_btree_obj(oid); } static inline size_t count_data_objs(const struct sd_inode *inode) @@ -394,6 +422,11 @@ static inline uint64_t vid_to_attr_oid(uint32_t vid, uint32_t attrid) return ((uint64_t)vid << VDI_SPACE_SHIFT) | VDI_ATTR_BIT | attrid; } +static inline uint64_t vid_to_btree_oid(uint32_t vid, uint32_t btreeid) +{ + return ((uint64_t)vid << VDI_SPACE_SHIFT) | VDI_BTREE_BIT | btreeid; +} + static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx) { return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx; diff --git a/lib/sd_inode.c b/lib/sd_inode.c index c03ca03..426e00c 100644 --- a/lib/sd_inode.c +++ b/lib/sd_inode.c @@ -1,15 +1,530 @@ +/* + * B-tree is a tree data structure that keeps data sorted and allows searches, + * sequential access, insertions, and deletions in logarithmic time. + * The B-tree is a generalization of a binary search tree in that a node can + * have more than two children. (Comer 1979, p. 123) Unlike self-balancing + * binary search trees, the B-tree is optimized for systems that read and + * write large blocks of data. (ref: http://en.wikipedia.org/wiki/B-tree) + * + * In sheepdog, we use space in inode->data_vdi_id[] to store leaf-node at + * beginning and store root-node of B-tree when it reach depths of two. + * + * At beginning, the inode->data_vdi_id[] is storing leaf-node which point + * to data-obj directly: + * + * +------------------+-----------+-----------+--------+ + * | sd_extent_header | sd_extent | sd_extent | ...... | + * +------------------+-----------+-----------+--------+ + * | | + * / \ + * / \ + * / \ + * +------------+ <------ ----> +------------+ + * | data-obj 1 | | data-obj 2 | + * +------------+ +------------+ + * + * After adding more oid into it, the leaf-node will be full of struct sd_extent + * and should be splited to two leaf-nodes, after it, the inode->data_vdi_id[] + * should become root-node which store sd_extent_idx and point to the two + * leaf-nodes: + * + * +------------------+-----------------+-----------------+ + * | sd_extent_header | sd_extent_idx | sd_extent_idx | + * +------------------+-----------------+-----------------+ + * | | + * / \ + * / ------------- + * / \ + * / \ + * / \ + * +------------------+-----------+-----------+--------+ +------------------+-----------+-----------+--------+ + * | sd_extent_header | sd_extent | sd_extent | ...... | | sd_extent_header | sd_extent | sd_extent | ...... | + * +------------------+-----------+-----------+--------+ +------------------+-----------+-----------+--------+ + * / \ / \ + * +------------+ <------ ---> +------------+ +--------------+ <-- --> +--------------+ + * | data-obj 1 | | data-obj 2 | | data-obj 511 | | data-obj 512 | + * +------------+ +------------+ +--------------+ +--------------+ + * + * When a leaf-node is full, we could add a new leaf-node and add a + * new sd_extent_idx in root-node to point to it: + * + * +------------------+-----------------+-----------------+---------------+ + * | sd_extent_header | sd_extent_idx | sd_extent_idx | sd_extent_idx | + * +------------------+-----------------+-----------------+---------------+ + * | | \ + * / \ \ (new leaf-node) + * / --------- ------ +------------------+-----------+--------+ + * / \ | sd_extent_header | sd_extent | ...... | + * / \ +------------------+-----------+--------+ + * / \ + * +------------------+-----------+--------+ +------------------+-----------+--------+ + * | sd_extent_header | sd_extent | ...... | | sd_extent_header | sd_extent | ...... | + * +------------------+-----------+--------+ +------------------+-----------+--------+ + * + * + * As above, the root-node point to leaf-node which point to data-obj + * (the implemention of B-tree in sd_inode only support two depth), so it could + * store: + * + * (number of sd_extent_idx in root-node) * (number of sd_extent in leaf-node) + * + * which is 349524 * 524287 = 183250889388 data-objects (about 680 PB with 4MB data-objs). + * + */ #include <string.h> +#include "util.h" #include "sheepdog_proto.h" +#define EXT_MAX_SPACE (SD_INODE_INDEX_SIZE - sizeof(struct sd_extent_header)) +#define EXT_MAX_ENTRIES (EXT_MAX_SPACE / sizeof(struct sd_extent)) +#define EXT_IDX_MAX_ENTRIES (EXT_MAX_SPACE / sizeof(struct sd_extent_idx)) + +#define EXT_HEADER(data) ((struct sd_extent_header *)(data)) +#define FIRST_EXT(data) ((struct sd_extent *)((char *)(data) + \ + sizeof(struct sd_extent_header))) +#define LAST_EXT(data) (FIRST_EXT(data) + EXT_HEADER(data)->entries) +#define OFFSET_EXT(data, n) ((char *)(data) + sizeof(struct sd_extent_header) \ + + n * sizeof(struct sd_extent)) + +#define EXT_MAX_IDXS (EXT_MAX_SPACE / sizeof(struct sd_extent_idx)) +#define FIRST_IDX(data) ((struct sd_extent_idx *)((char *)(data) + \ + sizeof(struct sd_extent_header))) +#define LAST_IDX(data) (FIRST_IDX(data) + EXT_HEADER(data)->entries) +#define OFFSET_IDX(data, n) ((char *)(data) + sizeof(struct sd_extent_header) \ + + n * sizeof(struct sd_extent_idx)) + +struct find_path { + struct sd_extent_idx *p_idx; + struct sd_extent *p_ext; + struct sd_extent_header *p_ext_header; + int depth; +}; + +typedef int (*comp)(void *a, void *b); + +static int extent_comp(void *a, void *b) +{ + struct sd_extent *ea = (struct sd_extent *)a; + struct sd_extent *eb = (struct sd_extent *)b; + + return ea->idx - eb->idx; +} + +static int index_comp(void *a, void *b) +{ + struct sd_extent_idx *ia = (struct sd_extent_idx *)a; + struct sd_extent_idx *ib = (struct sd_extent_idx *)b; + + return ia->idx - ib->idx; +} + +static void dump_btree(read_node_fn reader, struct sd_inode *inode) +{ +#ifdef DEBUG + struct sd_extent_header *header = EXT_HEADER(inode->data_vdi_id); + struct sd_extent_header *leaf_node = NULL; + struct sd_extent *last, *itor; + struct sd_extent_idx *last_idx, *itor_idx; + void *tmp; + + sd_info("btree> header: %u %u %u", header->magic, + header->entries, header->depth); + + if (header->depth == 1) { + last = LAST_EXT(inode->data_vdi_id); + itor = FIRST_EXT(inode->data_vdi_id); + + while (itor != last) { + sd_info("btree> ext: %d, %u", itor->idx, itor->vdi_id); + itor++; + } + } else if (header->depth == 2) { + last_idx = LAST_IDX(inode->data_vdi_id); + itor_idx = FIRST_IDX(inode->data_vdi_id); + leaf_node = xmalloc(SD_INODE_INDEX_SIZE); + tmp = (void *)leaf_node; + + while (itor_idx != last_idx) { + reader(itor_idx->oid, &tmp, SD_INODE_INDEX_SIZE); + + sd_info("btree> %p idx: %d, %lu, %u", + itor_idx, itor_idx->idx, itor_idx->oid, + leaf_node->entries); + last = LAST_EXT(leaf_node); + itor = FIRST_EXT(leaf_node); + while (itor != last) { + sd_info("btree> ext in: %d, %u", + itor->idx, itor->vdi_id); + itor++; + } + itor_idx++; + } + + free(leaf_node); + } else + assert(0); +#endif +} + +static void *binary_search(void *first, void *last, void *key, + size_t obj_size, comp cmp) +{ + const char *l, *r, *m; + int ret; + + l = (const char *)first; + r = (const char *)last - obj_size; + while (l <= r) { + m = l + ((r - l) / obj_size / 2) * obj_size; + ret = cmp((void *)key, (void *)m); + if (ret < 0) + r = m - obj_size; + else if (ret > 0) + l = m + obj_size; + else + return (void *)m; + } + return (void *)l; +} + +static void sd_inode_init(void *data, int depth) +{ + struct sd_extent_header *header = EXT_HEADER(data); + header->magic = INODE_BTREE_MAGIC; + header->depth = depth; + header->entries = 0; +} + +static bool ext_in_range(struct sd_extent_header *header, struct sd_extent *ext) +{ + struct sd_extent *last = LAST_EXT(header); + if (last - ext > 0) + return true; + return false; +} + +static bool idx_in_range(struct sd_extent_header *header, + struct sd_extent_idx *idx) +{ + struct sd_extent_idx *last = LAST_IDX(header); + if (last - idx > 0) + return true; + return false; +} + +static struct sd_extent *search_ext_entry(struct sd_extent_header *header, + int idx) +{ + struct sd_extent tmp; + tmp.idx = idx; + return binary_search(FIRST_EXT(header), LAST_EXT(header), &tmp, + sizeof(struct sd_extent), extent_comp); +} + +static struct sd_extent_idx *search_idx_entry(struct sd_extent_header *header, + int idx) +{ + struct sd_extent_idx tmp; + tmp.idx = idx; + return binary_search(FIRST_IDX(header), LAST_IDX(header), &tmp, + sizeof(struct sd_extent_idx), index_comp); +} + +static void insert_ext_entry_nosearch(struct sd_extent_header *header, + struct sd_extent *ext, int idx, uint32_t vdi_id) +{ + struct sd_extent *last = LAST_EXT(header); + + memmove(ext + 1, ext, (last - ext) * sizeof(struct sd_extent)); + ext->idx = idx; + ext->vdi_id = vdi_id; + header->entries++; +} + +static void insert_idx_entry_nosearch(struct sd_extent_header *header, + struct sd_extent_idx *idx_ext, int idx, uint64_t oid) +{ + struct sd_extent_idx *last = LAST_IDX(header); + memmove(idx_ext + 1, idx_ext, + (last - idx_ext) * sizeof(struct sd_extent_idx)); + idx_ext->idx = idx; + idx_ext->oid = oid; + header->entries++; +} + +static void insert_idx_entry(struct sd_extent_header *header, + int idx, uint64_t oid) +{ + struct sd_extent_idx *found; + + if (header->entries >= EXT_MAX_IDXS) + goto out; + + if (!header->entries) { + FIRST_IDX(header)->idx = idx; + FIRST_IDX(header)->oid = oid; + header->entries++; + goto out; + } + + found = search_idx_entry(header, idx); + insert_idx_entry_nosearch(header, found, idx, oid); +out: + return; +} + +static void split_to_nodes(struct sd_extent_header *src, + struct sd_extent_header *left, + struct sd_extent_header *right, int num) +{ + memcpy(left, src, sizeof(struct sd_extent_header) + + num * sizeof(struct sd_extent)); + left->entries = num; + + mempcpy(right, src, sizeof(struct sd_extent_header)); + mempcpy(FIRST_EXT(right), OFFSET_EXT(src, num), + (src->entries - num) * sizeof(struct sd_extent)); + right->entries = src->entries - num; +} + +static void transfer_to_idx_root(write_node_fn writer, struct sd_inode *inode) +{ + struct sd_extent_header *left; + struct sd_extent_header *right; + struct sd_extent_header *root = EXT_HEADER(inode->data_vdi_id); + uint64_t left_oid, right_oid; + uint32_t num = root->entries / 2; + + /* create two leaf-node and copy the entries from root-node */ + left = xmalloc(SD_INODE_INDEX_SIZE); + right = xmalloc(SD_INODE_INDEX_SIZE); + + split_to_nodes(root, left, right, num); + + /* write two nodes back */ + left_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++); + right_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++); + + writer(left_oid, left, SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 1); + writer(right_oid, right, SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 1); + + /* change root from ext-node to idx-node */ + root->entries = 0; + root->depth = 2; + insert_idx_entry(root, (LAST_EXT(left) - 1)->idx, left_oid); + insert_idx_entry(root, (LAST_EXT(right) - 1)->idx, right_oid); + + free(left); + free(right); +} + +static int search_whole_btree(read_node_fn reader, const struct sd_inode *inode, + int idx, struct find_path *path) +{ + struct sd_extent_header *header, *leaf_node; + void *tmp; + uint64_t oid; + int ret = SD_RES_BTREE_NOT_FOUND; + + header = EXT_HEADER(inode->data_vdi_id); + + /* root is idx-node */ + if (header->depth == 2) { + path->depth = 2; + path->p_idx = search_idx_entry(header, idx); + leaf_node = xmalloc(SD_INODE_INDEX_SIZE); + tmp = (void *)leaf_node; + + if (idx_in_range(header, path->p_idx)) { + oid = path->p_idx->oid; + ret = reader(oid, &tmp, SD_INODE_INDEX_SIZE); + if (ret != SD_RES_SUCCESS) + goto out; + path->p_ext = search_ext_entry(leaf_node, idx); + path->p_ext_header = leaf_node; + if (ext_in_range(leaf_node, path->p_ext) && + path->p_ext->idx == idx) + ret = SD_RES_BTREE_FOUND; + } else { + /* check if last idx-node has space */ + oid = (path->p_idx - 1)->oid; + ret = reader(oid, &tmp, SD_INODE_INDEX_SIZE); + if (ret != SD_RES_SUCCESS) + goto out; + if (leaf_node->entries < EXT_MAX_ENTRIES) { + path->p_ext = search_ext_entry(leaf_node, idx); + path->p_ext_header = leaf_node; + } + } + } else if (header->depth == 1) { + path->depth = 1; + path->p_ext = search_ext_entry(header, idx); + if (ext_in_range(header, path->p_ext) && + path->p_ext->idx == idx) + ret = SD_RES_BTREE_FOUND; + else + ret = SD_RES_BTREE_NOT_FOUND; + } +out: + return ret; +} + uint32_t sd_inode_get_vdi(read_node_fn reader, const struct sd_inode *inode, int idx) { - return inode->data_vdi_id[idx]; + struct find_path path; + int ret; + + if (inode->store_policy == 0) + return inode->data_vdi_id[idx]; + else { + /* btree is not init, so vdi is 0 */ + if (inode->data_vdi_id[0] == 0) + return 0; + + memset(&path, 0, sizeof(path)); + ret = search_whole_btree(reader, inode, idx, &path); + if (ret == SD_RES_BTREE_FOUND) + return path.p_ext->vdi_id; + if (path.p_ext_header) + free(path.p_ext_header); + } + + return 0; +} + +static void split_ext_node(write_node_fn writer, struct sd_inode *inode, + struct find_path *path) +{ + struct sd_extent_header *old = path->p_ext_header, *new_ext; + uint32_t num = old->entries / 2; + uint64_t new_oid; + + new_ext = xmalloc(SD_INODE_INDEX_SIZE); + + split_to_nodes(old, new_ext, old, num); + + new_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++); + writer(new_oid, new_ext, SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 1); + writer(path->p_idx->oid, old, SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 0); + + /* write new index */ + insert_idx_entry(EXT_HEADER(inode->data_vdi_id), + LAST_EXT(new_ext)->idx, new_oid); + + free(new_ext); +} + +static int insert_new_node(write_node_fn writer, read_node_fn reader, + struct sd_inode *inode, struct find_path *path, + int idx, uint32_t vdi_id) +{ + struct sd_extent_header *header = EXT_HEADER(inode->data_vdi_id); + struct sd_extent_header *leaf_node = NULL; + uint64_t oid; + int ret = SD_RES_SUCCESS; + + if (path->depth == 1) { + if (header->entries >= EXT_MAX_ENTRIES) { + transfer_to_idx_root(writer, inode); + ret = SD_RES_BTREE_REPEAT; + goto out; + } + insert_ext_entry_nosearch(header, + path->p_ext, idx, vdi_id); + } else if (path->depth == 2) { + if (idx_in_range(header, path->p_idx)) { + if (!path->p_ext_header) { + ret = SD_RES_BTREE_NOT_FOUND; + goto out; + } + if (path->p_ext_header->entries >= EXT_MAX_ENTRIES) { + split_ext_node(writer, inode, path); + ret = SD_RES_BTREE_REPEAT; + goto out; + } + insert_ext_entry_nosearch(path->p_ext_header, + path->p_ext, idx, vdi_id); + writer(path->p_idx->oid, path->p_ext_header, + SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 1); + } else if (path->p_ext_header) { + /* the last idx-node */ + insert_ext_entry_nosearch(path->p_ext_header, + path->p_ext, idx, vdi_id); + path->p_idx--; + path->p_idx->idx = + (LAST_EXT(path->p_ext_header) - 1)->idx; + writer(path->p_idx->oid, path->p_ext_header, + SD_INODE_INDEX_SIZE, inode->nr_copies, + inode->copy_policy, 1); + } else { + /* create a new ext-node */ + leaf_node = xmalloc(SD_INODE_INDEX_SIZE); + sd_inode_init(leaf_node, 2); + oid = vid_to_btree_oid(inode->vdi_id, + inode->btree_counter++); + insert_ext_entry_nosearch(leaf_node, + FIRST_EXT(leaf_node), idx, vdi_id); + writer(oid, leaf_node, SD_INODE_INDEX_SIZE, + inode->nr_copies, + inode->copy_policy, 1); + insert_idx_entry_nosearch(header, path->p_idx, + idx, oid); + } + } +out: + if (leaf_node) + free(leaf_node); + return ret; } void sd_inode_set_vdi(write_node_fn writer, read_node_fn reader, struct sd_inode *inode, int idx, uint32_t vdi_id) { - inode->data_vdi_id[idx] = vdi_id; + struct sd_extent_header *header; + struct find_path path; + int ret; + + path.p_ext_header = NULL; + + if (inode->store_policy == 0) + inode->data_vdi_id[idx] = vdi_id; + else { + if (inode->data_vdi_id[0] == 0) + sd_inode_init(inode->data_vdi_id, 1); + header = EXT_HEADER(inode->data_vdi_id); + assert(header->magic == INODE_BTREE_MAGIC); + while (1) { + memset(&path, 0, sizeof(path)); + ret = search_whole_btree(reader, inode, idx, &path); + if (ret == SD_RES_BTREE_FOUND) { + path.p_ext->vdi_id = vdi_id; + goto out; + } else { + ret = insert_new_node(writer, reader, inode, + &path, idx, vdi_id); + if (SD_RES_BTREE_REPEAT == ret) { + if (path.p_ext_header) + free(path.p_ext_header); + continue; + } else + goto out; + } + } + } +out: + if (path.p_ext_header) + free(path.p_ext_header); + dump_btree(reader, inode); +} + +void sd_inode_copy_vdis(struct sd_inode *oldi, struct sd_inode *newi) +{ + memcpy(newi->data_vdi_id, oldi->data_vdi_id, sizeof(newi->data_vdi_id)); } diff --git a/sheep/vdi.c b/sheep/vdi.c index dc3f975..203472a 100644 --- a/sheep/vdi.c +++ b/sheep/vdi.c @@ -216,6 +216,7 @@ static struct sd_inode *alloc_inode(const struct vdi_iocb *iocb, new->create_time = iocb->time; new->vdi_size = iocb->size; new->copy_policy = iocb->copy_policy; + new->store_policy = 1; new->nr_copies = iocb->nr_copies; new->block_size_shift = find_next_bit(&block_size, BITS_PER_LONG, 0); new->snap_id = new_snapid; diff --git a/sheepfs/volume.c b/sheepfs/volume.c index c2df32a..3fbc4a9 100644 --- a/sheepfs/volume.c +++ b/sheepfs/volume.c @@ -168,7 +168,7 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t size, if (rw == VOLUME_READ) hdr.opcode = SD_OP_READ_OBJ; else { - hdr.opcode = create ? + hdr.opcode = (create || is_vdi_btree_obj(oid)) ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ; hdr.flags |= SD_FLAG_CMD_WRITE; } -- 1.7.1 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog