From: levin li <xingke....@taobao.com> This module provides function for users to take sheepdog VDIs as block devices in linux, users can register a VDI to its kernel space, it just like that a new hard disk is added to the computer, users can create partitions for the disk, format the disk or mount the disk, it provides users a efficient way to use sheepdog as distributed storage system.
The usage is easy, after install the module sheepdev.ko, it creates a proc entry '/proc/entry', you can write into the proc entry file to control the driver. Add a new block device from an existing sheepdog VDI: # echo "add 127.0.0.1:7070 linux" > /proc/sheep It would create a block device /dev/sheepa, you can format/mount this device: # mkfs.ext4 /dev/sheepa # mount -t ext4 /sheep/sheepa test Remove a block device from the kernel: # echo "del linux" > /proc/sheep Signed-off-by: levin li <xingke....@taobao.com> --- sheepdev/connect.c | 178 ++++++++++ sheepdev/device.c | 985 ++++++++++++++++++++++++++++++++++++++++++++++++++++ sheepdev/proc.c | 176 ++++++++++ sheepdev/sheep.c | 186 ++++++++++ sheepdev/sheepdev.h | 138 ++++++++ 5 files changed, 1663 insertions(+) create mode 100644 sheepdev/connect.c create mode 100644 sheepdev/device.c create mode 100644 sheepdev/proc.c create mode 100644 sheepdev/sheep.c create mode 100644 sheepdev/sheepdev.h diff --git a/sheepdev/connect.c b/sheepdev/connect.c new file mode 100644 index 0000000..62138b4 --- /dev/null +++ b/sheepdev/connect.c @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2013 Taobao Inc. + * + * Levin Li <xingke....@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sheepdev.h" +#include "sheepdog_proto.h" + +int connect_to(struct socket **sock, const char *ip_addr, int port) +{ + int ret; + struct sockaddr_in addr; + + ret = sock_create(AF_INET, SOCK_STREAM, IPPROTO_TCP, sock); + if (ret) { + DBPRT("fail to create socket\n"); + return ret; + } + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = in_aton(ip_addr); + + ret = (*sock)->ops->connect(*sock, (struct sockaddr *)&addr, + sizeof(addr), 0); + + if (!ret) + DBPRT("connected to %s:%d\n", ip_addr, port); + + return ret; +} + +int do_read(struct socket *sock, char *buf, const size_t length) +{ + struct msghdr msg; + struct iovec iov; + int ret = 0, received = 0, left = length; + mm_segment_t oldmm; + + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + while (left > 0) { + msg.msg_iov->iov_base = buf + received; + msg.msg_iov->iov_len = left; + oldmm = get_fs(); + set_fs(KERNEL_DS); + ret = sock_recvmsg(sock, &msg, left, MSG_WAITALL); + set_fs(oldmm); + if (ret <= 0) + break; + left -= ret; + received += ret; + } + + return ret; +} + +static void forward_iov(struct msghdr *msg, int len) +{ + while (msg->msg_iov->iov_len <= len) { + len -= msg->msg_iov->iov_len; + msg->msg_iov++; + msg->msg_iovlen--; + } + + msg->msg_iov->iov_base = (char *) msg->msg_iov->iov_base + len; + msg->msg_iov->iov_len -= len; +} + + +static int do_write(struct socket *sock, struct msghdr *msg, int len) +{ + int ret; + mm_segment_t oldmm; + +rewrite: + oldmm = get_fs(); + set_fs(KERNEL_DS); + ret = sock_sendmsg(sock, msg, len); + set_fs(oldmm); + + if (ret < 0) { + if (ret == -EINTR) + goto rewrite; + if (ret == -EBUSY) { + DBPRT("busy\n"); + goto rewrite; + } + DBPRT("failed to write to socket: %d\n", ret); + return -EFAULT; + } + + len -= ret; + if (len) { + forward_iov(msg, ret); + goto rewrite; + } + + return 0; +} + +int send_req(struct socket *sock, struct sd_req *hdr, void *data, + unsigned int wlen) +{ + int ret; + struct msghdr msg; + struct iovec iov[2]; + + memset(&msg, 0, sizeof(msg)); + + msg.msg_iov = iov; + + msg.msg_iovlen = 1; + iov[0].iov_base = hdr; + iov[0].iov_len = sizeof(*hdr); + + if (wlen) { + msg.msg_iovlen++; + iov[1].iov_base = data; + iov[1].iov_len = wlen; + } + + ret = do_write(sock, &msg, sizeof(*hdr) + wlen); + if (ret) { + DBPRT("failed to send request %x, %d\n", hdr->opcode, wlen); + ret = -EFAULT; + } + + return ret; +} + +int exec_req(struct socket *sock, struct sd_req *hdr, void *data) +{ + int ret; + struct sd_rsp *rsp = (struct sd_rsp *)hdr; + unsigned int wlen, rlen; + + if (hdr->flags & SD_FLAG_CMD_WRITE) { + wlen = hdr->data_length; + rlen = 0; + } else { + wlen = 0; + rlen = hdr->data_length; + } + + if (send_req(sock, hdr, data, wlen)) + return -EFAULT; + + ret = do_read(sock, (char *)rsp, sizeof(*rsp)); + if (ret < 0) { + DBPRT("failed to read a response\n"); + return -EFAULT; + } + + if (rlen > rsp->data_length) + rlen = rsp->data_length; + + if (rlen) { + ret = do_read(sock, data, rlen); + if (ret < 0) { + DBPRT("failed to read the response data\n"); + return -EFAULT; + } + } + + return 0; +} diff --git a/sheepdev/device.c b/sheepdev/device.c new file mode 100644 index 0000000..996d103 --- /dev/null +++ b/sheepdev/device.c @@ -0,0 +1,985 @@ +/* + * Copyright (C) 2013 Taobao Inc. + * + * Levin Li <xingke....@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <linux/init.h> +#include <linux/module.h> +#include <linux/kernel.h> +#include <linux/wait.h> +#include <linux/sched.h> +#include <linux/mm.h> +#include <linux/slab.h> +#include <linux/fs.h> +#include <linux/genhd.h> +#include <linux/blkdev.h> +#include <linux/hdreg.h> +#include <linux/proc_fs.h> +#include <linux/kthread.h> +#include "sheepdev.h" + +static int sheepdev_major; +spinlock_t devices_lock; +struct list_head dev_list; +static unsigned long *device_bitmap; + +static void sheepdev_get(struct sheepdev *dev) +{ + atomic_inc(&dev->struct_refcnt); +} + +static void sheepdev_put(struct sheepdev *dev) +{ + if (atomic_dec_and_test(&dev->struct_refcnt)) + kfree(dev); +} + +static int add_request(struct sheepdev *dev, struct request *req, + uint64_t oid, int idx) +{ + struct obj_request *s_req = kmalloc(sizeof(*s_req), GFP_KERNEL); + if (!s_req) + return -EIO; + + s_req->req_id = dev->req_id; + s_req->req = req; + s_req->oid = oid; + s_req->idx = idx; + INIT_LIST_HEAD(&s_req->list); + + spin_lock(&dev->que_lock); + list_add_tail(&s_req->list, &dev->finish_list); + spin_unlock(&dev->que_lock); + + return 0; +} + +static struct sheep_request *sd_req_search(struct sheepdev *dev, + struct request *req) +{ + struct sheep_request *sdreq; + + list_for_each_entry(sdreq, &dev->sd_req_list, list) { + if (sdreq->req == req) + return sdreq; + } + + return NULL; +} + +static void sheep_end_request_directly(struct request *req, int ret) +{ + struct request_queue *q = req->q; + unsigned long flags; + + spin_lock_irqsave(q->queue_lock, flags); + if (!list_empty(&req->queuelist)) + list_del_init(&req->queuelist); + __blk_end_request_all(req, ret); + spin_unlock_irqrestore(q->queue_lock, flags); +} + +static void copy_read_data(struct request *req, char *buf, + int data_length, int higher_part_len, int result) +{ + struct req_iterator iter; + struct bio_vec *bvec; + int len = 0, rlen, offset, buf_len = 0; + int boundary = 0; + + if (result == SD_RES_NO_OBJ) + return; + + rq_for_each_segment(bvec, req, iter) { + void *addr; + + if (len + bvec->bv_len <= higher_part_len) { + len += bvec->bv_len; + continue; + } + + if (higher_part_len > len) { + offset = higher_part_len - len; + rlen = bvec->bv_len - offset; + } else { + offset = 0; + rlen = bvec->bv_len; + } + + if (buf_len + rlen > data_length) { + rlen = data_length - buf_len; + boundary = 1; + } + + addr = kmap(bvec->bv_page); + memcpy(addr + bvec->bv_offset + offset, buf + buf_len, rlen); + buf_len += rlen; + kunmap(bvec->bv_page); + + if (boundary) + break; + } +} + +static void sheep_end_request(struct sheepdev *dev, struct request *req, + int ret, int idx, char *buf, uint32_t data_length) +{ + unsigned long sector = blk_rq_pos(req); + unsigned long offset = sector * KERNEL_SECTOR_SIZE; + unsigned long nbytes = blk_rq_bytes(req); + + offset = offset % SHEEP_OBJECT_SIZE; + /* Check whether the request visits two objects. */ + if (offset + nbytes > SHEEP_OBJECT_SIZE) { + struct sheep_request *sre; + + spin_lock(&dev->sd_req_lock); + sre = sd_req_search(dev, req); + spin_unlock(&dev->sd_req_lock); + + if (sre) { + if (!rq_data_dir(req)) { + copy_read_data(req, buf, data_length, + sre->read_length, ret); + if (ret == SD_RES_NO_OBJ) { + if (sre->result && + sre->result != SD_RES_NO_OBJ) + ret = sre->result; + else + ret = 0; + } + } else + ret = ret ? ret : sre->result; + spin_lock(&dev->sd_req_lock); + list_del_init(&sre->list); + spin_unlock(&dev->sd_req_lock); + kfree(sre); + + sheep_end_request_directly(req, ret); + } else { + if (!rq_data_dir(req)) + copy_read_data(req, buf, data_length, 0, ret); + + sre = kmalloc(sizeof(*sre), GFP_KERNEL); + sre->result = ret; + sre->read_length = data_length; + sre->req = req; + + spin_lock(&dev->sd_req_lock); + list_add_tail(&sre->list, &dev->sd_req_list); + spin_unlock(&dev->sd_req_lock); + } + } else { + if (!rq_data_dir(req)) { + copy_read_data(req, buf, data_length, 0, ret); + if (ret && ret == SD_RES_NO_OBJ) + ret = 0; + } + + sheep_end_request_directly(req, ret); + } +} + +static struct obj_state_entry *obj_state_rb_insert(struct rb_root *root, + struct obj_state_entry *new) +{ + struct rb_node **p = &root->rb_node; + struct rb_node *parent = NULL; + struct obj_state_entry *entry; + + while (*p) { + parent = *p; + entry = rb_entry(parent, struct obj_state_entry, node); + + if (new->idx < entry->idx) + p = &(*p)->rb_left; + else if (new->idx > entry->idx) + p = &(*p)->rb_right; + else + return entry; + } + rb_link_node(&new->node, parent, p); + rb_insert_color(&new->node, root); + + return NULL; +} + +static struct obj_state_entry *obj_state_rb_search(struct rb_root *root, + uint32_t idx) +{ + struct rb_node *n = root->rb_node; + struct obj_state_entry *t; + + while (n) { + t = rb_entry(n, struct obj_state_entry, node); + + if (idx < t->idx) + n = n->rb_left; + else if (idx > t->idx) + n = n->rb_right; + else + return t; + } + + return NULL; +} + +/* + * Should not hold spin-lock, because we allocate memory with kmalloc + * which may sleep + */ +static void set_obj_state(struct sheepdev *dev, int idx, int state) +{ + struct obj_state_entry *old, *new; + + new = kmalloc(sizeof(*new), GFP_KERNEL); + if (!new) + DBPRT("[%s] No-Mem\n", __func__); + + new->idx = idx; + new->state = state; + + write_lock(&dev->creating_lock); + old = obj_state_rb_insert(&dev->obj_state_tree, new); + write_unlock(&dev->creating_lock); + if (old) { + kfree(new); + old->state = state; + } +} + +static int get_obj_state(struct sheepdev *dev, int idx) +{ + struct obj_state_entry *entry; + + read_lock(&dev->creating_lock); + entry = obj_state_rb_search(&dev->obj_state_tree, idx); + read_unlock(&dev->creating_lock); + if (entry) + return entry->state; + + return -ENOENT; +} + +static int remove_obj_state(struct sheepdev *dev, int idx) +{ + struct rb_root *root = &dev->obj_state_tree; + struct rb_node **p = &root->rb_node; + struct rb_node *parent = NULL; + struct obj_state_entry *entry; + + write_lock(&dev->creating_lock); + while (*p) { + parent = *p; + entry = rb_entry(parent, struct obj_state_entry, node); + + if (idx < entry->idx) + p = &(*p)->rb_left; + else if (idx > entry->idx) + p = &(*p)->rb_right; + else { + rb_erase(parent, root); + write_unlock(&dev->creating_lock); + kfree(entry); + return 0; + } + } + write_unlock(&dev->creating_lock); + + return -ENOENT; +} + +static int handle_read_request(struct request *req) +{ + struct gendisk *disk = req->rq_disk; + struct sheepdev *dev = disk->private_data; + unsigned long sector = blk_rq_pos(req); + unsigned long offset = sector * KERNEL_SECTOR_SIZE; + unsigned long nbytes = blk_rq_bytes(req); + uint64_t oid, obj_offset; + int ret = 0, idx, read_len = 0, visit_two_objs; + +next_obj: + idx = offset / SHEEP_OBJECT_SIZE; + oid = vid_to_data_oid(dev->vid, idx); + obj_offset = offset % SHEEP_OBJECT_SIZE; + visit_two_objs = 0; + + read_lock(&dev->creating_lock); + if (dev->inode->data_vdi_id[idx]) + oid = vid_to_data_oid(dev->inode->data_vdi_id[idx], idx); + else + oid = vid_to_data_oid(dev->vid, idx); + read_unlock(&dev->creating_lock); + + if (obj_offset + nbytes > SHEEP_OBJECT_SIZE) { + read_len = SHEEP_OBJECT_SIZE - obj_offset; + visit_two_objs = 1; + } else + read_len = nbytes; + + ret = add_request(dev, req, oid, idx); + if (ret) + return -EIO; + + ret = send_read_req(dev, oid, read_len, obj_offset); + if (ret) + return -EIO; + + if (visit_two_objs) { + nbytes -= read_len; + offset += read_len; + goto next_obj; + } + + return 0; +} + +static void sheep_wait_object(struct sheepdev *dev, int idx, int *create, uint64_t *cow_oid) +{ + *create = *cow_oid = 0; + + read_lock(&dev->creating_lock); + if (!dev->inode->data_vdi_id[idx]) { + read_unlock(&dev->creating_lock); + if (get_obj_state(dev, idx) > 0) { + /* Wait for pending inode-update to complete */ + wait_event_interruptible(dev->creating_wait, + object_ready(dev, idx)); + } else { + set_obj_state(dev, idx, OBJ_STATE_CREATING); + *create = 1; + } + } else if (!object_ready(dev, idx)) { + read_unlock(&dev->creating_lock); + /* + * Now we check the rbtree to determine whether to wait for + * copy-on-write done or to invoke copy-on-write for this object + */ + if (get_obj_state(dev, idx) > 0) { + /* Wait inode to be updated */ + wait_event_interruptible(dev->creating_wait, + object_ready(dev, idx)); + } else { + set_obj_state(dev, idx, OBJ_STATE_COWING); + *cow_oid = vid_to_data_oid(dev->inode->data_vdi_id[idx], idx); + *create = 1; + } + } else + read_unlock(&dev->creating_lock); +} + + +static int handle_write_request(struct request *req) +{ + struct req_iterator iter; + struct bio_vec *bvec; + struct gendisk *disk = req->rq_disk; + struct sheepdev *dev = disk->private_data; + unsigned long sector = blk_rq_pos(req); + unsigned long offset = sector * KERNEL_SECTOR_SIZE; + unsigned long nbytes = blk_rq_bytes(req); + uint64_t oid, obj_offset, cow_oid; + int ret = 0, len = 0, send_len = 0, sent_len = 0; + int create, idx, visit_two_objs; + void *sheep_buf = NULL; + + sheep_buf = kmalloc(nbytes, GFP_KERNEL); + if (!sheep_buf) + return -EIO; + + rq_for_each_segment(bvec, req, iter) { + void *addr = kmap(bvec->bv_page); + memcpy(sheep_buf + len, addr + bvec->bv_offset, bvec->bv_len); + len += bvec->bv_len; + kunmap(bvec->bv_page); + } + +next_obj: + idx = offset / SHEEP_OBJECT_SIZE; + oid = vid_to_data_oid(dev->vid, idx); + obj_offset = offset % SHEEP_OBJECT_SIZE; + send_len = nbytes; + visit_two_objs = 0; + + sheep_wait_object(dev, idx, &create, &cow_oid); + + if (obj_offset + send_len > SHEEP_OBJECT_SIZE) { + send_len = SHEEP_OBJECT_SIZE - obj_offset; + visit_two_objs = 1; + } + + ret = add_request(dev, req, oid, idx); + if (ret) { + ret = -EIO; + goto out; + } + + ret = send_write_req(dev, oid, cow_oid, sheep_buf + sent_len, + send_len, obj_offset, create); + if (ret != SD_RES_SUCCESS) { + ret = -EIO; + goto out; + } + + if (create) { + /* For create/cow operations we need to update inode */ + oid = vid_to_vdi_oid(dev->vid); + obj_offset = offsetof(struct sheepdog_inode, data_vdi_id); + obj_offset += sizeof(uint32_t) * idx; + + ret = add_request(dev, req, oid, idx); + if (ret) { + ret = -EIO; + goto out; + } + + ret = send_write_req(dev, oid, 0, (char *)&dev->vid, + sizeof(dev->vid), obj_offset, 0); + if (ret != SD_RES_SUCCESS) { + ret = -EIO; + goto out; + } + } + + if (visit_two_objs) { + sent_len += send_len; + offset += send_len; + nbytes -= send_len; + goto next_obj; + } + +out: + kfree(sheep_buf); + return ret; +} + +static void sheep_io_request(struct request_queue *rq) +{ + struct request *req; + struct gendisk *disk; + struct sheepdev *dev; + + while ((req = blk_fetch_request(rq)) != NULL) { + + spin_unlock_irq(rq->queue_lock); + + disk = req->rq_disk; + dev = disk->private_data; + + if (req->cmd_type != REQ_TYPE_FS) { + DBPRT("Skip non-fs request\n"); + __blk_end_request_all(req, -EIO); + } + + spin_lock_irq(rq->queue_lock); + list_add_tail(&req->queuelist, &dev->pending_list); + spin_unlock_irq(rq->queue_lock); + + wake_up_interruptible(&dev->req_wait); + + spin_lock_irq(rq->queue_lock); + } +} + +static void cleanup_finish_list(struct sheepdev *dev) +{ + struct obj_request *objreq, *t; + struct request *req, *n; + LIST_HEAD(deletion_list); + LIST_HEAD(finish_list); + + DBPRT("Network Error, cleanup request queue\n"); + + spin_lock(&dev->que_lock); + list_splice_init(&dev->finish_list, &finish_list); + list_splice_init(&dev->deletion_list, &deletion_list); + spin_unlock(&dev->que_lock); + + list_for_each_entry_safe(objreq, t, &finish_list, list) { + list_del_init(&objreq->list); + kfree(objreq); + } + + list_for_each_entry_safe(req, n, &deletion_list, queuelist) { + list_del_init(&req->queuelist); + sheep_end_request_directly(req, -EIO); + } +} + +static int process_request(void *data) +{ + struct sheepdev *dev = (struct sheepdev *)data; + struct request *req; + int ret; + + sheepdev_get(dev); + + while (!kthread_should_stop() || !list_empty(&dev->pending_list)) { + wait_event_interruptible(dev->req_wait, + !list_empty(&dev->pending_list) || + kthread_should_stop()); + + spin_lock(&dev->que_lock); + if (list_empty(&dev->pending_list)) { + spin_unlock(&dev->que_lock); + continue; + } + + req = list_entry(dev->pending_list.next, struct request, + queuelist); + list_del_init(&req->queuelist); + list_add_tail(&req->queuelist, &dev->deletion_list); + spin_unlock(&dev->que_lock); + + /* Check whether the connection died */ + read_lock(&dev->sock_lock); + if (!dev->sock) { + read_unlock(&dev->sock_lock); + + sheep_end_request_directly(req, -EIO); + continue; + } + read_unlock(&dev->sock_lock); + + if (rq_data_dir(req)) + ret = handle_write_request(req); + else + ret = handle_read_request(req); + + if (ret) { + write_lock(&dev->sock_lock); + inet_release(dev->sock); + dev->sock = NULL; + write_unlock(&dev->sock_lock); + + kthread_stop(dev->fin_thread); + cleanup_finish_list(dev); + } + + wake_up_interruptible(&dev->fin_wait); + } + + sheepdev_put(dev); + + return 0; +} + +static int sheepdev_open(struct block_device *blkdev, fmode_t mode) +{ + struct gendisk *disk = blkdev->bd_disk; + struct sheepdev *dev = disk->private_data; + + spin_lock(&dev->dev_lock); + dev->device_refcnt++; + spin_unlock(&dev->dev_lock); + + return 0; +} + +static int sheepdev_release(struct gendisk *disk, fmode_t mode) +{ + struct sheepdev *dev = disk->private_data; + + spin_lock(&dev->dev_lock); + dev->device_refcnt--; + spin_unlock(&dev->dev_lock); + + return 0; +} + +static struct block_device_operations sheepdev_ops = { + .owner = THIS_MODULE, + .open = sheepdev_open, + .release = sheepdev_release, +}; + +static int sheep_add_disk(struct sheepdev *dev) +{ + int ret; + struct request_queue *queue; + + dev->disk = alloc_disk(SHEEP_BLKDEV_MINORS); + if (!dev->disk) { + DBPRT("allocate gendisk failure\n"); + ret = -EBUSY; + return ret; + } + queue = blk_init_queue(sheep_io_request, &dev->que_lock); + queue_flag_set_unlocked(QUEUE_FLAG_NONROT, queue); + dev->disk->major = sheepdev_major; + dev->disk->first_minor = dev->minor * SHEEP_BLKDEV_MINORS; + dev->disk->queue = queue; + dev->disk->fops = &sheepdev_ops; + dev->disk->private_data = dev; + snprintf(dev->disk->disk_name, sizeof(dev->disk->disk_name), + SHEEP_BLKDEV_NAME"%c", dev->minor + 'a'); + + set_capacity(dev->disk, 0); + add_disk(dev->disk); + set_capacity(dev->disk, dev->sectors); + + return 0; +} + +static struct obj_request *find_request(struct sheepdev *dev, int id) +{ + struct obj_request *req, *t; + + spin_lock(&dev->que_lock); + list_for_each_entry_safe(req, t, &dev->finish_list, list) { + if (req->req_id != id) + continue; + list_del_init(&req->list); + spin_unlock(&dev->que_lock); + return req; + } + spin_unlock(&dev->que_lock); + + return NULL; +} + +static int read_reply(struct sheepdev *dev, int *req_id, int *result, + void **data, uint32_t *data_length) +{ + int ret; + struct sd_rsp rsp; + void *buf = NULL; + + *result = *req_id = *data_length = 0; + + ret = do_read(dev->sock, (char *)&rsp, sizeof(rsp)); + if (ret < 0) { + DBPRT("failed to read response\n"); + return -EIO; + } + + if (rsp.data_length > 0) { + buf = kmalloc(rsp.data_length, GFP_KERNEL); + if (!buf) { + DBPRT("No-mem\n"); + return -ENOMEM; + } + + ret = do_read(dev->sock, buf, rsp.data_length); + if (ret != rsp.data_length) { + kfree(buf); + return -EIO; + } + } + + *req_id = rsp.id; + *result = rsp.result; + *data = buf; + *data_length = rsp.data_length; + + return 0; +} + +static int process_response(void *data) +{ + struct sheepdev *dev = data; + struct obj_request *obj_req; + struct request *req; + uint32_t data_length; + int ret, req_id, res; + int obj_state; + + sheepdev_get(dev); + + while (!kthread_should_stop() || !list_empty(&dev->finish_list)) { + void *buf = NULL; + + wait_event_interruptible(dev->fin_wait, + !list_empty(&dev->finish_list) || + kthread_should_stop()); + + read_lock(&dev->sock_lock); + if (!dev->sock) { + read_unlock(&dev->sock_lock); + dev->fin_thread = NULL; + break; + } + read_unlock(&dev->sock_lock); + + spin_lock(&dev->que_lock); + if (list_empty(&dev->finish_list)) { + spin_unlock(&dev->que_lock); + continue; + } + spin_unlock(&dev->que_lock); + + ret = read_reply(dev, &req_id, &res, &buf, &data_length); + if (ret) { + cleanup_finish_list(dev); + + write_lock(&dev->sock_lock); + if (dev->sock) { + inet_release(dev->sock); + dev->sock = NULL; + } + write_unlock(&dev->sock_lock); + dev->fin_thread = NULL; + break; + } + + obj_req = find_request(dev, req_id); + if (!obj_req) { + DBPRT("No-request rfor id %d\n", req_id); + goto next; + } + req = obj_req->req; + + if (rq_data_dir(req)) { + int idx; + + res = (res != SD_RES_SUCCESS) ? -EIO : 0; + if (obj_req->oid == vid_to_vdi_oid(dev->vid)) { + /* inode-update response */ + idx = obj_req->idx; + } else { + /* oridinary write response */ + idx = data_oid_to_idx(obj_req->oid); + + /* obj already exist */ + read_lock(&dev->creating_lock); + if (dev->inode->data_vdi_id[idx] == dev->vid) { + read_unlock(&dev->creating_lock); + sheep_end_request(dev, obj_req->req, res, idx, NULL, 0); + goto next; + } + read_unlock(&dev->creating_lock); + } + + /* inode-update response */ + obj_state = get_obj_state(dev, idx); + if (obj_state == OBJ_STATE_OK) { + /* + * Both obj-write and inode-update are complete + * we can end the write request and wake other + * requests waiting for this object. + */ + remove_obj_state(dev, idx); + + write_lock(&dev->creating_lock); + dev->inode->data_vdi_id[idx] = dev->vid; + write_unlock(&dev->creating_lock); + + sheep_end_request(dev, req, res, idx, NULL, 0); + wake_up_interruptible(&dev->creating_wait); + } else { + /* + * wait for obj-write or inode-update to complete + */ + set_obj_state(dev, idx, OBJ_STATE_OK); + } + } else + sheep_end_request(dev, req, res, obj_req->idx, buf, data_length); +next: + kfree(buf); + kfree(obj_req); + } + + sheepdev_put(dev); + return 0; +} + +static int sheep_dev_setup(struct sheepdev *dev) +{ + int ret; + + ret = sheep_vdi_setup(dev); + if (ret) + return ret; + + spin_lock_init(&dev->que_lock); + spin_lock_init(&dev->dev_lock); + spin_lock_init(&dev->sd_req_lock); + rwlock_init(&dev->creating_lock); + rwlock_init(&dev->sock_lock); + init_waitqueue_head(&dev->req_wait); + init_waitqueue_head(&dev->fin_wait); + init_waitqueue_head(&dev->creating_wait); + INIT_LIST_HEAD(&dev->pending_list); + INIT_LIST_HEAD(&dev->finish_list); + INIT_LIST_HEAD(&dev->dev_list); + INIT_LIST_HEAD(&dev->deletion_list); + INIT_LIST_HEAD(&dev->sd_req_list); + + dev->obj_state_tree = RB_ROOT; + dev->req_id = 1; + dev->req_thread = kthread_run(process_request, dev, + "sheep_req"); + dev->fin_thread = kthread_run(process_response, dev, + "sheep_fin"); + + ret = sheep_add_disk(dev); + if (ret) { + return ret; + } + + return 0; +} + +int sheep_add_device(const char *addr, int port, const char *vdiname, + int snapshot_id, const char *snapshot_tag) +{ + struct sheepdev *dev; + int ret = 0; + + DBPRT("[%s:%d] vdiname: %s, snapshot id: %d, snapshot tag: %s\n", + addr, port, vdiname, snapshot_id, snapshot_tag); + + dev = kmalloc(sizeof(*dev), GFP_KERNEL); + if (!dev) + return -ENOMEM; + + memset(dev, 0, sizeof(*dev)); + dev->port = port; + dev->snapshot_id = snapshot_id; + memcpy(dev->ip_addr, addr, sizeof(dev->ip_addr)); + strcpy(dev->vdiname, vdiname); + strcpy(dev->snapshot_tag, snapshot_tag); + + spin_lock(&devices_lock); + dev->minor = find_next_zero_bit(device_bitmap, SHEEP_BLKDEV_MINORS, 0); + set_bit(dev->minor, device_bitmap); + spin_unlock(&devices_lock); + + ret = sheep_dev_setup(dev); + if (ret) { + clear_bit(dev->minor, device_bitmap); + goto out; + } else { + sheepdev_get(dev); + spin_lock(&devices_lock); + list_add_tail(&dev->dev_list, &dev_list); + spin_unlock(&devices_lock); + } + + return ret; +out: + kfree(dev); + return ret; +} + +static void remove_device(struct sheepdev *dev) +{ + DBPRT("remove device /dev/%s\n", dev->disk->disk_name); + + kthread_stop(dev->req_thread); + wake_up_interruptible(&dev->req_wait); + if (dev->fin_thread) { + kthread_stop(dev->fin_thread); + wake_up_interruptible(&dev->fin_wait); + } + + blk_cleanup_queue(dev->disk->queue); + del_gendisk(dev->disk); + put_disk(dev->disk); + + clear_bit(dev->minor, device_bitmap); + write_lock(&dev->sock_lock); + if (dev->sock) { + inet_release(dev->sock); + dev->sock = NULL; + } + write_unlock(&dev->sock_lock); + + sheepdev_put(dev); +} + +int sheep_remove_device(const char *vdiname, int snapshot_id, + const char *snapshot_tag) +{ + struct sheepdev *dev, *t; + int ret = 0; + + spin_lock(&devices_lock); + list_for_each_entry_safe(dev, t, &dev_list, dev_list) { + if (strcmp(vdiname, dev->vdiname) != 0) + continue; + if (*(dev->snapshot_tag) != '\0' && + strcmp(snapshot_tag, dev->snapshot_tag) != 0) + continue; + if (snapshot_id != 0 && + snapshot_id != dev->snapshot_id) + continue; + + spin_unlock(&devices_lock); + + spin_lock(&dev->dev_lock); + if (dev->device_refcnt) { + spin_unlock(&dev->dev_lock); + ret = -EBUSY; + } else { + spin_unlock(&dev->dev_lock); + list_del_init(&dev->dev_list); + remove_device(dev); + } + + return ret; + } + spin_unlock(&devices_lock); + + return -ENXIO; +} + +static int __init sheep_module_init(void) +{ + int ret; + + DBPRT("Block device driver for Sheepdog\n"); + + spin_lock_init(&devices_lock); + INIT_LIST_HEAD(&dev_list); + device_bitmap = kmalloc(SHEEP_BLKDEV_MINORS / 8, GFP_KERNEL); + if (!device_bitmap) + return -ENOMEM; + memset(device_bitmap, 0, SHEEP_BLKDEV_MINORS / 8); + + ret = sheep_proc_init(); + if (ret) + return ret; + + sheepdev_major = register_blkdev(0, SHEEP_BLKDEV_NAME); + if (sheepdev_major < 0) { + ret = sheepdev_major; + goto error; + } + + return 0; + +error: + sheep_proc_destroy(); + return ret; +} + +static void __exit sheep_module_exit(void) +{ + struct sheepdev *dev, *t; + + list_for_each_entry_safe(dev, t, &dev_list, dev_list) { + list_del_init(&dev->dev_list); + remove_device(dev); + } + + sheep_proc_destroy(); + unregister_blkdev(sheepdev_major, SHEEP_BLKDEV_NAME); + kfree(device_bitmap); + + DBPRT("Sheepdog Block Device Removed.\n"); +} + +module_init(sheep_module_init); +module_exit(sheep_module_exit); + +MODULE_LICENSE("GPL"); diff --git a/sheepdev/proc.c b/sheepdev/proc.c new file mode 100644 index 0000000..a217dd3 --- /dev/null +++ b/sheepdev/proc.c @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2012 Taobao Inc. + * + * Levin Li <xingke....@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include "sheepdev.h" + +static struct proc_dir_entry *sheep_proc_entry; + +#define MAX_CMD_LEN 64 + +static int process_add_command(char *buf, int len) +{ + int i, ret; + char addr[16]; + char snapshot_tag[SD_MAX_VDI_TAG_LEN]; + char vdiname[SD_MAX_VDI_LEN]; + int snapshot_id = 0, port; + + /* parse ip address */ + for (i = 0; buf[i] != '\0' && buf[i] != '\n' && + buf[i] != ' ' && buf[i] != ':' && i < len; i++); + + if (buf[i] != ' ' && buf[i] != ':') + return -EINVAL; + + if (i + 1 > sizeof(addr)) + return -EINVAL; + + memcpy(addr, buf, i); + addr[i] = '\0'; + + /* parse port */ + if (buf[i] == ' ') { + port = SD_LISTEN_PORT; + buf = &buf[i + 1]; + } else { + /* start from ':' to ' ' */ + char *tmp = &buf[i + 1]; + len -= (i + 1); + for (i = 0; tmp[i] != ' ' && tmp[i] != '\0' && + tmp[i] != '\n' && i < len; i++); + if (tmp[i] != ' ') { + return -EINVAL; + } + tmp[i] = '\0'; + buf = &tmp[i + 1]; + port = simple_strtol(tmp, NULL, 10); + } + + /* parse vdiname and snapshot id */ + for (i = 0; buf[i] != '\0' && buf[i] != ':' && buf[i] != '\n'; i++); + memcpy(vdiname, buf, i); + vdiname[i] = '\0'; + + *snapshot_tag = '\0'; + if (buf[i] == ':') { + char *p; + buf = &buf[i + 1]; + snapshot_id = simple_strtol(buf, &p, 10); + if (buf == p) { + snapshot_id = 0; + if (buf[strlen(buf) - 1] == '\n') + buf[strlen(buf) - 1] = '\0'; + strncpy(snapshot_tag, buf, sizeof(snapshot_tag)); + } + } + + ret = sheep_add_device(addr, port, vdiname, snapshot_id, snapshot_tag); + + return ret; +} + +static int process_del_command(char *buf, int len) +{ + char snapshot_tag[SD_MAX_VDI_TAG_LEN]; + char vdiname[SD_MAX_VDI_LEN]; + int snapshot_id = 0; + int ret, i; + + for (i = 0; buf[i] != ':' && buf[i] != '\n' && buf[i] != '\0'; i++); + + memcpy(vdiname, buf, i); + vdiname[i] = '\0'; + + if (buf[i] == ':') { + char *p; + + buf = &buf[i + 1]; + for (i = 0; buf[i] != '\n' && buf[i] != '\0'; i++); + memcpy(snapshot_tag, buf, i); + snapshot_tag[i] = '\0'; + snapshot_id = simple_strtol(snapshot_tag, &p, 10); + if (snapshot_tag == p) + snapshot_id = 0; + else + *snapshot_tag = '\0'; + } + + ret = sheep_remove_device(vdiname, snapshot_id, snapshot_tag); + + return ret; +} + +static ssize_t sheep_proc_write(struct file *filp, const char __user *buf, + size_t len, loff_t *offset) +{ + char *kern_buf, cmd_buf[MAX_CMD_LEN]; + int i, ret; + + kern_buf = kmalloc(len, GFP_KERNEL); + if (!kern_buf) + return -ENOMEM; + + if (copy_from_user(kern_buf, buf, len)) { + ret = -EINVAL; + goto out; + } + + for (i = 0; kern_buf[i] != '\0' && kern_buf[i] != '\n' && + kern_buf[i] != ' ' && i < len; i++); + + if (i > MAX_CMD_LEN || kern_buf[i] != ' ') { + ret = -EINVAL; + goto out; + } + memcpy(cmd_buf, kern_buf, i); + cmd_buf[i] = '\0'; + if (strcmp(cmd_buf, "add") == 0) { + ret = process_add_command(&kern_buf[i + 1], len - i - 1); + if (ret) + goto out; + } else if (strcmp(cmd_buf, "del") == 0) { + ret = process_del_command(&kern_buf[i + 1], len - i - 1); + if (ret) + goto out; + + } else { + ret = -EINVAL; + goto out; + } + + ret = len; +out: + kfree(kern_buf); + return ret; +} + +static struct file_operations sheep_proc_fops = { + .write = sheep_proc_write, +}; + +int sheep_proc_init(void) +{ + /* create proc entry for sheep control */ + sheep_proc_entry = create_proc_entry(PROC_ENTRY_NAME, + S_IFREG | S_IRUGO | S_IWUGO, NULL); + if (!sheep_proc_entry) + return -ENOMEM; + + sheep_proc_entry->proc_fops = &sheep_proc_fops; + + return 0; +} + +void sheep_proc_destroy(void) +{ + remove_proc_entry(PROC_ENTRY_NAME, NULL); +} diff --git a/sheepdev/sheep.c b/sheepdev/sheep.c new file mode 100644 index 0000000..39f00f9 --- /dev/null +++ b/sheepdev/sheep.c @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2013 Taobao Inc. + * + * Levin Li <xingke....@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sheepdev.h" + +static int read_object(struct sheepdev *dev, uint64_t oid, void *data, + unsigned int datalen, uint64_t offset) +{ + struct sd_req hdr; + struct sd_rsp *rsp = (struct sd_rsp *)&hdr; + int ret; + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_OBJ; + hdr.id = 0; + hdr.data_length = datalen; + + hdr.obj.oid = oid; + hdr.obj.offset = offset; + + ret = exec_req(dev->sock, &hdr, data); + + if (ret < 0) { + DBPRT("Failed to read object %llx\n", oid); + return SD_RES_EIO; + } + + if (rsp->result != SD_RES_SUCCESS) { + DBPRT("Failed to read object %llx,%d\n", oid, + rsp->result); + return SD_RES_EIO; + } + + return SD_RES_SUCCESS; +} + +int send_read_req(struct sheepdev *dev, uint64_t oid, + unsigned int datalen, uint64_t offset) +{ + struct sd_req hdr; + int ret; + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_READ_OBJ; + hdr.id = dev->req_id; + hdr.data_length = datalen; + + hdr.obj.oid = oid; + hdr.obj.offset = offset; + + ret = send_req(dev->sock, &hdr, NULL, 0); + + if (dev->req_id > UINT_MAX) + dev->req_id = 1; + else + dev->req_id++; + + if (ret < 0) { + DBPRT("Failed to read object %llx\n", oid); + return SD_RES_EIO; + } + + return SD_RES_SUCCESS; +} + +int send_write_req(struct sheepdev *dev, uint64_t oid, uint64_t cow_oid, + void *data, unsigned int datalen, uint64_t offset, + int create) +{ + struct sd_req hdr; + int ret; + + memset(&hdr, 0, sizeof(hdr)); + if (create) + hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ; + else + hdr.opcode = SD_OP_WRITE_OBJ; + + hdr.id = dev->req_id; + hdr.data_length = datalen; + hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT; + if (cow_oid) + hdr.flags |= SD_FLAG_CMD_COW; + + hdr.obj.oid = oid; + hdr.obj.cow_oid = cow_oid; + hdr.obj.offset = offset; + hdr.obj.copies = dev->inode->nr_copies; + + ret = send_req(dev->sock, &hdr, data, datalen); + + if (dev->req_id > UINT_MAX) + dev->req_id = 1; + else + dev->req_id++; + + if (ret < 0) { + DBPRT("Failed to write object %llx\n", oid); + return SD_RES_EIO; + } + + return SD_RES_SUCCESS; +} + +static int find_vdi_name(struct sheepdev *dev, const char *vdiname, + uint32_t snapid, const char *tag) +{ + int ret; + struct sd_req hdr; + struct sd_rsp *rsp = (struct sd_rsp *)&hdr; + char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN]; + + memset(buf, 0, sizeof(buf)); + strncpy(buf, vdiname, SD_MAX_VDI_LEN); + strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN); + + memset(&hdr, 0, sizeof(hdr)); + hdr.opcode = SD_OP_LOCK_VDI; + hdr.data_length = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN; + hdr.flags = SD_FLAG_CMD_WRITE; + hdr.vdi.snapid = snapid; + + ret = exec_req(dev->sock, &hdr, buf); + if (ret) + return -EIO; + + if (rsp->result != SD_RES_SUCCESS) { + DBPRT("Cannot get VDI info for %s %d %s\n", + vdiname, snapid, tag); + return -EIO; + } + + dev->vid = rsp->vdi.vdi_id; + + return 0; +} + +int sheep_vdi_setup(struct sheepdev *dev) +{ + int ret; + struct sheepdog_inode *inode; + + inode = vmalloc(sizeof(*inode)); + if (!inode) + return -ENOMEM; + memset(inode, 0 , sizeof(*inode)); + + ret = connect_to(&dev->sock, dev->ip_addr, dev->port); + if (ret) { + ret = -EFAULT; + goto out; + } + + ret = find_vdi_name(dev, dev->vdiname, dev->snapshot_id, + dev->snapshot_tag); + if (ret) + goto out; + + ret = read_object(dev, vid_to_vdi_oid(dev->vid), inode, + SD_INODE_SIZE, 0); + if (ret != SD_RES_SUCCESS) { + ret = -EIO; + goto out; + } + + dev->size = inode->vdi_size; + dev->sectors = dev->size / KERNEL_SECTOR_SIZE; + dev->snapshot_id = inode->snap_id; + strncpy(dev->snapshot_tag, inode->tag, SD_MAX_VDI_TAG_LEN); + dev->inode = inode; + + return 0; +out: + vfree(inode); + return ret; +} diff --git a/sheepdev/sheepdev.h b/sheepdev/sheepdev.h new file mode 100644 index 0000000..fae04a5 --- /dev/null +++ b/sheepdev/sheepdev.h @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2013 Taobao Inc. + * + * Levin Li <xingke....@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __SHEEP_H_ +#define __SHEEP_H_ + +#include <linux/socket.h> +#include <linux/net.h> +#include <net/sock.h> +#include <linux/tcp.h> +#include <linux/socket.h> +#include <linux/slab.h> +#include <linux/in.h> +#include <linux/inet.h> +#include <linux/list.h> +#include <linux/rbtree.h> +#include <asm/atomic.h> +#include <net/inet_common.h> +#include "sheepdog_proto.h" + +#define SHEEP_OBJECT_SIZE (4 * 1024 * 1024) + +#define SHEEP_BLKDEV_NAME "sheep" +#define PROC_ENTRY_NAME "sheep" +#define KERNEL_SECTOR_SIZE 512 +#define SHEEP_BLKDEV_MINORS 1024 + +#define DBPRT(fmt, args...) printk(KERN_DEBUG "sheep: " fmt, ##args) + +struct sheepdev { + struct gendisk *disk; + struct socket *sock; + + /* VDI related */ + char ip_addr[16]; + char vdiname[SD_MAX_VDI_LEN]; + char snapshot_tag[SD_MAX_VDI_TAG_LEN]; + unsigned int snapshot_id; + unsigned int port; + + unsigned int minor; + unsigned int req_id; + unsigned int vid; + unsigned long size; + unsigned long sectors; + atomic_t struct_refcnt; + unsigned int device_refcnt; + + spinlock_t dev_lock; + spinlock_t que_lock; + rwlock_t creating_lock; + rwlock_t sock_lock; + + struct rb_root obj_state_tree; + struct task_struct *req_thread; + struct task_struct *fin_thread; + + wait_queue_head_t req_wait; + wait_queue_head_t fin_wait; + wait_queue_head_t creating_wait; + + struct list_head pending_list; + struct list_head finish_list; + struct list_head dev_list; + struct list_head deletion_list; + + struct list_head sd_req_list; + spinlock_t sd_req_lock; + + struct sheepdog_inode *inode; +}; + +struct sheep_request { + int result; + uint32_t read_length; + struct request *req; + struct list_head list; +}; + +struct obj_request { + int req_id; + int idx; /* idx is only used when update inode */ + uint64_t oid; + struct request *req; + struct list_head list; +}; + +#define OBJ_STATE_CREATING 1 +#define OBJ_STATE_COWING 2 +#define OBJ_STATE_OK 3 + +struct obj_state_entry { + int idx; + int state; + struct rb_node node; +}; + +/* connect.c */ +int connect_to(struct socket **sock, const char *addr, int port); +int send_req(struct socket *sock, struct sd_req *hdr, void *data, + unsigned int wlen); +int do_read(struct socket *sock, char *buf, const size_t length); +int exec_req(struct socket *sock, struct sd_req *hdr, void *data); + +/* proc.c */ +int sheep_proc_init(void); +void sheep_proc_destroy(void); + +/* sheep.c */ +int send_read_req(struct sheepdev *sheepdev, uint64_t oid, + unsigned int datalen, uint64_t offset); +int send_write_req(struct sheepdev *sheepdev, uint64_t oid, uint64_t cow_oid, + void *data, unsigned int datalen, uint64_t offset, + int create); +int sheep_vdi_setup(struct sheepdev *sheep_dev); + +/* device.c */ +int sheep_add_device(const char *addr, int port, const char *vdiname, + int snapshot_id, const char *snapshot_tag); +int sheep_remove_device(const char *vdiname, int snapshot_id, + const char *snapshot_tag); + +static inline int object_ready(struct sheepdev *dev, int idx) +{ + return dev->inode->data_vdi_id[idx] == dev->vid; +} + +#endif -- 1.7.11.7 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog