This is the skeleton of a writeback cache implementation.  The biggest
missing item is that cache flush requestsn't aren't forwarded to other
nodes yet.  Also currently it is enabled unconditional instead of through
a flag.

Signed-off-by: Christoph Hellwig <[email protected]>

Index: sheepdog/include/sheepdog_proto.h
===================================================================
--- sheepdog.orig/include/sheepdog_proto.h      2011-11-15 22:55:42.339689112 
+0100
+++ sheepdog/include/sheepdog_proto.h   2011-11-15 22:56:08.209688958 +0100
@@ -28,6 +28,7 @@
 #define SD_OP_RELEASE_VDI    0x13
 #define SD_OP_GET_VDI_INFO   0x14
 #define SD_OP_READ_VDIS      0x15
+#define SD_OP_FLUSH_VDI      0x16
 
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
Index: sheepdog/sheep/ops.c
===================================================================
--- sheepdog.orig/sheep/ops.c   2011-11-15 22:55:42.353022445 +0100
+++ sheepdog/sheep/ops.c        2011-11-15 22:56:08.213022292 +0100
@@ -489,6 +489,11 @@ static struct sd_op_template sd_ops[] =
        [SD_OP_REMOVE_OBJ] = {
                .type = SD_OP_TYPE_IO,
        },
+
+       [SD_OP_FLUSH_VDI] = {
+               .type = SD_OP_TYPE_IO,
+       },
+
 };
 
 struct sd_op_template *get_sd_op(uint8_t opcode)
Index: sheepdog/sheep/store.c
===================================================================
--- sheepdog.orig/sheep/store.c 2011-11-15 22:55:42.366355778 +0100
+++ sheepdog/sheep/store.c      2011-11-15 23:02:30.179686678 +0100
@@ -22,6 +22,7 @@
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <time.h>
+#include <asm/unistd.h>
 
 #include "sheep_priv.h"
 
@@ -429,12 +430,150 @@ out:
        return ret;
 }
 
+static int forward_flush_obj_req(struct request *req, int idx)
+{
+       struct sd_obj_req hdr = *(struct sd_obj_req *)&req->rq;
+       struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+       int ret;
+
+       /*
+        * XXX: need to write code to find all nodes that have objects for
+        * this VDI.
+        */
+#if 0
+       int i, n, nr, fd;
+       unsigned wlen, rlen;
+       char name[128];
+       struct sheepdog_vnode_list_entry *e;
+       uint64_t oid = hdr.oid;
+       int copies;
+       struct pollfd pfds[SD_MAX_REDUNDANCY];
+       int nr_fds, local = 0;
+
+       dprintf("forward_flush_obj_req %"PRIx64"\n", oid);
+       e = req->entry;
+       nr = req->nr_vnodes;
+
+       copies = hdr.copies;
+
+       /* temporary hack */
+       if (!copies)
+               copies = sys->nr_sobjs;
+       if (copies > req->nr_zones)
+               copies = req->nr_zones;
+
+       nr_fds = 0;
+       memset(pfds, 0, sizeof(pfds));
+       for (i = 0; i < ARRAY_SIZE(pfds); i++)
+               pfds[i].fd = -1;
+
+       hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
+
+       wlen = hdr.data_length;
+       rlen = 0;
+
+       for (i = 0; i < copies; i++) {
+               n = obj_to_sheep(e, nr, oid, i);
+
+               addr_to_str(name, sizeof(name), e[n].addr, 0);
+
+               if (is_myself(e[n].addr, e[n].port)) {
+                       local = 1;
+                       continue;
+               }
+
+               fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, 
hdr.epoch, idx);
+               if (fd < 0) {
+                       eprintf("failed to connect to %s:%"PRIu32"\n", name, 
e[n].port);
+                       ret = SD_RES_NETWORK_ERROR;
+                       goto out;
+               }
+
+               ret = send_req(fd, (struct sd_req *)&hdr, req->data, &wlen);
+               if (ret) { /* network errors */
+                       ret = SD_RES_NETWORK_ERROR;
+                       dprintf("fail %"PRIu32"\n", ret);
+                       goto out;
+               }
+
+               pfds[nr_fds].fd = fd;
+               pfds[nr_fds].events = POLLIN;
+               nr_fds++;
+       }
+
+       if (local) {
+#endif
+               ret = store_queue_request_local(req, hdr.epoch);
+               rsp->result = ret;
+
+#if 0
+               if (nr_fds == 0) {
+                       eprintf("exit %"PRIu32"\n", ret);
+                       goto out;
+               }
+
+               if (rsp->result != SD_RES_SUCCESS) {
+                       eprintf("fail (local) %"PRIu32"\n", ret);
+                       goto out;
+               }
+       }
+
+       ret = SD_RES_SUCCESS;
+again:
+       if (poll(pfds, nr_fds, -1) < 0) {
+               if (errno == EINTR)
+                       goto again;
+
+               ret = SD_RES_EIO;
+       }
+
+       for (i = 0; i < nr_fds; i++) {
+               if (pfds[i].fd < 0)
+                       break;
+
+               if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP || 
pfds[i].revents & POLLNVAL) {
+                       ret = SD_RES_NETWORK_ERROR;
+                       break;
+               }
+
+               if (!(pfds[i].revents & POLLIN))
+                       continue;
+
+               if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) {
+                       eprintf("failed to read a response: %m\n");
+                       ret = SD_RES_NETWORK_ERROR;
+                       break;
+               }
+
+               if (rsp->result != SD_RES_SUCCESS) {
+                       eprintf("fail %"PRIu32"\n", rsp->result);
+                       ret = rsp->result;
+               }
+
+               break;
+       }
+       if (i < nr_fds) {
+               nr_fds--;
+               memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i));
+       }
+
+       dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
+
+       if (nr_fds > 0) {
+               goto again;
+       }
+out:
+#endif
+       return ret;
+}
+
+
 static int ob_open(uint32_t epoch, uint64_t oid, int flags, int *ret)
 {
        char path[1024];
        int fd;
 
-       flags |= O_DSYNC | O_RDWR;
+       flags |= O_RDWR;
        if (sys->use_directio && is_data_obj(oid))
                flags |= O_DIRECT;
 
@@ -782,6 +921,33 @@ out:
        return ret;
 }
 
+static int store_flush_cache(struct request *req, uint32_t epoch)
+{
+       int ret = SD_RES_SUCCESS;
+       int fd;
+
+       /*
+        * XXX: either check during startup that there is only a single
+        * filesystem used by the sheep daemon, or find a way to iterate
+        * over all of them.
+        */
+       fd = open(obj_path, O_DIRECTORY | O_RDONLY);
+       if (fd < 0) {
+               fprintf(stderr, "open failed\n");
+               return SD_RES_EIO;
+       }
+
+       if (syscall(__NR_syncfs, fd) < 0) {
+               fprintf(stderr, "syncfs returned %d\n", errno);
+               ret = SD_RES_EIO;
+       } else {
+               fprintf(stderr, "syncfs done\n");
+       }
+
+       close(fd);
+       return ret;
+}
+
 static int store_queue_request_local(struct request *req, uint32_t epoch)
 {
        struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
@@ -790,6 +956,9 @@ static int store_queue_request_local(str
        dprintf("%x, %" PRIx64" , %u\n", hdr->opcode, hdr->oid, epoch);
 
        switch (hdr->opcode) {
+       case SD_OP_FLUSH_VDI:
+               ret = store_flush_cache(req, epoch);
+               break;
        case SD_OP_WRITE_OBJ:
                ret = store_write_obj(req, epoch);
                break;
@@ -900,7 +1069,9 @@ void store_queue_request(struct work *wo
                                goto out;
                }
 
-               if (hdr->flags & SD_FLAG_CMD_WRITE)
+               if (hdr->opcode == SD_OP_FLUSH_VDI)
+                       ret = forward_flush_obj_req(req, idx);
+               else if (hdr->flags & SD_FLAG_CMD_WRITE)
                        ret = forward_write_obj_req(req, idx);
                else
                        ret = forward_read_obj_req(req, idx);
-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to