From: Masaki Saeki <saeki.mas...@po.ntts.co.jp>

This change is a preparation patch for add store_driver.
Put files together to the new folder.

Signed-off-by: Masaki Saeki <saeki.mas...@po.ntts.co.jp>
---
 sheep/Makefile.am         |    6 +-
 sheep/md.c                |  878 ---------------------------------------------
 sheep/plain_store.c       |  759 ---------------------------------------
 sheep/store.c             |  511 --------------------------
 sheep/store/common.c      |  511 ++++++++++++++++++++++++++
 sheep/store/md.c          |  878 +++++++++++++++++++++++++++++++++++++++++++++
 sheep/store/plain_store.c |  759 +++++++++++++++++++++++++++++++++++++++
 7 files changed, 2152 insertions(+), 2150 deletions(-)
 delete mode 100644 sheep/md.c
 delete mode 100644 sheep/plain_store.c
 delete mode 100644 sheep/store.c
 create mode 100644 sheep/store/common.c
 create mode 100644 sheep/store/md.c
 create mode 100644 sheep/store/plain_store.c

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 7a08838..3ddd761 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -24,10 +24,12 @@ AM_CPPFLAGS         = -I$(top_builddir)/include 
-I$(top_srcdir)/include \
 
 sbin_PROGRAMS          = sheep
 
-sheep_SOURCES          = sheep.c group.c request.c gateway.c store.c vdi.c \
+sheep_SOURCES          = sheep.c group.c request.c gateway.c vdi.c \
                          journal.c ops.c recovery.c cluster/local.c \
                          object_cache.c object_list_cache.c \
-                         plain_store.c config.c migrate.c md.c
+                         store/common.c store/md.c \
+                         store/plain_store.c \
+                         config.c migrate.c
 
 if BUILD_HTTP
 sheep_SOURCES          += http/http.c http/kv.c http/s3.c http/swift.c \
diff --git a/sheep/md.c b/sheep/md.c
deleted file mode 100644
index bcdfb73..0000000
--- a/sheep/md.c
+++ /dev/null
@@ -1,878 +0,0 @@
-/*
- * Copyright (C) 2013 Taobao Inc.
- *
- * Liu Yuan <namei.u...@gmail.com>
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "sheep_priv.h"
-
-#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
-
-#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/əʌo7/!"
-
-struct md md = {
-       .vroot = RB_ROOT,
-       .root = RB_ROOT,
-       .lock = SD_RW_LOCK_INITIALIZER,
-};
-
-static inline uint32_t nr_online_disks(void)
-{
-       uint32_t nr;
-
-       sd_read_lock(&md.lock);
-       nr = md.nr_disks;
-       sd_rw_unlock(&md.lock);
-
-       return nr;
-}
-
-static inline int vdisk_number(const struct disk *disk)
-{
-       return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE);
-}
-
-static int disk_cmp(const struct disk *d1, const struct disk *d2)
-{
-       return strcmp(d1->path, d2->path);
-}
-
-static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2)
-{
-       return intcmp(d1->hash, d2->hash);
-}
-
-static struct vdisk *vdisk_insert(struct vdisk *new)
-{
-       return rb_insert(&md.vroot, new, rb, vdisk_cmp);
-}
-
-/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */
-static struct vdisk *hval_to_vdisk(uint64_t hval)
-{
-       struct vdisk dummy = { .hash = hval };
-
-       return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp);
-}
-
-static struct vdisk *oid_to_vdisk(uint64_t oid)
-{
-       return hval_to_vdisk(sd_hash_oid(oid));
-}
-
-static void create_vdisks(const struct disk *disk)
-{
-       uint64_t hval = sd_hash(disk->path, strlen(disk->path));
-       const struct sd_node *n = &sys->this_node;
-       uint64_t node_hval;
-       int nr;
-
-       if (is_cluster_diskmode(&sys->cinfo)) {
-               node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
-               hval = fnv_64a_64(node_hval, hval);
-               nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
-               if (0 == n->nid.port)
-                       return;
-       } else
-               nr = vdisk_number(disk);
-
-       for (int i = 0; i < nr; i++) {
-               struct vdisk *v = xmalloc(sizeof(*v));
-
-               hval = sd_hash_next(hval);
-               v->hash = hval;
-               v->disk = disk;
-               if (unlikely(vdisk_insert(v)))
-                       panic("vdisk hash collison");
-       }
-}
-
-static inline void vdisk_free(struct vdisk *v)
-{
-       rb_erase(&v->rb, &md.vroot);
-       free(v);
-}
-
-static void remove_vdisks(const struct disk *disk)
-{
-       uint64_t hval = sd_hash(disk->path, strlen(disk->path));
-       const struct sd_node *n = &sys->this_node;
-       uint64_t node_hval;
-       int nr;
-
-       if (is_cluster_diskmode(&sys->cinfo)) {
-               node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
-               hval = fnv_64a_64(node_hval, hval);
-               nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
-       } else
-               nr = vdisk_number(disk);
-
-       for (int i = 0; i < nr; i++) {
-               struct vdisk *v;
-
-               hval = sd_hash_next(hval);
-               v = hval_to_vdisk(hval);
-               sd_assert(v->hash == hval);
-
-               vdisk_free(v);
-       }
-}
-
-static inline void trim_last_slash(char *path)
-{
-       sd_assert(path[0]);
-       while (path[strlen(path) - 1] == '/')
-               path[strlen(path) - 1] = '\0';
-}
-
-static struct disk *path_to_disk(const char *path)
-{
-       struct disk key = {};
-
-       pstrcpy(key.path, sizeof(key.path), path);
-       trim_last_slash(key.path);
-
-       return rb_search(&md.root, &key, rb, disk_cmp);
-}
-
-size_t get_store_objsize(uint64_t oid)
-{
-       if (is_erasure_oid(oid)) {
-               uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
-               int d;
-               ec_policy_to_dp(policy, &d, NULL);
-               return get_vdi_object_size(oid_to_vid(oid)) / d;
-       }
-       return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
-}
-
-static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch,
-                                uint8_t ec_index, struct vnode_info *vinfo,
-                                void *total)
-{
-       uint64_t *t = total;
-       struct stat s;
-       char path[PATH_MAX];
-
-       snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid);
-       if (stat(path, &s) == 0)
-               *t += s.st_blocks * SECTOR_SIZE;
-       else
-               *t += get_store_objsize(oid);
-
-       return SD_RES_SUCCESS;
-}
-
-static int64_t find_string_integer(const char *str, const char *delimiter)
-{
-       char *pos = strstr(str, delimiter), *p;
-       int64_t ret;
-
-       ret = strtoll(pos + 1, &p, 10);
-       if (ret == LLONG_MAX || p == pos + 1) {
-               sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter);
-               return -1;
-       }
-
-       return ret;
-}
-
-/* If cleanup is true, temporary objects will be removed */
-static int for_each_object_in_path(const char *path,
-                                  int (*func)(uint64_t, const char *, uint32_t,
-                                              uint8_t, struct vnode_info *,
-                                              void *),
-                                  bool cleanup, struct vnode_info *vinfo,
-                                  void *arg)
-{
-       DIR *dir;
-       struct dirent *d;
-       uint64_t oid;
-       int ret = SD_RES_SUCCESS;
-       char file_name[PATH_MAX];
-
-       dir = opendir(path);
-       if (unlikely(!dir)) {
-               sd_err("failed to open %s, %m", path);
-               return SD_RES_EIO;
-       }
-
-       while ((d = readdir(dir))) {
-               uint32_t epoch = 0;
-               uint8_t ec_index = SD_MAX_COPIES;
-
-               /* skip ".", ".." and ".stale" */
-               if (unlikely(!strncmp(d->d_name, ".", 1)))
-                       continue;
-
-               sd_debug("%s, %s", path, d->d_name);
-               oid = strtoull(d->d_name, NULL, 16);
-               if (oid == 0 || oid == ULLONG_MAX)
-                       continue;
-
-               /* don't call callback against temporary objects */
-               if (is_tmp_dentry(d->d_name)) {
-                       if (cleanup) {
-                               snprintf(file_name, sizeof(file_name),
-                                               "%s/%s", path, d->d_name);
-                               sd_debug("remove tmp object %s", file_name);
-                               if (unlink(file_name) < 0)
-                                       sd_err("failed to unlink %s: %m",
-                                                       file_name);
-                       }
-                       continue;
-               }
-
-               if (is_stale_dentry(d->d_name)) {
-                       epoch = find_string_integer(d->d_name, ".");
-                       if (epoch < 0)
-                               continue;
-               }
-
-               if (is_ec_dentry(d->d_name)) {
-                       ec_index = find_string_integer(d->d_name, "_");
-                       if (ec_index < 0)
-                               continue;
-               }
-
-               ret = func(oid, path, epoch, ec_index, vinfo, arg);
-               if (ret != SD_RES_SUCCESS)
-                       break;
-       }
-       closedir(dir);
-       return ret;
-}
-
-static uint64_t get_path_free_size(const char *path, uint64_t *used)
-{
-       struct statvfs fs;
-       uint64_t size;
-
-       if (statvfs(path, &fs) < 0) {
-               sd_err("get disk %s space failed %m", path);
-               return 0;
-       }
-       size = (int64_t)fs.f_frsize * fs.f_bavail;
-
-       if (!used)
-               goto out;
-       if (for_each_object_in_path(path, get_total_object_size, false,
-                                   NULL, used)
-           != SD_RES_SUCCESS)
-               return 0;
-out:
-       return size;
-}
-
-/*
- * If path is broken during initialization or not support xattr return 0. We 
can
- * safely use 0 to represent failure case  because 0 space path can be
- * considered as broken path.
- */
-static uint64_t init_path_space(const char *path, bool purge)
-{
-       uint64_t size;
-       char stale[PATH_MAX];
-
-       if (!is_xattr_enabled(path)) {
-               sd_warn("multi-disk support need xattr feature for path: %s",
-                       path);
-               goto broken_path;
-       }
-
-       if (purge && purge_directory(path) < 0)
-               sd_err("failed to purge %s", path);
-
-       snprintf(stale, PATH_MAX, "%s/.stale", path);
-       if (xmkdir(stale, sd_def_dmode) < 0) {
-               sd_err("can't mkdir for %s, %m", stale);
-               goto broken_path;
-       }
-
-#define MDNAME "user.md.size"
-#define MDSIZE sizeof(uint64_t)
-       if (getxattr(path, MDNAME, &size, MDSIZE) < 0) {
-               if (errno == ENODATA) {
-                       goto create;
-               } else {
-                       sd_err("%s, %m", path);
-                       goto broken_path;
-               }
-       }
-
-       return size;
-create:
-       size = get_path_free_size(path, NULL);
-       if (!size)
-               goto broken_path;
-       if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) {
-               sd_err("%s, %m", path);
-               goto broken_path;
-       }
-       return size;
-broken_path:
-       return 0;
-}
-
-/* We don't need lock at init stage */
-bool md_add_disk(const char *path, bool purge)
-{
-       struct disk *new;
-
-       if (path_to_disk(path)) {
-               sd_err("duplicate path %s", path);
-               return false;
-       }
-
-       if (xmkdir(path, sd_def_dmode) < 0) {
-               sd_err("can't mkdir for %s, %m", path);
-               return false;
-       }
-
-       new = xmalloc(sizeof(*new));
-       pstrcpy(new->path, PATH_MAX, path);
-       trim_last_slash(new->path);
-       new->space = init_path_space(new->path, purge);
-       if (!new->space) {
-               free(new);
-               return false;
-       }
-
-       create_vdisks(new);
-       rb_insert(&md.root, new, rb, disk_cmp);
-       md.space += new->space;
-       md.nr_disks++;
-
-       sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new),
-               md.nr_disks);
-       return true;
-}
-
-static inline void md_remove_disk(struct disk *disk)
-{
-       sd_info("%s from multi-disk array", disk->path);
-       rb_erase(&disk->rb, &md.root);
-       md.nr_disks--;
-       remove_vdisks(disk);
-       free(disk);
-}
-
-uint64_t md_init_space(void)
-{
-       return md.space;
-}
-
-static const char *md_get_object_dir_nolock(uint64_t oid)
-{
-       const struct vdisk *vd;
-
-       if (unlikely(md.nr_disks == 0))
-               return NONE_EXIST_PATH; /* To generate EIO */
-
-       vd = oid_to_vdisk(oid);
-       return vd->disk->path;
-}
-
-const char *md_get_object_dir(uint64_t oid)
-{
-       const char *p;
-
-       sd_read_lock(&md.lock);
-       p = md_get_object_dir_nolock(oid);
-       sd_rw_unlock(&md.lock);
-
-       return p;
-}
-
-struct process_path_arg {
-       const char *path;
-       struct vnode_info *vinfo;
-       int (*func)(uint64_t oid, const char *, uint32_t, uint8_t,
-                   struct vnode_info *, void *arg);
-       bool cleanup;
-       void *opaque;
-       int result;
-};
-
-static void *thread_process_path(void *arg)
-{
-       int ret;
-       struct process_path_arg *parg = (struct process_path_arg *)arg;
-
-       ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
-                                     parg->vinfo, parg->opaque);
-       if (ret != SD_RES_SUCCESS)
-               parg->result = ret;
-
-       return arg;
-}
-
-main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
-                                     uint32_t epoch, uint8_t ec_index,
-                                     struct vnode_info *vinfo, void *arg),
-                                 bool cleanup, void *arg)
-{
-       int ret = SD_RES_SUCCESS;
-       const struct disk *disk;
-       struct process_path_arg *thread_args, *path_arg;
-       struct vnode_info *vinfo;
-       void *ret_arg;
-       sd_thread_t *thread_array;
-       int nr_thread = 0, idx = 0;
-
-       sd_read_lock(&md.lock);
-
-       rb_for_each_entry(disk, &md.root, rb) {
-               nr_thread++;
-       }
-
-       thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
-       thread_array = xmalloc(nr_thread * sizeof(sd_thread_t));
-
-       vinfo = get_vnode_info();
-
-       rb_for_each_entry(disk, &md.root, rb) {
-               thread_args[idx].path = disk->path;
-               thread_args[idx].vinfo = vinfo;
-               thread_args[idx].func = func;
-               thread_args[idx].cleanup = cleanup;
-               thread_args[idx].opaque = arg;
-               thread_args[idx].result = SD_RES_SUCCESS;
-               ret = sd_thread_create_with_idx("foreach wd",
-                                               thread_array + idx,
-                                               thread_process_path,
-                                               (void *)(thread_args + idx));
-               if (ret) {
-                       /*
-                        * If we can't create enough threads to process
-                        * files, the data-consistent will be broken if
-                        * we continued.
-                        */
-                       panic("Failed to create thread for path %s",
-                             disk->path);
-               }
-               idx++;
-       }
-
-       sd_debug("Create %d threads for all path", nr_thread);
-       /* wait for all threads to exit */
-       for (idx = 0; idx < nr_thread; idx++) {
-               ret = sd_thread_join(thread_array[idx], &ret_arg);
-               if (ret)
-                       sd_err("Failed to join thread");
-               if (ret_arg) {
-                       path_arg = (struct process_path_arg *)ret_arg;
-                       if (path_arg->result != SD_RES_SUCCESS)
-                               sd_err("%s, %s", path_arg->path,
-                                      sd_strerror(path_arg->result));
-               }
-       }
-
-       put_vnode_info(vinfo);
-       sd_rw_unlock(&md.lock);
-
-       free(thread_args);
-       free(thread_array);
-       return ret;
-}
-
-int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
-                                        uint32_t epoch, uint8_t,
-                                        struct vnode_info *, void *arg),
-                            void *arg)
-{
-       int ret = SD_RES_SUCCESS;
-       char path[PATH_MAX];
-       const struct disk *disk;
-
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               snprintf(path, sizeof(path), "%s/.stale", disk->path);
-               ret = for_each_object_in_path(path, func, false, NULL, arg);
-               if (ret != SD_RES_SUCCESS)
-                       break;
-       }
-       sd_rw_unlock(&md.lock);
-       return ret;
-}
-
-
-int for_each_obj_path(int (*func)(const char *path))
-{
-       int ret = SD_RES_SUCCESS;
-       const struct disk *disk;
-
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               ret = func(disk->path);
-               if (ret != SD_RES_SUCCESS)
-                       break;
-       }
-       sd_rw_unlock(&md.lock);
-       return ret;
-}
-
-struct md_work {
-       struct work work;
-       char path[PATH_MAX];
-};
-
-static inline void kick_recover(void)
-{
-       struct vnode_info *vinfo = get_vnode_info();
-
-       if (is_cluster_diskmode(&sys->cinfo))
-               sys->cdrv->update_node(&sys->this_node);
-       else {
-               start_recovery(vinfo, vinfo, false);
-               put_vnode_info(vinfo);
-       }
-}
-
-static void md_do_recover(struct work *work)
-{
-       struct md_work *mw = container_of(work, struct md_work, work);
-       struct disk *disk;
-       int nr = 0;
-
-       sd_write_lock(&md.lock);
-       disk = path_to_disk(mw->path);
-       if (!disk)
-               /* Just ignore the duplicate EIO of the same path */
-               goto out;
-       md_remove_disk(disk);
-       nr = md.nr_disks;
-out:
-       sd_rw_unlock(&md.lock);
-
-       if (disk) {
-               if (nr > 0) {
-                       update_node_disks();
-                       kick_recover();
-               } else {
-                       leave_cluster();
-               }
-       }
-
-       free(mw);
-}
-
-int md_handle_eio(const char *fault_path)
-{
-       struct md_work *mw;
-
-       if (nr_online_disks() == 0)
-               return SD_RES_EIO;
-
-       mw = xzalloc(sizeof(*mw));
-       mw->work.done = md_do_recover;
-       pstrcpy(mw->path, PATH_MAX, fault_path);
-       queue_work(sys->md_wqueue, &mw->work);
-
-       /* Fool the requester to retry */
-       return SD_RES_NETWORK_ERROR;
-}
-
-static inline bool md_access(const char *path)
-{
-       if (access(path, R_OK | W_OK) < 0) {
-               if (unlikely(errno != ENOENT))
-                       sd_err("failed to check %s, %m", path);
-               return false;
-       }
-
-       return true;
-}
-
-static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
-                           const char *path, char *old, char *new)
-{
-       if (!epoch) {
-               if (!is_erasure_oid(oid)) {
-                       snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid);
-                       snprintf(new, PATH_MAX, "%s/%016" PRIx64,
-                                md_get_object_dir_nolock(oid), oid);
-               } else {
-                       snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path,
-                                oid, ec_index);
-                       snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d",
-                                md_get_object_dir_nolock(oid), oid, ec_index);
-               }
-       } else {
-               if (!is_erasure_oid(oid)) {
-                       snprintf(old, PATH_MAX,
-                                "%s/.stale/%016"PRIx64".%"PRIu32, path,
-                                oid, epoch);
-                       snprintf(new, PATH_MAX,
-                                "%s/.stale/%016"PRIx64".%"PRIu32,
-                                md_get_object_dir_nolock(oid), oid, epoch);
-               } else {
-                       snprintf(old, PATH_MAX,
-                                "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path,
-                                oid, ec_index, epoch);
-                       snprintf(new, PATH_MAX,
-                                "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
-                                md_get_object_dir_nolock(oid),
-                                oid, ec_index ,epoch);
-               }
-       }
-
-       if (!md_access(old))
-               return -1;
-
-       return 0;
-}
-
-static int md_move_object(uint64_t oid, const char *old, const char *new)
-{
-       struct strbuf buf = STRBUF_INIT;
-       int fd, ret = -1;
-       size_t sz = get_store_objsize(oid);
-
-       fd = open(old, O_RDONLY);
-       if (fd < 0) {
-               sd_err("failed to open %s", old);
-               goto out;
-       }
-
-       ret = strbuf_read(&buf, fd, sz);
-       if (ret != sz) {
-               sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret);
-               ret = -1;
-               goto out_close;
-       }
-
-       if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) {
-               if (errno != EEXIST) {
-                       sd_err("failed to create %s", new);
-                       ret = -1;
-                       goto out_close;
-               }
-       }
-       unlink(old);
-       ret = 0;
-out_close:
-       close(fd);
-out:
-       strbuf_release(&buf);
-       return ret;
-}
-
-static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index,
-                            const char *path)
-{
-       char old[PATH_MAX], new[PATH_MAX];
-
-       if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0)
-               return SD_RES_EIO;
-       /*
-        * Recovery thread and main thread might try to recover the same object.
-        * Either one succeeds, the other will fail and proceed and end up
-        * trying to move the object to where it is already in place, in this
-        * case we simply return.
-        */
-       if (!strcmp(old, new))
-               return SD_RES_SUCCESS;
-
-       /* We can't use rename(2) across device */
-       if (md_move_object(oid, old, new) < 0) {
-               sd_err("move old %s to new %s failed", old, new);
-               return SD_RES_EIO;
-       }
-
-       sd_debug("from %s to %s", old, new);
-       return SD_RES_SUCCESS;
-}
-
-static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index)
-{
-       int ret = SD_RES_EIO;
-       const struct disk *disk;
-
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               ret = md_check_and_move(oid, epoch, ec_index, disk->path);
-               if (ret == SD_RES_SUCCESS)
-                       break;
-       }
-       sd_rw_unlock(&md.lock);
-       return ret;
-}
-
-bool md_exist(uint64_t oid, uint8_t ec_index, char *path)
-{
-       if (md_access(path))
-               return true;
-       /*
-        * We have to iterate the WD because we don't have epoch-like history
-        * track to locate the objects for multiple disk failure. Simply do
-        * hard iteration simplify the code a lot.
-        */
-       if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS)
-               return true;
-
-       return false;
-}
-
-int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
-                     char *path)
-{
-       if (unlikely(!epoch))
-               panic("invalid 0 epoch");
-
-       if (is_erasure_oid(oid)) {
-               if (unlikely(ec_index >= SD_MAX_COPIES))
-                       panic("invalid ec index %d", ec_index);
-
-               snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
-                        md_get_object_dir(oid), oid, ec_index, epoch);
-       } else
-               snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
-                        md_get_object_dir(oid), oid, epoch);
-
-       if (md_access(path))
-               return SD_RES_SUCCESS;
-
-       if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS)
-               return SD_RES_SUCCESS;
-
-       return SD_RES_NO_OBJ;
-}
-
-uint32_t md_get_info(struct sd_md_info *info)
-{
-       uint32_t ret = sizeof(*info);
-       const struct disk *disk;
-       int i = 0;
-
-       memset(info, 0, ret);
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               info->disk[i].idx = i;
-               pstrcpy(info->disk[i].path, PATH_MAX, disk->path);
-               /* FIXME: better handling failure case. */
-               info->disk[i].free = get_path_free_size(info->disk[i].path,
-                                                       &info->disk[i].used);
-               i++;
-       }
-       info->nr = md.nr_disks;
-       sd_rw_unlock(&md.lock);
-       return ret;
-}
-
-static inline void md_del_disk(const char *path)
-{
-       struct disk *disk = path_to_disk(path);
-
-       if (!disk) {
-               sd_err("invalid path %s", path);
-               return;
-       }
-       md_remove_disk(disk);
-}
-
-#ifdef HAVE_DISKVNODES
-void update_node_disks(void)
-{
-       const struct disk *disk;
-       int i = 0;
-       bool rb_empty = false;
-
-       if (!sys)
-               return;
-
-       memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX);
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               sys->this_node.disks[i].disk_id =
-                       sd_hash(disk->path, strlen(disk->path));
-               sys->this_node.disks[i].disk_space = disk->space;
-               i++;
-       }
-       sd_rw_unlock(&md.lock);
-
-       if (RB_EMPTY_ROOT(&md.vroot))
-               rb_empty = true;
-       sd_write_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               if (!rb_empty)
-                       remove_vdisks(disk);
-               create_vdisks(disk);
-       }
-       sd_rw_unlock(&md.lock);
-}
-#else
-void update_node_disks(void)
-{
-}
-#endif
-
-static int do_plug_unplug(char *disks, bool plug)
-{
-       const char *path;
-       int old_nr, ret = SD_RES_UNKNOWN;
-
-       sd_write_lock(&md.lock);
-       old_nr = md.nr_disks;
-       path = strtok(disks, ",");
-       do {
-               if (plug) {
-                       if (!md_add_disk(path, true))
-                               sd_err("failed to add %s", path);
-               } else {
-                       md_del_disk(path);
-               }
-       } while ((path = strtok(NULL, ",")));
-
-       /* If no disks change, bail out */
-       if (old_nr == md.nr_disks)
-               goto out;
-
-       ret = SD_RES_SUCCESS;
-out:
-       sd_rw_unlock(&md.lock);
-
-       if (ret == SD_RES_SUCCESS) {
-               update_node_disks();
-               kick_recover();
-       }
-
-       return ret;
-}
-
-int md_plug_disks(char *disks)
-{
-       return do_plug_unplug(disks, true);
-}
-
-int md_unplug_disks(char *disks)
-{
-       return do_plug_unplug(disks, false);
-}
-
-uint64_t md_get_size(uint64_t *used)
-{
-       uint64_t fsize = 0;
-       const struct disk *disk;
-
-       *used = 0;
-       sd_read_lock(&md.lock);
-       rb_for_each_entry(disk, &md.root, rb) {
-               fsize += get_path_free_size(disk->path, used);
-       }
-       sd_rw_unlock(&md.lock);
-
-       return fsize + *used;
-}
-
-uint32_t md_nr_disks(void)
-{
-       return nr_online_disks();
-}
diff --git a/sheep/plain_store.c b/sheep/plain_store.c
deleted file mode 100644
index efbf129..0000000
--- a/sheep/plain_store.c
+++ /dev/null
@@ -1,759 +0,0 @@
-/*
- * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include <libgen.h>
-#include <linux/falloc.h>
-
-#include "sheep_priv.h"
-
-#ifndef FALLOC_FL_PUNCH_HOLE
-#define FALLOC_FL_PUNCH_HOLE 0x02
-#endif
-
-#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
-
-static inline bool iocb_is_aligned(const struct siocb *iocb)
-{
-       return  sector_algined(iocb->offset) && sector_algined(iocb->length);
-}
-
-static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create)
-{
-       int syncflag = create ? O_SYNC : O_DSYNC;
-       int flags = syncflag | O_RDWR;
-
-       if (uatomic_is_true(&sys->use_journal) || sys->nosync == true)
-               flags &= ~syncflag;
-
-       if (sys->backend_dio && iocb_is_aligned(iocb)) {
-               if (!is_aligned_to_pagesize(iocb->buf))
-                       panic("Memory isn't aligned to pagesize %p", iocb->buf);
-               flags |= O_DIRECT;
-       }
-
-       if (create)
-               flags |= O_CREAT | O_EXCL;
-
-       return flags;
-}
-
-static int get_store_path(uint64_t oid, uint8_t ec_index, char *path)
-{
-       if (is_erasure_oid(oid)) {
-               if (unlikely(ec_index >= SD_MAX_COPIES))
-                       panic("invalid ec_index %d", ec_index);
-               return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
-                               md_get_object_dir(oid), oid, ec_index);
-       }
-
-       return snprintf(path, PATH_MAX, "%s/%016" PRIx64,
-                       md_get_object_dir(oid), oid);
-}
-
-static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path)
-{
-       if (is_erasure_oid(oid)) {
-               if (unlikely(ec_index >= SD_MAX_COPIES))
-                       panic("invalid ec_index %d", ec_index);
-               return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp",
-                               md_get_object_dir(oid), oid, ec_index);
-       }
-
-       return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp",
-                       md_get_object_dir(oid), oid);
-}
-
-static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
-                               char *path)
-{
-       return md_get_stale_path(oid, epoch, ec_index, path);
-}
-
-/*
- * Check if oid is in this nodes (if oid is in the wrong place, it will be 
moved
- * to the correct one after this call in a MD setup.
- */
-bool default_exist(uint64_t oid, uint8_t ec_index)
-{
-       char path[PATH_MAX];
-
-       get_store_path(oid, ec_index, path);
-
-       return md_exist(oid, ec_index, path);
-}
-
-static int err_to_sderr(const char *path, uint64_t oid, int err)
-{
-       struct stat s;
-       char p[PATH_MAX], *dir;
-
-       /* Use a temporary buffer since dirname() may modify its argument. */
-       pstrcpy(p, sizeof(p), path);
-       dir = dirname(p);
-
-       sd_debug("%s", path);
-       switch (err) {
-       case ENOENT:
-               if (stat(dir, &s) < 0) {
-                       sd_err("%s corrupted", dir);
-                       return md_handle_eio(dir);
-               }
-               sd_debug("object %016" PRIx64 " not found locally", oid);
-               return SD_RES_NO_OBJ;
-       case ENOSPC:
-               /* TODO: stop automatic recovery */
-               sd_err("diskfull, oid=%"PRIx64, oid);
-               return SD_RES_NO_SPACE;
-       case EMFILE:
-       case ENFILE:
-       case EINTR:
-       case EAGAIN:
-       case EEXIST:
-               sd_err("%m, oid=%"PRIx64, oid);
-               /* make gateway try again */
-               return SD_RES_NETWORK_ERROR;
-       default:
-               sd_err("oid=%"PRIx64", %m", oid);
-               return md_handle_eio(dir);
-       }
-}
-
-static int discard(int fd, uint64_t start, uint32_t end)
-{
-       int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
-                            start, end - start);
-       if (ret < 0) {
-               if (errno == ENOSYS || errno == EOPNOTSUPP)
-                       sd_info("FALLOC_FL_PUNCH_HOLE is not supported "
-                               "on this filesystem");
-               else
-                       sd_err("failed to discard object, %m");
-       }
-
-       return ret;
-}
-
-/* Trim zero blocks of the beginning and end of the object. */
-static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
-                       uint64_t *poffset, uint32_t *plen)
-{
-       trim_zero_blocks(iocb->buf, poffset, plen);
-
-       if (iocb->offset < *poffset) {
-               sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset,
-                        *poffset, oid);
-
-               if (discard(fd, iocb->offset, *poffset) < 0)
-                       return -1;
-       }
-
-       if (*poffset + *plen < iocb->offset + iocb->length) {
-               uint64_t end = iocb->offset + iocb->length;
-               uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
-               if (end == get_objsize(oid, object_size))
-                       /* This is necessary to punch the last block */
-                       end = round_up(end, BLOCK_SIZE);
-               sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
-                        end, oid);
-
-               if (discard(fd, *poffset + *plen, end) < 0)
-                       return -1;
-       }
-
-       return 0;
-}
-
-int default_write(uint64_t oid, const struct siocb *iocb)
-{
-       int flags = prepare_iocb(oid, iocb, false), fd,
-           ret = SD_RES_SUCCESS;
-       char path[PATH_MAX];
-       ssize_t size;
-       uint32_t len = iocb->length;
-       uint64_t offset = iocb->offset;
-       static bool trim_is_supported = true;
-
-       if (iocb->epoch < sys_epoch()) {
-               sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch());
-               return SD_RES_OLD_NODE_VER;
-       }
-
-       if (uatomic_is_true(&sys->use_journal) &&
-           unlikely(journal_write_store(oid, iocb->buf, iocb->length,
-                                        iocb->offset, false))
-           != SD_RES_SUCCESS) {
-               sd_err("turn off journaling");
-               uatomic_set_false(&sys->use_journal);
-               flags |= O_DSYNC;
-               sync();
-       }
-
-       get_store_path(oid, iocb->ec_index, path);
-
-       /*
-        * Make sure oid is in the right place because oid might be misplaced
-        * in a wrong place, due to 'shutdown/restart with less/more disks' or
-        * any bugs. We need call err_to_sderr() to return EIO if disk is broken
-        */
-       if (!default_exist(oid, iocb->ec_index))
-               return err_to_sderr(path, oid, ENOENT);
-
-       fd = open(path, flags, sd_def_fmode);
-       if (unlikely(fd < 0))
-               return err_to_sderr(path, oid, errno);
-
-       if (trim_is_supported && is_sparse_object(oid)) {
-               if (default_trim(fd, oid, iocb, &offset, &len) < 0) {
-                       trim_is_supported = false;
-                       offset = iocb->offset;
-                       len = iocb->length;
-               }
-       }
-
-       size = xpwrite(fd, iocb->buf, len, offset);
-       if (unlikely(size != len)) {
-               sd_err("failed to write object %"PRIx64", path=%s, offset=%"
-                      PRId32", size=%"PRId32", result=%zd, %m", oid, path,
-                      iocb->offset, iocb->length, size);
-               ret = err_to_sderr(path, oid, errno);
-               goto out;
-       }
-out:
-       close(fd);
-       return ret;
-}
-
-static int make_stale_dir(const char *path)
-{
-       char p[PATH_MAX];
-
-       snprintf(p, PATH_MAX, "%s/.stale", path);
-       if (xmkdir(p, sd_def_dmode) < 0) {
-               sd_err("%s failed, %m", p);
-               return SD_RES_EIO;
-       }
-       return SD_RES_SUCCESS;
-}
-
-static int purge_dir(const char *path)
-{
-       if (purge_directory(path) < 0)
-               return SD_RES_EIO;
-
-       return SD_RES_SUCCESS;
-}
-
-static int purge_stale_dir(const char *path)
-{
-       char p[PATH_MAX];
-
-       snprintf(p, PATH_MAX, "%s/.stale", path);
-
-       if (purge_directory_async(p) < 0)
-               return SD_RES_EIO;
-
-       return SD_RES_SUCCESS;
-}
-
-int default_cleanup(void)
-{
-       int ret;
-
-       ret = for_each_obj_path(purge_stale_dir);
-       if (ret != SD_RES_SUCCESS)
-               return ret;
-
-       return SD_RES_SUCCESS;
-}
-
-static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
-{
-       int ret;
-       struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
-       struct siocb iocb = {
-               .epoch = epoch,
-               .buf = inode,
-               .length = SD_INODE_HEADER_SIZE,
-       };
-
-       ret = default_read(oid, &iocb);
-       if (ret != SD_RES_SUCCESS) {
-               sd_err("failed to read inode header %" PRIx64 " %" PRId32
-                      "wat %s", oid, epoch, wd);
-               goto out;
-       }
-       add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies,
-                     vdi_is_snapshot(inode), inode->copy_policy,
-                     inode->block_size_shift, inode->parent_vdi_id);
-
-       if (inode->name[0] == '\0')
-               atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
-
-       atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse);
-
-       ret = SD_RES_SUCCESS;
-out:
-       free(inode);
-       return ret;
-}
-
-static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd,
-                                      uint32_t epoch, uint8_t ec_index,
-                                      struct vnode_info *vinfo,
-                                      void *arg)
-{
-       int ret;
-       objlist_cache_insert(oid);
-
-       if (is_vdi_obj(oid)) {
-               sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32
-                        " at %s", oid, epoch, wd);
-               ret = init_vdi_state(oid, wd, epoch);
-               if (ret != SD_RES_SUCCESS)
-                       return ret;
-       }
-       return SD_RES_SUCCESS;
-}
-
-int default_init(void)
-{
-       int ret;
-
-       sd_debug("use plain store driver");
-       ret = for_each_obj_path(make_stale_dir);
-       if (ret != SD_RES_SUCCESS)
-               return ret;
-
-       for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL);
-
-       return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL);
-}
-
-static int default_read_from_path(uint64_t oid, const char *path,
-                                 const struct siocb *iocb)
-{
-       int flags = prepare_iocb(oid, iocb, false), fd,
-           ret = SD_RES_SUCCESS;
-       ssize_t size;
-
-       /*
-        * Make sure oid is in the right place because oid might be misplaced
-        * in a wrong place, due to 'shutdown/restart with less disks' or any
-        * bugs. We need call err_to_sderr() to return EIO if disk is broken.
-        *
-        * For stale path, get_store_stale_path already does default_exist job.
-        */
-       if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index))
-               return err_to_sderr(path, oid, ENOENT);
-
-       fd = open(path, flags);
-       if (fd < 0)
-               return err_to_sderr(path, oid, errno);
-
-       size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
-       if (size < 0) {
-               sd_err("failed to read object %"PRIx64", path=%s, offset=%"
-                      PRId32", size=%"PRId32", result=%zd, %m", oid, path,
-                      iocb->offset, iocb->length, size);
-               ret = err_to_sderr(path, oid, errno);
-       }
-       close(fd);
-       return ret;
-}
-
-int default_read(uint64_t oid, const struct siocb *iocb)
-{
-       int ret;
-       char path[PATH_MAX];
-
-       get_store_path(oid, iocb->ec_index, path);
-       ret = default_read_from_path(oid, path, iocb);
-
-       /*
-        * If the request is against the older epoch, try to read from
-        * the stale directory
-        */
-       if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 &&
-           iocb->epoch < sys_epoch()) {
-               get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path);
-               ret = default_read_from_path(oid, path, iocb);
-       }
-
-       return ret;
-}
-
-int default_create_and_write(uint64_t oid, const struct siocb *iocb)
-{
-       char path[PATH_MAX], tmp_path[PATH_MAX], *dir;
-       int flags = prepare_iocb(oid, iocb, true);
-       int ret, fd;
-       uint32_t len = iocb->length;
-       uint32_t object_size = 0;
-       size_t obj_size;
-       uint64_t offset = iocb->offset;
-
-       sd_debug("%"PRIx64, oid);
-       get_store_path(oid, iocb->ec_index, path);
-       get_store_tmp_path(oid, iocb->ec_index, tmp_path);
-
-       if (uatomic_is_true(&sys->use_journal) &&
-           journal_write_store(oid, iocb->buf, iocb->length,
-                               iocb->offset, true)
-           != SD_RES_SUCCESS) {
-               sd_err("turn off journaling");
-               uatomic_set_false(&sys->use_journal);
-               flags |= O_SYNC;
-               sync();
-       }
-
-       fd = open(tmp_path, flags, sd_def_fmode);
-       if (fd < 0) {
-               if (errno == EEXIST) {
-                       /*
-                        * This happens if node membership changes during object
-                        * creation; while gateway retries a CREATE request,
-                        * recovery process could also recover the object at the
-                        * same time.  They should try to write the same date,
-                        * so it is okay to simply return success here.
-                        */
-                       sd_debug("%s exists", tmp_path);
-                       return SD_RES_SUCCESS;
-               }
-
-               sd_err("failed to open %s: %m", tmp_path);
-               return err_to_sderr(path, oid, errno);
-       }
-
-       obj_size = get_store_objsize(oid);
-
-       trim_zero_blocks(iocb->buf, &offset, &len);
-
-       object_size = get_vdi_object_size(oid_to_vid(oid));
-
-       if (offset != 0 || len != get_objsize(oid, object_size)) {
-               if (is_sparse_object(oid))
-                       ret = xftruncate(fd, obj_size);
-               else
-                       ret = prealloc(fd, obj_size);
-               if (ret < 0) {
-                       ret = err_to_sderr(path, oid, errno);
-                       goto out;
-               }
-       }
-
-       ret = xpwrite(fd, iocb->buf, len, offset);
-       if (ret != len) {
-               sd_err("failed to write object. %m");
-               ret = err_to_sderr(path, oid, errno);
-               goto out;
-       }
-
-       ret = rename(tmp_path, path);
-       if (ret < 0) {
-               sd_err("failed to rename %s to %s: %m", tmp_path, path);
-               ret = err_to_sderr(path, oid, errno);
-               goto out;
-       }
-
-       close(fd);
-
-       if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) {
-               objlist_cache_insert(oid);
-               return SD_RES_SUCCESS;
-       }
-
-       pstrcpy(tmp_path, sizeof(tmp_path), path);
-       dir = dirname(tmp_path);
-       fd = open(dir, O_DIRECTORY | O_RDONLY);
-       if (fd < 0) {
-               sd_err("failed to open directory %s: %m", dir);
-               return err_to_sderr(path, oid, errno);
-       }
-
-       if (fsync(fd) != 0) {
-               sd_err("failed to write directory %s: %m", dir);
-               ret = err_to_sderr(path, oid, errno);
-               close(fd);
-               if (unlink(path) != 0)
-                       sd_err("failed to unlink %s: %m", path);
-               return ret;
-       }
-       close(fd);
-       objlist_cache_insert(oid);
-       return SD_RES_SUCCESS;
-
-out:
-       if (unlink(tmp_path) != 0)
-               sd_err("failed to unlink %s: %m", tmp_path);
-       close(fd);
-       return ret;
-}
-
-int default_link(uint64_t oid, uint32_t tgt_epoch)
-{
-       char path[PATH_MAX], stale_path[PATH_MAX];
-
-       sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid,
-                tgt_epoch);
-
-       snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid);
-       get_store_stale_path(oid, tgt_epoch, 0, stale_path);
-
-       if (link(stale_path, path) < 0) {
-               /*
-                * Recovery thread and main thread might try to recover the
-                * same object and we might get EEXIST in such case.
-                */
-               if (errno == EEXIST)
-                       goto out;
-
-               sd_debug("failed to link from %s to %s, %m", stale_path, path);
-               return err_to_sderr(path, oid, errno);
-       }
-out:
-       return SD_RES_SUCCESS;
-}
-
-/*
- * For replicated object, if any of the replica belongs to this node, we
- * consider it not stale.
- *
- * For erasure coded object, since every copy is unique and if it migrates to
- * other node(index gets changed even it has some other copy belongs to it)
- * because of hash ring changes, we consider it stale.
- */
-static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo)
-{
-       uint32_t i, nr_copies;
-       const struct sd_vnode *v;
-       bool ret = true;
-       const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-
-       nr_copies = get_obj_copy_number(oid, vinfo->nr_zones);
-       oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes);
-       for (i = 0; i < nr_copies; i++) {
-               v = obj_vnodes[i];
-               if (vnode_is_local(v)) {
-                       if (ec_index < SD_MAX_COPIES) {
-                               if (i == ec_index)
-                                       ret = false;
-                       } else {
-                               ret = false;
-                       }
-                       break;
-               }
-       }
-
-       return ret;
-}
-
-static int move_object_to_stale_dir(uint64_t oid, const char *wd,
-                                   uint32_t epoch, uint8_t ec_index,
-                                   struct vnode_info *vinfo, void *arg)
-{
-       char path[PATH_MAX], stale_path[PATH_MAX];
-       uint32_t tgt_epoch = *(uint32_t *)arg;
-
-       /* ec_index from md.c is reliable so we can directly use it */
-       if (ec_index < SD_MAX_COPIES) {
-               snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
-                        md_get_object_dir(oid), oid, ec_index);
-               snprintf(stale_path, PATH_MAX,
-                        "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
-                        md_get_object_dir(oid), oid, ec_index, tgt_epoch);
-       } else {
-               snprintf(path, PATH_MAX, "%s/%016" PRIx64,
-                        md_get_object_dir(oid), oid);
-               snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
-                        md_get_object_dir(oid), oid, tgt_epoch);
-       }
-
-       if (unlikely(rename(path, stale_path)) < 0) {
-               sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid,
-                      path);
-               return SD_RES_EIO;
-       }
-
-       sd_debug("moved object %"PRIx64, oid);
-       return SD_RES_SUCCESS;
-}
-
-static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch,
-                              uint8_t ec_index, struct vnode_info *vinfo,
-                              void *arg)
-{
-       if (oid_stale(oid, ec_index, vinfo))
-               return move_object_to_stale_dir(oid, wd, 0, ec_index,
-                                               NULL, arg);
-
-       return SD_RES_SUCCESS;
-}
-
-int default_update_epoch(uint32_t epoch)
-{
-       sd_assert(epoch);
-       return for_each_object_in_wd(check_stale_objects, false, &epoch);
-}
-
-int default_format(void)
-{
-       unsigned ret;
-
-       sd_debug("try get a clean store");
-       ret = for_each_obj_path(purge_dir);
-       if (ret != SD_RES_SUCCESS)
-               return ret;
-
-       if (sys->enable_object_cache)
-               object_cache_format();
-
-       return SD_RES_SUCCESS;
-}
-
-int default_remove_object(uint64_t oid, uint8_t ec_index)
-{
-       char path[PATH_MAX];
-
-       if (uatomic_is_true(&sys->use_journal))
-               journal_remove_object(oid);
-
-       get_store_path(oid, ec_index, path);
-
-       if (unlink(path) < 0) {
-               if (errno == ENOENT)
-                       return SD_RES_NO_OBJ;
-
-               sd_err("failed, %s, %m", path);
-               return SD_RES_EIO;
-       }
-
-       return SD_RES_SUCCESS;
-}
-
-#define SHA1NAME "user.obj.sha1"
-
-static int get_object_sha1(const char *path, uint8_t *sha1)
-{
-       if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE)
-           != SHA1_DIGEST_SIZE) {
-               if (errno == ENODATA)
-                       sd_debug("sha1 is not cached yet, %s", path);
-               else
-                       sd_err("fail to get xattr, %s", path);
-               return -1;
-       }
-
-       return 0;
-}
-
-static int set_object_sha1(const char *path, const uint8_t *sha1)
-{
-       int ret;
-
-       ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0);
-       if (ret < 0)
-               sd_err("fail to set sha1, %s", path);
-
-       return ret;
-}
-
-static int get_object_path(uint64_t oid, uint32_t epoch, char *path,
-                          size_t size)
-{
-       if (default_exist(oid, 0)) {
-               snprintf(path, PATH_MAX, "%s/%016"PRIx64,
-                        md_get_object_dir(oid), oid);
-       } else {
-               get_store_stale_path(oid, epoch, 0, path);
-               if (access(path, F_OK) < 0) {
-                       if (errno == ENOENT)
-                               return SD_RES_NO_OBJ;
-                       return SD_RES_EIO;
-               }
-
-       }
-
-       return SD_RES_SUCCESS;
-}
-
-int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1)
-{
-       int ret;
-       void *buf;
-       struct siocb iocb = {};
-       uint32_t length;
-       bool is_readonly_obj = oid_is_readonly(oid);
-       char path[PATH_MAX];
-
-       ret = get_object_path(oid, epoch, path, sizeof(path));
-       if (ret != SD_RES_SUCCESS)
-               return ret;
-
-       if (is_readonly_obj) {
-               if (get_object_sha1(path, sha1) == 0) {
-                       sd_debug("use cached sha1 digest %s",
-                                sha1_to_hex(sha1));
-                       return SD_RES_SUCCESS;
-               }
-       }
-
-       length = get_store_objsize(oid);
-       buf = valloc(length);
-       if (buf == NULL)
-               return SD_RES_NO_MEM;
-
-       iocb.epoch = epoch;
-       iocb.buf = buf;
-       iocb.length = length;
-
-       ret = default_read_from_path(oid, path, &iocb);
-       if (ret != SD_RES_SUCCESS) {
-               free(buf);
-               return ret;
-       }
-
-       get_buffer_sha1(buf, length, sha1);
-       free(buf);
-
-       sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid,
-                epoch, sha1_to_hex(sha1));
-
-       if (is_readonly_obj)
-               set_object_sha1(path, sha1);
-
-       return ret;
-}
-
-int default_purge_obj(void)
-{
-       uint32_t tgt_epoch = get_latest_epoch();
-
-       return for_each_object_in_wd(move_object_to_stale_dir, true,
-                                    &tgt_epoch);
-}
-
-static struct store_driver plain_store = {
-       .name = "plain",
-       .init = default_init,
-       .exist = default_exist,
-       .create_and_write = default_create_and_write,
-       .write = default_write,
-       .read = default_read,
-       .link = default_link,
-       .update_epoch = default_update_epoch,
-       .cleanup = default_cleanup,
-       .format = default_format,
-       .remove_object = default_remove_object,
-       .get_hash = default_get_hash,
-       .purge_obj = default_purge_obj,
-};
-
-add_store_driver(plain_store);
diff --git a/sheep/store.c b/sheep/store.c
deleted file mode 100644
index 8843fb8..0000000
--- a/sheep/store.c
+++ /dev/null
@@ -1,511 +0,0 @@
-/*
- * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
- *
- * This program is free software; you can redistribute it and/or
- * modify it under the terms of the GNU General Public License version
- * 2 as published by the Free Software Foundation.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "sheep_priv.h"
-
-char *obj_path;
-char *epoch_path;
-
-struct store_driver *sd_store;
-LIST_HEAD(store_drivers);
-
-int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
-{
-       int ret, len, nodes_len;
-       time_t t;
-       char path[PATH_MAX], *buf;
-
-       sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
-
-       /* Piggyback the epoch creation time for 'dog cluster info' */
-       time(&t);
-       nodes_len = nr_nodes * sizeof(struct sd_node);
-       len = nodes_len + sizeof(time_t);
-       buf = xmalloc(len);
-       memcpy(buf, nodes, nodes_len);
-       memcpy(buf + nodes_len, &t, sizeof(time_t));
-
-       /*
-        * rb field is unused in epoch file, zero-filling it
-        * is good for epoch file recovery because it is unified
-        */
-       for (int i = 0; i < nr_nodes; i++)
-               memset(buf + i * sizeof(struct sd_node)
-                               + offsetof(struct sd_node, rb),
-                               0, sizeof(struct rb_node));
-
-       snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
-
-       ret = atomic_create_and_write(path, buf, len, true);
-
-       free(buf);
-       return ret;
-}
-
-static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
-                            int *nr_nodes, time_t *timestamp)
-{
-       int fd, ret, buf_len;
-       char path[PATH_MAX];
-       struct stat epoch_stat;
-
-       snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
-       fd = open(path, O_RDONLY);
-       if (fd < 0) {
-               sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
-               goto err;
-       }
-
-       memset(&epoch_stat, 0, sizeof(epoch_stat));
-       ret = fstat(fd, &epoch_stat);
-       if (ret < 0) {
-               sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
-               goto err;
-       }
-
-       buf_len = epoch_stat.st_size - sizeof(*timestamp);
-       if (buf_len < 0) {
-               sd_err("invalid epoch %"PRIu32" log", epoch);
-               goto err;
-       }
-       if (len < buf_len) {
-               close(fd);
-               return SD_RES_BUFFER_SMALL;
-       }
-
-       ret = xread(fd, nodes, buf_len);
-       if (ret < 0) {
-               sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
-               goto err;
-       }
-
-       /* Broken epoch, just ignore */
-       if (ret % sizeof(struct sd_node) != 0) {
-               sd_err("invalid epoch %"PRIu32" log", epoch);
-               goto err;
-       }
-
-       *nr_nodes = ret / sizeof(struct sd_node);
-
-       if (timestamp) {
-               ret = xread(fd, timestamp, sizeof(*timestamp));
-               if (ret != sizeof(*timestamp)) {
-                       sd_err("invalid epoch %"PRIu32" log", epoch);
-                       goto err;
-               }
-       }
-
-       close(fd);
-       return SD_RES_SUCCESS;
-err:
-       if (fd >= 0)
-               close(fd);
-       return SD_RES_NO_TAG;
-}
-
-int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
-                               int len, int *nr_nodes)
-{
-       return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
-}
-
-int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
-                               int len, int *nr_nodes, time_t *timestamp)
-{
-       return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
-}
-
-uint32_t get_latest_epoch(void)
-{
-       DIR *dir;
-       struct dirent *d;
-       uint32_t e, epoch = 0;
-       char *p;
-
-       dir = opendir(epoch_path);
-       if (!dir)
-               panic("failed to get the latest epoch: %m");
-
-       while ((d = readdir(dir))) {
-               e = strtol(d->d_name, &p, 10);
-               if (d->d_name == p)
-                       continue;
-
-               if (strlen(d->d_name) != 8)
-                       continue;
-
-               if (e > epoch)
-                       epoch = e;
-       }
-       closedir(dir);
-
-       return epoch;
-}
-
-int lock_base_dir(const char *d)
-{
-#define LOCK_PATH "/lock"
-       char *lock_path;
-       int ret = 0;
-       int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
-
-       lock_path = xzalloc(len);
-       snprintf(lock_path, len, "%s" LOCK_PATH, d);
-
-       fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
-       if (fd < 0) {
-               sd_err("failed to open lock file %s (%m)", lock_path);
-               ret = -1;
-               goto out;
-       }
-
-       if (lockf(fd, F_TLOCK, 1) < 0) {
-               if (errno == EACCES || errno == EAGAIN)
-                       sd_err("another sheep daemon is using %s", d);
-               else
-                       sd_err("unable to get base dir lock (%m)");
-               ret = -1;
-               goto out;
-       }
-
-out:
-       free(lock_path);
-       return ret;
-}
-
-int init_base_path(const char *d)
-{
-       if (xmkdir(d, sd_def_dmode) < 0) {
-               sd_err("cannot create the directory %s (%m)", d);
-               return -1;
-       }
-
-       return 0;
-}
-
-static inline int check_path_len(const char *path)
-{
-       int len = strlen(path);
-       if (len > PATH_MAX) {
-               sd_err("insanely long object directory %s", path);
-               return -1;
-       }
-
-       return 0;
-}
-
-static int is_meta_store(const char *path)
-{
-       char conf[PATH_MAX];
-       char epoch[PATH_MAX];
-
-       snprintf(conf, PATH_MAX, "%s/config", path);
-       snprintf(epoch, PATH_MAX, "%s/epoch", path);
-       if (!access(conf, R_OK) && !access(epoch, R_OK))
-               return true;
-
-       return false;
-}
-
-static int init_obj_path(const char *base_path, char *argp)
-{
-       char *p;
-       int len;
-
-       if (check_path_len(base_path) < 0)
-               return -1;
-
-#define OBJ_PATH "/obj"
-       len = strlen(base_path) + strlen(OBJ_PATH) + 1;
-       obj_path = xzalloc(len);
-       snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
-
-       /* Eat up the first component */
-       strtok(argp, ",");
-       p = strtok(NULL, ",");
-       if (!p) {
-               /*
-                * If We have only one path, meta-store and object-store share
-                * it. This is helpful to upgrade old sheep cluster to
-                * the MD-enabled.
-                */
-               md_add_disk(obj_path, false);
-       } else {
-               do {
-                       if (is_meta_store(p)) {
-                               sd_err("%s is meta-store, abort", p);
-                               return -1;
-                       }
-                       md_add_disk(p, false);
-               } while ((p = strtok(NULL, ",")));
-       }
-
-       if (md_nr_disks() <= 0) {
-               sd_err("There isn't any available disk!");
-               return -1;
-       }
-
-       return xmkdir(obj_path, sd_def_dmode);
-}
-
-static int init_epoch_path(const char *base_path)
-{
-#define EPOCH_PATH "/epoch/"
-       int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
-       epoch_path = xzalloc(len);
-       snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
-
-       return xmkdir(epoch_path, sd_def_dmode);
-}
-
-/*
- * If the node is gateway, this function only finds the store driver.
- * Otherwise, this function initializes the backend store
- */
-int init_store_driver(bool is_gateway)
-{
-       char driver_name[STORE_LEN], *p;
-
-       pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
-
-       p = memchr(driver_name, '\0', STORE_LEN);
-       if (!p) {
-               /*
-                * If the driver name is not NUL terminated we are in deep
-                * trouble, let's get out here.
-                */
-               sd_debug("store name not NUL terminated");
-               return SD_RES_NO_STORE;
-       }
-
-       /*
-        * The store file might not exist in case this is a new sheep that
-        * never joined a cluster before.
-        */
-       if (p == driver_name)
-               return 0;
-
-       sd_store = find_store_driver(driver_name);
-       if (!sd_store) {
-               sd_debug("store %s not found", driver_name);
-               return SD_RES_NO_STORE;
-       }
-
-       if (is_gateway)
-               return SD_RES_SUCCESS;
-
-       return sd_store->init();
-}
-
-int init_disk_space(const char *base_path)
-{
-       int ret = SD_RES_SUCCESS;
-       uint64_t space_size = 0, mds;
-       struct statvfs fs;
-
-       if (sys->gateway_only)
-               goto out;
-
-       /* We need to init md even we don't need to update sapce */
-       mds = md_init_space();
-
-       /* If it is restarted */
-       ret = get_node_space(&space_size);
-       if (space_size != 0) {
-               sys->disk_space = space_size;
-               goto out;
-       }
-
-       /* User has specified the space at startup */
-       if (sys->disk_space) {
-               ret = set_node_space(sys->disk_space);
-               goto out;
-       }
-
-       if (mds) {
-               sys->disk_space = mds;
-       } else {
-               ret = statvfs(base_path, &fs);
-               if (ret < 0) {
-                       sd_debug("get disk space failed %m");
-                       ret = SD_RES_EIO;
-                       goto out;
-               }
-               sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
-       }
-
-       ret = set_node_space(sys->disk_space);
-out:
-       sd_debug("disk free space is %" PRIu64, sys->disk_space);
-       return ret;
-}
-
-/* Initialize all the global pathnames used internally */
-int init_global_pathnames(const char *d, char *argp)
-{
-       int ret;
-
-       ret = init_obj_path(d, argp);
-       if (ret)
-               return ret;
-
-       ret = init_epoch_path(d);
-       if (ret)
-               return ret;
-
-       init_config_path(d);
-
-       return 0;
-}
-
-/* Write data to both local object cache (if enabled) and backends */
-int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
-                   uint64_t offset, bool create)
-{
-       struct sd_req hdr;
-       int ret;
-
-       if (sys->enable_object_cache && object_is_cached(oid)) {
-               ret = object_cache_write(oid, data, datalen, offset,
-                                        create);
-               if (ret == SD_RES_NO_CACHE)
-                       goto forward_write;
-
-               if (ret != 0) {
-                       sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
-                              ret);
-                       return ret;
-               }
-       }
-
-forward_write:
-       if (create)
-               sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
-       else
-               sd_init_req(&hdr, SD_OP_WRITE_OBJ);
-       hdr.flags = SD_FLAG_CMD_WRITE;
-       hdr.data_length = datalen;
-
-       hdr.obj.oid = oid;
-       hdr.obj.offset = offset;
-
-       ret = exec_local_req(&hdr, data);
-       if (ret != SD_RES_SUCCESS)
-               sd_err("failed to write object %" PRIx64 ", %s", oid,
-                          sd_strerror(ret));
-
-       return ret;
-}
-
-int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
-                       uint64_t offset)
-{
-       struct sd_req hdr;
-       int ret;
-
-       sd_init_req(&hdr, SD_OP_READ_OBJ);
-       hdr.data_length = datalen;
-       hdr.obj.oid = oid;
-       hdr.obj.offset = offset;
-
-       ret = exec_local_req(&hdr, data);
-       if (ret != SD_RES_SUCCESS)
-               sd_err("failed to read object %" PRIx64 ", %s", oid,
-                      sd_strerror(ret));
-       return ret;
-}
-
-/*
- * Read data firstly from local object cache(if enabled), if fail,
- * try read backends
- */
-int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
-                  uint64_t offset)
-{
-       int ret;
-
-       if (sys->enable_object_cache && object_is_cached(oid)) {
-               ret = object_cache_read(oid, data, datalen, offset);
-               if (ret != SD_RES_SUCCESS) {
-                       sd_err("try forward read %" PRIx64 " %s", oid,
-                              sd_strerror(ret));
-                       goto forward_read;
-               }
-               return ret;
-       }
-
-forward_read:
-       return read_backend_object(oid, data, datalen, offset);
-}
-
-int sd_remove_object(uint64_t oid)
-{
-       struct sd_req hdr;
-       int ret;
-
-       if (sys->enable_object_cache && object_is_cached(oid)) {
-               ret = object_cache_remove(oid);
-               if (ret != SD_RES_SUCCESS)
-                       return ret;
-       }
-
-       sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
-       hdr.obj.oid = oid;
-
-       ret = exec_local_req(&hdr, NULL);
-       if (ret != SD_RES_SUCCESS)
-               sd_err("failed to remove object %" PRIx64 ", %s", oid,
-                      sd_strerror(ret));
-
-       return ret;
-}
-
-int sd_discard_object(uint64_t oid)
-{
-       int ret;
-       struct sd_req hdr;
-
-       sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
-       hdr.obj.oid = oid;
-
-       ret = exec_local_req(&hdr, NULL);
-       if (ret != SD_RES_SUCCESS)
-               sd_err("Failed to discard data obj %"PRIu64" %s", oid,
-                      sd_strerror(ret));
-
-       return ret;
-}
-
-int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
-                        uint32_t refcnt)
-{
-       struct sd_req hdr;
-       int ret;
-       uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
-
-       sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
-                data_oid, generation, refcnt);
-
-       if (generation == 0 && refcnt == 0)
-               return sd_remove_object(data_oid);
-
-       sd_init_req(&hdr, SD_OP_DECREF_OBJ);
-       hdr.ref.oid = ledger_oid;
-       hdr.ref.generation = generation;
-       hdr.ref.count = refcnt;
-
-       ret = exec_local_req(&hdr, NULL);
-       if (ret != SD_RES_SUCCESS)
-               sd_err("failed to decrement reference %" PRIx64 ", %s",
-                      ledger_oid, sd_strerror(ret));
-
-       return ret;
-}
diff --git a/sheep/store/common.c b/sheep/store/common.c
new file mode 100644
index 0000000..8843fb8
--- /dev/null
+++ b/sheep/store/common.c
@@ -0,0 +1,511 @@
+/*
+ * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "sheep_priv.h"
+
+char *obj_path;
+char *epoch_path;
+
+struct store_driver *sd_store;
+LIST_HEAD(store_drivers);
+
+int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes)
+{
+       int ret, len, nodes_len;
+       time_t t;
+       char path[PATH_MAX], *buf;
+
+       sd_debug("update epoch: %d, %zu", epoch, nr_nodes);
+
+       /* Piggyback the epoch creation time for 'dog cluster info' */
+       time(&t);
+       nodes_len = nr_nodes * sizeof(struct sd_node);
+       len = nodes_len + sizeof(time_t);
+       buf = xmalloc(len);
+       memcpy(buf, nodes, nodes_len);
+       memcpy(buf + nodes_len, &t, sizeof(time_t));
+
+       /*
+        * rb field is unused in epoch file, zero-filling it
+        * is good for epoch file recovery because it is unified
+        */
+       for (int i = 0; i < nr_nodes; i++)
+               memset(buf + i * sizeof(struct sd_node)
+                               + offsetof(struct sd_node, rb),
+                               0, sizeof(struct rb_node));
+
+       snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
+
+       ret = atomic_create_and_write(path, buf, len, true);
+
+       free(buf);
+       return ret;
+}
+
+static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len,
+                            int *nr_nodes, time_t *timestamp)
+{
+       int fd, ret, buf_len;
+       char path[PATH_MAX];
+       struct stat epoch_stat;
+
+       snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch);
+       fd = open(path, O_RDONLY);
+       if (fd < 0) {
+               sd_debug("failed to open epoch %"PRIu32" log, %m", epoch);
+               goto err;
+       }
+
+       memset(&epoch_stat, 0, sizeof(epoch_stat));
+       ret = fstat(fd, &epoch_stat);
+       if (ret < 0) {
+               sd_err("failed to stat epoch %"PRIu32" log, %m", epoch);
+               goto err;
+       }
+
+       buf_len = epoch_stat.st_size - sizeof(*timestamp);
+       if (buf_len < 0) {
+               sd_err("invalid epoch %"PRIu32" log", epoch);
+               goto err;
+       }
+       if (len < buf_len) {
+               close(fd);
+               return SD_RES_BUFFER_SMALL;
+       }
+
+       ret = xread(fd, nodes, buf_len);
+       if (ret < 0) {
+               sd_err("failed to read epoch %"PRIu32" log, %m", epoch);
+               goto err;
+       }
+
+       /* Broken epoch, just ignore */
+       if (ret % sizeof(struct sd_node) != 0) {
+               sd_err("invalid epoch %"PRIu32" log", epoch);
+               goto err;
+       }
+
+       *nr_nodes = ret / sizeof(struct sd_node);
+
+       if (timestamp) {
+               ret = xread(fd, timestamp, sizeof(*timestamp));
+               if (ret != sizeof(*timestamp)) {
+                       sd_err("invalid epoch %"PRIu32" log", epoch);
+                       goto err;
+               }
+       }
+
+       close(fd);
+       return SD_RES_SUCCESS;
+err:
+       if (fd >= 0)
+               close(fd);
+       return SD_RES_NO_TAG;
+}
+
+int epoch_log_read(uint32_t epoch, struct sd_node *nodes,
+                               int len, int *nr_nodes)
+{
+       return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL);
+}
+
+int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes,
+                               int len, int *nr_nodes, time_t *timestamp)
+{
+       return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp);
+}
+
+uint32_t get_latest_epoch(void)
+{
+       DIR *dir;
+       struct dirent *d;
+       uint32_t e, epoch = 0;
+       char *p;
+
+       dir = opendir(epoch_path);
+       if (!dir)
+               panic("failed to get the latest epoch: %m");
+
+       while ((d = readdir(dir))) {
+               e = strtol(d->d_name, &p, 10);
+               if (d->d_name == p)
+                       continue;
+
+               if (strlen(d->d_name) != 8)
+                       continue;
+
+               if (e > epoch)
+                       epoch = e;
+       }
+       closedir(dir);
+
+       return epoch;
+}
+
+int lock_base_dir(const char *d)
+{
+#define LOCK_PATH "/lock"
+       char *lock_path;
+       int ret = 0;
+       int fd, len = strlen(d) + strlen(LOCK_PATH) + 1;
+
+       lock_path = xzalloc(len);
+       snprintf(lock_path, len, "%s" LOCK_PATH, d);
+
+       fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode);
+       if (fd < 0) {
+               sd_err("failed to open lock file %s (%m)", lock_path);
+               ret = -1;
+               goto out;
+       }
+
+       if (lockf(fd, F_TLOCK, 1) < 0) {
+               if (errno == EACCES || errno == EAGAIN)
+                       sd_err("another sheep daemon is using %s", d);
+               else
+                       sd_err("unable to get base dir lock (%m)");
+               ret = -1;
+               goto out;
+       }
+
+out:
+       free(lock_path);
+       return ret;
+}
+
+int init_base_path(const char *d)
+{
+       if (xmkdir(d, sd_def_dmode) < 0) {
+               sd_err("cannot create the directory %s (%m)", d);
+               return -1;
+       }
+
+       return 0;
+}
+
+static inline int check_path_len(const char *path)
+{
+       int len = strlen(path);
+       if (len > PATH_MAX) {
+               sd_err("insanely long object directory %s", path);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int is_meta_store(const char *path)
+{
+       char conf[PATH_MAX];
+       char epoch[PATH_MAX];
+
+       snprintf(conf, PATH_MAX, "%s/config", path);
+       snprintf(epoch, PATH_MAX, "%s/epoch", path);
+       if (!access(conf, R_OK) && !access(epoch, R_OK))
+               return true;
+
+       return false;
+}
+
+static int init_obj_path(const char *base_path, char *argp)
+{
+       char *p;
+       int len;
+
+       if (check_path_len(base_path) < 0)
+               return -1;
+
+#define OBJ_PATH "/obj"
+       len = strlen(base_path) + strlen(OBJ_PATH) + 1;
+       obj_path = xzalloc(len);
+       snprintf(obj_path, len, "%s" OBJ_PATH, base_path);
+
+       /* Eat up the first component */
+       strtok(argp, ",");
+       p = strtok(NULL, ",");
+       if (!p) {
+               /*
+                * If We have only one path, meta-store and object-store share
+                * it. This is helpful to upgrade old sheep cluster to
+                * the MD-enabled.
+                */
+               md_add_disk(obj_path, false);
+       } else {
+               do {
+                       if (is_meta_store(p)) {
+                               sd_err("%s is meta-store, abort", p);
+                               return -1;
+                       }
+                       md_add_disk(p, false);
+               } while ((p = strtok(NULL, ",")));
+       }
+
+       if (md_nr_disks() <= 0) {
+               sd_err("There isn't any available disk!");
+               return -1;
+       }
+
+       return xmkdir(obj_path, sd_def_dmode);
+}
+
+static int init_epoch_path(const char *base_path)
+{
+#define EPOCH_PATH "/epoch/"
+       int len = strlen(base_path) + strlen(EPOCH_PATH) + 1;
+       epoch_path = xzalloc(len);
+       snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path);
+
+       return xmkdir(epoch_path, sd_def_dmode);
+}
+
+/*
+ * If the node is gateway, this function only finds the store driver.
+ * Otherwise, this function initializes the backend store
+ */
+int init_store_driver(bool is_gateway)
+{
+       char driver_name[STORE_LEN], *p;
+
+       pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store);
+
+       p = memchr(driver_name, '\0', STORE_LEN);
+       if (!p) {
+               /*
+                * If the driver name is not NUL terminated we are in deep
+                * trouble, let's get out here.
+                */
+               sd_debug("store name not NUL terminated");
+               return SD_RES_NO_STORE;
+       }
+
+       /*
+        * The store file might not exist in case this is a new sheep that
+        * never joined a cluster before.
+        */
+       if (p == driver_name)
+               return 0;
+
+       sd_store = find_store_driver(driver_name);
+       if (!sd_store) {
+               sd_debug("store %s not found", driver_name);
+               return SD_RES_NO_STORE;
+       }
+
+       if (is_gateway)
+               return SD_RES_SUCCESS;
+
+       return sd_store->init();
+}
+
+int init_disk_space(const char *base_path)
+{
+       int ret = SD_RES_SUCCESS;
+       uint64_t space_size = 0, mds;
+       struct statvfs fs;
+
+       if (sys->gateway_only)
+               goto out;
+
+       /* We need to init md even we don't need to update sapce */
+       mds = md_init_space();
+
+       /* If it is restarted */
+       ret = get_node_space(&space_size);
+       if (space_size != 0) {
+               sys->disk_space = space_size;
+               goto out;
+       }
+
+       /* User has specified the space at startup */
+       if (sys->disk_space) {
+               ret = set_node_space(sys->disk_space);
+               goto out;
+       }
+
+       if (mds) {
+               sys->disk_space = mds;
+       } else {
+               ret = statvfs(base_path, &fs);
+               if (ret < 0) {
+                       sd_debug("get disk space failed %m");
+                       ret = SD_RES_EIO;
+                       goto out;
+               }
+               sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail;
+       }
+
+       ret = set_node_space(sys->disk_space);
+out:
+       sd_debug("disk free space is %" PRIu64, sys->disk_space);
+       return ret;
+}
+
+/* Initialize all the global pathnames used internally */
+int init_global_pathnames(const char *d, char *argp)
+{
+       int ret;
+
+       ret = init_obj_path(d, argp);
+       if (ret)
+               return ret;
+
+       ret = init_epoch_path(d);
+       if (ret)
+               return ret;
+
+       init_config_path(d);
+
+       return 0;
+}
+
+/* Write data to both local object cache (if enabled) and backends */
+int sd_write_object(uint64_t oid, char *data, unsigned int datalen,
+                   uint64_t offset, bool create)
+{
+       struct sd_req hdr;
+       int ret;
+
+       if (sys->enable_object_cache && object_is_cached(oid)) {
+               ret = object_cache_write(oid, data, datalen, offset,
+                                        create);
+               if (ret == SD_RES_NO_CACHE)
+                       goto forward_write;
+
+               if (ret != 0) {
+                       sd_err("write cache failed %" PRIx64 " %" PRIx32, oid,
+                              ret);
+                       return ret;
+               }
+       }
+
+forward_write:
+       if (create)
+               sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
+       else
+               sd_init_req(&hdr, SD_OP_WRITE_OBJ);
+       hdr.flags = SD_FLAG_CMD_WRITE;
+       hdr.data_length = datalen;
+
+       hdr.obj.oid = oid;
+       hdr.obj.offset = offset;
+
+       ret = exec_local_req(&hdr, data);
+       if (ret != SD_RES_SUCCESS)
+               sd_err("failed to write object %" PRIx64 ", %s", oid,
+                          sd_strerror(ret));
+
+       return ret;
+}
+
+int read_backend_object(uint64_t oid, char *data, unsigned int datalen,
+                       uint64_t offset)
+{
+       struct sd_req hdr;
+       int ret;
+
+       sd_init_req(&hdr, SD_OP_READ_OBJ);
+       hdr.data_length = datalen;
+       hdr.obj.oid = oid;
+       hdr.obj.offset = offset;
+
+       ret = exec_local_req(&hdr, data);
+       if (ret != SD_RES_SUCCESS)
+               sd_err("failed to read object %" PRIx64 ", %s", oid,
+                      sd_strerror(ret));
+       return ret;
+}
+
+/*
+ * Read data firstly from local object cache(if enabled), if fail,
+ * try read backends
+ */
+int sd_read_object(uint64_t oid, char *data, unsigned int datalen,
+                  uint64_t offset)
+{
+       int ret;
+
+       if (sys->enable_object_cache && object_is_cached(oid)) {
+               ret = object_cache_read(oid, data, datalen, offset);
+               if (ret != SD_RES_SUCCESS) {
+                       sd_err("try forward read %" PRIx64 " %s", oid,
+                              sd_strerror(ret));
+                       goto forward_read;
+               }
+               return ret;
+       }
+
+forward_read:
+       return read_backend_object(oid, data, datalen, offset);
+}
+
+int sd_remove_object(uint64_t oid)
+{
+       struct sd_req hdr;
+       int ret;
+
+       if (sys->enable_object_cache && object_is_cached(oid)) {
+               ret = object_cache_remove(oid);
+               if (ret != SD_RES_SUCCESS)
+                       return ret;
+       }
+
+       sd_init_req(&hdr, SD_OP_REMOVE_OBJ);
+       hdr.obj.oid = oid;
+
+       ret = exec_local_req(&hdr, NULL);
+       if (ret != SD_RES_SUCCESS)
+               sd_err("failed to remove object %" PRIx64 ", %s", oid,
+                      sd_strerror(ret));
+
+       return ret;
+}
+
+int sd_discard_object(uint64_t oid)
+{
+       int ret;
+       struct sd_req hdr;
+
+       sd_init_req(&hdr, SD_OP_DISCARD_OBJ);
+       hdr.obj.oid = oid;
+
+       ret = exec_local_req(&hdr, NULL);
+       if (ret != SD_RES_SUCCESS)
+               sd_err("Failed to discard data obj %"PRIu64" %s", oid,
+                      sd_strerror(ret));
+
+       return ret;
+}
+
+int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation,
+                        uint32_t refcnt)
+{
+       struct sd_req hdr;
+       int ret;
+       uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid);
+
+       sd_debug("%"PRIx64", %" PRId32 ", %" PRId32,
+                data_oid, generation, refcnt);
+
+       if (generation == 0 && refcnt == 0)
+               return sd_remove_object(data_oid);
+
+       sd_init_req(&hdr, SD_OP_DECREF_OBJ);
+       hdr.ref.oid = ledger_oid;
+       hdr.ref.generation = generation;
+       hdr.ref.count = refcnt;
+
+       ret = exec_local_req(&hdr, NULL);
+       if (ret != SD_RES_SUCCESS)
+               sd_err("failed to decrement reference %" PRIx64 ", %s",
+                      ledger_oid, sd_strerror(ret));
+
+       return ret;
+}
diff --git a/sheep/store/md.c b/sheep/store/md.c
new file mode 100644
index 0000000..c07489e
--- /dev/null
+++ b/sheep/store/md.c
@@ -0,0 +1,878 @@
+/*
+ * Copyright (C) 2013 Taobao Inc.
+ *
+ * Liu Yuan <namei.u...@gmail.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "sheep_priv.h"
+
+#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */
+
+#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/əʌo7/!"
+
+struct md md = {
+       .vroot = RB_ROOT,
+       .root = RB_ROOT,
+       .lock = SD_RW_LOCK_INITIALIZER,
+};
+
+static inline uint32_t nr_online_disks(void)
+{
+       uint32_t nr;
+
+       sd_read_lock(&md.lock);
+       nr = md.nr_disks;
+       sd_rw_unlock(&md.lock);
+
+       return nr;
+}
+
+static inline int vdisk_number(const struct disk *disk)
+{
+       return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE);
+}
+
+static int disk_cmp(const struct disk *d1, const struct disk *d2)
+{
+       return strcmp(d1->path, d2->path);
+}
+
+static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2)
+{
+       return intcmp(d1->hash, d2->hash);
+}
+
+static struct vdisk *vdisk_insert(struct vdisk *new)
+{
+       return rb_insert(&md.vroot, new, rb, vdisk_cmp);
+}
+
+/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */
+static struct vdisk *hval_to_vdisk(uint64_t hval)
+{
+       struct vdisk dummy = { .hash = hval };
+
+       return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp);
+}
+
+static struct vdisk *oid_to_vdisk(uint64_t oid)
+{
+       return hval_to_vdisk(sd_hash_oid(oid));
+}
+
+static void create_vdisks(const struct disk *disk)
+{
+       uint64_t hval = sd_hash(disk->path, strlen(disk->path));
+       const struct sd_node *n = &sys->this_node;
+       uint64_t node_hval;
+       int nr;
+
+       if (is_cluster_diskmode(&sys->cinfo)) {
+               node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
+               hval = fnv_64a_64(node_hval, hval);
+               nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
+               if (0 == n->nid.port)
+                       return;
+       } else
+               nr = vdisk_number(disk);
+
+       for (int i = 0; i < nr; i++) {
+               struct vdisk *v = xmalloc(sizeof(*v));
+
+               hval = sd_hash_next(hval);
+               v->hash = hval;
+               v->disk = disk;
+               if (unlikely(vdisk_insert(v)))
+                       panic("vdisk hash collison");
+       }
+}
+
+static inline void vdisk_free(struct vdisk *v)
+{
+       rb_erase(&v->rb, &md.vroot);
+       free(v);
+}
+
+static void remove_vdisks(const struct disk *disk)
+{
+       uint64_t hval = sd_hash(disk->path, strlen(disk->path));
+       const struct sd_node *n = &sys->this_node;
+       uint64_t node_hval;
+       int nr;
+
+       if (is_cluster_diskmode(&sys->cinfo)) {
+               node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr));
+               hval = fnv_64a_64(node_hval, hval);
+               nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN);
+       } else
+               nr = vdisk_number(disk);
+
+       for (int i = 0; i < nr; i++) {
+               struct vdisk *v;
+
+               hval = sd_hash_next(hval);
+               v = hval_to_vdisk(hval);
+               sd_assert(v->hash == hval);
+
+               vdisk_free(v);
+       }
+}
+
+static inline void trim_last_slash(char *path)
+{
+       sd_assert(path[0]);
+       while (path[strlen(path) - 1] == '/')
+               path[strlen(path) - 1] = '\0';
+}
+
+static struct disk *path_to_disk(const char *path)
+{
+       struct disk key = {};
+
+       pstrcpy(key.path, sizeof(key.path), path);
+       trim_last_slash(key.path);
+
+       return rb_search(&md.root, &key, rb, disk_cmp);
+}
+
+size_t get_store_objsize(uint64_t oid)
+{
+       if (is_erasure_oid(oid)) {
+               uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid));
+               int d;
+               ec_policy_to_dp(policy, &d, NULL);
+               return get_vdi_object_size(oid_to_vid(oid)) / d;
+       }
+       return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid)));
+}
+
+static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch,
+                                uint8_t ec_index, struct vnode_info *vinfo,
+                                void *total)
+{
+       uint64_t *t = total;
+       struct stat s;
+       char path[PATH_MAX];
+
+       snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid);
+       if (stat(path, &s) == 0)
+               *t += s.st_blocks * SECTOR_SIZE;
+       else
+               *t += get_store_objsize(oid);
+
+       return SD_RES_SUCCESS;
+}
+
+static int64_t find_string_integer(const char *str, const char *delimiter)
+{
+       char *pos = strstr(str, delimiter), *p;
+       int64_t ret;
+
+       ret = strtoll(pos + 1, &p, 10);
+       if (ret == LLONG_MAX || p == pos + 1) {
+               sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter);
+               return -1;
+       }
+
+       return ret;
+}
+
+/* If cleanup is true, temporary objects will be removed */
+static int for_each_object_in_path(const char *path,
+                                  int (*func)(uint64_t, const char *, uint32_t,
+                                              uint8_t, struct vnode_info *,
+                                              void *),
+                                  bool cleanup, struct vnode_info *vinfo,
+                                  void *arg)
+{
+       DIR *dir;
+       struct dirent *d;
+       uint64_t oid;
+       int ret = SD_RES_SUCCESS;
+       char file_name[PATH_MAX];
+
+       dir = opendir(path);
+       if (unlikely(!dir)) {
+               sd_err("failed to open %s, %m", path);
+               return SD_RES_EIO;
+       }
+
+       while ((d = readdir(dir))) {
+               uint32_t epoch = 0;
+               uint8_t ec_index = SD_MAX_COPIES;
+
+               /* skip ".", ".." and ".stale" */
+               if (unlikely(!strncmp(d->d_name, ".", 1)))
+                       continue;
+
+               sd_debug("%s, %s", path, d->d_name);
+               oid = strtoull(d->d_name, NULL, 16);
+               if (oid == 0 || oid == ULLONG_MAX)
+                       continue;
+
+               /* don't call callback against temporary objects */
+               if (is_tmp_dentry(d->d_name)) {
+                       if (cleanup) {
+                               snprintf(file_name, sizeof(file_name),
+                                               "%s/%s", path, d->d_name);
+                               sd_debug("remove tmp object %s", file_name);
+                               if (unlink(file_name) < 0)
+                                       sd_err("failed to unlink %s: %m",
+                                                       file_name);
+                       }
+                       continue;
+               }
+
+               if (is_stale_dentry(d->d_name)) {
+                       epoch = find_string_integer(d->d_name, ".");
+                       if (epoch < 0)
+                               continue;
+               }
+
+               if (is_ec_dentry(d->d_name)) {
+                       ec_index = find_string_integer(d->d_name, "_");
+                       if (ec_index < 0)
+                               continue;
+               }
+
+               ret = func(oid, path, epoch, ec_index, vinfo, arg);
+               if (ret != SD_RES_SUCCESS)
+                       break;
+       }
+       closedir(dir);
+       return ret;
+}
+
+static uint64_t get_path_free_size(const char *path, uint64_t *used)
+{
+       struct statvfs fs;
+       uint64_t size;
+
+       if (statvfs(path, &fs) < 0) {
+               sd_err("get disk %s space failed %m", path);
+               return 0;
+       }
+       size = (int64_t)fs.f_frsize * fs.f_bavail;
+
+       if (!used)
+               goto out;
+       if (for_each_object_in_path(path, get_total_object_size, false,
+                                   NULL, used)
+           != SD_RES_SUCCESS)
+               return 0;
+out:
+       return size;
+}
+
+/*
+ * If path is broken during initialization or not support xattr return 0. We 
can
+ * safely use 0 to represent failure case  because 0 space path can be
+ * considered as broken path.
+ */
+static uint64_t init_path_space(const char *path, bool purge)
+{
+       uint64_t size;
+       char stale[PATH_MAX];
+
+       if (!is_xattr_enabled(path)) {
+               sd_warn("multi-disk support need xattr feature for path: %s",
+                       path);
+               goto broken_path;
+       }
+
+       if (purge && purge_directory(path) < 0)
+               sd_err("failed to purge %s", path);
+
+       snprintf(stale, PATH_MAX, "%s/.stale", path);
+       if (xmkdir(stale, sd_def_dmode) < 0) {
+               sd_err("can't mkdir for %s, %m", stale);
+               goto broken_path;
+       }
+
+#define MDNAME "user.md.size"
+#define MDSIZE sizeof(uint64_t)
+       if (getxattr(path, MDNAME, &size, MDSIZE) < 0) {
+               if (errno == ENODATA) {
+                       goto create;
+               } else {
+                       sd_err("%s, %m", path);
+                       goto broken_path;
+               }
+       }
+
+       return size;
+create:
+       size = get_path_free_size(path, NULL);
+       if (!size)
+               goto broken_path;
+       if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) {
+               sd_err("%s, %m", path);
+               goto broken_path;
+       }
+       return size;
+broken_path:
+       return 0;
+}
+
+/* We don't need lock at init stage */
+bool md_add_disk(const char *path, bool purge)
+{
+       struct disk *new;
+
+       if (path_to_disk(path)) {
+               sd_err("duplicate path %s", path);
+               return false;
+       }
+
+       if (xmkdir(path, sd_def_dmode) < 0) {
+               sd_err("can't mkdir for %s, %m", path);
+               return false;
+       }
+
+       new = xmalloc(sizeof(*new));
+       pstrcpy(new->path, PATH_MAX, path);
+       trim_last_slash(new->path);
+       new->space = init_path_space(new->path, purge);
+       if (!new->space) {
+               free(new);
+               return false;
+       }
+
+       create_vdisks(new);
+       rb_insert(&md.root, new, rb, disk_cmp);
+       md.space += new->space;
+       md.nr_disks++;
+
+       sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new),
+               md.nr_disks);
+       return true;
+}
+
+static inline void md_remove_disk(struct disk *disk)
+{
+       sd_info("%s from multi-disk array", disk->path);
+       rb_erase(&disk->rb, &md.root);
+       md.nr_disks--;
+       remove_vdisks(disk);
+       free(disk);
+}
+
+uint64_t md_init_space(void)
+{
+       return md.space;
+}
+
+static const char *md_get_object_dir_nolock(uint64_t oid)
+{
+       const struct vdisk *vd;
+
+       if (unlikely(md.nr_disks == 0))
+               return NONE_EXIST_PATH; /* To generate EIO */
+
+       vd = oid_to_vdisk(oid);
+       return vd->disk->path;
+}
+
+const char *md_get_object_dir(uint64_t oid)
+{
+       const char *p;
+
+       sd_read_lock(&md.lock);
+       p = md_get_object_dir_nolock(oid);
+       sd_rw_unlock(&md.lock);
+
+       return p;
+}
+
+struct process_path_arg {
+       const char *path;
+       struct vnode_info *vinfo;
+       int (*func)(uint64_t oid, const char *, uint32_t, uint8_t,
+                   struct vnode_info *, void *arg);
+       bool cleanup;
+       void *opaque;
+       int result;
+};
+
+static void *thread_process_path(void *arg)
+{
+       int ret;
+       struct process_path_arg *parg = (struct process_path_arg *)arg;
+
+       ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup,
+                                     parg->vinfo, parg->opaque);
+       if (ret != SD_RES_SUCCESS)
+               parg->result = ret;
+
+       return arg;
+}
+
+main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path,
+                                     uint32_t epoch, uint8_t ec_index,
+                                     struct vnode_info *vinfo, void *arg),
+                                 bool cleanup, void *arg)
+{
+       int ret = SD_RES_SUCCESS;
+       const struct disk *disk;
+       struct process_path_arg *thread_args, *path_arg;
+       struct vnode_info *vinfo;
+       void *ret_arg;
+       sd_thread_t *thread_array;
+       int nr_thread = 0, idx = 0;
+
+       sd_read_lock(&md.lock);
+
+       rb_for_each_entry(disk, &md.root, rb) {
+               nr_thread++;
+       }
+
+       thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg));
+       thread_array = xmalloc(nr_thread * sizeof(sd_thread_t));
+
+       vinfo = get_vnode_info();
+
+       rb_for_each_entry(disk, &md.root, rb) {
+               thread_args[idx].path = disk->path;
+               thread_args[idx].vinfo = vinfo;
+               thread_args[idx].func = func;
+               thread_args[idx].cleanup = cleanup;
+               thread_args[idx].opaque = arg;
+               thread_args[idx].result = SD_RES_SUCCESS;
+               ret = sd_thread_create_with_idx("foreach wd",
+                                               thread_array + idx,
+                                               thread_process_path,
+                                               (void *)(thread_args + idx));
+               if (ret) {
+                       /*
+                        * If we can't create enough threads to process
+                        * files, the data-consistent will be broken if
+                        * we continued.
+                        */
+                       panic("Failed to create thread for path %s",
+                             disk->path);
+               }
+               idx++;
+       }
+
+       sd_debug("Create %d threads for all path", nr_thread);
+       /* wait for all threads to exit */
+       for (idx = 0; idx < nr_thread; idx++) {
+               ret = sd_thread_join(thread_array[idx], &ret_arg);
+               if (ret)
+                       sd_err("Failed to join thread");
+               if (ret_arg) {
+                       path_arg = (struct process_path_arg *)ret_arg;
+                       if (path_arg->result != SD_RES_SUCCESS)
+                               sd_err("%s, %s", path_arg->path,
+                                      sd_strerror(path_arg->result));
+               }
+       }
+
+       put_vnode_info(vinfo);
+       sd_rw_unlock(&md.lock);
+
+       free(thread_args);
+       free(thread_array);
+       return ret;
+}
+
+int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path,
+                                        uint32_t epoch, uint8_t,
+                                        struct vnode_info *, void *arg),
+                            void *arg)
+{
+       int ret = SD_RES_SUCCESS;
+       char path[PATH_MAX];
+       const struct disk *disk;
+
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               snprintf(path, sizeof(path), "%s/.stale", disk->path);
+               ret = for_each_object_in_path(path, func, false, NULL, arg);
+               if (ret != SD_RES_SUCCESS)
+                       break;
+       }
+       sd_rw_unlock(&md.lock);
+       return ret;
+}
+
+
+int for_each_obj_path(int (*func)(const char *path))
+{
+       int ret = SD_RES_SUCCESS;
+       const struct disk *disk;
+
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               ret = func(disk->path);
+               if (ret != SD_RES_SUCCESS)
+                       break;
+       }
+       sd_rw_unlock(&md.lock);
+       return ret;
+}
+
+struct md_work {
+       struct work work;
+       char path[PATH_MAX];
+};
+
+static inline void kick_recover(void)
+{
+       struct vnode_info *vinfo = get_vnode_info();
+
+       if (is_cluster_diskmode(&sys->cinfo))
+               sys->cdrv->update_node(&sys->this_node);
+       else {
+               start_recovery(vinfo, vinfo, false);
+               put_vnode_info(vinfo);
+       }
+}
+
+static void md_do_recover(struct work *work)
+{
+       struct md_work *mw = container_of(work, struct md_work, work);
+       struct disk *disk;
+       int nr = 0;
+
+       sd_write_lock(&md.lock);
+       disk = path_to_disk(mw->path);
+       if (!disk)
+               /* Just ignore the duplicate EIO of the same path */
+               goto out;
+       md_remove_disk(disk);
+       nr = md.nr_disks;
+out:
+       sd_rw_unlock(&md.lock);
+
+       if (disk) {
+               if (nr > 0) {
+                       update_node_disks();
+                       kick_recover();
+               } else {
+                       leave_cluster();
+               }
+       }
+
+       free(mw);
+}
+
+int md_handle_eio(const char *fault_path)
+{
+       struct md_work *mw;
+
+       if (nr_online_disks() == 0)
+               return SD_RES_EIO;
+
+       mw = xzalloc(sizeof(*mw));
+       mw->work.done = md_do_recover;
+       pstrcpy(mw->path, PATH_MAX, fault_path);
+       queue_work(sys->md_wqueue, &mw->work);
+
+       /* Fool the requester to retry */
+       return SD_RES_NETWORK_ERROR;
+}
+
+static inline bool md_access(const char *path)
+{
+       if (access(path, R_OK | W_OK) < 0) {
+               if (unlikely(errno != ENOENT))
+                       sd_err("failed to check %s, %m", path);
+               return false;
+       }
+
+       return true;
+}
+
+static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
+                           const char *path, char *old, char *new)
+{
+       if (!epoch) {
+               if (!is_erasure_oid(oid)) {
+                       snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid);
+                       snprintf(new, PATH_MAX, "%s/%016" PRIx64,
+                                md_get_object_dir_nolock(oid), oid);
+               } else {
+                       snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path,
+                                oid, ec_index);
+                       snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d",
+                                md_get_object_dir_nolock(oid), oid, ec_index);
+               }
+       } else {
+               if (!is_erasure_oid(oid)) {
+                       snprintf(old, PATH_MAX,
+                                "%s/.stale/%016"PRIx64".%"PRIu32, path,
+                                oid, epoch);
+                       snprintf(new, PATH_MAX,
+                                "%s/.stale/%016"PRIx64".%"PRIu32,
+                                md_get_object_dir_nolock(oid), oid, epoch);
+               } else {
+                       snprintf(old, PATH_MAX,
+                                "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path,
+                                oid, ec_index, epoch);
+                       snprintf(new, PATH_MAX,
+                                "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
+                                md_get_object_dir_nolock(oid),
+                                oid, ec_index, epoch);
+               }
+       }
+
+       if (!md_access(old))
+               return -1;
+
+       return 0;
+}
+
+static int md_move_object(uint64_t oid, const char *old, const char *new)
+{
+       struct strbuf buf = STRBUF_INIT;
+       int fd, ret = -1;
+       size_t sz = get_store_objsize(oid);
+
+       fd = open(old, O_RDONLY);
+       if (fd < 0) {
+               sd_err("failed to open %s", old);
+               goto out;
+       }
+
+       ret = strbuf_read(&buf, fd, sz);
+       if (ret != sz) {
+               sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret);
+               ret = -1;
+               goto out_close;
+       }
+
+       if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) {
+               if (errno != EEXIST) {
+                       sd_err("failed to create %s", new);
+                       ret = -1;
+                       goto out_close;
+               }
+       }
+       unlink(old);
+       ret = 0;
+out_close:
+       close(fd);
+out:
+       strbuf_release(&buf);
+       return ret;
+}
+
+static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index,
+                            const char *path)
+{
+       char old[PATH_MAX], new[PATH_MAX];
+
+       if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0)
+               return SD_RES_EIO;
+       /*
+        * Recovery thread and main thread might try to recover the same object.
+        * Either one succeeds, the other will fail and proceed and end up
+        * trying to move the object to where it is already in place, in this
+        * case we simply return.
+        */
+       if (!strcmp(old, new))
+               return SD_RES_SUCCESS;
+
+       /* We can't use rename(2) across device */
+       if (md_move_object(oid, old, new) < 0) {
+               sd_err("move old %s to new %s failed", old, new);
+               return SD_RES_EIO;
+       }
+
+       sd_debug("from %s to %s", old, new);
+       return SD_RES_SUCCESS;
+}
+
+static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index)
+{
+       int ret = SD_RES_EIO;
+       const struct disk *disk;
+
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               ret = md_check_and_move(oid, epoch, ec_index, disk->path);
+               if (ret == SD_RES_SUCCESS)
+                       break;
+       }
+       sd_rw_unlock(&md.lock);
+       return ret;
+}
+
+bool md_exist(uint64_t oid, uint8_t ec_index, char *path)
+{
+       if (md_access(path))
+               return true;
+       /*
+        * We have to iterate the WD because we don't have epoch-like history
+        * track to locate the objects for multiple disk failure. Simply do
+        * hard iteration simplify the code a lot.
+        */
+       if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS)
+               return true;
+
+       return false;
+}
+
+int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
+                     char *path)
+{
+       if (unlikely(!epoch))
+               panic("invalid 0 epoch");
+
+       if (is_erasure_oid(oid)) {
+               if (unlikely(ec_index >= SD_MAX_COPIES))
+                       panic("invalid ec index %d", ec_index);
+
+               snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
+                        md_get_object_dir(oid), oid, ec_index, epoch);
+       } else
+               snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
+                        md_get_object_dir(oid), oid, epoch);
+
+       if (md_access(path))
+               return SD_RES_SUCCESS;
+
+       if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS)
+               return SD_RES_SUCCESS;
+
+       return SD_RES_NO_OBJ;
+}
+
+uint32_t md_get_info(struct sd_md_info *info)
+{
+       uint32_t ret = sizeof(*info);
+       const struct disk *disk;
+       int i = 0;
+
+       memset(info, 0, ret);
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               info->disk[i].idx = i;
+               pstrcpy(info->disk[i].path, PATH_MAX, disk->path);
+               /* FIXME: better handling failure case. */
+               info->disk[i].free = get_path_free_size(info->disk[i].path,
+                                                       &info->disk[i].used);
+               i++;
+       }
+       info->nr = md.nr_disks;
+       sd_rw_unlock(&md.lock);
+       return ret;
+}
+
+static inline void md_del_disk(const char *path)
+{
+       struct disk *disk = path_to_disk(path);
+
+       if (!disk) {
+               sd_err("invalid path %s", path);
+               return;
+       }
+       md_remove_disk(disk);
+}
+
+#ifdef HAVE_DISKVNODES
+void update_node_disks(void)
+{
+       const struct disk *disk;
+       int i = 0;
+       bool rb_empty = false;
+
+       if (!sys)
+               return;
+
+       memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX);
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               sys->this_node.disks[i].disk_id =
+                       sd_hash(disk->path, strlen(disk->path));
+               sys->this_node.disks[i].disk_space = disk->space;
+               i++;
+       }
+       sd_rw_unlock(&md.lock);
+
+       if (RB_EMPTY_ROOT(&md.vroot))
+               rb_empty = true;
+       sd_write_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               if (!rb_empty)
+                       remove_vdisks(disk);
+               create_vdisks(disk);
+       }
+       sd_rw_unlock(&md.lock);
+}
+#else
+void update_node_disks(void)
+{
+}
+#endif
+
+static int do_plug_unplug(char *disks, bool plug)
+{
+       const char *path;
+       int old_nr, ret = SD_RES_UNKNOWN;
+
+       sd_write_lock(&md.lock);
+       old_nr = md.nr_disks;
+       path = strtok(disks, ",");
+       do {
+               if (plug) {
+                       if (!md_add_disk(path, true))
+                               sd_err("failed to add %s", path);
+               } else {
+                       md_del_disk(path);
+               }
+       } while ((path = strtok(NULL, ",")));
+
+       /* If no disks change, bail out */
+       if (old_nr == md.nr_disks)
+               goto out;
+
+       ret = SD_RES_SUCCESS;
+out:
+       sd_rw_unlock(&md.lock);
+
+       if (ret == SD_RES_SUCCESS) {
+               update_node_disks();
+               kick_recover();
+       }
+
+       return ret;
+}
+
+int md_plug_disks(char *disks)
+{
+       return do_plug_unplug(disks, true);
+}
+
+int md_unplug_disks(char *disks)
+{
+       return do_plug_unplug(disks, false);
+}
+
+uint64_t md_get_size(uint64_t *used)
+{
+       uint64_t fsize = 0;
+       const struct disk *disk;
+
+       *used = 0;
+       sd_read_lock(&md.lock);
+       rb_for_each_entry(disk, &md.root, rb) {
+               fsize += get_path_free_size(disk->path, used);
+       }
+       sd_rw_unlock(&md.lock);
+
+       return fsize + *used;
+}
+
+uint32_t md_nr_disks(void)
+{
+       return nr_online_disks();
+}
diff --git a/sheep/store/plain_store.c b/sheep/store/plain_store.c
new file mode 100644
index 0000000..efbf129
--- /dev/null
+++ b/sheep/store/plain_store.c
@@ -0,0 +1,759 @@
+/*
+ * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <libgen.h>
+#include <linux/falloc.h>
+
+#include "sheep_priv.h"
+
+#ifndef FALLOC_FL_PUNCH_HOLE
+#define FALLOC_FL_PUNCH_HOLE 0x02
+#endif
+
+#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; })
+
+static inline bool iocb_is_aligned(const struct siocb *iocb)
+{
+       return  sector_algined(iocb->offset) && sector_algined(iocb->length);
+}
+
+static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create)
+{
+       int syncflag = create ? O_SYNC : O_DSYNC;
+       int flags = syncflag | O_RDWR;
+
+       if (uatomic_is_true(&sys->use_journal) || sys->nosync == true)
+               flags &= ~syncflag;
+
+       if (sys->backend_dio && iocb_is_aligned(iocb)) {
+               if (!is_aligned_to_pagesize(iocb->buf))
+                       panic("Memory isn't aligned to pagesize %p", iocb->buf);
+               flags |= O_DIRECT;
+       }
+
+       if (create)
+               flags |= O_CREAT | O_EXCL;
+
+       return flags;
+}
+
+static int get_store_path(uint64_t oid, uint8_t ec_index, char *path)
+{
+       if (is_erasure_oid(oid)) {
+               if (unlikely(ec_index >= SD_MAX_COPIES))
+                       panic("invalid ec_index %d", ec_index);
+               return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
+                               md_get_object_dir(oid), oid, ec_index);
+       }
+
+       return snprintf(path, PATH_MAX, "%s/%016" PRIx64,
+                       md_get_object_dir(oid), oid);
+}
+
+static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path)
+{
+       if (is_erasure_oid(oid)) {
+               if (unlikely(ec_index >= SD_MAX_COPIES))
+                       panic("invalid ec_index %d", ec_index);
+               return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp",
+                               md_get_object_dir(oid), oid, ec_index);
+       }
+
+       return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp",
+                       md_get_object_dir(oid), oid);
+}
+
+static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index,
+                               char *path)
+{
+       return md_get_stale_path(oid, epoch, ec_index, path);
+}
+
+/*
+ * Check if oid is in this nodes (if oid is in the wrong place, it will be 
moved
+ * to the correct one after this call in a MD setup.
+ */
+bool default_exist(uint64_t oid, uint8_t ec_index)
+{
+       char path[PATH_MAX];
+
+       get_store_path(oid, ec_index, path);
+
+       return md_exist(oid, ec_index, path);
+}
+
+static int err_to_sderr(const char *path, uint64_t oid, int err)
+{
+       struct stat s;
+       char p[PATH_MAX], *dir;
+
+       /* Use a temporary buffer since dirname() may modify its argument. */
+       pstrcpy(p, sizeof(p), path);
+       dir = dirname(p);
+
+       sd_debug("%s", path);
+       switch (err) {
+       case ENOENT:
+               if (stat(dir, &s) < 0) {
+                       sd_err("%s corrupted", dir);
+                       return md_handle_eio(dir);
+               }
+               sd_debug("object %016" PRIx64 " not found locally", oid);
+               return SD_RES_NO_OBJ;
+       case ENOSPC:
+               /* TODO: stop automatic recovery */
+               sd_err("diskfull, oid=%"PRIx64, oid);
+               return SD_RES_NO_SPACE;
+       case EMFILE:
+       case ENFILE:
+       case EINTR:
+       case EAGAIN:
+       case EEXIST:
+               sd_err("%m, oid=%"PRIx64, oid);
+               /* make gateway try again */
+               return SD_RES_NETWORK_ERROR;
+       default:
+               sd_err("oid=%"PRIx64", %m", oid);
+               return md_handle_eio(dir);
+       }
+}
+
+static int discard(int fd, uint64_t start, uint32_t end)
+{
+       int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE,
+                            start, end - start);
+       if (ret < 0) {
+               if (errno == ENOSYS || errno == EOPNOTSUPP)
+                       sd_info("FALLOC_FL_PUNCH_HOLE is not supported "
+                               "on this filesystem");
+               else
+                       sd_err("failed to discard object, %m");
+       }
+
+       return ret;
+}
+
+/* Trim zero blocks of the beginning and end of the object. */
+static int default_trim(int fd, uint64_t oid, const struct siocb *iocb,
+                       uint64_t *poffset, uint32_t *plen)
+{
+       trim_zero_blocks(iocb->buf, poffset, plen);
+
+       if (iocb->offset < *poffset) {
+               sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset,
+                        *poffset, oid);
+
+               if (discard(fd, iocb->offset, *poffset) < 0)
+                       return -1;
+       }
+
+       if (*poffset + *plen < iocb->offset + iocb->length) {
+               uint64_t end = iocb->offset + iocb->length;
+               uint32_t object_size = get_vdi_object_size(oid_to_vid(oid));
+               if (end == get_objsize(oid, object_size))
+                       /* This is necessary to punch the last block */
+                       end = round_up(end, BLOCK_SIZE);
+               sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen,
+                        end, oid);
+
+               if (discard(fd, *poffset + *plen, end) < 0)
+                       return -1;
+       }
+
+       return 0;
+}
+
+int default_write(uint64_t oid, const struct siocb *iocb)
+{
+       int flags = prepare_iocb(oid, iocb, false), fd,
+           ret = SD_RES_SUCCESS;
+       char path[PATH_MAX];
+       ssize_t size;
+       uint32_t len = iocb->length;
+       uint64_t offset = iocb->offset;
+       static bool trim_is_supported = true;
+
+       if (iocb->epoch < sys_epoch()) {
+               sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch());
+               return SD_RES_OLD_NODE_VER;
+       }
+
+       if (uatomic_is_true(&sys->use_journal) &&
+           unlikely(journal_write_store(oid, iocb->buf, iocb->length,
+                                        iocb->offset, false))
+           != SD_RES_SUCCESS) {
+               sd_err("turn off journaling");
+               uatomic_set_false(&sys->use_journal);
+               flags |= O_DSYNC;
+               sync();
+       }
+
+       get_store_path(oid, iocb->ec_index, path);
+
+       /*
+        * Make sure oid is in the right place because oid might be misplaced
+        * in a wrong place, due to 'shutdown/restart with less/more disks' or
+        * any bugs. We need call err_to_sderr() to return EIO if disk is broken
+        */
+       if (!default_exist(oid, iocb->ec_index))
+               return err_to_sderr(path, oid, ENOENT);
+
+       fd = open(path, flags, sd_def_fmode);
+       if (unlikely(fd < 0))
+               return err_to_sderr(path, oid, errno);
+
+       if (trim_is_supported && is_sparse_object(oid)) {
+               if (default_trim(fd, oid, iocb, &offset, &len) < 0) {
+                       trim_is_supported = false;
+                       offset = iocb->offset;
+                       len = iocb->length;
+               }
+       }
+
+       size = xpwrite(fd, iocb->buf, len, offset);
+       if (unlikely(size != len)) {
+               sd_err("failed to write object %"PRIx64", path=%s, offset=%"
+                      PRId32", size=%"PRId32", result=%zd, %m", oid, path,
+                      iocb->offset, iocb->length, size);
+               ret = err_to_sderr(path, oid, errno);
+               goto out;
+       }
+out:
+       close(fd);
+       return ret;
+}
+
+static int make_stale_dir(const char *path)
+{
+       char p[PATH_MAX];
+
+       snprintf(p, PATH_MAX, "%s/.stale", path);
+       if (xmkdir(p, sd_def_dmode) < 0) {
+               sd_err("%s failed, %m", p);
+               return SD_RES_EIO;
+       }
+       return SD_RES_SUCCESS;
+}
+
+static int purge_dir(const char *path)
+{
+       if (purge_directory(path) < 0)
+               return SD_RES_EIO;
+
+       return SD_RES_SUCCESS;
+}
+
+static int purge_stale_dir(const char *path)
+{
+       char p[PATH_MAX];
+
+       snprintf(p, PATH_MAX, "%s/.stale", path);
+
+       if (purge_directory_async(p) < 0)
+               return SD_RES_EIO;
+
+       return SD_RES_SUCCESS;
+}
+
+int default_cleanup(void)
+{
+       int ret;
+
+       ret = for_each_obj_path(purge_stale_dir);
+       if (ret != SD_RES_SUCCESS)
+               return ret;
+
+       return SD_RES_SUCCESS;
+}
+
+static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch)
+{
+       int ret;
+       struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE);
+       struct siocb iocb = {
+               .epoch = epoch,
+               .buf = inode,
+               .length = SD_INODE_HEADER_SIZE,
+       };
+
+       ret = default_read(oid, &iocb);
+       if (ret != SD_RES_SUCCESS) {
+               sd_err("failed to read inode header %" PRIx64 " %" PRId32
+                      "wat %s", oid, epoch, wd);
+               goto out;
+       }
+       add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies,
+                     vdi_is_snapshot(inode), inode->copy_policy,
+                     inode->block_size_shift, inode->parent_vdi_id);
+
+       if (inode->name[0] == '\0')
+               atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted);
+
+       atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse);
+
+       ret = SD_RES_SUCCESS;
+out:
+       free(inode);
+       return ret;
+}
+
+static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd,
+                                      uint32_t epoch, uint8_t ec_index,
+                                      struct vnode_info *vinfo,
+                                      void *arg)
+{
+       int ret;
+       objlist_cache_insert(oid);
+
+       if (is_vdi_obj(oid)) {
+               sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32
+                        " at %s", oid, epoch, wd);
+               ret = init_vdi_state(oid, wd, epoch);
+               if (ret != SD_RES_SUCCESS)
+                       return ret;
+       }
+       return SD_RES_SUCCESS;
+}
+
+int default_init(void)
+{
+       int ret;
+
+       sd_debug("use plain store driver");
+       ret = for_each_obj_path(make_stale_dir);
+       if (ret != SD_RES_SUCCESS)
+               return ret;
+
+       for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL);
+
+       return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL);
+}
+
+static int default_read_from_path(uint64_t oid, const char *path,
+                                 const struct siocb *iocb)
+{
+       int flags = prepare_iocb(oid, iocb, false), fd,
+           ret = SD_RES_SUCCESS;
+       ssize_t size;
+
+       /*
+        * Make sure oid is in the right place because oid might be misplaced
+        * in a wrong place, due to 'shutdown/restart with less disks' or any
+        * bugs. We need call err_to_sderr() to return EIO if disk is broken.
+        *
+        * For stale path, get_store_stale_path already does default_exist job.
+        */
+       if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index))
+               return err_to_sderr(path, oid, ENOENT);
+
+       fd = open(path, flags);
+       if (fd < 0)
+               return err_to_sderr(path, oid, errno);
+
+       size = xpread(fd, iocb->buf, iocb->length, iocb->offset);
+       if (size < 0) {
+               sd_err("failed to read object %"PRIx64", path=%s, offset=%"
+                      PRId32", size=%"PRId32", result=%zd, %m", oid, path,
+                      iocb->offset, iocb->length, size);
+               ret = err_to_sderr(path, oid, errno);
+       }
+       close(fd);
+       return ret;
+}
+
+int default_read(uint64_t oid, const struct siocb *iocb)
+{
+       int ret;
+       char path[PATH_MAX];
+
+       get_store_path(oid, iocb->ec_index, path);
+       ret = default_read_from_path(oid, path, iocb);
+
+       /*
+        * If the request is against the older epoch, try to read from
+        * the stale directory
+        */
+       if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 &&
+           iocb->epoch < sys_epoch()) {
+               get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path);
+               ret = default_read_from_path(oid, path, iocb);
+       }
+
+       return ret;
+}
+
+int default_create_and_write(uint64_t oid, const struct siocb *iocb)
+{
+       char path[PATH_MAX], tmp_path[PATH_MAX], *dir;
+       int flags = prepare_iocb(oid, iocb, true);
+       int ret, fd;
+       uint32_t len = iocb->length;
+       uint32_t object_size = 0;
+       size_t obj_size;
+       uint64_t offset = iocb->offset;
+
+       sd_debug("%"PRIx64, oid);
+       get_store_path(oid, iocb->ec_index, path);
+       get_store_tmp_path(oid, iocb->ec_index, tmp_path);
+
+       if (uatomic_is_true(&sys->use_journal) &&
+           journal_write_store(oid, iocb->buf, iocb->length,
+                               iocb->offset, true)
+           != SD_RES_SUCCESS) {
+               sd_err("turn off journaling");
+               uatomic_set_false(&sys->use_journal);
+               flags |= O_SYNC;
+               sync();
+       }
+
+       fd = open(tmp_path, flags, sd_def_fmode);
+       if (fd < 0) {
+               if (errno == EEXIST) {
+                       /*
+                        * This happens if node membership changes during object
+                        * creation; while gateway retries a CREATE request,
+                        * recovery process could also recover the object at the
+                        * same time.  They should try to write the same date,
+                        * so it is okay to simply return success here.
+                        */
+                       sd_debug("%s exists", tmp_path);
+                       return SD_RES_SUCCESS;
+               }
+
+               sd_err("failed to open %s: %m", tmp_path);
+               return err_to_sderr(path, oid, errno);
+       }
+
+       obj_size = get_store_objsize(oid);
+
+       trim_zero_blocks(iocb->buf, &offset, &len);
+
+       object_size = get_vdi_object_size(oid_to_vid(oid));
+
+       if (offset != 0 || len != get_objsize(oid, object_size)) {
+               if (is_sparse_object(oid))
+                       ret = xftruncate(fd, obj_size);
+               else
+                       ret = prealloc(fd, obj_size);
+               if (ret < 0) {
+                       ret = err_to_sderr(path, oid, errno);
+                       goto out;
+               }
+       }
+
+       ret = xpwrite(fd, iocb->buf, len, offset);
+       if (ret != len) {
+               sd_err("failed to write object. %m");
+               ret = err_to_sderr(path, oid, errno);
+               goto out;
+       }
+
+       ret = rename(tmp_path, path);
+       if (ret < 0) {
+               sd_err("failed to rename %s to %s: %m", tmp_path, path);
+               ret = err_to_sderr(path, oid, errno);
+               goto out;
+       }
+
+       close(fd);
+
+       if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) {
+               objlist_cache_insert(oid);
+               return SD_RES_SUCCESS;
+       }
+
+       pstrcpy(tmp_path, sizeof(tmp_path), path);
+       dir = dirname(tmp_path);
+       fd = open(dir, O_DIRECTORY | O_RDONLY);
+       if (fd < 0) {
+               sd_err("failed to open directory %s: %m", dir);
+               return err_to_sderr(path, oid, errno);
+       }
+
+       if (fsync(fd) != 0) {
+               sd_err("failed to write directory %s: %m", dir);
+               ret = err_to_sderr(path, oid, errno);
+               close(fd);
+               if (unlink(path) != 0)
+                       sd_err("failed to unlink %s: %m", path);
+               return ret;
+       }
+       close(fd);
+       objlist_cache_insert(oid);
+       return SD_RES_SUCCESS;
+
+out:
+       if (unlink(tmp_path) != 0)
+               sd_err("failed to unlink %s: %m", tmp_path);
+       close(fd);
+       return ret;
+}
+
+int default_link(uint64_t oid, uint32_t tgt_epoch)
+{
+       char path[PATH_MAX], stale_path[PATH_MAX];
+
+       sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid,
+                tgt_epoch);
+
+       snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid);
+       get_store_stale_path(oid, tgt_epoch, 0, stale_path);
+
+       if (link(stale_path, path) < 0) {
+               /*
+                * Recovery thread and main thread might try to recover the
+                * same object and we might get EEXIST in such case.
+                */
+               if (errno == EEXIST)
+                       goto out;
+
+               sd_debug("failed to link from %s to %s, %m", stale_path, path);
+               return err_to_sderr(path, oid, errno);
+       }
+out:
+       return SD_RES_SUCCESS;
+}
+
+/*
+ * For replicated object, if any of the replica belongs to this node, we
+ * consider it not stale.
+ *
+ * For erasure coded object, since every copy is unique and if it migrates to
+ * other node(index gets changed even it has some other copy belongs to it)
+ * because of hash ring changes, we consider it stale.
+ */
+static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo)
+{
+       uint32_t i, nr_copies;
+       const struct sd_vnode *v;
+       bool ret = true;
+       const struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
+
+       nr_copies = get_obj_copy_number(oid, vinfo->nr_zones);
+       oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes);
+       for (i = 0; i < nr_copies; i++) {
+               v = obj_vnodes[i];
+               if (vnode_is_local(v)) {
+                       if (ec_index < SD_MAX_COPIES) {
+                               if (i == ec_index)
+                                       ret = false;
+                       } else {
+                               ret = false;
+                       }
+                       break;
+               }
+       }
+
+       return ret;
+}
+
+static int move_object_to_stale_dir(uint64_t oid, const char *wd,
+                                   uint32_t epoch, uint8_t ec_index,
+                                   struct vnode_info *vinfo, void *arg)
+{
+       char path[PATH_MAX], stale_path[PATH_MAX];
+       uint32_t tgt_epoch = *(uint32_t *)arg;
+
+       /* ec_index from md.c is reliable so we can directly use it */
+       if (ec_index < SD_MAX_COPIES) {
+               snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d",
+                        md_get_object_dir(oid), oid, ec_index);
+               snprintf(stale_path, PATH_MAX,
+                        "%s/.stale/%016"PRIx64"_%d.%"PRIu32,
+                        md_get_object_dir(oid), oid, ec_index, tgt_epoch);
+       } else {
+               snprintf(path, PATH_MAX, "%s/%016" PRIx64,
+                        md_get_object_dir(oid), oid);
+               snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32,
+                        md_get_object_dir(oid), oid, tgt_epoch);
+       }
+
+       if (unlikely(rename(path, stale_path)) < 0) {
+               sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid,
+                      path);
+               return SD_RES_EIO;
+       }
+
+       sd_debug("moved object %"PRIx64, oid);
+       return SD_RES_SUCCESS;
+}
+
+static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch,
+                              uint8_t ec_index, struct vnode_info *vinfo,
+                              void *arg)
+{
+       if (oid_stale(oid, ec_index, vinfo))
+               return move_object_to_stale_dir(oid, wd, 0, ec_index,
+                                               NULL, arg);
+
+       return SD_RES_SUCCESS;
+}
+
+int default_update_epoch(uint32_t epoch)
+{
+       sd_assert(epoch);
+       return for_each_object_in_wd(check_stale_objects, false, &epoch);
+}
+
+int default_format(void)
+{
+       unsigned ret;
+
+       sd_debug("try get a clean store");
+       ret = for_each_obj_path(purge_dir);
+       if (ret != SD_RES_SUCCESS)
+               return ret;
+
+       if (sys->enable_object_cache)
+               object_cache_format();
+
+       return SD_RES_SUCCESS;
+}
+
+int default_remove_object(uint64_t oid, uint8_t ec_index)
+{
+       char path[PATH_MAX];
+
+       if (uatomic_is_true(&sys->use_journal))
+               journal_remove_object(oid);
+
+       get_store_path(oid, ec_index, path);
+
+       if (unlink(path) < 0) {
+               if (errno == ENOENT)
+                       return SD_RES_NO_OBJ;
+
+               sd_err("failed, %s, %m", path);
+               return SD_RES_EIO;
+       }
+
+       return SD_RES_SUCCESS;
+}
+
+#define SHA1NAME "user.obj.sha1"
+
+static int get_object_sha1(const char *path, uint8_t *sha1)
+{
+       if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE)
+           != SHA1_DIGEST_SIZE) {
+               if (errno == ENODATA)
+                       sd_debug("sha1 is not cached yet, %s", path);
+               else
+                       sd_err("fail to get xattr, %s", path);
+               return -1;
+       }
+
+       return 0;
+}
+
+static int set_object_sha1(const char *path, const uint8_t *sha1)
+{
+       int ret;
+
+       ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0);
+       if (ret < 0)
+               sd_err("fail to set sha1, %s", path);
+
+       return ret;
+}
+
+static int get_object_path(uint64_t oid, uint32_t epoch, char *path,
+                          size_t size)
+{
+       if (default_exist(oid, 0)) {
+               snprintf(path, PATH_MAX, "%s/%016"PRIx64,
+                        md_get_object_dir(oid), oid);
+       } else {
+               get_store_stale_path(oid, epoch, 0, path);
+               if (access(path, F_OK) < 0) {
+                       if (errno == ENOENT)
+                               return SD_RES_NO_OBJ;
+                       return SD_RES_EIO;
+               }
+
+       }
+
+       return SD_RES_SUCCESS;
+}
+
+int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1)
+{
+       int ret;
+       void *buf;
+       struct siocb iocb = {};
+       uint32_t length;
+       bool is_readonly_obj = oid_is_readonly(oid);
+       char path[PATH_MAX];
+
+       ret = get_object_path(oid, epoch, path, sizeof(path));
+       if (ret != SD_RES_SUCCESS)
+               return ret;
+
+       if (is_readonly_obj) {
+               if (get_object_sha1(path, sha1) == 0) {
+                       sd_debug("use cached sha1 digest %s",
+                                sha1_to_hex(sha1));
+                       return SD_RES_SUCCESS;
+               }
+       }
+
+       length = get_store_objsize(oid);
+       buf = valloc(length);
+       if (buf == NULL)
+               return SD_RES_NO_MEM;
+
+       iocb.epoch = epoch;
+       iocb.buf = buf;
+       iocb.length = length;
+
+       ret = default_read_from_path(oid, path, &iocb);
+       if (ret != SD_RES_SUCCESS) {
+               free(buf);
+               return ret;
+       }
+
+       get_buffer_sha1(buf, length, sha1);
+       free(buf);
+
+       sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid,
+                epoch, sha1_to_hex(sha1));
+
+       if (is_readonly_obj)
+               set_object_sha1(path, sha1);
+
+       return ret;
+}
+
+int default_purge_obj(void)
+{
+       uint32_t tgt_epoch = get_latest_epoch();
+
+       return for_each_object_in_wd(move_object_to_stale_dir, true,
+                                    &tgt_epoch);
+}
+
+static struct store_driver plain_store = {
+       .name = "plain",
+       .init = default_init,
+       .exist = default_exist,
+       .create_and_write = default_create_and_write,
+       .write = default_write,
+       .read = default_read,
+       .link = default_link,
+       .update_epoch = default_update_epoch,
+       .cleanup = default_cleanup,
+       .format = default_format,
+       .remove_object = default_remove_object,
+       .get_hash = default_get_hash,
+       .purge_obj = default_purge_obj,
+};
+
+add_store_driver(plain_store);
-- 
1.7.1

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
https://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to