There is no need to grab a local copy of the vnode information in the VDI ops, we can simply pass it in through ->process_work_cluster. For deletions we have to grab an additional reference as deletions may still be in progress by the time finish_request is called.
Signed-off-by: Christoph Hellwig <[email protected]> --- sheep/group.c | 3 sheep/ops.c | 28 ++++--- sheep/sheep_priv.h | 31 ++++---- sheep/vdi.c | 193 +++++++++++++++++++++++++---------------------------- 4 files changed, 129 insertions(+), 126 deletions(-) Index: sheepdog/sheep/ops.c =================================================================== --- sheepdog.orig/sheep/ops.c 2012-05-08 15:46:29.808689145 +0200 +++ sheepdog/sheep/ops.c 2012-05-08 15:49:44.120687292 +0200 @@ -56,7 +56,8 @@ struct sd_op_template { */ int (*process_work)(struct request *req); int (*process_work_cluster)(const struct sd_req *req, - struct sd_rsp *rsp, void *data); + struct sd_rsp *rsp, struct vnode_info *vnode_info, + void *data); int (*process_main)(const struct sd_req *req, struct sd_rsp *rsp, void *data); }; @@ -121,15 +122,15 @@ out: } static int cluster_new_vdi(const struct sd_req *req, struct sd_rsp *rsp, - void *data) + struct vnode_info *vnode_info, void *data) { const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req; struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp; uint32_t vid = 0, nr_copies = sys->nr_copies; int ret; - ret = add_vdi(hdr->epoch, data, hdr->data_length, hdr->vdi_size, &vid, - hdr->base_vdi_id, hdr->copies, + ret = add_vdi(vnode_info, hdr->epoch, data, hdr->data_length, + hdr->vdi_size, &vid, hdr->base_vdi_id, hdr->copies, hdr->snapid, &nr_copies); vdi_rsp->vdi_id = vid; @@ -152,14 +153,14 @@ static int post_cluster_new_vdi(const st } static int cluster_del_vdi(const struct sd_req *req, struct sd_rsp *rsp, - void *data) + struct vnode_info *vnode_info, void *data) { const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req; struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp; uint32_t vid = 0, nr_copies = sys->nr_copies; int ret; - ret = del_vdi(hdr->epoch, data, hdr->data_length, &vid, + ret = del_vdi(vnode_info, hdr->epoch, data, hdr->data_length, &vid, hdr->snapid, &nr_copies); if (ret == SD_RES_SUCCESS) @@ -171,7 +172,7 @@ static int cluster_del_vdi(const struct } static int cluster_get_vdi_info(const struct sd_req *req, struct sd_rsp *rsp, - void *data) + struct vnode_info *vnode_info, void *data) { const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req; struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp; @@ -189,7 +190,7 @@ static int cluster_get_vdi_info(const st else return SD_RES_INVALID_PARMS; - ret = lookup_vdi(hdr->epoch, data, tag, &vid, hdr->snapid, + ret = lookup_vdi(vnode_info, hdr->epoch, data, tag, &vid, hdr->snapid, &nr_copies, NULL); if (ret != SD_RES_SUCCESS) return ret; @@ -287,7 +288,7 @@ static int cluster_shutdown(const struct } static int cluster_get_vdi_attr(const struct sd_req *req, struct sd_rsp *rsp, - void *data) + struct vnode_info *vnode_info, void *data) { const struct sd_vdi_req *hdr = (const struct sd_vdi_req *)req; struct sd_vdi_rsp *vdi_rsp = (struct sd_vdi_rsp *)rsp; @@ -297,7 +298,7 @@ static int cluster_get_vdi_attr(const st struct sheepdog_vdi_attr *vattr; vattr = data; - ret = lookup_vdi(hdr->epoch, vattr->name, vattr->tag, + ret = lookup_vdi(vnode_info, hdr->epoch, vattr->name, vattr->tag, &vid, hdr->snapid, &nr_copies, &created_time); if (ret != SD_RES_SUCCESS) return ret; @@ -306,7 +307,7 @@ static int cluster_get_vdi_attr(const st so we use the hash value of the VDI name as the VDI id */ vid = fnv_64a_buf(vattr->name, strlen(vattr->name), FNV1A_64_INIT); vid &= SD_NR_VDIS - 1; - ret = get_vdi_attr(hdr->epoch, data, hdr->data_length, vid, + ret = get_vdi_attr(vnode_info, hdr->epoch, data, hdr->data_length, vid, &attrid, nr_copies, created_time, hdr->flags & SD_FLAG_CMD_CREAT, hdr->flags & SD_FLAG_CMD_EXCL, @@ -1082,9 +1083,10 @@ int do_process_work(struct request *req) } int do_process_work_cluster(struct sd_op_template *op, const struct sd_req *req, - struct sd_rsp *rsp, void *data) + struct sd_rsp *rsp, struct vnode_info *vnode_info, + void *data) { - return op->process_work_cluster(req, rsp, data); + return op->process_work_cluster(req, rsp, vnode_info, data); } int do_process_main(struct sd_op_template *op, const struct sd_req *req, Index: sheepdog/sheep/sheep_priv.h =================================================================== --- sheepdog.orig/sheep/sheep_priv.h 2012-05-08 15:47:05.840688801 +0200 +++ sheepdog/sheep/sheep_priv.h 2012-05-08 15:49:44.120687292 +0200 @@ -224,21 +224,25 @@ int create_listen_port(int port, void *d int init_store(const char *dir); int init_base_path(const char *dir); -int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size, - uint32_t *new_vid, uint32_t base_vid, uint32_t copies, - int is_snapshot, unsigned int *nr_copies); - -int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, - uint32_t snapid, unsigned int *nr_copies); - -int lookup_vdi(uint32_t epoch, char *name, char *tag, uint32_t *vid, - uint32_t snapid, unsigned int *nr_copies, uint64_t *ctime); +int add_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *data, + int data_len, uint64_t size, uint32_t *new_vid, + uint32_t base_vid, uint32_t copies, int is_snapshot, + unsigned int *nr_copies); + +int del_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *data, + int data_len, uint32_t *vid, uint32_t snapid, + unsigned int *nr_copies); + +int lookup_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *name, + char *tag, uint32_t *vid, uint32_t snapid, + unsigned int *nr_copies, uint64_t *ctime); int read_vdis(char *data, int len, unsigned int *rsp_len); -int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len, - uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime, - int write, int excl, int delete); +int get_vdi_attr(struct vnode_info *vnode_info, uint32_t epoch, + struct sheepdog_vdi_attr *vattr, int data_len, uint32_t vid, + uint32_t *attrid, int copies, uint64_t ctime, int write, + int excl, int delete); int get_zones_nr_from(struct sd_node *nodes, int nr_nodes); struct vnode_info *grab_vnode_info(struct vnode_info *vnode_info); @@ -329,7 +333,8 @@ int do_process_main(struct sd_op_templat int has_process_work_cluster(struct sd_op_template *op); int do_process_work_cluster(struct sd_op_template *op, const struct sd_req *req, - struct sd_rsp *rsp, void *data); + struct sd_rsp *rsp, struct vnode_info *vnode_info, + void *data); /* Journal */ struct jrnl_descriptor *jrnl_begin(const void *buf, size_t count, off_t offset, Index: sheepdog/sheep/vdi.c =================================================================== --- sheepdog.orig/sheep/vdi.c 2012-05-08 15:46:26.788689174 +0200 +++ sheepdog/sheep/vdi.c 2012-05-08 15:49:44.120687292 +0200 @@ -17,11 +17,11 @@ /* TODO: should be performed atomically */ -static int create_vdi_obj(uint32_t epoch, char *name, uint32_t new_vid, uint64_t size, - uint32_t base_vid, uint32_t cur_vid, uint32_t copies, - uint32_t snapid, int is_snapshot) +static int create_vdi_obj(struct vnode_info *vnode_info, uint32_t epoch, + char *name, uint32_t new_vid, uint64_t size, uint32_t base_vid, + uint32_t cur_vid, uint32_t copies, uint32_t snapid, + int is_snapshot) { - struct vnode_info *vnode_info; /* we are not called concurrently */ struct sheepdog_inode *new = NULL, *base = NULL, *cur = NULL; struct timeval tv; @@ -51,8 +51,6 @@ static int create_vdi_obj(uint32_t epoch } } - vnode_info = get_vnode_info(); - nr_copies = get_nr_copies(vnode_info); if (nr_copies > copies) nr_copies = copies; @@ -63,7 +61,7 @@ static int create_vdi_obj(uint32_t epoch sizeof(*base), 0, nr_copies); if (ret != SD_RES_SUCCESS) { ret = SD_RES_BASE_VDI_READ; - goto out_put_vnode_info; + goto out; } } @@ -80,7 +78,7 @@ static int create_vdi_obj(uint32_t epoch if (ret != SD_RES_SUCCESS) { vprintf(SDOG_ERR, "failed\n"); ret = SD_RES_BASE_VDI_READ; - goto out_put_vnode_info; + goto out; } cur->snap_ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000; @@ -112,7 +110,7 @@ static int create_vdi_obj(uint32_t epoch if (i == ARRAY_SIZE(base->child_vdi_id)) { ret = SD_RES_NO_BASE_VDI; - goto out_put_vnode_info; + goto out; } } @@ -123,7 +121,7 @@ static int create_vdi_obj(uint32_t epoch if (ret != 0) { vprintf(SDOG_ERR, "failed\n"); ret = SD_RES_BASE_VDI_READ; - goto out_put_vnode_info; + goto out; } } @@ -134,7 +132,7 @@ static int create_vdi_obj(uint32_t epoch if (ret != 0) { vprintf(SDOG_ERR, "failed\n"); ret = SD_RES_BASE_VDI_WRITE; - goto out_put_vnode_info; + goto out; } } @@ -144,8 +142,6 @@ static int create_vdi_obj(uint32_t epoch if (ret != 0) ret = SD_RES_VDI_WRITE; -out_put_vnode_info: - put_vnode_info(vnode_info); out: free(new); free(cur); @@ -153,12 +149,12 @@ out: return ret; } -static int find_first_vdi(uint32_t epoch, unsigned long start, unsigned long end, - char *name, char *tag, uint32_t snapid, uint32_t *vid, - unsigned long *deleted_nr, uint32_t *next_snap, - unsigned int *inode_nr_copies, uint64_t *ctime) +static int find_first_vdi(struct vnode_info *vnode_info, uint32_t epoch, + unsigned long start, unsigned long end, char *name, char *tag, + uint32_t snapid, uint32_t *vid, unsigned long *deleted_nr, + uint32_t *next_snap, unsigned int *inode_nr_copies, + uint64_t *ctime) { - struct vnode_info *vnode_info; struct sheepdog_inode *inode = NULL; unsigned long i; int nr_copies; @@ -171,7 +167,6 @@ static int find_first_vdi(uint32_t epoch goto out; } - vnode_info = get_vnode_info(); nr_copies = get_nr_copies(vnode_info); for (i = start; i >= end; i--) { @@ -180,7 +175,7 @@ static int find_first_vdi(uint32_t epoch SD_INODE_HEADER_SIZE, 0, nr_copies); if (ret != SD_RES_SUCCESS) { ret = SD_RES_EIO; - goto out_put_vnode_info; + goto out_free_inode; } if (inode->name[0] == '\0') { @@ -202,7 +197,7 @@ static int find_first_vdi(uint32_t epoch if (ctime) *ctime = inode->ctime; ret = SD_RES_SUCCESS; - goto out_put_vnode_info; + goto out_free_inode; } } @@ -211,17 +206,17 @@ static int find_first_vdi(uint32_t epoch else ret = SD_RES_NO_VDI; -out_put_vnode_info: - put_vnode_info(vnode_info); +out_free_inode: free(inode); out: return ret; } -static int do_lookup_vdi(uint32_t epoch, char *name, int namelen, uint32_t *vid, - char *tag, uint32_t snapid, uint32_t *next_snapid, - unsigned long *right_nr, unsigned long *deleted_nr, - unsigned int *nr_copies, uint64_t *ctime) +static int do_lookup_vdi(struct vnode_info *vnode_info, uint32_t epoch, + char *name, int namelen, uint32_t *vid, char *tag, + uint32_t snapid, uint32_t *next_snapid, unsigned long *right_nr, + unsigned long *deleted_nr, unsigned int *nr_copies, + uint64_t *ctime) { int ret; unsigned long nr, start_nr; @@ -238,8 +233,9 @@ static int do_lookup_vdi(uint32_t epoch, } else if (nr < SD_NR_VDIS) { right_side: /* look up on the right side of the hash point */ - ret = find_first_vdi(epoch, nr - 1, start_nr, name, tag, snapid, vid, - deleted_nr, next_snapid, nr_copies, ctime); + ret = find_first_vdi(vnode_info, epoch, nr - 1, start_nr, name, + tag, snapid, vid, deleted_nr, next_snapid, + nr_copies, ctime); return ret; } else { /* round up... bitmap search from the head of the bitmap */ @@ -249,8 +245,9 @@ static int do_lookup_vdi(uint32_t epoch, return SD_RES_FULL_VDI; else if (nr) { /* look up on the left side of the hash point */ - ret = find_first_vdi(epoch, nr - 1, 0, name, tag, snapid, vid, - deleted_nr, next_snapid, nr_copies, ctime); + ret = find_first_vdi(vnode_info, epoch, nr - 1, 0, name, + tag, snapid, vid, deleted_nr, + next_snapid, nr_copies, ctime); if (ret == SD_RES_NO_VDI) ; /* we need to go to the right side */ else @@ -262,19 +259,21 @@ static int do_lookup_vdi(uint32_t epoch, } } -int lookup_vdi(uint32_t epoch, char *name, char *tag, uint32_t *vid, - uint32_t snapid, unsigned int *nr_copies, uint64_t *ctime) +int lookup_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *name, + char *tag, uint32_t *vid, uint32_t snapid, + unsigned int *nr_copies, uint64_t *ctime) { uint32_t dummy0; unsigned long dummy1, dummy2; - return do_lookup_vdi(epoch, name, strlen(name), vid, tag, snapid, - &dummy0, &dummy1, &dummy2, nr_copies, ctime); + return do_lookup_vdi(vnode_info, epoch, name, strlen(name), vid, tag, + snapid, &dummy0, &dummy1, &dummy2, nr_copies, + ctime); } -int add_vdi(uint32_t epoch, char *data, int data_len, uint64_t size, - uint32_t *new_vid, uint32_t base_vid, uint32_t copies, - int is_snapshot, unsigned int *nr_copies) +int add_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *data, + int data_len, uint64_t size, uint32_t *new_vid, uint32_t base_vid, + uint32_t copies, int is_snapshot, unsigned int *nr_copies) { uint32_t cur_vid = 0; uint32_t next_snapid; @@ -287,8 +286,9 @@ int add_vdi(uint32_t epoch, char *data, name = data; - ret = do_lookup_vdi(epoch, name, strlen(name), &cur_vid, NULL, 0, &next_snapid, - &right_nr, &deleted_nr, nr_copies, NULL); + ret = do_lookup_vdi(vnode_info, epoch, name, strlen(name), &cur_vid, + NULL, 0, &next_snapid, &right_nr, &deleted_nr, + nr_copies, NULL); if (is_snapshot) { if (ret != SD_RES_SUCCESS) { @@ -327,39 +327,8 @@ int add_vdi(uint32_t epoch, char *data, copies = sys->nr_copies; } - ret = create_vdi_obj(epoch, name, *new_vid, size, base_vid, cur_vid, copies, - next_snapid, is_snapshot); - - return ret; -} - -int start_deletion(uint32_t vid, uint32_t epoch); - -int del_vdi(uint32_t epoch, char *data, int data_len, uint32_t *vid, - uint32_t snapid, unsigned int *nr_copies) -{ - char *name = data, *tag; - uint32_t dummy0; - unsigned long dummy1, dummy2; - int ret; - - if (data_len == SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN) - tag = data + SD_MAX_VDI_LEN; - else if (data_len == SD_MAX_VDI_LEN) - tag = NULL; - else { - ret = SD_RES_INVALID_PARMS; - goto out; - } - - ret = do_lookup_vdi(epoch, name, strlen(name), vid, tag, snapid, - &dummy0, &dummy1, &dummy2, nr_copies, NULL); - if (ret != SD_RES_SUCCESS) - goto out; - - ret = start_deletion(*vid, epoch); -out: - return ret; + return create_vdi_obj(vnode_info, epoch, name, *new_vid, size, base_vid, + cur_vid, copies, next_snapid, is_snapshot); } int read_vdis(char *data, int len, unsigned int *rsp_len) @@ -604,34 +573,57 @@ out: return vid; } -int start_deletion(uint32_t vid, uint32_t epoch) +int del_vdi(struct vnode_info *vnode_info, uint32_t epoch, char *data, + int data_len, uint32_t *vid, uint32_t snapid, + unsigned int *nr_copies) { + char *name = data, *tag; + uint32_t dummy0; + unsigned long dummy1, dummy2; struct deletion_work *dw = NULL; - int ret = SD_RES_NO_MEM; + int ret; uint32_t root_vid; + switch (data_len) { + case SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN: + tag = data + SD_MAX_VDI_LEN; + break; + case SD_MAX_VDI_LEN: + tag = NULL; + break; + default: + return SD_RES_INVALID_PARMS; + } + + ret = do_lookup_vdi(vnode_info, epoch, name, strlen(name), vid, tag, + snapid, &dummy0, &dummy1, &dummy2, nr_copies, + NULL); + if (ret != SD_RES_SUCCESS) + return ret; + dw = zalloc(sizeof(struct deletion_work)); if (!dw) - goto err; + return SD_RES_NO_MEM; /* buf is to store vdi id of every object */ dw->buf = zalloc(SD_INODE_SIZE - SD_INODE_HEADER_SIZE); - if (!dw->buf) - goto err; + if (!dw->buf) { + ret = SD_RES_NO_MEM; + goto out_free_dw; + } dw->count = 0; - dw->vid = vid; + dw->vid = *vid; dw->epoch = epoch; + dw->vnodes = vnode_info; dw->work.fn = delete_one; dw->work.done = delete_one_done; - dw->vnodes = get_vnode_info(); - - root_vid = get_vdi_root(dw->vnodes, dw->epoch, dw->vid); + root_vid = get_vdi_root(vnode_info, dw->epoch, dw->vid); if (!root_vid) { ret = SD_RES_EIO; - goto err; + goto out_free_buf; } ret = fill_vdi_list(dw, root_vid); @@ -647,28 +639,35 @@ int start_deletion(uint32_t vid, uint32_ if (dw->count == 0) goto out; + /* + * Only grab the vnode_info reference once we know that we can't fail in + * this helper thread. + */ + grab_vnode_info(dw->vnodes); + if (!list_empty(&deletion_work_list)) { list_add_tail(&dw->dw_siblings, &deletion_work_list); - goto out; + } else { + list_add_tail(&dw->dw_siblings, &deletion_work_list); + queue_work(sys->deletion_wqueue, &dw->work); } - list_add_tail(&dw->dw_siblings, &deletion_work_list); - queue_work(sys->deletion_wqueue, &dw->work); out: return SD_RES_SUCCESS; -err: - if (dw) - free(dw->buf); + +out_free_buf: + free(dw->buf); +out_free_dw: free(dw); return ret; } -int get_vdi_attr(uint32_t epoch, struct sheepdog_vdi_attr *vattr, int data_len, +int get_vdi_attr(struct vnode_info *vnode_info, uint32_t epoch, + struct sheepdog_vdi_attr *vattr, int data_len, uint32_t vid, uint32_t *attrid, int copies, uint64_t ctime, int wr, int excl, int delete) { - struct vnode_info *vnode_info = NULL; struct sheepdog_vdi_attr tmp_attr; uint64_t oid, hval; uint32_t end; @@ -677,8 +676,6 @@ int get_vdi_attr(uint32_t epoch, struct vattr->ctime = ctime; - vnode_info = get_vnode_info(); - nr_copies = get_nr_copies(vnode_info); if (nr_copies > copies) nr_copies = copies; @@ -704,11 +701,11 @@ int get_vdi_attr(uint32_t epoch, struct ret = SD_RES_EIO; else ret = SD_RES_SUCCESS; - goto out_put_vnode_info; + goto out; } if (ret != SD_RES_SUCCESS) - goto out_put_vnode_info; + goto out; /* compare attribute header */ if (strcmp(tmp_attr.name, vattr->name) == 0 && @@ -738,7 +735,7 @@ int get_vdi_attr(uint32_t epoch, struct ret = SD_RES_SUCCESS; } else ret = SD_RES_SUCCESS; - goto out_put_vnode_info; + goto out; } (*attrid)++; @@ -746,8 +743,6 @@ int get_vdi_attr(uint32_t epoch, struct dprintf("there is no space for new VDIs\n"); ret = SD_RES_FULL_VDI; - -out_put_vnode_info: - put_vnode_info(vnode_info); +out: return ret; } Index: sheepdog/sheep/group.c =================================================================== --- sheepdog.orig/sheep/group.c 2012-05-08 15:49:30.652687422 +0200 +++ sheepdog/sheep/group.c 2012-05-08 15:49:44.124687292 +0200 @@ -249,7 +249,8 @@ static void do_cluster_op(void *arg) else data = req->data; ret = do_process_work_cluster(req->op, (const struct sd_req *)&msg->req, - (struct sd_rsp *)&msg->rsp, data); + (struct sd_rsp *)&msg->rsp, + req->vnodes, data); msg->rsp.result = ret; } -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
