The current implementation of Sheepdog journaling only supports
objects.  This patch extend it.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 sheep/Makefile.am  |    3 +-
 sheep/journal.c    |  310 ++++++++++++++++++++++++++++++++++++++++
 sheep/sheep.c      |    2 -
 sheep/sheep_priv.h |   62 +--------
 sheep/store.c      |  403 +---------------------------------------------------
 5 files changed, 322 insertions(+), 458 deletions(-)
 create mode 100644 sheep/journal.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index ad92ea0..2b9d58f 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -23,7 +23,8 @@ INCLUDES              = -I$(top_builddir)/include 
-I$(top_srcdir)/include $(libcpg_CFLAGS) $
 
 sbin_PROGRAMS          = sheep
 
-sheep_SOURCES          = sheep.c group.c sdnet.c store.c vdi.c work.c 
cluster/corosync.c
+sheep_SOURCES          = sheep.c group.c sdnet.c store.c vdi.c work.c 
journal.c \
+                         cluster/corosync.c
 sheep_LDADD            = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a 
-lpthread
 sheep_DEPENDENCIES     = ../lib/libsheepdog.a
 
diff --git a/sheep/journal.c b/sheep/journal.c
new file mode 100644
index 0000000..19ac259
--- /dev/null
+++ b/sheep/journal.c
@@ -0,0 +1,310 @@
+/*
+ * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <dirent.h>
+#include <errno.h>
+
+#include "sheep_priv.h"
+
+#define JRNL_END_MARK           0x87654321UL
+#define IS_END_MARK_SET(var)    (var == JRNL_END_MARK)
+
+/* Journal header for data object */
+struct jrnl_head {
+       uint64_t offset;
+       uint64_t size;
+       char target_path[256];
+};
+
+struct jrnl_descriptor {
+       struct jrnl_head head;
+       void *data;
+       int fd;      /* Open file fd */
+       int target_fd;
+       char path[256];
+};
+
+/* Journal APIs */
+static int jrnl_open(struct jrnl_descriptor *jd, const char *path)
+{
+       strcpy(jd->path, path);
+       jd->fd = open(path, O_RDONLY);
+
+       if (jd->fd < 0) {
+               eprintf("failed to open %s, %s\n", jd->path, strerror(errno));
+               if (errno == ENOENT)
+                       return SD_RES_NO_OBJ;
+               else
+                       return SD_RES_UNKNOWN;
+       }
+
+       return SD_RES_SUCCESS;
+}
+
+static int jrnl_close(struct jrnl_descriptor *jd)
+{
+       close(jd->fd);
+       jd->fd = -1;
+
+       return 0;
+}
+
+static int jrnl_create(struct jrnl_descriptor *jd, const char *jrnl_dir)
+{
+       snprintf(jd->path, sizeof(jd->path), "%sXXXXXX", jrnl_dir);
+       jd->fd = mkostemp(jd->path, O_SYNC);
+
+       if (jd->fd < 0) {
+               eprintf("failed to create %s, %s\n", jd->path, strerror(errno));
+               return SD_RES_UNKNOWN;
+       }
+
+       return SD_RES_SUCCESS;
+}
+
+static int jrnl_remove(struct jrnl_descriptor *jd)
+{
+       int ret;
+
+       ret = unlink(jd->path);
+       if (ret) {
+               eprintf("failed to remove %s, %m\n", jd->path);
+               ret = SD_RES_EIO;
+       } else
+               ret = SD_RES_SUCCESS;
+
+       return ret;
+}
+
+static int jrnl_has_end_mark(struct jrnl_descriptor *jd)
+{
+       ssize_t ret;
+       uint32_t end_mark = 0;
+       struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+
+       ret = pread64(jd->fd, &end_mark, sizeof(end_mark),
+                     sizeof(*head) + head->size);
+
+       return IS_END_MARK_SET(end_mark);
+}
+
+static int jrnl_write_header(struct jrnl_descriptor *jd)
+{
+       ssize_t ret;
+       struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+
+       ret = pwrite64(jd->fd, head, sizeof(*head), 0);
+
+       if (ret != sizeof(*head)) {
+               if (errno == ENOSPC)
+                       ret = SD_RES_NO_SPACE;
+               else
+                       ret = SD_RES_EIO;
+       } else
+               ret = SD_RES_SUCCESS;
+
+       return ret;
+}
+
+static int jrnl_write_data(struct jrnl_descriptor *jd)
+{
+       ssize_t ret;
+       struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+
+       ret = pwrite64(jd->fd, jd->data, head->size, sizeof(*head));
+
+       if (ret != head->size) {
+               if (errno == ENOSPC)
+                       ret = SD_RES_NO_SPACE;
+               else
+                       ret = SD_RES_EIO;
+       } else
+               ret = SD_RES_SUCCESS;
+
+       return ret;
+}
+
+static int jrnl_write_end_mark(struct jrnl_descriptor *jd)
+{
+       ssize_t retsize;
+       int ret;
+       uint32_t end_mark = JRNL_END_MARK;
+       struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+
+       retsize = pwrite64(jd->fd, &end_mark, sizeof(end_mark),
+                          sizeof(*head) + head->size);
+
+       if (retsize != sizeof(end_mark)) {
+               if (errno == ENOSPC)
+                       ret = SD_RES_NO_SPACE;
+               else
+                       ret = SD_RES_EIO;
+       } else
+               ret = SD_RES_SUCCESS;
+
+       return ret;
+}
+
+static int jrnl_apply_to_target_object(struct jrnl_descriptor *jd)
+{
+       char *buf = NULL;
+       int buf_len, res = 0;
+       ssize_t retsize;
+
+       /* FIXME: handle larger size */
+       buf_len = (1 << 22);
+       buf = zalloc(buf_len);
+       if (!buf) {
+               eprintf("failed to allocate memory\n");
+               return SD_RES_NO_MEM;
+       }
+
+       /* Flush out journal to disk (vdi object) */
+       retsize = pread64(jd->fd, &jd->head, sizeof(jd->head), 0);
+       retsize = pread64(jd->fd, buf, jd->head.size, sizeof(jd->head));
+       retsize = pwrite64(jd->target_fd, buf, jd->head.size, jd->head.offset);
+       if (retsize != jd->head.size) {
+               if (errno == ENOSPC)
+                       res = SD_RES_NO_SPACE;
+               else
+                       res = SD_RES_EIO;
+       }
+
+       /* Clean up */
+       free(buf);
+
+       return res;
+}
+
+static int jrnl_commit_data(struct jrnl_descriptor *jd)
+{
+       int ret = 0;
+       ssize_t retsize;
+       struct jrnl_head *head = (struct jrnl_head *) &jd->head;
+
+       retsize = pwrite64(jd->target_fd, jd->data, head->size, head->offset);
+       if (retsize != head->size) {
+               if (errno == ENOSPC)
+                       ret = SD_RES_NO_SPACE;
+               else
+                       ret = SD_RES_EIO;
+       }
+
+       return ret;
+}
+
+/*
+ * We cannot use this function for concurrent write operations
+ */
+int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
+                const char *path, const char *jrnl_dir)
+{
+       int ret;
+       struct jrnl_descriptor jd;
+
+       memset(&jd, 0, sizeof(jd));
+       jd.target_fd = fd;
+
+       jd.head.offset = offset;
+       jd.head.size = count;
+       strcpy(jd.head.target_path, path);
+
+       jd.data = buf;
+
+       ret = jrnl_create(&jd, jrnl_dir);
+       if (ret)
+               goto out;
+
+       ret = jrnl_write_header(&jd);
+       if (ret)
+               goto out;
+
+       ret = jrnl_write_data(&jd);
+       if (ret)
+               goto out;
+
+       ret = jrnl_write_end_mark(&jd);
+       if (ret)
+               goto out;
+
+       ret = jrnl_commit_data(&jd);
+       if (ret)
+               goto out;
+
+       ret = jrnl_close(&jd);
+       if (ret)
+               goto out;
+
+       ret = jrnl_remove(&jd);
+
+out:
+       return ret;
+}
+
+int jrnl_recover(const char *jrnl_dir)
+{
+       DIR *dir;
+       struct dirent *d;
+       char jrnl_file_path[PATH_MAX];
+
+       eprintf("Openning the directory %s.\n", jrnl_dir);
+       dir = opendir(jrnl_dir);
+       if (!dir)
+               return -1;
+
+       vprintf(SDOG_NOTICE, "start jrnl_recovery.\n");
+       while ((d = readdir(dir))) {
+               int ret;
+               struct jrnl_descriptor jd;
+
+               if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
+                       continue;
+
+               snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%s",
+                        jrnl_dir, d->d_name);
+               ret = jrnl_open(&jd, jrnl_file_path);
+               if (ret) {
+                       eprintf("Unable to open the journal file, %s, for 
reading.\n",
+                               jrnl_file_path);
+                       goto end_while_3;
+               }
+               if (!jrnl_has_end_mark(&jd))
+                       goto end_while_2;
+               jd.target_fd = open(jd.head.target_path, O_SYNC | O_RDWR);
+               if (ret) {
+                       eprintf("Unable to open the object file, %s, to 
recover.\n",
+                               jd.head.target_path);
+                       goto end_while_2;
+               }
+               ret = jrnl_apply_to_target_object(&jd);
+               if (ret)
+                       eprintf("Unable to recover the object, %s.\n",
+                               jd.head.target_path);
+
+               close(jd.target_fd);
+               jd.target_fd = -1;
+end_while_2:
+               jrnl_close(&jd);
+end_while_3:
+               vprintf(SDOG_INFO, "recovered the object in journal, %s\n",
+                       jrnl_file_path);
+               jrnl_remove(&jd);
+       }
+       closedir(dir);
+       vprintf(SDOG_NOTICE, "end jrnl_recovery.\n");
+
+       return 0;
+}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 27859ca..0deac09 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -185,8 +185,6 @@ int main(int argc, char **argv)
        if (ret)
                exit(1);
 
-       jrnl_recover();
-
        ret = init_event(EPOLL_SIZE);
        if (ret)
                exit(1);
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index e2fcb40..cccb986 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -242,65 +242,9 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int 
node_idx,
                 uint32_t epoch, int worker_idx);
 
 /* Journal */
-#define JRNL_TYPE_VDI        0
-#define JRNL_MAX_TYPES       1
-
-#define SET_END_MARK            1UL
-#define UNSET_END_MARK          0UL
-#define IS_END_MARK_SET(var)    (var == 1UL)
-
-/* Journal header for data object */
-struct jrnl_vdi_head {
-       uint32_t jh_type;
-       uint32_t pad;
-       uint64_t jh_offset;
-       uint64_t jh_size;
-};
-
-struct jrnl_file_desc {
-       uint32_t  jf_epoch;   /* epoch */
-       uint64_t  jf_oid;     /* Object id */
-       int       jf_fd;      /* Open file fd */
-       int       jf_target_fd;
-} jrnl_file_desc_t;
-
-struct jrnl_descriptor {
-       void                    *jd_head;
-       void                    *jd_data;
-       int                     jd_end_mark;
-       struct jrnl_file_desc   jd_jfd;
-#define jdf_epoch               jd_jfd.jf_epoch
-#define jdf_oid                 jd_jfd.jf_oid
-#define jdf_fd                  jd_jfd.jf_fd
-#define jdf_target_fd           jd_jfd.jf_target_fd
-} jrnl_desc_t;
-
-struct jrnl_handler {
-       int (*has_end_mark)(struct jrnl_descriptor *jd);
-       int (*write_header)(struct jrnl_descriptor *jd);
-       int (*write_data)(struct jrnl_descriptor *jd);
-       int (*write_end_mark)(struct jrnl_descriptor *jd);
-       int (*apply_to_target_object)(struct jrnl_file_desc *jfd);
-       int (*commit_data)(struct jrnl_descriptor *jd);
-};
-
-inline uint32_t jrnl_get_type(struct jrnl_descriptor *jd);
-int jrnl_get_type_from_file(struct jrnl_file_desc *jfd, uint32_t *jrnl_type);
-int jrnl_exists(struct jrnl_file_desc *jfd);
-int jrnl_update_epoch_store(uint32_t epoch);
-int jrnl_open(struct jrnl_file_desc *jfd, int aflags);
-int jrnl_create(struct jrnl_file_desc *jfd);
-int jrnl_remove(struct jrnl_file_desc *jfd);
-inline int jrnl_close(struct jrnl_file_desc *jfd);
-
-inline int jrnl_has_end_mark(struct jrnl_descriptor *jd);
-inline int jrnl_write_header(struct jrnl_descriptor *jd);
-inline int jrnl_write_data(struct jrnl_descriptor *jd);
-inline int jrnl_write_end_mark(struct jrnl_descriptor *jd);
-inline int jrnl_apply_to_target_object(struct jrnl_file_desc *jfd);
-inline int jrnl_commit_data(struct jrnl_descriptor *jd);
-int jrnl_perform(struct jrnl_descriptor *jd);
-int jrnl_recover(void);
+int jrnl_perform(int fd, void *buf, size_t count, off_t offset,
+                const char *path, const char *jrnl_dir);
+int jrnl_recover(const char *jrnl_dir);
 
 static inline int is_myself(uint8_t *addr, uint16_t port)
 {
diff --git a/sheep/store.c b/sheep/store.c
index abaab9f..8eb2d5b 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -35,26 +35,6 @@ static char *jrnl_path;
 static mode_t def_dmode = S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP | S_IWGRP | 
S_IXGRP;
 static mode_t def_fmode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
 
-/* Journal internal data structures */
-/* Journal Handlers for Data Object */
-static int jrnl_vdi_has_end_mark(struct jrnl_descriptor *jd);
-static int jrnl_vdi_write_header(struct jrnl_descriptor *jd);
-static int jrnl_vdi_write_data(struct jrnl_descriptor *jd);
-static int jrnl_vdi_write_end_mark(struct jrnl_descriptor *jd);
-static int jrnl_vdi_apply_to_target_object(struct jrnl_file_desc *jfd);
-static int jrnl_vdi_commit_data(struct jrnl_descriptor *jd);
-
-static struct jrnl_handler jrnl_handlers[JRNL_MAX_TYPES] = {
-       {
-               .has_end_mark = jrnl_vdi_has_end_mark,
-               .write_header = jrnl_vdi_write_header,
-               .write_data = jrnl_vdi_write_data,
-               .write_end_mark = jrnl_vdi_write_end_mark,
-               .apply_to_target_object = jrnl_vdi_apply_to_target_object,
-               .commit_data = jrnl_vdi_commit_data
-       }
-};
-
 static int obj_cmp(const void *oid1, const void *oid2)
 {
        const uint64_t hval1 = fnv_64a_buf((void *)oid1, sizeof(uint64_t), 
FNV1A_64_INIT);
@@ -565,8 +545,6 @@ static int store_queue_request_local(struct request *req, 
uint32_t epoch)
        uint64_t oid = hdr->oid;
        uint32_t opcode = hdr->opcode;
        char path[1024], *buf = NULL;
-       struct jrnl_descriptor jd;
-       struct jrnl_vdi_head jh;
 
        dprintf("%x, %" PRIx64" , %u\n", opcode, oid, epoch);
 
@@ -703,20 +681,10 @@ static int store_queue_request_local(struct request *req, 
uint32_t epoch)
                }
 
                if (is_vdi_obj(oid)) {
-                       jd.jdf_epoch = epoch;
-                       jd.jdf_oid = oid;
-                       jd.jdf_target_fd = fd;
-
-                       memset(&jh, 0, sizeof(jh));
-                       jh.jh_type = JRNL_TYPE_VDI;
-                       jh.jh_offset = hdr->offset;
-                       jh.jh_size = hdr->data_length;
-
-                       jd.jd_head = &jh;
-                       jd.jd_data = req->data;
-                       jd.jd_end_mark = SET_END_MARK;
-
-                       ret = jrnl_perform(&jd);
+                       snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, 
obj_path,
+                                epoch, oid);
+                       ret = jrnl_perform(fd, req->data, hdr->data_length,
+                                          hdr->offset, path, jrnl_path);
                        if (ret)
                                goto out;
                } else {
@@ -2028,10 +1996,13 @@ static int init_jrnl_path(const char *base_path)
        /* Error during directory creation */
        if (ret)
                return ret;
+
        /* If journal is newly created */
        if (new)
                return 0;
 
+       jrnl_recover(jrnl_path);
+
        return 0;
 }
 
