On Tue, Mar 17, 2015 at 06:00:53PM +0900, Saeki Masaki wrote: > This change is a preparation patch for add store_driver. > Put files together to the new folder. > > and fix little style problem in md.c > > Signed-off-by: Masaki Saeki <[email protected]> > --- > sheep/Makefile.am | 6 +- > sheep/md.c | 878 > --------------------------------------------- > sheep/plain_store.c | 759 --------------------------------------- > sheep/store.c | 511 -------------------------- > sheep/store/common.c | 511 ++++++++++++++++++++++++++ > sheep/store/md.c | 878 > +++++++++++++++++++++++++++++++++++++++++++++ > sheep/store/plain_store.c | 759 +++++++++++++++++++++++++++++++++++++++ > 7 files changed, 2152 insertions(+), 2150 deletions(-) > delete mode 100644 sheep/md.c > delete mode 100644 sheep/plain_store.c > delete mode 100644 sheep/store.c > create mode 100644 sheep/store/common.c > create mode 100644 sheep/store/md.c > create mode 100644 sheep/store/plain_store.c > > diff --git a/sheep/Makefile.am b/sheep/Makefile.am > index 7a08838..3ddd761 100644 > --- a/sheep/Makefile.am > +++ b/sheep/Makefile.am > @@ -24,10 +24,12 @@ AM_CPPFLAGS = -I$(top_builddir)/include > -I$(top_srcdir)/include \ > > sbin_PROGRAMS = sheep > > -sheep_SOURCES = sheep.c group.c request.c gateway.c store.c > vdi.c \ > +sheep_SOURCES = sheep.c group.c request.c gateway.c vdi.c \ > journal.c ops.c recovery.c cluster/local.c \ > object_cache.c object_list_cache.c \ > - plain_store.c config.c migrate.c md.c > + store/common.c store/md.c \ > + store/plain_store.c \ > + config.c migrate.c > > if BUILD_HTTP > sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c > \ > diff --git a/sheep/md.c b/sheep/md.c > deleted file mode 100644 > index c00d7a5..0000000 > --- a/sheep/md.c > +++ /dev/null > @@ -1,878 +0,0 @@ > -/* > - * Copyright (C) 2013 Taobao Inc. > - * > - * Liu Yuan <[email protected]> > - * > - * This program is free software; you can redistribute it and/or > - * modify it under the terms of the GNU General Public License version > - * 2 as published by the Free Software Foundation. > - * > - * You should have received a copy of the GNU General Public License > - * along with this program. If not, see <http://www.gnu.org/licenses/>. > - */ > - > -#include "sheep_priv.h" > - > -#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */ > - > -#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/ノ厂経7/!" > - > -struct md md = { > - .vroot = RB_ROOT, > - .root = RB_ROOT, > - .lock = SD_RW_LOCK_INITIALIZER, > -}; > - > -static inline uint32_t nr_online_disks(void) > -{ > - uint32_t nr; > - > - sd_read_lock(&md.lock); > - nr = md.nr_disks; > - sd_rw_unlock(&md.lock); > - > - return nr; > -} > - > -static inline int vdisk_number(const struct disk *disk) > -{ > - return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE); > -} > - > -static int disk_cmp(const struct disk *d1, const struct disk *d2) > -{ > - return strcmp(d1->path, d2->path); > -} > - > -static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2) > -{ > - return intcmp(d1->hash, d2->hash); > -} > - > -static struct vdisk *vdisk_insert(struct vdisk *new) > -{ > - return rb_insert(&md.vroot, new, rb, vdisk_cmp); > -} > - > -/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */ > -static struct vdisk *hval_to_vdisk(uint64_t hval) > -{ > - struct vdisk dummy = { .hash = hval }; > - > - return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp); > -} > - > -static struct vdisk *oid_to_vdisk(uint64_t oid) > -{ > - return hval_to_vdisk(sd_hash_oid(oid)); > -} > - > -static void create_vdisks(const struct disk *disk) > -{ > - uint64_t hval = sd_hash(disk->path, strlen(disk->path)); > - const struct sd_node *n = &sys->this_node; > - uint64_t node_hval; > - int nr; > - > - if (is_cluster_diskmode(&sys->cinfo)) { > - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); > - hval = fnv_64a_64(node_hval, hval); > - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); > - if (0 == n->nid.port) > - return; > - } else > - nr = vdisk_number(disk); > - > - for (int i = 0; i < nr; i++) { > - struct vdisk *v = xmalloc(sizeof(*v)); > - > - hval = sd_hash_next(hval); > - v->hash = hval; > - v->disk = disk; > - if (unlikely(vdisk_insert(v))) > - panic("vdisk hash collison"); > - } > -} > - > -static inline void vdisk_free(struct vdisk *v) > -{ > - rb_erase(&v->rb, &md.vroot); > - free(v); > -} > - > -static void remove_vdisks(const struct disk *disk) > -{ > - uint64_t hval = sd_hash(disk->path, strlen(disk->path)); > - const struct sd_node *n = &sys->this_node; > - uint64_t node_hval; > - int nr; > - > - if (is_cluster_diskmode(&sys->cinfo)) { > - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); > - hval = fnv_64a_64(node_hval, hval); > - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); > - } else > - nr = vdisk_number(disk); > - > - for (int i = 0; i < nr; i++) { > - struct vdisk *v; > - > - hval = sd_hash_next(hval); > - v = hval_to_vdisk(hval); > - assert(v->hash == hval); > - > - vdisk_free(v); > - } > -} > - > -static inline void trim_last_slash(char *path) > -{ > - assert(path[0]); > - while (path[strlen(path) - 1] == '/') > - path[strlen(path) - 1] = '\0'; > -} > - > -static struct disk *path_to_disk(const char *path) > -{ > - struct disk key = {}; > - > - pstrcpy(key.path, sizeof(key.path), path); > - trim_last_slash(key.path); > - > - return rb_search(&md.root, &key, rb, disk_cmp); > -} > - > -size_t get_store_objsize(uint64_t oid) > -{ > - if (is_erasure_oid(oid)) { > - uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid)); > - int d; > - ec_policy_to_dp(policy, &d, NULL); > - return get_vdi_object_size(oid_to_vid(oid)) / d; > - } > - return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid))); > -} > - > -static int get_total_object_size(uint64_t oid, const char *wd, uint32_t > epoch, > - uint8_t ec_index, struct vnode_info *vinfo, > - void *total) > -{ > - uint64_t *t = total; > - struct stat s; > - char path[PATH_MAX]; > - > - snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid); > - if (stat(path, &s) == 0) > - *t += s.st_blocks * SECTOR_SIZE; > - else > - *t += get_store_objsize(oid); > - > - return SD_RES_SUCCESS; > -} > - > -static int64_t find_string_integer(const char *str, const char *delimiter) > -{ > - char *pos = strstr(str, delimiter), *p; > - int64_t ret; > - > - ret = strtoll(pos + 1, &p, 10); > - if (ret == LLONG_MAX || p == pos + 1) { > - sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter); > - return -1; > - } > - > - return ret; > -} > - > -/* If cleanup is true, temporary objects will be removed */ > -static int for_each_object_in_path(const char *path, > - int (*func)(uint64_t, const char *, uint32_t, > - uint8_t, struct vnode_info *, > - void *), > - bool cleanup, struct vnode_info *vinfo, > - void *arg) > -{ > - DIR *dir; > - struct dirent *d; > - uint64_t oid; > - int ret = SD_RES_SUCCESS; > - char file_name[PATH_MAX]; > - > - dir = opendir(path); > - if (unlikely(!dir)) { > - sd_err("failed to open %s, %m", path); > - return SD_RES_EIO; > - } > - > - while ((d = readdir(dir))) { > - uint32_t epoch = 0; > - uint8_t ec_index = SD_MAX_COPIES; > - > - /* skip ".", ".." and ".stale" */ > - if (unlikely(!strncmp(d->d_name, ".", 1))) > - continue; > - > - sd_debug("%s, %s", path, d->d_name); > - oid = strtoull(d->d_name, NULL, 16); > - if (oid == 0 || oid == ULLONG_MAX) > - continue; > - > - /* don't call callback against temporary objects */ > - if (is_tmp_dentry(d->d_name)) { > - if (cleanup) { > - snprintf(file_name, sizeof(file_name), > - "%s/%s", path, d->d_name); > - sd_debug("remove tmp object %s", file_name); > - if (unlink(file_name) < 0) > - sd_err("failed to unlink %s: %m", > - file_name); > - } > - continue; > - } > - > - if (is_stale_dentry(d->d_name)) { > - epoch = find_string_integer(d->d_name, "."); > - if (epoch < 0) > - continue; > - } > - > - if (is_ec_dentry(d->d_name)) { > - ec_index = find_string_integer(d->d_name, "_"); > - if (ec_index < 0) > - continue; > - } > - > - ret = func(oid, path, epoch, ec_index, vinfo, arg); > - if (ret != SD_RES_SUCCESS) > - break; > - } > - closedir(dir); > - return ret; > -} > - > -static uint64_t get_path_free_size(const char *path, uint64_t *used) > -{ > - struct statvfs fs; > - uint64_t size; > - > - if (statvfs(path, &fs) < 0) { > - sd_err("get disk %s space failed %m", path); > - return 0; > - } > - size = (int64_t)fs.f_frsize * fs.f_bavail; > - > - if (!used) > - goto out; > - if (for_each_object_in_path(path, get_total_object_size, false, > - NULL, used) > - != SD_RES_SUCCESS) > - return 0; > -out: > - return size; > -} > - > -/* > - * If path is broken during initialization or not support xattr return 0. We > can > - * safely use 0 to represent failure case because 0 space path can be > - * considered as broken path. > - */ > -static uint64_t init_path_space(const char *path, bool purge) > -{ > - uint64_t size; > - char stale[PATH_MAX]; > - > - if (!is_xattr_enabled(path)) { > - sd_warn("multi-disk support need xattr feature for path: %s", > - path); > - goto broken_path; > - } > - > - if (purge && purge_directory(path) < 0) > - sd_err("failed to purge %s", path); > - > - snprintf(stale, PATH_MAX, "%s/.stale", path); > - if (xmkdir(stale, sd_def_dmode) < 0) { > - sd_err("can't mkdir for %s, %m", stale); > - goto broken_path; > - } > - > -#define MDNAME "user.md.size" > -#define MDSIZE sizeof(uint64_t) > - if (getxattr(path, MDNAME, &size, MDSIZE) < 0) { > - if (errno == ENODATA) { > - goto create; > - } else { > - sd_err("%s, %m", path); > - goto broken_path; > - } > - } > - > - return size; > -create: > - size = get_path_free_size(path, NULL); > - if (!size) > - goto broken_path; > - if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) { > - sd_err("%s, %m", path); > - goto broken_path; > - } > - return size; > -broken_path: > - return 0; > -} > - > -/* We don't need lock at init stage */ > -bool md_add_disk(const char *path, bool purge) > -{ > - struct disk *new; > - > - if (path_to_disk(path)) { > - sd_err("duplicate path %s", path); > - return false; > - } > - > - if (xmkdir(path, sd_def_dmode) < 0) { > - sd_err("can't mkdir for %s, %m", path); > - return false; > - } > - > - new = xmalloc(sizeof(*new)); > - pstrcpy(new->path, PATH_MAX, path); > - trim_last_slash(new->path); > - new->space = init_path_space(new->path, purge); > - if (!new->space) { > - free(new); > - return false; > - } > - > - create_vdisks(new); > - rb_insert(&md.root, new, rb, disk_cmp); > - md.space += new->space; > - md.nr_disks++; > - > - sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new), > - md.nr_disks); > - return true; > -} > - > -static inline void md_remove_disk(struct disk *disk) > -{ > - sd_info("%s from multi-disk array", disk->path); > - rb_erase(&disk->rb, &md.root); > - md.nr_disks--; > - remove_vdisks(disk); > - free(disk); > -} > - > -uint64_t md_init_space(void) > -{ > - return md.space; > -} > - > -static const char *md_get_object_dir_nolock(uint64_t oid) > -{ > - const struct vdisk *vd; > - > - if (unlikely(md.nr_disks == 0)) > - return NONE_EXIST_PATH; /* To generate EIO */ > - > - vd = oid_to_vdisk(oid); > - return vd->disk->path; > -} > - > -const char *md_get_object_dir(uint64_t oid) > -{ > - const char *p; > - > - sd_read_lock(&md.lock); > - p = md_get_object_dir_nolock(oid); > - sd_rw_unlock(&md.lock); > - > - return p; > -} > - > -struct process_path_arg { > - const char *path; > - struct vnode_info *vinfo; > - int (*func)(uint64_t oid, const char *, uint32_t, uint8_t, > - struct vnode_info *, void *arg); > - bool cleanup; > - void *opaque; > - int result; > -}; > - > -static void *thread_process_path(void *arg) > -{ > - int ret; > - struct process_path_arg *parg = (struct process_path_arg *)arg; > - > - ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup, > - parg->vinfo, parg->opaque); > - if (ret != SD_RES_SUCCESS) > - parg->result = ret; > - > - return arg; > -} > - > -main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path, > - uint32_t epoch, uint8_t ec_index, > - struct vnode_info *vinfo, void *arg), > - bool cleanup, void *arg) > -{ > - int ret = SD_RES_SUCCESS; > - const struct disk *disk; > - struct process_path_arg *thread_args, *path_arg; > - struct vnode_info *vinfo; > - void *ret_arg; > - sd_thread_t *thread_array; > - int nr_thread = 0, idx = 0; > - > - sd_read_lock(&md.lock); > - > - rb_for_each_entry(disk, &md.root, rb) { > - nr_thread++; > - } > - > - thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg)); > - thread_array = xmalloc(nr_thread * sizeof(sd_thread_t)); > - > - vinfo = get_vnode_info(); > - > - rb_for_each_entry(disk, &md.root, rb) { > - thread_args[idx].path = disk->path; > - thread_args[idx].vinfo = vinfo; > - thread_args[idx].func = func; > - thread_args[idx].cleanup = cleanup; > - thread_args[idx].opaque = arg; > - thread_args[idx].result = SD_RES_SUCCESS; > - ret = sd_thread_create_with_idx("foreach wd", > - thread_array + idx, > - thread_process_path, > - (void *)(thread_args + idx)); > - if (ret) { > - /* > - * If we can't create enough threads to process > - * files, the data-consistent will be broken if > - * we continued. > - */ > - panic("Failed to create thread for path %s", > - disk->path); > - } > - idx++; > - } > - > - sd_debug("Create %d threads for all path", nr_thread); > - /* wait for all threads to exit */ > - for (idx = 0; idx < nr_thread; idx++) { > - ret = sd_thread_join(thread_array[idx], &ret_arg); > - if (ret) > - sd_err("Failed to join thread"); > - if (ret_arg) { > - path_arg = (struct process_path_arg *)ret_arg; > - if (path_arg->result != SD_RES_SUCCESS) > - sd_err("%s, %s", path_arg->path, > - sd_strerror(path_arg->result)); > - } > - } > - > - put_vnode_info(vinfo); > - sd_rw_unlock(&md.lock); > - > - free(thread_args); > - free(thread_array); > - return ret; > -} > - > -int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path, > - uint32_t epoch, uint8_t, > - struct vnode_info *, void *arg), > - void *arg) > -{ > - int ret = SD_RES_SUCCESS; > - char path[PATH_MAX]; > - const struct disk *disk; > - > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - snprintf(path, sizeof(path), "%s/.stale", disk->path); > - ret = for_each_object_in_path(path, func, false, NULL, arg); > - if (ret != SD_RES_SUCCESS) > - break; > - } > - sd_rw_unlock(&md.lock); > - return ret; > -} > - > - > -int for_each_obj_path(int (*func)(const char *path)) > -{ > - int ret = SD_RES_SUCCESS; > - const struct disk *disk; > - > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - ret = func(disk->path); > - if (ret != SD_RES_SUCCESS) > - break; > - } > - sd_rw_unlock(&md.lock); > - return ret; > -} > - > -struct md_work { > - struct work work; > - char path[PATH_MAX]; > -}; > - > -static inline void kick_recover(void) > -{ > - struct vnode_info *vinfo = get_vnode_info(); > - > - if (is_cluster_diskmode(&sys->cinfo)) > - sys->cdrv->update_node(&sys->this_node); > - else { > - start_recovery(vinfo, vinfo, false); > - put_vnode_info(vinfo); > - } > -} > - > -static void md_do_recover(struct work *work) > -{ > - struct md_work *mw = container_of(work, struct md_work, work); > - struct disk *disk; > - int nr = 0; > - > - sd_write_lock(&md.lock); > - disk = path_to_disk(mw->path); > - if (!disk) > - /* Just ignore the duplicate EIO of the same path */ > - goto out; > - md_remove_disk(disk); > - nr = md.nr_disks; > -out: > - sd_rw_unlock(&md.lock); > - > - if (disk) { > - if (nr > 0) { > - update_node_disks(); > - kick_recover(); > - } else { > - leave_cluster(); > - } > - } > - > - free(mw); > -} > - > -int md_handle_eio(const char *fault_path) > -{ > - struct md_work *mw; > - > - if (nr_online_disks() == 0) > - return SD_RES_EIO; > - > - mw = xzalloc(sizeof(*mw)); > - mw->work.done = md_do_recover; > - pstrcpy(mw->path, PATH_MAX, fault_path); > - queue_work(sys->md_wqueue, &mw->work); > - > - /* Fool the requester to retry */ > - return SD_RES_NETWORK_ERROR; > -} > - > -static inline bool md_access(const char *path) > -{ > - if (access(path, R_OK | W_OK) < 0) { > - if (unlikely(errno != ENOENT)) > - sd_err("failed to check %s, %m", path); > - return false; > - } > - > - return true; > -} > - > -static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, > - const char *path, char *old, char *new) > -{ > - if (!epoch) { > - if (!is_erasure_oid(oid)) { > - snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid); > - snprintf(new, PATH_MAX, "%s/%016" PRIx64, > - md_get_object_dir_nolock(oid), oid); > - } else { > - snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path, > - oid, ec_index); > - snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d", > - md_get_object_dir_nolock(oid), oid, ec_index); > - } > - } else { > - if (!is_erasure_oid(oid)) { > - snprintf(old, PATH_MAX, > - "%s/.stale/%016"PRIx64".%"PRIu32, path, > - oid, epoch); > - snprintf(new, PATH_MAX, > - "%s/.stale/%016"PRIx64".%"PRIu32, > - md_get_object_dir_nolock(oid), oid, epoch); > - } else { > - snprintf(old, PATH_MAX, > - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path, > - oid, ec_index, epoch); > - snprintf(new, PATH_MAX, > - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, > - md_get_object_dir_nolock(oid), > - oid, ec_index ,epoch); > - } > - } > - > - if (!md_access(old)) > - return -1; > - > - return 0; > -} > - > -static int md_move_object(uint64_t oid, const char *old, const char *new) > -{ > - struct strbuf buf = STRBUF_INIT; > - int fd, ret = -1; > - size_t sz = get_store_objsize(oid); > - > - fd = open(old, O_RDONLY); > - if (fd < 0) { > - sd_err("failed to open %s", old); > - goto out; > - } > - > - ret = strbuf_read(&buf, fd, sz); > - if (ret != sz) { > - sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret); > - ret = -1; > - goto out_close; > - } > - > - if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) { > - if (errno != EEXIST) { > - sd_err("failed to create %s", new); > - ret = -1; > - goto out_close; > - } > - } > - unlink(old); > - ret = 0; > -out_close: > - close(fd); > -out: > - strbuf_release(&buf); > - return ret; > -} > - > -static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index, > - const char *path) > -{ > - char old[PATH_MAX], new[PATH_MAX]; > - > - if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0) > - return SD_RES_EIO; > - /* > - * Recovery thread and main thread might try to recover the same object. > - * Either one succeeds, the other will fail and proceed and end up > - * trying to move the object to where it is already in place, in this > - * case we simply return. > - */ > - if (!strcmp(old, new)) > - return SD_RES_SUCCESS; > - > - /* We can't use rename(2) across device */ > - if (md_move_object(oid, old, new) < 0) { > - sd_err("move old %s to new %s failed", old, new); > - return SD_RES_EIO; > - } > - > - sd_debug("from %s to %s", old, new); > - return SD_RES_SUCCESS; > -} > - > -static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index) > -{ > - int ret = SD_RES_EIO; > - const struct disk *disk; > - > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - ret = md_check_and_move(oid, epoch, ec_index, disk->path); > - if (ret == SD_RES_SUCCESS) > - break; > - } > - sd_rw_unlock(&md.lock); > - return ret; > -} > - > -bool md_exist(uint64_t oid, uint8_t ec_index, char *path) > -{ > - if (md_access(path)) > - return true; > - /* > - * We have to iterate the WD because we don't have epoch-like history > - * track to locate the objects for multiple disk failure. Simply do > - * hard iteration simplify the code a lot. > - */ > - if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS) > - return true; > - > - return false; > -} > - > -int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, > - char *path) > -{ > - if (unlikely(!epoch)) > - panic("invalid 0 epoch"); > - > - if (is_erasure_oid(oid)) { > - if (unlikely(ec_index >= SD_MAX_COPIES)) > - panic("invalid ec index %d", ec_index); > - > - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32, > - md_get_object_dir(oid), oid, ec_index, epoch); > - } else > - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, > - md_get_object_dir(oid), oid, epoch); > - > - if (md_access(path)) > - return SD_RES_SUCCESS; > - > - if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS) > - return SD_RES_SUCCESS; > - > - return SD_RES_NO_OBJ; > -} > - > -uint32_t md_get_info(struct sd_md_info *info) > -{ > - uint32_t ret = sizeof(*info); > - const struct disk *disk; > - int i = 0; > - > - memset(info, 0, ret); > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - info->disk[i].idx = i; > - pstrcpy(info->disk[i].path, PATH_MAX, disk->path); > - /* FIXME: better handling failure case. */ > - info->disk[i].free = get_path_free_size(info->disk[i].path, > - &info->disk[i].used); > - i++; > - } > - info->nr = md.nr_disks; > - sd_rw_unlock(&md.lock); > - return ret; > -} > - > -static inline void md_del_disk(const char *path) > -{ > - struct disk *disk = path_to_disk(path); > - > - if (!disk) { > - sd_err("invalid path %s", path); > - return; > - } > - md_remove_disk(disk); > -} > - > -#ifdef HAVE_DISKVNODES > -void update_node_disks(void) > -{ > - const struct disk *disk; > - int i = 0; > - bool rb_empty = false; > - > - if (!sys) > - return; > - > - memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX); > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - sys->this_node.disks[i].disk_id = > - sd_hash(disk->path, strlen(disk->path)); > - sys->this_node.disks[i].disk_space = disk->space; > - i++; > - } > - sd_rw_unlock(&md.lock); > - > - if (RB_EMPTY_ROOT(&md.vroot)) > - rb_empty = true; > - sd_write_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - if (!rb_empty) > - remove_vdisks(disk); > - create_vdisks(disk); > - } > - sd_rw_unlock(&md.lock); > -} > -#else > -void update_node_disks(void) > -{ > -} > -#endif > - > -static int do_plug_unplug(char *disks, bool plug) > -{ > - const char *path; > - int old_nr, ret = SD_RES_UNKNOWN; > - > - sd_write_lock(&md.lock); > - old_nr = md.nr_disks; > - path = strtok(disks, ","); > - do { > - if (plug) { > - if (!md_add_disk(path, true)) > - sd_err("failed to add %s", path); > - } else { > - md_del_disk(path); > - } > - } while ((path = strtok(NULL, ","))); > - > - /* If no disks change, bail out */ > - if (old_nr == md.nr_disks) > - goto out; > - > - ret = SD_RES_SUCCESS; > -out: > - sd_rw_unlock(&md.lock); > - > - if (ret == SD_RES_SUCCESS) { > - update_node_disks(); > - kick_recover(); > - } > - > - return ret; > -} > - > -int md_plug_disks(char *disks) > -{ > - return do_plug_unplug(disks, true); > -} > - > -int md_unplug_disks(char *disks) > -{ > - return do_plug_unplug(disks, false); > -} > - > -uint64_t md_get_size(uint64_t *used) > -{ > - uint64_t fsize = 0; > - const struct disk *disk; > - > - *used = 0; > - sd_read_lock(&md.lock); > - rb_for_each_entry(disk, &md.root, rb) { > - fsize += get_path_free_size(disk->path, used); > - } > - sd_rw_unlock(&md.lock); > - > - return fsize + *used; > -} > - > -uint32_t md_nr_disks(void) > -{ > - return nr_online_disks(); > -} > diff --git a/sheep/plain_store.c b/sheep/plain_store.c > deleted file mode 100644 > index 4c19832..0000000 > --- a/sheep/plain_store.c > +++ /dev/null > @@ -1,759 +0,0 @@ > -/* > - * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. > - * > - * This program is free software; you can redistribute it and/or > - * modify it under the terms of the GNU General Public License version > - * 2 as published by the Free Software Foundation. > - * > - * You should have received a copy of the GNU General Public License > - * along with this program. If not, see <http://www.gnu.org/licenses/>. > - */ > - > -#include <libgen.h> > -#include <linux/falloc.h> > - > -#include "sheep_priv.h" > - > -#ifndef FALLOC_FL_PUNCH_HOLE > -#define FALLOC_FL_PUNCH_HOLE 0x02 > -#endif > - > -#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; }) > - > -static inline bool iocb_is_aligned(const struct siocb *iocb) > -{ > - return sector_algined(iocb->offset) && sector_algined(iocb->length); > -} > - > -static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create) > -{ > - int syncflag = create ? O_SYNC : O_DSYNC; > - int flags = syncflag | O_RDWR; > - > - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) > - flags &= ~syncflag; > - > - if (sys->backend_dio && iocb_is_aligned(iocb)) { > - if (!is_aligned_to_pagesize(iocb->buf)) > - panic("Memory isn't aligned to pagesize %p", iocb->buf); > - flags |= O_DIRECT; > - } > - > - if (create) > - flags |= O_CREAT | O_EXCL; > - > - return flags; > -} > - > -static int get_store_path(uint64_t oid, uint8_t ec_index, char *path) > -{ > - if (is_erasure_oid(oid)) { > - if (unlikely(ec_index >= SD_MAX_COPIES)) > - panic("invalid ec_index %d", ec_index); > - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", > - md_get_object_dir(oid), oid, ec_index); > - } > - > - return snprintf(path, PATH_MAX, "%s/%016" PRIx64, > - md_get_object_dir(oid), oid); > -} > - > -static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path) > -{ > - if (is_erasure_oid(oid)) { > - if (unlikely(ec_index >= SD_MAX_COPIES)) > - panic("invalid ec_index %d", ec_index); > - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp", > - md_get_object_dir(oid), oid, ec_index); > - } > - > - return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp", > - md_get_object_dir(oid), oid); > -} > - > -static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t > ec_index, > - char *path) > -{ > - return md_get_stale_path(oid, epoch, ec_index, path); > -} > - > -/* > - * Check if oid is in this nodes (if oid is in the wrong place, it will be > moved > - * to the correct one after this call in a MD setup. > - */ > -bool default_exist(uint64_t oid, uint8_t ec_index) > -{ > - char path[PATH_MAX]; > - > - get_store_path(oid, ec_index, path); > - > - return md_exist(oid, ec_index, path); > -} > - > -static int err_to_sderr(const char *path, uint64_t oid, int err) > -{ > - struct stat s; > - char p[PATH_MAX], *dir; > - > - /* Use a temporary buffer since dirname() may modify its argument. */ > - pstrcpy(p, sizeof(p), path); > - dir = dirname(p); > - > - sd_debug("%s", path); > - switch (err) { > - case ENOENT: > - if (stat(dir, &s) < 0) { > - sd_err("%s corrupted", dir); > - return md_handle_eio(dir); > - } > - sd_debug("object %016" PRIx64 " not found locally", oid); > - return SD_RES_NO_OBJ; > - case ENOSPC: > - /* TODO: stop automatic recovery */ > - sd_err("diskfull, oid=%"PRIx64, oid); > - return SD_RES_NO_SPACE; > - case EMFILE: > - case ENFILE: > - case EINTR: > - case EAGAIN: > - case EEXIST: > - sd_err("%m, oid=%"PRIx64, oid); > - /* make gateway try again */ > - return SD_RES_NETWORK_ERROR; > - default: > - sd_err("oid=%"PRIx64", %m", oid); > - return md_handle_eio(dir); > - } > -} > - > -static int discard(int fd, uint64_t start, uint32_t end) > -{ > - int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, > - start, end - start); > - if (ret < 0) { > - if (errno == ENOSYS || errno == EOPNOTSUPP) > - sd_info("FALLOC_FL_PUNCH_HOLE is not supported " > - "on this filesystem"); > - else > - sd_err("failed to discard object, %m"); > - } > - > - return ret; > -} > - > -/* Trim zero blocks of the beginning and end of the object. */ > -static int default_trim(int fd, uint64_t oid, const struct siocb *iocb, > - uint64_t *poffset, uint32_t *plen) > -{ > - trim_zero_blocks(iocb->buf, poffset, plen); > - > - if (iocb->offset < *poffset) { > - sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset, > - *poffset, oid); > - > - if (discard(fd, iocb->offset, *poffset) < 0) > - return -1; > - } > - > - if (*poffset + *plen < iocb->offset + iocb->length) { > - uint64_t end = iocb->offset + iocb->length; > - uint32_t object_size = get_vdi_object_size(oid_to_vid(oid)); > - if (end == get_objsize(oid, object_size)) > - /* This is necessary to punch the last block */ > - end = round_up(end, BLOCK_SIZE); > - sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen, > - end, oid); > - > - if (discard(fd, *poffset + *plen, end) < 0) > - return -1; > - } > - > - return 0; > -} > - > -int default_write(uint64_t oid, const struct siocb *iocb) > -{ > - int flags = prepare_iocb(oid, iocb, false), fd, > - ret = SD_RES_SUCCESS; > - char path[PATH_MAX]; > - ssize_t size; > - uint32_t len = iocb->length; > - uint64_t offset = iocb->offset; > - static bool trim_is_supported = true; > - > - if (iocb->epoch < sys_epoch()) { > - sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch()); > - return SD_RES_OLD_NODE_VER; > - } > - > - if (uatomic_is_true(&sys->use_journal) && > - unlikely(journal_write_store(oid, iocb->buf, iocb->length, > - iocb->offset, false)) > - != SD_RES_SUCCESS) { > - sd_err("turn off journaling"); > - uatomic_set_false(&sys->use_journal); > - flags |= O_DSYNC; > - sync(); > - } > - > - get_store_path(oid, iocb->ec_index, path); > - > - /* > - * Make sure oid is in the right place because oid might be misplaced > - * in a wrong place, due to 'shutdown/restart with less/more disks' or > - * any bugs. We need call err_to_sderr() to return EIO if disk is broken > - */ > - if (!default_exist(oid, iocb->ec_index)) > - return err_to_sderr(path, oid, ENOENT); > - > - fd = open(path, flags, sd_def_fmode); > - if (unlikely(fd < 0)) > - return err_to_sderr(path, oid, errno); > - > - if (trim_is_supported && is_sparse_object(oid)) { > - if (default_trim(fd, oid, iocb, &offset, &len) < 0) { > - trim_is_supported = false; > - offset = iocb->offset; > - len = iocb->length; > - } > - } > - > - size = xpwrite(fd, iocb->buf, len, offset); > - if (unlikely(size != len)) { > - sd_err("failed to write object %"PRIx64", path=%s, offset=%" > - PRId32", size=%"PRId32", result=%zd, %m", oid, path, > - iocb->offset, iocb->length, size); > - ret = err_to_sderr(path, oid, errno); > - goto out; > - } > -out: > - close(fd); > - return ret; > -} > - > -static int make_stale_dir(const char *path) > -{ > - char p[PATH_MAX]; > - > - snprintf(p, PATH_MAX, "%s/.stale", path); > - if (xmkdir(p, sd_def_dmode) < 0) { > - sd_err("%s failed, %m", p); > - return SD_RES_EIO; > - } > - return SD_RES_SUCCESS; > -} > - > -static int purge_dir(const char *path) > -{ > - if (purge_directory(path) < 0) > - return SD_RES_EIO; > - > - return SD_RES_SUCCESS; > -} > - > -static int purge_stale_dir(const char *path) > -{ > - char p[PATH_MAX]; > - > - snprintf(p, PATH_MAX, "%s/.stale", path); > - > - if (purge_directory_async(p) < 0) > - return SD_RES_EIO; > - > - return SD_RES_SUCCESS; > -} > - > -int default_cleanup(void) > -{ > - int ret; > - > - ret = for_each_obj_path(purge_stale_dir); > - if (ret != SD_RES_SUCCESS) > - return ret; > - > - return SD_RES_SUCCESS; > -} > - > -static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch) > -{ > - int ret; > - struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE); > - struct siocb iocb = { > - .epoch = epoch, > - .buf = inode, > - .length = SD_INODE_HEADER_SIZE, > - }; > - > - ret = default_read(oid, &iocb); > - if (ret != SD_RES_SUCCESS) { > - sd_err("failed to read inode header %" PRIx64 " %" PRId32 > - "wat %s", oid, epoch, wd); > - goto out; > - } > - add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies, > - vdi_is_snapshot(inode), inode->copy_policy, > - inode->block_size_shift, inode->parent_vdi_id); > - > - if (inode->name[0] == '\0') > - atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted); > - > - atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse); > - > - ret = SD_RES_SUCCESS; > -out: > - free(inode); > - return ret; > -} > - > -static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd, > - uint32_t epoch, uint8_t ec_index, > - struct vnode_info *vinfo, > - void *arg) > -{ > - int ret; > - objlist_cache_insert(oid); > - > - if (is_vdi_obj(oid)) { > - sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32 > - " at %s", oid, epoch, wd); > - ret = init_vdi_state(oid, wd, epoch); > - if (ret != SD_RES_SUCCESS) > - return ret; > - } > - return SD_RES_SUCCESS; > -} > - > -int default_init(void) > -{ > - int ret; > - > - sd_debug("use plain store driver"); > - ret = for_each_obj_path(make_stale_dir); > - if (ret != SD_RES_SUCCESS) > - return ret; > - > - for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL); > - > - return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL); > -} > - > -static int default_read_from_path(uint64_t oid, const char *path, > - const struct siocb *iocb) > -{ > - int flags = prepare_iocb(oid, iocb, false), fd, > - ret = SD_RES_SUCCESS; > - ssize_t size; > - > - /* > - * Make sure oid is in the right place because oid might be misplaced > - * in a wrong place, due to 'shutdown/restart with less disks' or any > - * bugs. We need call err_to_sderr() to return EIO if disk is broken. > - * > - * For stale path, get_store_stale_path already does default_exist job. > - */ > - if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index)) > - return err_to_sderr(path, oid, ENOENT); > - > - fd = open(path, flags); > - if (fd < 0) > - return err_to_sderr(path, oid, errno); > - > - size = xpread(fd, iocb->buf, iocb->length, iocb->offset); > - if (size < 0) { > - sd_err("failed to read object %"PRIx64", path=%s, offset=%" > - PRId32", size=%"PRId32", result=%zd, %m", oid, path, > - iocb->offset, iocb->length, size); > - ret = err_to_sderr(path, oid, errno); > - } > - close(fd); > - return ret; > -} > - > -int default_read(uint64_t oid, const struct siocb *iocb) > -{ > - int ret; > - char path[PATH_MAX]; > - > - get_store_path(oid, iocb->ec_index, path); > - ret = default_read_from_path(oid, path, iocb); > - > - /* > - * If the request is against the older epoch, try to read from > - * the stale directory > - */ > - if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 && > - iocb->epoch < sys_epoch()) { > - get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path); > - ret = default_read_from_path(oid, path, iocb); > - } > - > - return ret; > -} > - > -int default_create_and_write(uint64_t oid, const struct siocb *iocb) > -{ > - char path[PATH_MAX], tmp_path[PATH_MAX], *dir; > - int flags = prepare_iocb(oid, iocb, true); > - int ret, fd; > - uint32_t len = iocb->length; > - uint32_t object_size = 0; > - size_t obj_size; > - uint64_t offset = iocb->offset; > - > - sd_debug("%"PRIx64, oid); > - get_store_path(oid, iocb->ec_index, path); > - get_store_tmp_path(oid, iocb->ec_index, tmp_path); > - > - if (uatomic_is_true(&sys->use_journal) && > - journal_write_store(oid, iocb->buf, iocb->length, > - iocb->offset, true) > - != SD_RES_SUCCESS) { > - sd_err("turn off journaling"); > - uatomic_set_false(&sys->use_journal); > - flags |= O_SYNC; > - sync(); > - } > - > - fd = open(tmp_path, flags, sd_def_fmode); > - if (fd < 0) { > - if (errno == EEXIST) { > - /* > - * This happens if node membership changes during object > - * creation; while gateway retries a CREATE request, > - * recovery process could also recover the object at the > - * same time. They should try to write the same date, > - * so it is okay to simply return success here. > - */ > - sd_debug("%s exists", tmp_path); > - return SD_RES_SUCCESS; > - } > - > - sd_err("failed to open %s: %m", tmp_path); > - return err_to_sderr(path, oid, errno); > - } > - > - obj_size = get_store_objsize(oid); > - > - trim_zero_blocks(iocb->buf, &offset, &len); > - > - object_size = get_vdi_object_size(oid_to_vid(oid)); > - > - if (offset != 0 || len != get_objsize(oid, object_size)) { > - if (is_sparse_object(oid)) > - ret = xftruncate(fd, obj_size); > - else > - ret = prealloc(fd, obj_size); > - if (ret < 0) { > - ret = err_to_sderr(path, oid, errno); > - goto out; > - } > - } > - > - ret = xpwrite(fd, iocb->buf, len, offset); > - if (ret != len) { > - sd_err("failed to write object. %m"); > - ret = err_to_sderr(path, oid, errno); > - goto out; > - } > - > - ret = rename(tmp_path, path); > - if (ret < 0) { > - sd_err("failed to rename %s to %s: %m", tmp_path, path); > - ret = err_to_sderr(path, oid, errno); > - goto out; > - } > - > - close(fd); > - > - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) { > - objlist_cache_insert(oid); > - return SD_RES_SUCCESS; > - } > - > - pstrcpy(tmp_path, sizeof(tmp_path), path); > - dir = dirname(tmp_path); > - fd = open(dir, O_DIRECTORY | O_RDONLY); > - if (fd < 0) { > - sd_err("failed to open directory %s: %m", dir); > - return err_to_sderr(path, oid, errno); > - } > - > - if (fsync(fd) != 0) { > - sd_err("failed to write directory %s: %m", dir); > - ret = err_to_sderr(path, oid, errno); > - close(fd); > - if (unlink(path) != 0) > - sd_err("failed to unlink %s: %m", path); > - return ret; > - } > - close(fd); > - objlist_cache_insert(oid); > - return SD_RES_SUCCESS; > - > -out: > - if (unlink(tmp_path) != 0) > - sd_err("failed to unlink %s: %m", tmp_path); > - close(fd); > - return ret; > -} > - > -int default_link(uint64_t oid, uint32_t tgt_epoch) > -{ > - char path[PATH_MAX], stale_path[PATH_MAX]; > - > - sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid, > - tgt_epoch); > - > - snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid); > - get_store_stale_path(oid, tgt_epoch, 0, stale_path); > - > - if (link(stale_path, path) < 0) { > - /* > - * Recovery thread and main thread might try to recover the > - * same object and we might get EEXIST in such case. > - */ > - if (errno == EEXIST) > - goto out; > - > - sd_debug("failed to link from %s to %s, %m", stale_path, path); > - return err_to_sderr(path, oid, errno); > - } > -out: > - return SD_RES_SUCCESS; > -} > - > -/* > - * For replicated object, if any of the replica belongs to this node, we > - * consider it not stale. > - * > - * For erasure coded object, since every copy is unique and if it migrates to > - * other node(index gets changed even it has some other copy belongs to it) > - * because of hash ring changes, we consider it stale. > - */ > -static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo) > -{ > - uint32_t i, nr_copies; > - const struct sd_vnode *v; > - bool ret = true; > - const struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; > - > - nr_copies = get_obj_copy_number(oid, vinfo->nr_zones); > - oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes); > - for (i = 0; i < nr_copies; i++) { > - v = obj_vnodes[i]; > - if (vnode_is_local(v)) { > - if (ec_index < SD_MAX_COPIES) { > - if (i == ec_index) > - ret = false; > - } else { > - ret = false; > - } > - break; > - } > - } > - > - return ret; > -} > - > -static int move_object_to_stale_dir(uint64_t oid, const char *wd, > - uint32_t epoch, uint8_t ec_index, > - struct vnode_info *vinfo, void *arg) > -{ > - char path[PATH_MAX], stale_path[PATH_MAX]; > - uint32_t tgt_epoch = *(uint32_t *)arg; > - > - /* ec_index from md.c is reliable so we can directly use it */ > - if (ec_index < SD_MAX_COPIES) { > - snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", > - md_get_object_dir(oid), oid, ec_index); > - snprintf(stale_path, PATH_MAX, > - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, > - md_get_object_dir(oid), oid, ec_index, tgt_epoch); > - } else { > - snprintf(path, PATH_MAX, "%s/%016" PRIx64, > - md_get_object_dir(oid), oid); > - snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, > - md_get_object_dir(oid), oid, tgt_epoch); > - } > - > - if (unlikely(rename(path, stale_path)) < 0) { > - sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid, > - path); > - return SD_RES_EIO; > - } > - > - sd_debug("moved object %"PRIx64, oid); > - return SD_RES_SUCCESS; > -} > - > -static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch, > - uint8_t ec_index, struct vnode_info *vinfo, > - void *arg) > -{ > - if (oid_stale(oid, ec_index, vinfo)) > - return move_object_to_stale_dir(oid, wd, 0, ec_index, > - NULL, arg); > - > - return SD_RES_SUCCESS; > -} > - > -int default_update_epoch(uint32_t epoch) > -{ > - assert(epoch); > - return for_each_object_in_wd(check_stale_objects, false, &epoch); > -} > - > -int default_format(void) > -{ > - unsigned ret; > - > - sd_debug("try get a clean store"); > - ret = for_each_obj_path(purge_dir); > - if (ret != SD_RES_SUCCESS) > - return ret; > - > - if (sys->enable_object_cache) > - object_cache_format(); > - > - return SD_RES_SUCCESS; > -} > - > -int default_remove_object(uint64_t oid, uint8_t ec_index) > -{ > - char path[PATH_MAX]; > - > - if (uatomic_is_true(&sys->use_journal)) > - journal_remove_object(oid); > - > - get_store_path(oid, ec_index, path); > - > - if (unlink(path) < 0) { > - if (errno == ENOENT) > - return SD_RES_NO_OBJ; > - > - sd_err("failed, %s, %m", path); > - return SD_RES_EIO; > - } > - > - return SD_RES_SUCCESS; > -} > - > -#define SHA1NAME "user.obj.sha1" > - > -static int get_object_sha1(const char *path, uint8_t *sha1) > -{ > - if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE) > - != SHA1_DIGEST_SIZE) { > - if (errno == ENODATA) > - sd_debug("sha1 is not cached yet, %s", path); > - else > - sd_err("fail to get xattr, %s", path); > - return -1; > - } > - > - return 0; > -} > - > -static int set_object_sha1(const char *path, const uint8_t *sha1) > -{ > - int ret; > - > - ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0); > - if (ret < 0) > - sd_err("fail to set sha1, %s", path); > - > - return ret; > -} > - > -static int get_object_path(uint64_t oid, uint32_t epoch, char *path, > - size_t size) > -{ > - if (default_exist(oid, 0)) { > - snprintf(path, PATH_MAX, "%s/%016"PRIx64, > - md_get_object_dir(oid), oid); > - } else { > - get_store_stale_path(oid, epoch, 0, path); > - if (access(path, F_OK) < 0) { > - if (errno == ENOENT) > - return SD_RES_NO_OBJ; > - return SD_RES_EIO; > - } > - > - } > - > - return SD_RES_SUCCESS; > -} > - > -int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1) > -{ > - int ret; > - void *buf; > - struct siocb iocb = {}; > - uint32_t length; > - bool is_readonly_obj = oid_is_readonly(oid); > - char path[PATH_MAX]; > - > - ret = get_object_path(oid, epoch, path, sizeof(path)); > - if (ret != SD_RES_SUCCESS) > - return ret; > - > - if (is_readonly_obj) { > - if (get_object_sha1(path, sha1) == 0) { > - sd_debug("use cached sha1 digest %s", > - sha1_to_hex(sha1)); > - return SD_RES_SUCCESS; > - } > - } > - > - length = get_store_objsize(oid); > - buf = valloc(length); > - if (buf == NULL) > - return SD_RES_NO_MEM; > - > - iocb.epoch = epoch; > - iocb.buf = buf; > - iocb.length = length; > - > - ret = default_read_from_path(oid, path, &iocb); > - if (ret != SD_RES_SUCCESS) { > - free(buf); > - return ret; > - } > - > - get_buffer_sha1(buf, length, sha1); > - free(buf); > - > - sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid, > - epoch, sha1_to_hex(sha1)); > - > - if (is_readonly_obj) > - set_object_sha1(path, sha1); > - > - return ret; > -} > - > -int default_purge_obj(void) > -{ > - uint32_t tgt_epoch = get_latest_epoch(); > - > - return for_each_object_in_wd(move_object_to_stale_dir, true, > - &tgt_epoch); > -} > - > -static struct store_driver plain_store = { > - .name = "plain", > - .init = default_init, > - .exist = default_exist, > - .create_and_write = default_create_and_write, > - .write = default_write, > - .read = default_read, > - .link = default_link, > - .update_epoch = default_update_epoch, > - .cleanup = default_cleanup, > - .format = default_format, > - .remove_object = default_remove_object, > - .get_hash = default_get_hash, > - .purge_obj = default_purge_obj, > -}; > - > -add_store_driver(plain_store); > diff --git a/sheep/store.c b/sheep/store.c > deleted file mode 100644 > index 8843fb8..0000000 > --- a/sheep/store.c > +++ /dev/null > @@ -1,511 +0,0 @@ > -/* > - * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. > - * > - * This program is free software; you can redistribute it and/or > - * modify it under the terms of the GNU General Public License version > - * 2 as published by the Free Software Foundation. > - * > - * You should have received a copy of the GNU General Public License > - * along with this program. If not, see <http://www.gnu.org/licenses/>. > - */ > - > -#include "sheep_priv.h" > - > -char *obj_path; > -char *epoch_path; > - > -struct store_driver *sd_store; > -LIST_HEAD(store_drivers); > - > -int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes) > -{ > - int ret, len, nodes_len; > - time_t t; > - char path[PATH_MAX], *buf; > - > - sd_debug("update epoch: %d, %zu", epoch, nr_nodes); > - > - /* Piggyback the epoch creation time for 'dog cluster info' */ > - time(&t); > - nodes_len = nr_nodes * sizeof(struct sd_node); > - len = nodes_len + sizeof(time_t); > - buf = xmalloc(len); > - memcpy(buf, nodes, nodes_len); > - memcpy(buf + nodes_len, &t, sizeof(time_t)); > - > - /* > - * rb field is unused in epoch file, zero-filling it > - * is good for epoch file recovery because it is unified > - */ > - for (int i = 0; i < nr_nodes; i++) > - memset(buf + i * sizeof(struct sd_node) > - + offsetof(struct sd_node, rb), > - 0, sizeof(struct rb_node)); > - > - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); > - > - ret = atomic_create_and_write(path, buf, len, true); > - > - free(buf); > - return ret; > -} > - > -static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len, > - int *nr_nodes, time_t *timestamp) > -{ > - int fd, ret, buf_len; > - char path[PATH_MAX]; > - struct stat epoch_stat; > - > - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); > - fd = open(path, O_RDONLY); > - if (fd < 0) { > - sd_debug("failed to open epoch %"PRIu32" log, %m", epoch); > - goto err; > - } > - > - memset(&epoch_stat, 0, sizeof(epoch_stat)); > - ret = fstat(fd, &epoch_stat); > - if (ret < 0) { > - sd_err("failed to stat epoch %"PRIu32" log, %m", epoch); > - goto err; > - } > - > - buf_len = epoch_stat.st_size - sizeof(*timestamp); > - if (buf_len < 0) { > - sd_err("invalid epoch %"PRIu32" log", epoch); > - goto err; > - } > - if (len < buf_len) { > - close(fd); > - return SD_RES_BUFFER_SMALL; > - } > - > - ret = xread(fd, nodes, buf_len); > - if (ret < 0) { > - sd_err("failed to read epoch %"PRIu32" log, %m", epoch); > - goto err; > - } > - > - /* Broken epoch, just ignore */ > - if (ret % sizeof(struct sd_node) != 0) { > - sd_err("invalid epoch %"PRIu32" log", epoch); > - goto err; > - } > - > - *nr_nodes = ret / sizeof(struct sd_node); > - > - if (timestamp) { > - ret = xread(fd, timestamp, sizeof(*timestamp)); > - if (ret != sizeof(*timestamp)) { > - sd_err("invalid epoch %"PRIu32" log", epoch); > - goto err; > - } > - } > - > - close(fd); > - return SD_RES_SUCCESS; > -err: > - if (fd >= 0) > - close(fd); > - return SD_RES_NO_TAG; > -} > - > -int epoch_log_read(uint32_t epoch, struct sd_node *nodes, > - int len, int *nr_nodes) > -{ > - return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL); > -} > - > -int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes, > - int len, int *nr_nodes, time_t *timestamp) > -{ > - return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp); > -} > - > -uint32_t get_latest_epoch(void) > -{ > - DIR *dir; > - struct dirent *d; > - uint32_t e, epoch = 0; > - char *p; > - > - dir = opendir(epoch_path); > - if (!dir) > - panic("failed to get the latest epoch: %m"); > - > - while ((d = readdir(dir))) { > - e = strtol(d->d_name, &p, 10); > - if (d->d_name == p) > - continue; > - > - if (strlen(d->d_name) != 8) > - continue; > - > - if (e > epoch) > - epoch = e; > - } > - closedir(dir); > - > - return epoch; > -} > - > -int lock_base_dir(const char *d) > -{ > -#define LOCK_PATH "/lock" > - char *lock_path; > - int ret = 0; > - int fd, len = strlen(d) + strlen(LOCK_PATH) + 1; > - > - lock_path = xzalloc(len); > - snprintf(lock_path, len, "%s" LOCK_PATH, d); > - > - fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode); > - if (fd < 0) { > - sd_err("failed to open lock file %s (%m)", lock_path); > - ret = -1; > - goto out; > - } > - > - if (lockf(fd, F_TLOCK, 1) < 0) { > - if (errno == EACCES || errno == EAGAIN) > - sd_err("another sheep daemon is using %s", d); > - else > - sd_err("unable to get base dir lock (%m)"); > - ret = -1; > - goto out; > - } > - > -out: > - free(lock_path); > - return ret; > -} > - > -int init_base_path(const char *d) > -{ > - if (xmkdir(d, sd_def_dmode) < 0) { > - sd_err("cannot create the directory %s (%m)", d); > - return -1; > - } > - > - return 0; > -} > - > -static inline int check_path_len(const char *path) > -{ > - int len = strlen(path); > - if (len > PATH_MAX) { > - sd_err("insanely long object directory %s", path); > - return -1; > - } > - > - return 0; > -} > - > -static int is_meta_store(const char *path) > -{ > - char conf[PATH_MAX]; > - char epoch[PATH_MAX]; > - > - snprintf(conf, PATH_MAX, "%s/config", path); > - snprintf(epoch, PATH_MAX, "%s/epoch", path); > - if (!access(conf, R_OK) && !access(epoch, R_OK)) > - return true; > - > - return false; > -} > - > -static int init_obj_path(const char *base_path, char *argp) > -{ > - char *p; > - int len; > - > - if (check_path_len(base_path) < 0) > - return -1; > - > -#define OBJ_PATH "/obj" > - len = strlen(base_path) + strlen(OBJ_PATH) + 1; > - obj_path = xzalloc(len); > - snprintf(obj_path, len, "%s" OBJ_PATH, base_path); > - > - /* Eat up the first component */ > - strtok(argp, ","); > - p = strtok(NULL, ","); > - if (!p) { > - /* > - * If We have only one path, meta-store and object-store share > - * it. This is helpful to upgrade old sheep cluster to > - * the MD-enabled. > - */ > - md_add_disk(obj_path, false); > - } else { > - do { > - if (is_meta_store(p)) { > - sd_err("%s is meta-store, abort", p); > - return -1; > - } > - md_add_disk(p, false); > - } while ((p = strtok(NULL, ","))); > - } > - > - if (md_nr_disks() <= 0) { > - sd_err("There isn't any available disk!"); > - return -1; > - } > - > - return xmkdir(obj_path, sd_def_dmode); > -} > - > -static int init_epoch_path(const char *base_path) > -{ > -#define EPOCH_PATH "/epoch/" > - int len = strlen(base_path) + strlen(EPOCH_PATH) + 1; > - epoch_path = xzalloc(len); > - snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path); > - > - return xmkdir(epoch_path, sd_def_dmode); > -} > - > -/* > - * If the node is gateway, this function only finds the store driver. > - * Otherwise, this function initializes the backend store > - */ > -int init_store_driver(bool is_gateway) > -{ > - char driver_name[STORE_LEN], *p; > - > - pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store); > - > - p = memchr(driver_name, '\0', STORE_LEN); > - if (!p) { > - /* > - * If the driver name is not NUL terminated we are in deep > - * trouble, let's get out here. > - */ > - sd_debug("store name not NUL terminated"); > - return SD_RES_NO_STORE; > - } > - > - /* > - * The store file might not exist in case this is a new sheep that > - * never joined a cluster before. > - */ > - if (p == driver_name) > - return 0; > - > - sd_store = find_store_driver(driver_name); > - if (!sd_store) { > - sd_debug("store %s not found", driver_name); > - return SD_RES_NO_STORE; > - } > - > - if (is_gateway) > - return SD_RES_SUCCESS; > - > - return sd_store->init(); > -} > - > -int init_disk_space(const char *base_path) > -{ > - int ret = SD_RES_SUCCESS; > - uint64_t space_size = 0, mds; > - struct statvfs fs; > - > - if (sys->gateway_only) > - goto out; > - > - /* We need to init md even we don't need to update sapce */ > - mds = md_init_space(); > - > - /* If it is restarted */ > - ret = get_node_space(&space_size); > - if (space_size != 0) { > - sys->disk_space = space_size; > - goto out; > - } > - > - /* User has specified the space at startup */ > - if (sys->disk_space) { > - ret = set_node_space(sys->disk_space); > - goto out; > - } > - > - if (mds) { > - sys->disk_space = mds; > - } else { > - ret = statvfs(base_path, &fs); > - if (ret < 0) { > - sd_debug("get disk space failed %m"); > - ret = SD_RES_EIO; > - goto out; > - } > - sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail; > - } > - > - ret = set_node_space(sys->disk_space); > -out: > - sd_debug("disk free space is %" PRIu64, sys->disk_space); > - return ret; > -} > - > -/* Initialize all the global pathnames used internally */ > -int init_global_pathnames(const char *d, char *argp) > -{ > - int ret; > - > - ret = init_obj_path(d, argp); > - if (ret) > - return ret; > - > - ret = init_epoch_path(d); > - if (ret) > - return ret; > - > - init_config_path(d); > - > - return 0; > -} > - > -/* Write data to both local object cache (if enabled) and backends */ > -int sd_write_object(uint64_t oid, char *data, unsigned int datalen, > - uint64_t offset, bool create) > -{ > - struct sd_req hdr; > - int ret; > - > - if (sys->enable_object_cache && object_is_cached(oid)) { > - ret = object_cache_write(oid, data, datalen, offset, > - create); > - if (ret == SD_RES_NO_CACHE) > - goto forward_write; > - > - if (ret != 0) { > - sd_err("write cache failed %" PRIx64 " %" PRIx32, oid, > - ret); > - return ret; > - } > - } > - > -forward_write: > - if (create) > - sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); > - else > - sd_init_req(&hdr, SD_OP_WRITE_OBJ); > - hdr.flags = SD_FLAG_CMD_WRITE; > - hdr.data_length = datalen; > - > - hdr.obj.oid = oid; > - hdr.obj.offset = offset; > - > - ret = exec_local_req(&hdr, data); > - if (ret != SD_RES_SUCCESS) > - sd_err("failed to write object %" PRIx64 ", %s", oid, > - sd_strerror(ret)); > - > - return ret; > -} > - > -int read_backend_object(uint64_t oid, char *data, unsigned int datalen, > - uint64_t offset) > -{ > - struct sd_req hdr; > - int ret; > - > - sd_init_req(&hdr, SD_OP_READ_OBJ); > - hdr.data_length = datalen; > - hdr.obj.oid = oid; > - hdr.obj.offset = offset; > - > - ret = exec_local_req(&hdr, data); > - if (ret != SD_RES_SUCCESS) > - sd_err("failed to read object %" PRIx64 ", %s", oid, > - sd_strerror(ret)); > - return ret; > -} > - > -/* > - * Read data firstly from local object cache(if enabled), if fail, > - * try read backends > - */ > -int sd_read_object(uint64_t oid, char *data, unsigned int datalen, > - uint64_t offset) > -{ > - int ret; > - > - if (sys->enable_object_cache && object_is_cached(oid)) { > - ret = object_cache_read(oid, data, datalen, offset); > - if (ret != SD_RES_SUCCESS) { > - sd_err("try forward read %" PRIx64 " %s", oid, > - sd_strerror(ret)); > - goto forward_read; > - } > - return ret; > - } > - > -forward_read: > - return read_backend_object(oid, data, datalen, offset); > -} > - > -int sd_remove_object(uint64_t oid) > -{ > - struct sd_req hdr; > - int ret; > - > - if (sys->enable_object_cache && object_is_cached(oid)) { > - ret = object_cache_remove(oid); > - if (ret != SD_RES_SUCCESS) > - return ret; > - } > - > - sd_init_req(&hdr, SD_OP_REMOVE_OBJ); > - hdr.obj.oid = oid; > - > - ret = exec_local_req(&hdr, NULL); > - if (ret != SD_RES_SUCCESS) > - sd_err("failed to remove object %" PRIx64 ", %s", oid, > - sd_strerror(ret)); > - > - return ret; > -} > - > -int sd_discard_object(uint64_t oid) > -{ > - int ret; > - struct sd_req hdr; > - > - sd_init_req(&hdr, SD_OP_DISCARD_OBJ); > - hdr.obj.oid = oid; > - > - ret = exec_local_req(&hdr, NULL); > - if (ret != SD_RES_SUCCESS) > - sd_err("Failed to discard data obj %"PRIu64" %s", oid, > - sd_strerror(ret)); > - > - return ret; > -} > - > -int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation, > - uint32_t refcnt) > -{ > - struct sd_req hdr; > - int ret; > - uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid); > - > - sd_debug("%"PRIx64", %" PRId32 ", %" PRId32, > - data_oid, generation, refcnt); > - > - if (generation == 0 && refcnt == 0) > - return sd_remove_object(data_oid); > - > - sd_init_req(&hdr, SD_OP_DECREF_OBJ); > - hdr.ref.oid = ledger_oid; > - hdr.ref.generation = generation; > - hdr.ref.count = refcnt; > - > - ret = exec_local_req(&hdr, NULL); > - if (ret != SD_RES_SUCCESS) > - sd_err("failed to decrement reference %" PRIx64 ", %s", > - ledger_oid, sd_strerror(ret)); > - > - return ret; > -} > diff --git a/sheep/store/common.c b/sheep/store/common.c > new file mode 100644 > index 0000000..8843fb8 > --- /dev/null > +++ b/sheep/store/common.c > @@ -0,0 +1,511 @@ > +/* > + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License version > + * 2 as published by the Free Software Foundation. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program. If not, see <http://www.gnu.org/licenses/>. > + */ > + > +#include "sheep_priv.h" > + > +char *obj_path; > +char *epoch_path; > + > +struct store_driver *sd_store; > +LIST_HEAD(store_drivers); > + > +int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes) > +{ > + int ret, len, nodes_len; > + time_t t; > + char path[PATH_MAX], *buf; > + > + sd_debug("update epoch: %d, %zu", epoch, nr_nodes); > + > + /* Piggyback the epoch creation time for 'dog cluster info' */ > + time(&t); > + nodes_len = nr_nodes * sizeof(struct sd_node); > + len = nodes_len + sizeof(time_t); > + buf = xmalloc(len); > + memcpy(buf, nodes, nodes_len); > + memcpy(buf + nodes_len, &t, sizeof(time_t)); > + > + /* > + * rb field is unused in epoch file, zero-filling it > + * is good for epoch file recovery because it is unified > + */ > + for (int i = 0; i < nr_nodes; i++) > + memset(buf + i * sizeof(struct sd_node) > + + offsetof(struct sd_node, rb), > + 0, sizeof(struct rb_node)); > + > + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); > + > + ret = atomic_create_and_write(path, buf, len, true); > + > + free(buf); > + return ret; > +} > + > +static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len, > + int *nr_nodes, time_t *timestamp) > +{ > + int fd, ret, buf_len; > + char path[PATH_MAX]; > + struct stat epoch_stat; > + > + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); > + fd = open(path, O_RDONLY); > + if (fd < 0) { > + sd_debug("failed to open epoch %"PRIu32" log, %m", epoch); > + goto err; > + } > + > + memset(&epoch_stat, 0, sizeof(epoch_stat)); > + ret = fstat(fd, &epoch_stat); > + if (ret < 0) { > + sd_err("failed to stat epoch %"PRIu32" log, %m", epoch); > + goto err; > + } > + > + buf_len = epoch_stat.st_size - sizeof(*timestamp); > + if (buf_len < 0) { > + sd_err("invalid epoch %"PRIu32" log", epoch); > + goto err; > + } > + if (len < buf_len) { > + close(fd); > + return SD_RES_BUFFER_SMALL; > + } > + > + ret = xread(fd, nodes, buf_len); > + if (ret < 0) { > + sd_err("failed to read epoch %"PRIu32" log, %m", epoch); > + goto err; > + } > + > + /* Broken epoch, just ignore */ > + if (ret % sizeof(struct sd_node) != 0) { > + sd_err("invalid epoch %"PRIu32" log", epoch); > + goto err; > + } > + > + *nr_nodes = ret / sizeof(struct sd_node); > + > + if (timestamp) { > + ret = xread(fd, timestamp, sizeof(*timestamp)); > + if (ret != sizeof(*timestamp)) { > + sd_err("invalid epoch %"PRIu32" log", epoch); > + goto err; > + } > + } > + > + close(fd); > + return SD_RES_SUCCESS; > +err: > + if (fd >= 0) > + close(fd); > + return SD_RES_NO_TAG; > +} > + > +int epoch_log_read(uint32_t epoch, struct sd_node *nodes, > + int len, int *nr_nodes) > +{ > + return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL); > +} > + > +int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes, > + int len, int *nr_nodes, time_t *timestamp) > +{ > + return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp); > +} > + > +uint32_t get_latest_epoch(void) > +{ > + DIR *dir; > + struct dirent *d; > + uint32_t e, epoch = 0; > + char *p; > + > + dir = opendir(epoch_path); > + if (!dir) > + panic("failed to get the latest epoch: %m"); > + > + while ((d = readdir(dir))) { > + e = strtol(d->d_name, &p, 10); > + if (d->d_name == p) > + continue; > + > + if (strlen(d->d_name) != 8) > + continue; > + > + if (e > epoch) > + epoch = e; > + } > + closedir(dir); > + > + return epoch; > +} > + > +int lock_base_dir(const char *d) > +{ > +#define LOCK_PATH "/lock" > + char *lock_path; > + int ret = 0; > + int fd, len = strlen(d) + strlen(LOCK_PATH) + 1; > + > + lock_path = xzalloc(len); > + snprintf(lock_path, len, "%s" LOCK_PATH, d); > + > + fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode); > + if (fd < 0) { > + sd_err("failed to open lock file %s (%m)", lock_path); > + ret = -1; > + goto out; > + } > + > + if (lockf(fd, F_TLOCK, 1) < 0) { > + if (errno == EACCES || errno == EAGAIN) > + sd_err("another sheep daemon is using %s", d); > + else > + sd_err("unable to get base dir lock (%m)"); > + ret = -1; > + goto out; > + } > + > +out: > + free(lock_path); > + return ret; > +} > + > +int init_base_path(const char *d) > +{ > + if (xmkdir(d, sd_def_dmode) < 0) { > + sd_err("cannot create the directory %s (%m)", d); > + return -1; > + } > + > + return 0; > +} > + > +static inline int check_path_len(const char *path) > +{ > + int len = strlen(path); > + if (len > PATH_MAX) { > + sd_err("insanely long object directory %s", path); > + return -1; > + } > + > + return 0; > +} > + > +static int is_meta_store(const char *path) > +{ > + char conf[PATH_MAX]; > + char epoch[PATH_MAX]; > + > + snprintf(conf, PATH_MAX, "%s/config", path); > + snprintf(epoch, PATH_MAX, "%s/epoch", path); > + if (!access(conf, R_OK) && !access(epoch, R_OK)) > + return true; > + > + return false; > +} > + > +static int init_obj_path(const char *base_path, char *argp) > +{ > + char *p; > + int len; > + > + if (check_path_len(base_path) < 0) > + return -1; > + > +#define OBJ_PATH "/obj" > + len = strlen(base_path) + strlen(OBJ_PATH) + 1; > + obj_path = xzalloc(len); > + snprintf(obj_path, len, "%s" OBJ_PATH, base_path); > + > + /* Eat up the first component */ > + strtok(argp, ","); > + p = strtok(NULL, ","); > + if (!p) { > + /* > + * If We have only one path, meta-store and object-store share > + * it. This is helpful to upgrade old sheep cluster to > + * the MD-enabled. > + */ > + md_add_disk(obj_path, false); > + } else { > + do { > + if (is_meta_store(p)) { > + sd_err("%s is meta-store, abort", p); > + return -1; > + } > + md_add_disk(p, false); > + } while ((p = strtok(NULL, ","))); > + } > + > + if (md_nr_disks() <= 0) { > + sd_err("There isn't any available disk!"); > + return -1; > + } > + > + return xmkdir(obj_path, sd_def_dmode); > +} > + > +static int init_epoch_path(const char *base_path) > +{ > +#define EPOCH_PATH "/epoch/" > + int len = strlen(base_path) + strlen(EPOCH_PATH) + 1; > + epoch_path = xzalloc(len); > + snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path); > + > + return xmkdir(epoch_path, sd_def_dmode); > +} > + > +/* > + * If the node is gateway, this function only finds the store driver. > + * Otherwise, this function initializes the backend store > + */ > +int init_store_driver(bool is_gateway) > +{ > + char driver_name[STORE_LEN], *p; > + > + pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store); > + > + p = memchr(driver_name, '\0', STORE_LEN); > + if (!p) { > + /* > + * If the driver name is not NUL terminated we are in deep > + * trouble, let's get out here. > + */ > + sd_debug("store name not NUL terminated"); > + return SD_RES_NO_STORE; > + } > + > + /* > + * The store file might not exist in case this is a new sheep that > + * never joined a cluster before. > + */ > + if (p == driver_name) > + return 0; > + > + sd_store = find_store_driver(driver_name); > + if (!sd_store) { > + sd_debug("store %s not found", driver_name); > + return SD_RES_NO_STORE; > + } > + > + if (is_gateway) > + return SD_RES_SUCCESS; > + > + return sd_store->init(); > +} > + > +int init_disk_space(const char *base_path) > +{ > + int ret = SD_RES_SUCCESS; > + uint64_t space_size = 0, mds; > + struct statvfs fs; > + > + if (sys->gateway_only) > + goto out; > + > + /* We need to init md even we don't need to update sapce */ > + mds = md_init_space(); > + > + /* If it is restarted */ > + ret = get_node_space(&space_size); > + if (space_size != 0) { > + sys->disk_space = space_size; > + goto out; > + } > + > + /* User has specified the space at startup */ > + if (sys->disk_space) { > + ret = set_node_space(sys->disk_space); > + goto out; > + } > + > + if (mds) { > + sys->disk_space = mds; > + } else { > + ret = statvfs(base_path, &fs); > + if (ret < 0) { > + sd_debug("get disk space failed %m"); > + ret = SD_RES_EIO; > + goto out; > + } > + sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail; > + } > + > + ret = set_node_space(sys->disk_space); > +out: > + sd_debug("disk free space is %" PRIu64, sys->disk_space); > + return ret; > +} > + > +/* Initialize all the global pathnames used internally */ > +int init_global_pathnames(const char *d, char *argp) > +{ > + int ret; > + > + ret = init_obj_path(d, argp); > + if (ret) > + return ret; > + > + ret = init_epoch_path(d); > + if (ret) > + return ret; > + > + init_config_path(d); > + > + return 0; > +} > + > +/* Write data to both local object cache (if enabled) and backends */ > +int sd_write_object(uint64_t oid, char *data, unsigned int datalen, > + uint64_t offset, bool create) > +{ > + struct sd_req hdr; > + int ret; > + > + if (sys->enable_object_cache && object_is_cached(oid)) { > + ret = object_cache_write(oid, data, datalen, offset, > + create); > + if (ret == SD_RES_NO_CACHE) > + goto forward_write; > + > + if (ret != 0) { > + sd_err("write cache failed %" PRIx64 " %" PRIx32, oid, > + ret); > + return ret; > + } > + } > + > +forward_write: > + if (create) > + sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); > + else > + sd_init_req(&hdr, SD_OP_WRITE_OBJ); > + hdr.flags = SD_FLAG_CMD_WRITE; > + hdr.data_length = datalen; > + > + hdr.obj.oid = oid; > + hdr.obj.offset = offset; > + > + ret = exec_local_req(&hdr, data); > + if (ret != SD_RES_SUCCESS) > + sd_err("failed to write object %" PRIx64 ", %s", oid, > + sd_strerror(ret)); > + > + return ret; > +} > + > +int read_backend_object(uint64_t oid, char *data, unsigned int datalen, > + uint64_t offset) > +{ > + struct sd_req hdr; > + int ret; > + > + sd_init_req(&hdr, SD_OP_READ_OBJ); > + hdr.data_length = datalen; > + hdr.obj.oid = oid; > + hdr.obj.offset = offset; > + > + ret = exec_local_req(&hdr, data); > + if (ret != SD_RES_SUCCESS) > + sd_err("failed to read object %" PRIx64 ", %s", oid, > + sd_strerror(ret)); > + return ret; > +} > + > +/* > + * Read data firstly from local object cache(if enabled), if fail, > + * try read backends > + */ > +int sd_read_object(uint64_t oid, char *data, unsigned int datalen, > + uint64_t offset) > +{ > + int ret; > + > + if (sys->enable_object_cache && object_is_cached(oid)) { > + ret = object_cache_read(oid, data, datalen, offset); > + if (ret != SD_RES_SUCCESS) { > + sd_err("try forward read %" PRIx64 " %s", oid, > + sd_strerror(ret)); > + goto forward_read; > + } > + return ret; > + } > + > +forward_read: > + return read_backend_object(oid, data, datalen, offset); > +} > + > +int sd_remove_object(uint64_t oid) > +{ > + struct sd_req hdr; > + int ret; > + > + if (sys->enable_object_cache && object_is_cached(oid)) { > + ret = object_cache_remove(oid); > + if (ret != SD_RES_SUCCESS) > + return ret; > + } > + > + sd_init_req(&hdr, SD_OP_REMOVE_OBJ); > + hdr.obj.oid = oid; > + > + ret = exec_local_req(&hdr, NULL); > + if (ret != SD_RES_SUCCESS) > + sd_err("failed to remove object %" PRIx64 ", %s", oid, > + sd_strerror(ret)); > + > + return ret; > +} > + > +int sd_discard_object(uint64_t oid) > +{ > + int ret; > + struct sd_req hdr; > + > + sd_init_req(&hdr, SD_OP_DISCARD_OBJ); > + hdr.obj.oid = oid; > + > + ret = exec_local_req(&hdr, NULL); > + if (ret != SD_RES_SUCCESS) > + sd_err("Failed to discard data obj %"PRIu64" %s", oid, > + sd_strerror(ret)); > + > + return ret; > +} > + > +int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation, > + uint32_t refcnt) > +{ > + struct sd_req hdr; > + int ret; > + uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid); > + > + sd_debug("%"PRIx64", %" PRId32 ", %" PRId32, > + data_oid, generation, refcnt); > + > + if (generation == 0 && refcnt == 0) > + return sd_remove_object(data_oid); > + > + sd_init_req(&hdr, SD_OP_DECREF_OBJ); > + hdr.ref.oid = ledger_oid; > + hdr.ref.generation = generation; > + hdr.ref.count = refcnt; > + > + ret = exec_local_req(&hdr, NULL); > + if (ret != SD_RES_SUCCESS) > + sd_err("failed to decrement reference %" PRIx64 ", %s", > + ledger_oid, sd_strerror(ret)); > + > + return ret; > +} > diff --git a/sheep/store/md.c b/sheep/store/md.c > new file mode 100644 > index 0000000..87ab759 > --- /dev/null > +++ b/sheep/store/md.c > @@ -0,0 +1,878 @@ > +/* > + * Copyright (C) 2013 Taobao Inc. > + * > + * Liu Yuan <[email protected]> > + * > + * This program is free software; you can redistribute it and/or > + * modify it under the terms of the GNU General Public License version > + * 2 as published by the Free Software Foundation. > + * > + * You should have received a copy of the GNU General Public License > + * along with this program. If not, see <http://www.gnu.org/licenses/>. > + */ > + > +#include "sheep_priv.h" > + > +#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */ > + > +#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/ノ厂経7/!"
why NONE_EXIST_PATH is changed? And I don't think imbed japanese in to the assic code is good idea. By the way, please use git format-patch to generate a series of patches and send them by git send-email to get a chain of patches under a single topic. For now your patches are split into different topics on my Mutt client. Thanks, Yuan -- sheepdog mailing list [email protected] https://lists.wpkg.org/mailman/listinfo/sheepdog