@@ -2087,363 +2058,3 @@ int get_global_nr_copies(uint32_t *copies)
 {
        return attr(epoch_path, ANAME_COPIES, copies, sizeof(*copies), 0);
 }
-
-/* Journal APIs */
-int jrnl_exists(struct jrnl_file_desc *jfd)
-{
-       int ret;
-       char path[1024];
-       struct stat s;
-
-       snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
-                jfd->jf_epoch, jfd->jf_oid);
-
-       ret = stat(path, &s);
-       if (ret)
-               return 1;
-
-       return 0;
-}
-
-int jrnl_update_epoch_store(uint32_t epoch)
-{
-       char new[1024];
-       struct stat s;
-
-       snprintf(new, sizeof(new), "%s%08u/", jrnl_path, epoch);
-       if (stat(new, &s) < 0)
-               if (errno == ENOENT)
-                       mkdir(new, def_dmode);
-
-       return 0;
-}
-
-int jrnl_open(struct jrnl_file_desc *jfd, int aflags)
-{
-       char path[1024];
-       int flags = aflags;
-       int fd, ret;
-
-
-       jrnl_update_epoch_store(jfd->jf_epoch);
-       snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
-                jfd->jf_epoch, jfd->jf_oid);
-
-       fd = open(path, flags, def_fmode);
-       if (fd < 0) {
-               eprintf("failed to open %s, %s\n", path, strerror(errno));
-               if (errno == ENOENT)
-                       ret = SD_RES_NO_OBJ;
-               else
-                       ret = SD_RES_UNKNOWN;
-       } else {
-               jfd->jf_fd = fd;
-               ret = SD_RES_SUCCESS;
-       }
-
-       return ret;
-}
-
-int jrnl_close(struct jrnl_file_desc *jfd)
-{
-       close(jfd->jf_fd);
-       jfd->jf_fd = -1;
-
-       return 0;
-}
-
-int jrnl_create(struct jrnl_file_desc *jfd)
-{
-       return jrnl_open(jfd, O_RDWR | O_CREAT);
-}
-
-inline uint32_t jrnl_get_type(struct jrnl_descriptor *jd)
-{
-       return *((uint32_t *) jd->jd_head);
-}
-
-int jrnl_get_type_from_file(struct jrnl_file_desc *jfd, uint32_t *jrnl_type)
-{
-       ssize_t retsize;
-
-       retsize = pread64(jfd->jf_fd, jrnl_type, sizeof(*jrnl_type), 0);
-
-       if (retsize != sizeof(*jrnl_type))
-               return SD_RES_EIO;
-       else
-               return SD_RES_SUCCESS;
-}
-
-int jrnl_remove(struct jrnl_file_desc *jfd)
-{
-       char path[1024];
-       int ret;
-
-       snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, jrnl_path,
-                jfd->jf_epoch, jfd->jf_oid);
-       ret = unlink(path);
-       if (ret) {
-               eprintf("failed to remove %s, %s\n", path, strerror(errno));
-               ret = SD_RES_EIO;
-       } else
-               ret = SD_RES_SUCCESS;
-
-       return ret;
-}
-
-inline int jrnl_has_end_mark(struct jrnl_descriptor *jd)
-{
-       return jrnl_handlers[jrnl_get_type(jd)].has_end_mark(jd);
-}
-
-inline int jrnl_write_header(struct jrnl_descriptor *jd)
-{
-       return jrnl_handlers[jrnl_get_type(jd)].write_header(jd);
-}
-
-inline int jrnl_write_data(struct jrnl_descriptor *jd)
-{
-       return jrnl_handlers[jrnl_get_type(jd)].write_data(jd);
-}
-
-inline int jrnl_write_end_mark(struct jrnl_descriptor *jd)
-{
-       return jrnl_handlers[jrnl_get_type(jd)].write_end_mark(jd);
-}
-
-inline int jrnl_apply_to_target_object(struct jrnl_file_desc *jfd)
-{
-       int ret;
-       uint32_t jrnl_type;
-
-       ret = jrnl_get_type_from_file(jfd, &jrnl_type);
-       if (ret)
-               return ret;
-
-       return jrnl_handlers[jrnl_type].apply_to_target_object(jfd);
-}
-
-inline int jrnl_commit_data(struct jrnl_descriptor *jd)
-{
-       return jrnl_handlers[jrnl_get_type(jd)].commit_data(jd);
-}
-
-int jrnl_perform(struct jrnl_descriptor *jd)
-{
-       int ret;
-
-       ret = jrnl_create(&jd->jd_jfd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_write_header(jd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_write_data(jd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_write_end_mark(jd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_commit_data(jd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_close(&jd->jd_jfd);
-       if (ret)
-               goto out;
-
-       ret = jrnl_remove(&jd->jd_jfd);
-
-out:
-       return ret;
-}
-
-int jrnl_recover(void)
-{
-       DIR *dir;
-       struct dirent *d;
-       char jrnl_dir[PATH_MAX], jrnl_file_path[PATH_MAX], 
obj_file_path[PATH_MAX];
-       int epoch;
-
-       epoch = get_latest_epoch();
-       if (epoch < 0)
-               return 1;
-
-       snprintf(jrnl_dir, sizeof(jrnl_dir), "%s%08u/", jrnl_path, epoch);
-
-       eprintf("Openning the directory %s.\n", jrnl_dir);
-       dir = opendir(jrnl_dir);
-       if (!dir)
-               return -1;
-
-       vprintf(SDOG_NOTICE, "start jrnl_recovery.\n");
-       while ((d = readdir(dir))) {
-               int ret;
-               struct jrnl_file_desc jfd;
-
-               if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
-                       continue;
-
-               jfd.jf_epoch = epoch;
-               sscanf(d->d_name, "%" PRIx64, &jfd.jf_oid);
-               snprintf(jrnl_file_path, sizeof(jrnl_file_path), "%s%016" 
PRIx64,
-                        jrnl_dir, jfd.jf_oid);
-               snprintf(obj_file_path, sizeof(obj_file_path), "%s%08u/%016" 
PRIx64,
-                        obj_path, epoch, jfd.jf_oid);
-               ret = jrnl_open(&jfd, O_RDONLY);
-               if (ret) {
-                       eprintf("Unable to open the journal file, %s, for 
reading.\n",
-                               jrnl_file_path);
-                       goto end_while_3;
-               }
-               jfd.jf_target_fd = ob_open(epoch, jfd.jf_oid, 0, &ret);
-               if (ret) {
-                       eprintf("Unable to open the object file, %s, to 
recover.\n",
-                               obj_file_path);
-                       goto end_while_2;
-               }
-               ret = jrnl_apply_to_target_object(&jfd);
-               if (ret)
-                       eprintf("Unable to recover the object, %s.\n",
-                               obj_file_path);
-
-               close(jfd.jf_target_fd);
-               jfd.jf_target_fd = -1;
-end_while_2:
-               jrnl_close(&jfd);
-end_while_3:
-               vprintf(SDOG_INFO, "recovered the object in journal, %s\n",
-                       jrnl_file_path);
-               jrnl_remove(&jfd);
-       }
-       closedir(dir);
-       vprintf(SDOG_NOTICE, "end jrnl_recovery.\n");
-
-       return 0;
-}
-
-/* VDI data journalling functions */
-static int jrnl_vdi_has_end_mark(struct jrnl_descriptor *jd)
-{
-       ssize_t ret;
-       uint32_t end_mark = UNSET_END_MARK;
-       struct jrnl_vdi_head *head = (struct jrnl_vdi_head *) jd->jd_head;
-
-       ret = pread64(jd->jdf_fd, &end_mark, sizeof(end_mark),
-                     sizeof(*head) + head->jh_size);
-
-       return IS_END_MARK_SET(end_mark) ? SET_END_MARK : UNSET_END_MARK;
-}
-
-static int jrnl_vdi_write_header(struct jrnl_descriptor *jd)
-{
-       ssize_t ret;
-       struct jrnl_vdi_head *head = (struct jrnl_vdi_head *) jd->jd_head;
-
-       ret = pwrite64(jd->jdf_fd, head, sizeof(*head), 0);
-
-       if (ret != sizeof(*head)) {
-               if (errno == ENOSPC)
-                       ret = SD_RES_NO_SPACE;
-               else
-                       ret = SD_RES_EIO;
-       } else
-               ret = SD_RES_SUCCESS;
-
-       return ret;
-}
-
-static int jrnl_vdi_write_data(struct jrnl_descriptor *jd)
-{
-       ssize_t ret;
-       struct jrnl_vdi_head *head = (struct jrnl_vdi_head *) jd->jd_head;
-
-       ret = pwrite64(jd->jdf_fd, jd->jd_data, head->jh_size, sizeof(*head));
-
-       if (ret != head->jh_size) {
-               if (errno == ENOSPC)
-                       ret = SD_RES_NO_SPACE;
-               else
-                       ret = SD_RES_EIO;
-       } else
-               ret = SD_RES_SUCCESS;
-
-       return ret;
-}
-
-static int jrnl_vdi_write_end_mark(struct jrnl_descriptor *jd)
-{
-       ssize_t retsize;
-       int ret;
-       uint32_t end_mark = SET_END_MARK;
-       struct jrnl_vdi_head *head = (struct jrnl_vdi_head *) jd->jd_head;
-
-       retsize = pwrite64(jd->jdf_fd, &end_mark, sizeof(end_mark),
-                          sizeof(*head) + head->jh_size);
-
-       if (retsize != sizeof(end_mark)) {
-               if (errno == ENOSPC)
-                       ret = SD_RES_NO_SPACE;
-               else
-                       ret = SD_RES_EIO;
-       } else
-               ret = SD_RES_SUCCESS;
-
-       jd->jd_end_mark = end_mark;
-
-       return ret;
-}
-
-static int jrnl_vdi_apply_to_target_object(struct jrnl_file_desc *jfd)
-{
-       char *buf = NULL;
-       int buf_len, res = 0;
-       ssize_t retsize;
-       struct jrnl_vdi_head jh;
-
-       /* FIXME: handle larger size */
-       buf_len = (1 << 22);
-       buf = zalloc(buf_len);
-       if (!buf) {
-               eprintf("failed to allocate memory\n");
-               return SD_RES_NO_MEM;
-       }
-
-       /* Flush out journal to disk (vdi object) */
-       retsize = pread64(jfd->jf_fd, &jh, sizeof(jh), 0);
-       retsize = pread64(jfd->jf_fd, buf, jh.jh_size, sizeof(jh));
-       retsize = pwrite64(jfd->jf_target_fd, buf, jh.jh_size, jh.jh_offset);
-       if (retsize != jh.jh_size) {
-               if (errno == ENOSPC)
-                       res = SD_RES_NO_SPACE;
-               else
-                       res = SD_RES_EIO;
-       }
-
-       /* Clean up */
-       free(buf);
-
-       return res;
-}
-
-static int jrnl_vdi_commit_data(struct jrnl_descriptor *jd)
-{
-       int ret = 0;
-       ssize_t retsize;
-       struct jrnl_vdi_head *head = (struct jrnl_vdi_head *) jd->jd_head;
-
-       retsize = pwrite64(jd->jdf_target_fd, jd->jd_data, head->jh_size,
-                          head->jh_offset);
-       if (retsize != head->jh_size) {
-               if (errno == ENOSPC)
-                       ret = SD_RES_NO_SPACE;
-               else
-                       ret = SD_RES_EIO;
-       }
-
-       return ret;
-}
-- 
1.7.2.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to