From: Masaki Saeki <saeki.mas...@po.ntts.co.jp> This change is a preparation patch for add store_driver. Put files together to the new folder.
Signed-off-by: Masaki Saeki <saeki.mas...@po.ntts.co.jp> --- sheep/Makefile.am | 6 +- sheep/md.c | 878 --------------------------------------------- sheep/plain_store.c | 759 --------------------------------------- sheep/store.c | 511 -------------------------- sheep/store/common.c | 511 ++++++++++++++++++++++++++ sheep/store/md.c | 878 +++++++++++++++++++++++++++++++++++++++++++++ sheep/store/plain_store.c | 759 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 2152 insertions(+), 2150 deletions(-) delete mode 100644 sheep/md.c delete mode 100644 sheep/plain_store.c delete mode 100644 sheep/store.c create mode 100644 sheep/store/common.c create mode 100644 sheep/store/md.c create mode 100644 sheep/store/plain_store.c diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 7a08838..3ddd761 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -24,10 +24,12 @@ AM_CPPFLAGS = -I$(top_builddir)/include -I$(top_srcdir)/include \ sbin_PROGRAMS = sheep -sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c \ +sheep_SOURCES = sheep.c group.c request.c gateway.c vdi.c \ journal.c ops.c recovery.c cluster/local.c \ object_cache.c object_list_cache.c \ - plain_store.c config.c migrate.c md.c + store/common.c store/md.c \ + store/plain_store.c \ + config.c migrate.c if BUILD_HTTP sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c \ diff --git a/sheep/md.c b/sheep/md.c deleted file mode 100644 index bcdfb73..0000000 --- a/sheep/md.c +++ /dev/null @@ -1,878 +0,0 @@ -/* - * Copyright (C) 2013 Taobao Inc. - * - * Liu Yuan <namei.u...@gmail.com> - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License version - * 2 as published by the Free Software Foundation. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "sheep_priv.h" - -#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */ - -#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/əʌo7/!" - -struct md md = { - .vroot = RB_ROOT, - .root = RB_ROOT, - .lock = SD_RW_LOCK_INITIALIZER, -}; - -static inline uint32_t nr_online_disks(void) -{ - uint32_t nr; - - sd_read_lock(&md.lock); - nr = md.nr_disks; - sd_rw_unlock(&md.lock); - - return nr; -} - -static inline int vdisk_number(const struct disk *disk) -{ - return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE); -} - -static int disk_cmp(const struct disk *d1, const struct disk *d2) -{ - return strcmp(d1->path, d2->path); -} - -static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2) -{ - return intcmp(d1->hash, d2->hash); -} - -static struct vdisk *vdisk_insert(struct vdisk *new) -{ - return rb_insert(&md.vroot, new, rb, vdisk_cmp); -} - -/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */ -static struct vdisk *hval_to_vdisk(uint64_t hval) -{ - struct vdisk dummy = { .hash = hval }; - - return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp); -} - -static struct vdisk *oid_to_vdisk(uint64_t oid) -{ - return hval_to_vdisk(sd_hash_oid(oid)); -} - -static void create_vdisks(const struct disk *disk) -{ - uint64_t hval = sd_hash(disk->path, strlen(disk->path)); - const struct sd_node *n = &sys->this_node; - uint64_t node_hval; - int nr; - - if (is_cluster_diskmode(&sys->cinfo)) { - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); - hval = fnv_64a_64(node_hval, hval); - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); - if (0 == n->nid.port) - return; - } else - nr = vdisk_number(disk); - - for (int i = 0; i < nr; i++) { - struct vdisk *v = xmalloc(sizeof(*v)); - - hval = sd_hash_next(hval); - v->hash = hval; - v->disk = disk; - if (unlikely(vdisk_insert(v))) - panic("vdisk hash collison"); - } -} - -static inline void vdisk_free(struct vdisk *v) -{ - rb_erase(&v->rb, &md.vroot); - free(v); -} - -static void remove_vdisks(const struct disk *disk) -{ - uint64_t hval = sd_hash(disk->path, strlen(disk->path)); - const struct sd_node *n = &sys->this_node; - uint64_t node_hval; - int nr; - - if (is_cluster_diskmode(&sys->cinfo)) { - node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); - hval = fnv_64a_64(node_hval, hval); - nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); - } else - nr = vdisk_number(disk); - - for (int i = 0; i < nr; i++) { - struct vdisk *v; - - hval = sd_hash_next(hval); - v = hval_to_vdisk(hval); - sd_assert(v->hash == hval); - - vdisk_free(v); - } -} - -static inline void trim_last_slash(char *path) -{ - sd_assert(path[0]); - while (path[strlen(path) - 1] == '/') - path[strlen(path) - 1] = '\0'; -} - -static struct disk *path_to_disk(const char *path) -{ - struct disk key = {}; - - pstrcpy(key.path, sizeof(key.path), path); - trim_last_slash(key.path); - - return rb_search(&md.root, &key, rb, disk_cmp); -} - -size_t get_store_objsize(uint64_t oid) -{ - if (is_erasure_oid(oid)) { - uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid)); - int d; - ec_policy_to_dp(policy, &d, NULL); - return get_vdi_object_size(oid_to_vid(oid)) / d; - } - return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid))); -} - -static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch, - uint8_t ec_index, struct vnode_info *vinfo, - void *total) -{ - uint64_t *t = total; - struct stat s; - char path[PATH_MAX]; - - snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid); - if (stat(path, &s) == 0) - *t += s.st_blocks * SECTOR_SIZE; - else - *t += get_store_objsize(oid); - - return SD_RES_SUCCESS; -} - -static int64_t find_string_integer(const char *str, const char *delimiter) -{ - char *pos = strstr(str, delimiter), *p; - int64_t ret; - - ret = strtoll(pos + 1, &p, 10); - if (ret == LLONG_MAX || p == pos + 1) { - sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter); - return -1; - } - - return ret; -} - -/* If cleanup is true, temporary objects will be removed */ -static int for_each_object_in_path(const char *path, - int (*func)(uint64_t, const char *, uint32_t, - uint8_t, struct vnode_info *, - void *), - bool cleanup, struct vnode_info *vinfo, - void *arg) -{ - DIR *dir; - struct dirent *d; - uint64_t oid; - int ret = SD_RES_SUCCESS; - char file_name[PATH_MAX]; - - dir = opendir(path); - if (unlikely(!dir)) { - sd_err("failed to open %s, %m", path); - return SD_RES_EIO; - } - - while ((d = readdir(dir))) { - uint32_t epoch = 0; - uint8_t ec_index = SD_MAX_COPIES; - - /* skip ".", ".." and ".stale" */ - if (unlikely(!strncmp(d->d_name, ".", 1))) - continue; - - sd_debug("%s, %s", path, d->d_name); - oid = strtoull(d->d_name, NULL, 16); - if (oid == 0 || oid == ULLONG_MAX) - continue; - - /* don't call callback against temporary objects */ - if (is_tmp_dentry(d->d_name)) { - if (cleanup) { - snprintf(file_name, sizeof(file_name), - "%s/%s", path, d->d_name); - sd_debug("remove tmp object %s", file_name); - if (unlink(file_name) < 0) - sd_err("failed to unlink %s: %m", - file_name); - } - continue; - } - - if (is_stale_dentry(d->d_name)) { - epoch = find_string_integer(d->d_name, "."); - if (epoch < 0) - continue; - } - - if (is_ec_dentry(d->d_name)) { - ec_index = find_string_integer(d->d_name, "_"); - if (ec_index < 0) - continue; - } - - ret = func(oid, path, epoch, ec_index, vinfo, arg); - if (ret != SD_RES_SUCCESS) - break; - } - closedir(dir); - return ret; -} - -static uint64_t get_path_free_size(const char *path, uint64_t *used) -{ - struct statvfs fs; - uint64_t size; - - if (statvfs(path, &fs) < 0) { - sd_err("get disk %s space failed %m", path); - return 0; - } - size = (int64_t)fs.f_frsize * fs.f_bavail; - - if (!used) - goto out; - if (for_each_object_in_path(path, get_total_object_size, false, - NULL, used) - != SD_RES_SUCCESS) - return 0; -out: - return size; -} - -/* - * If path is broken during initialization or not support xattr return 0. We can - * safely use 0 to represent failure case because 0 space path can be - * considered as broken path. - */ -static uint64_t init_path_space(const char *path, bool purge) -{ - uint64_t size; - char stale[PATH_MAX]; - - if (!is_xattr_enabled(path)) { - sd_warn("multi-disk support need xattr feature for path: %s", - path); - goto broken_path; - } - - if (purge && purge_directory(path) < 0) - sd_err("failed to purge %s", path); - - snprintf(stale, PATH_MAX, "%s/.stale", path); - if (xmkdir(stale, sd_def_dmode) < 0) { - sd_err("can't mkdir for %s, %m", stale); - goto broken_path; - } - -#define MDNAME "user.md.size" -#define MDSIZE sizeof(uint64_t) - if (getxattr(path, MDNAME, &size, MDSIZE) < 0) { - if (errno == ENODATA) { - goto create; - } else { - sd_err("%s, %m", path); - goto broken_path; - } - } - - return size; -create: - size = get_path_free_size(path, NULL); - if (!size) - goto broken_path; - if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) { - sd_err("%s, %m", path); - goto broken_path; - } - return size; -broken_path: - return 0; -} - -/* We don't need lock at init stage */ -bool md_add_disk(const char *path, bool purge) -{ - struct disk *new; - - if (path_to_disk(path)) { - sd_err("duplicate path %s", path); - return false; - } - - if (xmkdir(path, sd_def_dmode) < 0) { - sd_err("can't mkdir for %s, %m", path); - return false; - } - - new = xmalloc(sizeof(*new)); - pstrcpy(new->path, PATH_MAX, path); - trim_last_slash(new->path); - new->space = init_path_space(new->path, purge); - if (!new->space) { - free(new); - return false; - } - - create_vdisks(new); - rb_insert(&md.root, new, rb, disk_cmp); - md.space += new->space; - md.nr_disks++; - - sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new), - md.nr_disks); - return true; -} - -static inline void md_remove_disk(struct disk *disk) -{ - sd_info("%s from multi-disk array", disk->path); - rb_erase(&disk->rb, &md.root); - md.nr_disks--; - remove_vdisks(disk); - free(disk); -} - -uint64_t md_init_space(void) -{ - return md.space; -} - -static const char *md_get_object_dir_nolock(uint64_t oid) -{ - const struct vdisk *vd; - - if (unlikely(md.nr_disks == 0)) - return NONE_EXIST_PATH; /* To generate EIO */ - - vd = oid_to_vdisk(oid); - return vd->disk->path; -} - -const char *md_get_object_dir(uint64_t oid) -{ - const char *p; - - sd_read_lock(&md.lock); - p = md_get_object_dir_nolock(oid); - sd_rw_unlock(&md.lock); - - return p; -} - -struct process_path_arg { - const char *path; - struct vnode_info *vinfo; - int (*func)(uint64_t oid, const char *, uint32_t, uint8_t, - struct vnode_info *, void *arg); - bool cleanup; - void *opaque; - int result; -}; - -static void *thread_process_path(void *arg) -{ - int ret; - struct process_path_arg *parg = (struct process_path_arg *)arg; - - ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup, - parg->vinfo, parg->opaque); - if (ret != SD_RES_SUCCESS) - parg->result = ret; - - return arg; -} - -main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path, - uint32_t epoch, uint8_t ec_index, - struct vnode_info *vinfo, void *arg), - bool cleanup, void *arg) -{ - int ret = SD_RES_SUCCESS; - const struct disk *disk; - struct process_path_arg *thread_args, *path_arg; - struct vnode_info *vinfo; - void *ret_arg; - sd_thread_t *thread_array; - int nr_thread = 0, idx = 0; - - sd_read_lock(&md.lock); - - rb_for_each_entry(disk, &md.root, rb) { - nr_thread++; - } - - thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg)); - thread_array = xmalloc(nr_thread * sizeof(sd_thread_t)); - - vinfo = get_vnode_info(); - - rb_for_each_entry(disk, &md.root, rb) { - thread_args[idx].path = disk->path; - thread_args[idx].vinfo = vinfo; - thread_args[idx].func = func; - thread_args[idx].cleanup = cleanup; - thread_args[idx].opaque = arg; - thread_args[idx].result = SD_RES_SUCCESS; - ret = sd_thread_create_with_idx("foreach wd", - thread_array + idx, - thread_process_path, - (void *)(thread_args + idx)); - if (ret) { - /* - * If we can't create enough threads to process - * files, the data-consistent will be broken if - * we continued. - */ - panic("Failed to create thread for path %s", - disk->path); - } - idx++; - } - - sd_debug("Create %d threads for all path", nr_thread); - /* wait for all threads to exit */ - for (idx = 0; idx < nr_thread; idx++) { - ret = sd_thread_join(thread_array[idx], &ret_arg); - if (ret) - sd_err("Failed to join thread"); - if (ret_arg) { - path_arg = (struct process_path_arg *)ret_arg; - if (path_arg->result != SD_RES_SUCCESS) - sd_err("%s, %s", path_arg->path, - sd_strerror(path_arg->result)); - } - } - - put_vnode_info(vinfo); - sd_rw_unlock(&md.lock); - - free(thread_args); - free(thread_array); - return ret; -} - -int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path, - uint32_t epoch, uint8_t, - struct vnode_info *, void *arg), - void *arg) -{ - int ret = SD_RES_SUCCESS; - char path[PATH_MAX]; - const struct disk *disk; - - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - snprintf(path, sizeof(path), "%s/.stale", disk->path); - ret = for_each_object_in_path(path, func, false, NULL, arg); - if (ret != SD_RES_SUCCESS) - break; - } - sd_rw_unlock(&md.lock); - return ret; -} - - -int for_each_obj_path(int (*func)(const char *path)) -{ - int ret = SD_RES_SUCCESS; - const struct disk *disk; - - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - ret = func(disk->path); - if (ret != SD_RES_SUCCESS) - break; - } - sd_rw_unlock(&md.lock); - return ret; -} - -struct md_work { - struct work work; - char path[PATH_MAX]; -}; - -static inline void kick_recover(void) -{ - struct vnode_info *vinfo = get_vnode_info(); - - if (is_cluster_diskmode(&sys->cinfo)) - sys->cdrv->update_node(&sys->this_node); - else { - start_recovery(vinfo, vinfo, false); - put_vnode_info(vinfo); - } -} - -static void md_do_recover(struct work *work) -{ - struct md_work *mw = container_of(work, struct md_work, work); - struct disk *disk; - int nr = 0; - - sd_write_lock(&md.lock); - disk = path_to_disk(mw->path); - if (!disk) - /* Just ignore the duplicate EIO of the same path */ - goto out; - md_remove_disk(disk); - nr = md.nr_disks; -out: - sd_rw_unlock(&md.lock); - - if (disk) { - if (nr > 0) { - update_node_disks(); - kick_recover(); - } else { - leave_cluster(); - } - } - - free(mw); -} - -int md_handle_eio(const char *fault_path) -{ - struct md_work *mw; - - if (nr_online_disks() == 0) - return SD_RES_EIO; - - mw = xzalloc(sizeof(*mw)); - mw->work.done = md_do_recover; - pstrcpy(mw->path, PATH_MAX, fault_path); - queue_work(sys->md_wqueue, &mw->work); - - /* Fool the requester to retry */ - return SD_RES_NETWORK_ERROR; -} - -static inline bool md_access(const char *path) -{ - if (access(path, R_OK | W_OK) < 0) { - if (unlikely(errno != ENOENT)) - sd_err("failed to check %s, %m", path); - return false; - } - - return true; -} - -static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, - const char *path, char *old, char *new) -{ - if (!epoch) { - if (!is_erasure_oid(oid)) { - snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid); - snprintf(new, PATH_MAX, "%s/%016" PRIx64, - md_get_object_dir_nolock(oid), oid); - } else { - snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path, - oid, ec_index); - snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d", - md_get_object_dir_nolock(oid), oid, ec_index); - } - } else { - if (!is_erasure_oid(oid)) { - snprintf(old, PATH_MAX, - "%s/.stale/%016"PRIx64".%"PRIu32, path, - oid, epoch); - snprintf(new, PATH_MAX, - "%s/.stale/%016"PRIx64".%"PRIu32, - md_get_object_dir_nolock(oid), oid, epoch); - } else { - snprintf(old, PATH_MAX, - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path, - oid, ec_index, epoch); - snprintf(new, PATH_MAX, - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, - md_get_object_dir_nolock(oid), - oid, ec_index ,epoch); - } - } - - if (!md_access(old)) - return -1; - - return 0; -} - -static int md_move_object(uint64_t oid, const char *old, const char *new) -{ - struct strbuf buf = STRBUF_INIT; - int fd, ret = -1; - size_t sz = get_store_objsize(oid); - - fd = open(old, O_RDONLY); - if (fd < 0) { - sd_err("failed to open %s", old); - goto out; - } - - ret = strbuf_read(&buf, fd, sz); - if (ret != sz) { - sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret); - ret = -1; - goto out_close; - } - - if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) { - if (errno != EEXIST) { - sd_err("failed to create %s", new); - ret = -1; - goto out_close; - } - } - unlink(old); - ret = 0; -out_close: - close(fd); -out: - strbuf_release(&buf); - return ret; -} - -static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index, - const char *path) -{ - char old[PATH_MAX], new[PATH_MAX]; - - if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0) - return SD_RES_EIO; - /* - * Recovery thread and main thread might try to recover the same object. - * Either one succeeds, the other will fail and proceed and end up - * trying to move the object to where it is already in place, in this - * case we simply return. - */ - if (!strcmp(old, new)) - return SD_RES_SUCCESS; - - /* We can't use rename(2) across device */ - if (md_move_object(oid, old, new) < 0) { - sd_err("move old %s to new %s failed", old, new); - return SD_RES_EIO; - } - - sd_debug("from %s to %s", old, new); - return SD_RES_SUCCESS; -} - -static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index) -{ - int ret = SD_RES_EIO; - const struct disk *disk; - - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - ret = md_check_and_move(oid, epoch, ec_index, disk->path); - if (ret == SD_RES_SUCCESS) - break; - } - sd_rw_unlock(&md.lock); - return ret; -} - -bool md_exist(uint64_t oid, uint8_t ec_index, char *path) -{ - if (md_access(path)) - return true; - /* - * We have to iterate the WD because we don't have epoch-like history - * track to locate the objects for multiple disk failure. Simply do - * hard iteration simplify the code a lot. - */ - if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS) - return true; - - return false; -} - -int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, - char *path) -{ - if (unlikely(!epoch)) - panic("invalid 0 epoch"); - - if (is_erasure_oid(oid)) { - if (unlikely(ec_index >= SD_MAX_COPIES)) - panic("invalid ec index %d", ec_index); - - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32, - md_get_object_dir(oid), oid, ec_index, epoch); - } else - snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, - md_get_object_dir(oid), oid, epoch); - - if (md_access(path)) - return SD_RES_SUCCESS; - - if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS) - return SD_RES_SUCCESS; - - return SD_RES_NO_OBJ; -} - -uint32_t md_get_info(struct sd_md_info *info) -{ - uint32_t ret = sizeof(*info); - const struct disk *disk; - int i = 0; - - memset(info, 0, ret); - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - info->disk[i].idx = i; - pstrcpy(info->disk[i].path, PATH_MAX, disk->path); - /* FIXME: better handling failure case. */ - info->disk[i].free = get_path_free_size(info->disk[i].path, - &info->disk[i].used); - i++; - } - info->nr = md.nr_disks; - sd_rw_unlock(&md.lock); - return ret; -} - -static inline void md_del_disk(const char *path) -{ - struct disk *disk = path_to_disk(path); - - if (!disk) { - sd_err("invalid path %s", path); - return; - } - md_remove_disk(disk); -} - -#ifdef HAVE_DISKVNODES -void update_node_disks(void) -{ - const struct disk *disk; - int i = 0; - bool rb_empty = false; - - if (!sys) - return; - - memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX); - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - sys->this_node.disks[i].disk_id = - sd_hash(disk->path, strlen(disk->path)); - sys->this_node.disks[i].disk_space = disk->space; - i++; - } - sd_rw_unlock(&md.lock); - - if (RB_EMPTY_ROOT(&md.vroot)) - rb_empty = true; - sd_write_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - if (!rb_empty) - remove_vdisks(disk); - create_vdisks(disk); - } - sd_rw_unlock(&md.lock); -} -#else -void update_node_disks(void) -{ -} -#endif - -static int do_plug_unplug(char *disks, bool plug) -{ - const char *path; - int old_nr, ret = SD_RES_UNKNOWN; - - sd_write_lock(&md.lock); - old_nr = md.nr_disks; - path = strtok(disks, ","); - do { - if (plug) { - if (!md_add_disk(path, true)) - sd_err("failed to add %s", path); - } else { - md_del_disk(path); - } - } while ((path = strtok(NULL, ","))); - - /* If no disks change, bail out */ - if (old_nr == md.nr_disks) - goto out; - - ret = SD_RES_SUCCESS; -out: - sd_rw_unlock(&md.lock); - - if (ret == SD_RES_SUCCESS) { - update_node_disks(); - kick_recover(); - } - - return ret; -} - -int md_plug_disks(char *disks) -{ - return do_plug_unplug(disks, true); -} - -int md_unplug_disks(char *disks) -{ - return do_plug_unplug(disks, false); -} - -uint64_t md_get_size(uint64_t *used) -{ - uint64_t fsize = 0; - const struct disk *disk; - - *used = 0; - sd_read_lock(&md.lock); - rb_for_each_entry(disk, &md.root, rb) { - fsize += get_path_free_size(disk->path, used); - } - sd_rw_unlock(&md.lock); - - return fsize + *used; -} - -uint32_t md_nr_disks(void) -{ - return nr_online_disks(); -} diff --git a/sheep/plain_store.c b/sheep/plain_store.c deleted file mode 100644 index efbf129..0000000 --- a/sheep/plain_store.c +++ /dev/null @@ -1,759 +0,0 @@ -/* - * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License version - * 2 as published by the Free Software Foundation. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include <libgen.h> -#include <linux/falloc.h> - -#include "sheep_priv.h" - -#ifndef FALLOC_FL_PUNCH_HOLE -#define FALLOC_FL_PUNCH_HOLE 0x02 -#endif - -#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; }) - -static inline bool iocb_is_aligned(const struct siocb *iocb) -{ - return sector_algined(iocb->offset) && sector_algined(iocb->length); -} - -static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create) -{ - int syncflag = create ? O_SYNC : O_DSYNC; - int flags = syncflag | O_RDWR; - - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) - flags &= ~syncflag; - - if (sys->backend_dio && iocb_is_aligned(iocb)) { - if (!is_aligned_to_pagesize(iocb->buf)) - panic("Memory isn't aligned to pagesize %p", iocb->buf); - flags |= O_DIRECT; - } - - if (create) - flags |= O_CREAT | O_EXCL; - - return flags; -} - -static int get_store_path(uint64_t oid, uint8_t ec_index, char *path) -{ - if (is_erasure_oid(oid)) { - if (unlikely(ec_index >= SD_MAX_COPIES)) - panic("invalid ec_index %d", ec_index); - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", - md_get_object_dir(oid), oid, ec_index); - } - - return snprintf(path, PATH_MAX, "%s/%016" PRIx64, - md_get_object_dir(oid), oid); -} - -static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path) -{ - if (is_erasure_oid(oid)) { - if (unlikely(ec_index >= SD_MAX_COPIES)) - panic("invalid ec_index %d", ec_index); - return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp", - md_get_object_dir(oid), oid, ec_index); - } - - return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp", - md_get_object_dir(oid), oid); -} - -static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, - char *path) -{ - return md_get_stale_path(oid, epoch, ec_index, path); -} - -/* - * Check if oid is in this nodes (if oid is in the wrong place, it will be moved - * to the correct one after this call in a MD setup. - */ -bool default_exist(uint64_t oid, uint8_t ec_index) -{ - char path[PATH_MAX]; - - get_store_path(oid, ec_index, path); - - return md_exist(oid, ec_index, path); -} - -static int err_to_sderr(const char *path, uint64_t oid, int err) -{ - struct stat s; - char p[PATH_MAX], *dir; - - /* Use a temporary buffer since dirname() may modify its argument. */ - pstrcpy(p, sizeof(p), path); - dir = dirname(p); - - sd_debug("%s", path); - switch (err) { - case ENOENT: - if (stat(dir, &s) < 0) { - sd_err("%s corrupted", dir); - return md_handle_eio(dir); - } - sd_debug("object %016" PRIx64 " not found locally", oid); - return SD_RES_NO_OBJ; - case ENOSPC: - /* TODO: stop automatic recovery */ - sd_err("diskfull, oid=%"PRIx64, oid); - return SD_RES_NO_SPACE; - case EMFILE: - case ENFILE: - case EINTR: - case EAGAIN: - case EEXIST: - sd_err("%m, oid=%"PRIx64, oid); - /* make gateway try again */ - return SD_RES_NETWORK_ERROR; - default: - sd_err("oid=%"PRIx64", %m", oid); - return md_handle_eio(dir); - } -} - -static int discard(int fd, uint64_t start, uint32_t end) -{ - int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, - start, end - start); - if (ret < 0) { - if (errno == ENOSYS || errno == EOPNOTSUPP) - sd_info("FALLOC_FL_PUNCH_HOLE is not supported " - "on this filesystem"); - else - sd_err("failed to discard object, %m"); - } - - return ret; -} - -/* Trim zero blocks of the beginning and end of the object. */ -static int default_trim(int fd, uint64_t oid, const struct siocb *iocb, - uint64_t *poffset, uint32_t *plen) -{ - trim_zero_blocks(iocb->buf, poffset, plen); - - if (iocb->offset < *poffset) { - sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset, - *poffset, oid); - - if (discard(fd, iocb->offset, *poffset) < 0) - return -1; - } - - if (*poffset + *plen < iocb->offset + iocb->length) { - uint64_t end = iocb->offset + iocb->length; - uint32_t object_size = get_vdi_object_size(oid_to_vid(oid)); - if (end == get_objsize(oid, object_size)) - /* This is necessary to punch the last block */ - end = round_up(end, BLOCK_SIZE); - sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen, - end, oid); - - if (discard(fd, *poffset + *plen, end) < 0) - return -1; - } - - return 0; -} - -int default_write(uint64_t oid, const struct siocb *iocb) -{ - int flags = prepare_iocb(oid, iocb, false), fd, - ret = SD_RES_SUCCESS; - char path[PATH_MAX]; - ssize_t size; - uint32_t len = iocb->length; - uint64_t offset = iocb->offset; - static bool trim_is_supported = true; - - if (iocb->epoch < sys_epoch()) { - sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch()); - return SD_RES_OLD_NODE_VER; - } - - if (uatomic_is_true(&sys->use_journal) && - unlikely(journal_write_store(oid, iocb->buf, iocb->length, - iocb->offset, false)) - != SD_RES_SUCCESS) { - sd_err("turn off journaling"); - uatomic_set_false(&sys->use_journal); - flags |= O_DSYNC; - sync(); - } - - get_store_path(oid, iocb->ec_index, path); - - /* - * Make sure oid is in the right place because oid might be misplaced - * in a wrong place, due to 'shutdown/restart with less/more disks' or - * any bugs. We need call err_to_sderr() to return EIO if disk is broken - */ - if (!default_exist(oid, iocb->ec_index)) - return err_to_sderr(path, oid, ENOENT); - - fd = open(path, flags, sd_def_fmode); - if (unlikely(fd < 0)) - return err_to_sderr(path, oid, errno); - - if (trim_is_supported && is_sparse_object(oid)) { - if (default_trim(fd, oid, iocb, &offset, &len) < 0) { - trim_is_supported = false; - offset = iocb->offset; - len = iocb->length; - } - } - - size = xpwrite(fd, iocb->buf, len, offset); - if (unlikely(size != len)) { - sd_err("failed to write object %"PRIx64", path=%s, offset=%" - PRId32", size=%"PRId32", result=%zd, %m", oid, path, - iocb->offset, iocb->length, size); - ret = err_to_sderr(path, oid, errno); - goto out; - } -out: - close(fd); - return ret; -} - -static int make_stale_dir(const char *path) -{ - char p[PATH_MAX]; - - snprintf(p, PATH_MAX, "%s/.stale", path); - if (xmkdir(p, sd_def_dmode) < 0) { - sd_err("%s failed, %m", p); - return SD_RES_EIO; - } - return SD_RES_SUCCESS; -} - -static int purge_dir(const char *path) -{ - if (purge_directory(path) < 0) - return SD_RES_EIO; - - return SD_RES_SUCCESS; -} - -static int purge_stale_dir(const char *path) -{ - char p[PATH_MAX]; - - snprintf(p, PATH_MAX, "%s/.stale", path); - - if (purge_directory_async(p) < 0) - return SD_RES_EIO; - - return SD_RES_SUCCESS; -} - -int default_cleanup(void) -{ - int ret; - - ret = for_each_obj_path(purge_stale_dir); - if (ret != SD_RES_SUCCESS) - return ret; - - return SD_RES_SUCCESS; -} - -static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch) -{ - int ret; - struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE); - struct siocb iocb = { - .epoch = epoch, - .buf = inode, - .length = SD_INODE_HEADER_SIZE, - }; - - ret = default_read(oid, &iocb); - if (ret != SD_RES_SUCCESS) { - sd_err("failed to read inode header %" PRIx64 " %" PRId32 - "wat %s", oid, epoch, wd); - goto out; - } - add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies, - vdi_is_snapshot(inode), inode->copy_policy, - inode->block_size_shift, inode->parent_vdi_id); - - if (inode->name[0] == '\0') - atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted); - - atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse); - - ret = SD_RES_SUCCESS; -out: - free(inode); - return ret; -} - -static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd, - uint32_t epoch, uint8_t ec_index, - struct vnode_info *vinfo, - void *arg) -{ - int ret; - objlist_cache_insert(oid); - - if (is_vdi_obj(oid)) { - sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32 - " at %s", oid, epoch, wd); - ret = init_vdi_state(oid, wd, epoch); - if (ret != SD_RES_SUCCESS) - return ret; - } - return SD_RES_SUCCESS; -} - -int default_init(void) -{ - int ret; - - sd_debug("use plain store driver"); - ret = for_each_obj_path(make_stale_dir); - if (ret != SD_RES_SUCCESS) - return ret; - - for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL); - - return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL); -} - -static int default_read_from_path(uint64_t oid, const char *path, - const struct siocb *iocb) -{ - int flags = prepare_iocb(oid, iocb, false), fd, - ret = SD_RES_SUCCESS; - ssize_t size; - - /* - * Make sure oid is in the right place because oid might be misplaced - * in a wrong place, due to 'shutdown/restart with less disks' or any - * bugs. We need call err_to_sderr() to return EIO if disk is broken. - * - * For stale path, get_store_stale_path already does default_exist job. - */ - if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index)) - return err_to_sderr(path, oid, ENOENT); - - fd = open(path, flags); - if (fd < 0) - return err_to_sderr(path, oid, errno); - - size = xpread(fd, iocb->buf, iocb->length, iocb->offset); - if (size < 0) { - sd_err("failed to read object %"PRIx64", path=%s, offset=%" - PRId32", size=%"PRId32", result=%zd, %m", oid, path, - iocb->offset, iocb->length, size); - ret = err_to_sderr(path, oid, errno); - } - close(fd); - return ret; -} - -int default_read(uint64_t oid, const struct siocb *iocb) -{ - int ret; - char path[PATH_MAX]; - - get_store_path(oid, iocb->ec_index, path); - ret = default_read_from_path(oid, path, iocb); - - /* - * If the request is against the older epoch, try to read from - * the stale directory - */ - if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 && - iocb->epoch < sys_epoch()) { - get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path); - ret = default_read_from_path(oid, path, iocb); - } - - return ret; -} - -int default_create_and_write(uint64_t oid, const struct siocb *iocb) -{ - char path[PATH_MAX], tmp_path[PATH_MAX], *dir; - int flags = prepare_iocb(oid, iocb, true); - int ret, fd; - uint32_t len = iocb->length; - uint32_t object_size = 0; - size_t obj_size; - uint64_t offset = iocb->offset; - - sd_debug("%"PRIx64, oid); - get_store_path(oid, iocb->ec_index, path); - get_store_tmp_path(oid, iocb->ec_index, tmp_path); - - if (uatomic_is_true(&sys->use_journal) && - journal_write_store(oid, iocb->buf, iocb->length, - iocb->offset, true) - != SD_RES_SUCCESS) { - sd_err("turn off journaling"); - uatomic_set_false(&sys->use_journal); - flags |= O_SYNC; - sync(); - } - - fd = open(tmp_path, flags, sd_def_fmode); - if (fd < 0) { - if (errno == EEXIST) { - /* - * This happens if node membership changes during object - * creation; while gateway retries a CREATE request, - * recovery process could also recover the object at the - * same time. They should try to write the same date, - * so it is okay to simply return success here. - */ - sd_debug("%s exists", tmp_path); - return SD_RES_SUCCESS; - } - - sd_err("failed to open %s: %m", tmp_path); - return err_to_sderr(path, oid, errno); - } - - obj_size = get_store_objsize(oid); - - trim_zero_blocks(iocb->buf, &offset, &len); - - object_size = get_vdi_object_size(oid_to_vid(oid)); - - if (offset != 0 || len != get_objsize(oid, object_size)) { - if (is_sparse_object(oid)) - ret = xftruncate(fd, obj_size); - else - ret = prealloc(fd, obj_size); - if (ret < 0) { - ret = err_to_sderr(path, oid, errno); - goto out; - } - } - - ret = xpwrite(fd, iocb->buf, len, offset); - if (ret != len) { - sd_err("failed to write object. %m"); - ret = err_to_sderr(path, oid, errno); - goto out; - } - - ret = rename(tmp_path, path); - if (ret < 0) { - sd_err("failed to rename %s to %s: %m", tmp_path, path); - ret = err_to_sderr(path, oid, errno); - goto out; - } - - close(fd); - - if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) { - objlist_cache_insert(oid); - return SD_RES_SUCCESS; - } - - pstrcpy(tmp_path, sizeof(tmp_path), path); - dir = dirname(tmp_path); - fd = open(dir, O_DIRECTORY | O_RDONLY); - if (fd < 0) { - sd_err("failed to open directory %s: %m", dir); - return err_to_sderr(path, oid, errno); - } - - if (fsync(fd) != 0) { - sd_err("failed to write directory %s: %m", dir); - ret = err_to_sderr(path, oid, errno); - close(fd); - if (unlink(path) != 0) - sd_err("failed to unlink %s: %m", path); - return ret; - } - close(fd); - objlist_cache_insert(oid); - return SD_RES_SUCCESS; - -out: - if (unlink(tmp_path) != 0) - sd_err("failed to unlink %s: %m", tmp_path); - close(fd); - return ret; -} - -int default_link(uint64_t oid, uint32_t tgt_epoch) -{ - char path[PATH_MAX], stale_path[PATH_MAX]; - - sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid, - tgt_epoch); - - snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid); - get_store_stale_path(oid, tgt_epoch, 0, stale_path); - - if (link(stale_path, path) < 0) { - /* - * Recovery thread and main thread might try to recover the - * same object and we might get EEXIST in such case. - */ - if (errno == EEXIST) - goto out; - - sd_debug("failed to link from %s to %s, %m", stale_path, path); - return err_to_sderr(path, oid, errno); - } -out: - return SD_RES_SUCCESS; -} - -/* - * For replicated object, if any of the replica belongs to this node, we - * consider it not stale. - * - * For erasure coded object, since every copy is unique and if it migrates to - * other node(index gets changed even it has some other copy belongs to it) - * because of hash ring changes, we consider it stale. - */ -static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo) -{ - uint32_t i, nr_copies; - const struct sd_vnode *v; - bool ret = true; - const struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; - - nr_copies = get_obj_copy_number(oid, vinfo->nr_zones); - oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes); - for (i = 0; i < nr_copies; i++) { - v = obj_vnodes[i]; - if (vnode_is_local(v)) { - if (ec_index < SD_MAX_COPIES) { - if (i == ec_index) - ret = false; - } else { - ret = false; - } - break; - } - } - - return ret; -} - -static int move_object_to_stale_dir(uint64_t oid, const char *wd, - uint32_t epoch, uint8_t ec_index, - struct vnode_info *vinfo, void *arg) -{ - char path[PATH_MAX], stale_path[PATH_MAX]; - uint32_t tgt_epoch = *(uint32_t *)arg; - - /* ec_index from md.c is reliable so we can directly use it */ - if (ec_index < SD_MAX_COPIES) { - snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", - md_get_object_dir(oid), oid, ec_index); - snprintf(stale_path, PATH_MAX, - "%s/.stale/%016"PRIx64"_%d.%"PRIu32, - md_get_object_dir(oid), oid, ec_index, tgt_epoch); - } else { - snprintf(path, PATH_MAX, "%s/%016" PRIx64, - md_get_object_dir(oid), oid); - snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, - md_get_object_dir(oid), oid, tgt_epoch); - } - - if (unlikely(rename(path, stale_path)) < 0) { - sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid, - path); - return SD_RES_EIO; - } - - sd_debug("moved object %"PRIx64, oid); - return SD_RES_SUCCESS; -} - -static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch, - uint8_t ec_index, struct vnode_info *vinfo, - void *arg) -{ - if (oid_stale(oid, ec_index, vinfo)) - return move_object_to_stale_dir(oid, wd, 0, ec_index, - NULL, arg); - - return SD_RES_SUCCESS; -} - -int default_update_epoch(uint32_t epoch) -{ - sd_assert(epoch); - return for_each_object_in_wd(check_stale_objects, false, &epoch); -} - -int default_format(void) -{ - unsigned ret; - - sd_debug("try get a clean store"); - ret = for_each_obj_path(purge_dir); - if (ret != SD_RES_SUCCESS) - return ret; - - if (sys->enable_object_cache) - object_cache_format(); - - return SD_RES_SUCCESS; -} - -int default_remove_object(uint64_t oid, uint8_t ec_index) -{ - char path[PATH_MAX]; - - if (uatomic_is_true(&sys->use_journal)) - journal_remove_object(oid); - - get_store_path(oid, ec_index, path); - - if (unlink(path) < 0) { - if (errno == ENOENT) - return SD_RES_NO_OBJ; - - sd_err("failed, %s, %m", path); - return SD_RES_EIO; - } - - return SD_RES_SUCCESS; -} - -#define SHA1NAME "user.obj.sha1" - -static int get_object_sha1(const char *path, uint8_t *sha1) -{ - if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE) - != SHA1_DIGEST_SIZE) { - if (errno == ENODATA) - sd_debug("sha1 is not cached yet, %s", path); - else - sd_err("fail to get xattr, %s", path); - return -1; - } - - return 0; -} - -static int set_object_sha1(const char *path, const uint8_t *sha1) -{ - int ret; - - ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0); - if (ret < 0) - sd_err("fail to set sha1, %s", path); - - return ret; -} - -static int get_object_path(uint64_t oid, uint32_t epoch, char *path, - size_t size) -{ - if (default_exist(oid, 0)) { - snprintf(path, PATH_MAX, "%s/%016"PRIx64, - md_get_object_dir(oid), oid); - } else { - get_store_stale_path(oid, epoch, 0, path); - if (access(path, F_OK) < 0) { - if (errno == ENOENT) - return SD_RES_NO_OBJ; - return SD_RES_EIO; - } - - } - - return SD_RES_SUCCESS; -} - -int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1) -{ - int ret; - void *buf; - struct siocb iocb = {}; - uint32_t length; - bool is_readonly_obj = oid_is_readonly(oid); - char path[PATH_MAX]; - - ret = get_object_path(oid, epoch, path, sizeof(path)); - if (ret != SD_RES_SUCCESS) - return ret; - - if (is_readonly_obj) { - if (get_object_sha1(path, sha1) == 0) { - sd_debug("use cached sha1 digest %s", - sha1_to_hex(sha1)); - return SD_RES_SUCCESS; - } - } - - length = get_store_objsize(oid); - buf = valloc(length); - if (buf == NULL) - return SD_RES_NO_MEM; - - iocb.epoch = epoch; - iocb.buf = buf; - iocb.length = length; - - ret = default_read_from_path(oid, path, &iocb); - if (ret != SD_RES_SUCCESS) { - free(buf); - return ret; - } - - get_buffer_sha1(buf, length, sha1); - free(buf); - - sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid, - epoch, sha1_to_hex(sha1)); - - if (is_readonly_obj) - set_object_sha1(path, sha1); - - return ret; -} - -int default_purge_obj(void) -{ - uint32_t tgt_epoch = get_latest_epoch(); - - return for_each_object_in_wd(move_object_to_stale_dir, true, - &tgt_epoch); -} - -static struct store_driver plain_store = { - .name = "plain", - .init = default_init, - .exist = default_exist, - .create_and_write = default_create_and_write, - .write = default_write, - .read = default_read, - .link = default_link, - .update_epoch = default_update_epoch, - .cleanup = default_cleanup, - .format = default_format, - .remove_object = default_remove_object, - .get_hash = default_get_hash, - .purge_obj = default_purge_obj, -}; - -add_store_driver(plain_store); diff --git a/sheep/store.c b/sheep/store.c deleted file mode 100644 index 8843fb8..0000000 --- a/sheep/store.c +++ /dev/null @@ -1,511 +0,0 @@ -/* - * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License version - * 2 as published by the Free Software Foundation. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "sheep_priv.h" - -char *obj_path; -char *epoch_path; - -struct store_driver *sd_store; -LIST_HEAD(store_drivers); - -int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes) -{ - int ret, len, nodes_len; - time_t t; - char path[PATH_MAX], *buf; - - sd_debug("update epoch: %d, %zu", epoch, nr_nodes); - - /* Piggyback the epoch creation time for 'dog cluster info' */ - time(&t); - nodes_len = nr_nodes * sizeof(struct sd_node); - len = nodes_len + sizeof(time_t); - buf = xmalloc(len); - memcpy(buf, nodes, nodes_len); - memcpy(buf + nodes_len, &t, sizeof(time_t)); - - /* - * rb field is unused in epoch file, zero-filling it - * is good for epoch file recovery because it is unified - */ - for (int i = 0; i < nr_nodes; i++) - memset(buf + i * sizeof(struct sd_node) - + offsetof(struct sd_node, rb), - 0, sizeof(struct rb_node)); - - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); - - ret = atomic_create_and_write(path, buf, len, true); - - free(buf); - return ret; -} - -static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len, - int *nr_nodes, time_t *timestamp) -{ - int fd, ret, buf_len; - char path[PATH_MAX]; - struct stat epoch_stat; - - snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); - fd = open(path, O_RDONLY); - if (fd < 0) { - sd_debug("failed to open epoch %"PRIu32" log, %m", epoch); - goto err; - } - - memset(&epoch_stat, 0, sizeof(epoch_stat)); - ret = fstat(fd, &epoch_stat); - if (ret < 0) { - sd_err("failed to stat epoch %"PRIu32" log, %m", epoch); - goto err; - } - - buf_len = epoch_stat.st_size - sizeof(*timestamp); - if (buf_len < 0) { - sd_err("invalid epoch %"PRIu32" log", epoch); - goto err; - } - if (len < buf_len) { - close(fd); - return SD_RES_BUFFER_SMALL; - } - - ret = xread(fd, nodes, buf_len); - if (ret < 0) { - sd_err("failed to read epoch %"PRIu32" log, %m", epoch); - goto err; - } - - /* Broken epoch, just ignore */ - if (ret % sizeof(struct sd_node) != 0) { - sd_err("invalid epoch %"PRIu32" log", epoch); - goto err; - } - - *nr_nodes = ret / sizeof(struct sd_node); - - if (timestamp) { - ret = xread(fd, timestamp, sizeof(*timestamp)); - if (ret != sizeof(*timestamp)) { - sd_err("invalid epoch %"PRIu32" log", epoch); - goto err; - } - } - - close(fd); - return SD_RES_SUCCESS; -err: - if (fd >= 0) - close(fd); - return SD_RES_NO_TAG; -} - -int epoch_log_read(uint32_t epoch, struct sd_node *nodes, - int len, int *nr_nodes) -{ - return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL); -} - -int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes, - int len, int *nr_nodes, time_t *timestamp) -{ - return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp); -} - -uint32_t get_latest_epoch(void) -{ - DIR *dir; - struct dirent *d; - uint32_t e, epoch = 0; - char *p; - - dir = opendir(epoch_path); - if (!dir) - panic("failed to get the latest epoch: %m"); - - while ((d = readdir(dir))) { - e = strtol(d->d_name, &p, 10); - if (d->d_name == p) - continue; - - if (strlen(d->d_name) != 8) - continue; - - if (e > epoch) - epoch = e; - } - closedir(dir); - - return epoch; -} - -int lock_base_dir(const char *d) -{ -#define LOCK_PATH "/lock" - char *lock_path; - int ret = 0; - int fd, len = strlen(d) + strlen(LOCK_PATH) + 1; - - lock_path = xzalloc(len); - snprintf(lock_path, len, "%s" LOCK_PATH, d); - - fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode); - if (fd < 0) { - sd_err("failed to open lock file %s (%m)", lock_path); - ret = -1; - goto out; - } - - if (lockf(fd, F_TLOCK, 1) < 0) { - if (errno == EACCES || errno == EAGAIN) - sd_err("another sheep daemon is using %s", d); - else - sd_err("unable to get base dir lock (%m)"); - ret = -1; - goto out; - } - -out: - free(lock_path); - return ret; -} - -int init_base_path(const char *d) -{ - if (xmkdir(d, sd_def_dmode) < 0) { - sd_err("cannot create the directory %s (%m)", d); - return -1; - } - - return 0; -} - -static inline int check_path_len(const char *path) -{ - int len = strlen(path); - if (len > PATH_MAX) { - sd_err("insanely long object directory %s", path); - return -1; - } - - return 0; -} - -static int is_meta_store(const char *path) -{ - char conf[PATH_MAX]; - char epoch[PATH_MAX]; - - snprintf(conf, PATH_MAX, "%s/config", path); - snprintf(epoch, PATH_MAX, "%s/epoch", path); - if (!access(conf, R_OK) && !access(epoch, R_OK)) - return true; - - return false; -} - -static int init_obj_path(const char *base_path, char *argp) -{ - char *p; - int len; - - if (check_path_len(base_path) < 0) - return -1; - -#define OBJ_PATH "/obj" - len = strlen(base_path) + strlen(OBJ_PATH) + 1; - obj_path = xzalloc(len); - snprintf(obj_path, len, "%s" OBJ_PATH, base_path); - - /* Eat up the first component */ - strtok(argp, ","); - p = strtok(NULL, ","); - if (!p) { - /* - * If We have only one path, meta-store and object-store share - * it. This is helpful to upgrade old sheep cluster to - * the MD-enabled. - */ - md_add_disk(obj_path, false); - } else { - do { - if (is_meta_store(p)) { - sd_err("%s is meta-store, abort", p); - return -1; - } - md_add_disk(p, false); - } while ((p = strtok(NULL, ","))); - } - - if (md_nr_disks() <= 0) { - sd_err("There isn't any available disk!"); - return -1; - } - - return xmkdir(obj_path, sd_def_dmode); -} - -static int init_epoch_path(const char *base_path) -{ -#define EPOCH_PATH "/epoch/" - int len = strlen(base_path) + strlen(EPOCH_PATH) + 1; - epoch_path = xzalloc(len); - snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path); - - return xmkdir(epoch_path, sd_def_dmode); -} - -/* - * If the node is gateway, this function only finds the store driver. - * Otherwise, this function initializes the backend store - */ -int init_store_driver(bool is_gateway) -{ - char driver_name[STORE_LEN], *p; - - pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store); - - p = memchr(driver_name, '\0', STORE_LEN); - if (!p) { - /* - * If the driver name is not NUL terminated we are in deep - * trouble, let's get out here. - */ - sd_debug("store name not NUL terminated"); - return SD_RES_NO_STORE; - } - - /* - * The store file might not exist in case this is a new sheep that - * never joined a cluster before. - */ - if (p == driver_name) - return 0; - - sd_store = find_store_driver(driver_name); - if (!sd_store) { - sd_debug("store %s not found", driver_name); - return SD_RES_NO_STORE; - } - - if (is_gateway) - return SD_RES_SUCCESS; - - return sd_store->init(); -} - -int init_disk_space(const char *base_path) -{ - int ret = SD_RES_SUCCESS; - uint64_t space_size = 0, mds; - struct statvfs fs; - - if (sys->gateway_only) - goto out; - - /* We need to init md even we don't need to update sapce */ - mds = md_init_space(); - - /* If it is restarted */ - ret = get_node_space(&space_size); - if (space_size != 0) { - sys->disk_space = space_size; - goto out; - } - - /* User has specified the space at startup */ - if (sys->disk_space) { - ret = set_node_space(sys->disk_space); - goto out; - } - - if (mds) { - sys->disk_space = mds; - } else { - ret = statvfs(base_path, &fs); - if (ret < 0) { - sd_debug("get disk space failed %m"); - ret = SD_RES_EIO; - goto out; - } - sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail; - } - - ret = set_node_space(sys->disk_space); -out: - sd_debug("disk free space is %" PRIu64, sys->disk_space); - return ret; -} - -/* Initialize all the global pathnames used internally */ -int init_global_pathnames(const char *d, char *argp) -{ - int ret; - - ret = init_obj_path(d, argp); - if (ret) - return ret; - - ret = init_epoch_path(d); - if (ret) - return ret; - - init_config_path(d); - - return 0; -} - -/* Write data to both local object cache (if enabled) and backends */ -int sd_write_object(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset, bool create) -{ - struct sd_req hdr; - int ret; - - if (sys->enable_object_cache && object_is_cached(oid)) { - ret = object_cache_write(oid, data, datalen, offset, - create); - if (ret == SD_RES_NO_CACHE) - goto forward_write; - - if (ret != 0) { - sd_err("write cache failed %" PRIx64 " %" PRIx32, oid, - ret); - return ret; - } - } - -forward_write: - if (create) - sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); - else - sd_init_req(&hdr, SD_OP_WRITE_OBJ); - hdr.flags = SD_FLAG_CMD_WRITE; - hdr.data_length = datalen; - - hdr.obj.oid = oid; - hdr.obj.offset = offset; - - ret = exec_local_req(&hdr, data); - if (ret != SD_RES_SUCCESS) - sd_err("failed to write object %" PRIx64 ", %s", oid, - sd_strerror(ret)); - - return ret; -} - -int read_backend_object(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset) -{ - struct sd_req hdr; - int ret; - - sd_init_req(&hdr, SD_OP_READ_OBJ); - hdr.data_length = datalen; - hdr.obj.oid = oid; - hdr.obj.offset = offset; - - ret = exec_local_req(&hdr, data); - if (ret != SD_RES_SUCCESS) - sd_err("failed to read object %" PRIx64 ", %s", oid, - sd_strerror(ret)); - return ret; -} - -/* - * Read data firstly from local object cache(if enabled), if fail, - * try read backends - */ -int sd_read_object(uint64_t oid, char *data, unsigned int datalen, - uint64_t offset) -{ - int ret; - - if (sys->enable_object_cache && object_is_cached(oid)) { - ret = object_cache_read(oid, data, datalen, offset); - if (ret != SD_RES_SUCCESS) { - sd_err("try forward read %" PRIx64 " %s", oid, - sd_strerror(ret)); - goto forward_read; - } - return ret; - } - -forward_read: - return read_backend_object(oid, data, datalen, offset); -} - -int sd_remove_object(uint64_t oid) -{ - struct sd_req hdr; - int ret; - - if (sys->enable_object_cache && object_is_cached(oid)) { - ret = object_cache_remove(oid); - if (ret != SD_RES_SUCCESS) - return ret; - } - - sd_init_req(&hdr, SD_OP_REMOVE_OBJ); - hdr.obj.oid = oid; - - ret = exec_local_req(&hdr, NULL); - if (ret != SD_RES_SUCCESS) - sd_err("failed to remove object %" PRIx64 ", %s", oid, - sd_strerror(ret)); - - return ret; -} - -int sd_discard_object(uint64_t oid) -{ - int ret; - struct sd_req hdr; - - sd_init_req(&hdr, SD_OP_DISCARD_OBJ); - hdr.obj.oid = oid; - - ret = exec_local_req(&hdr, NULL); - if (ret != SD_RES_SUCCESS) - sd_err("Failed to discard data obj %"PRIu64" %s", oid, - sd_strerror(ret)); - - return ret; -} - -int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation, - uint32_t refcnt) -{ - struct sd_req hdr; - int ret; - uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid); - - sd_debug("%"PRIx64", %" PRId32 ", %" PRId32, - data_oid, generation, refcnt); - - if (generation == 0 && refcnt == 0) - return sd_remove_object(data_oid); - - sd_init_req(&hdr, SD_OP_DECREF_OBJ); - hdr.ref.oid = ledger_oid; - hdr.ref.generation = generation; - hdr.ref.count = refcnt; - - ret = exec_local_req(&hdr, NULL); - if (ret != SD_RES_SUCCESS) - sd_err("failed to decrement reference %" PRIx64 ", %s", - ledger_oid, sd_strerror(ret)); - - return ret; -} diff --git a/sheep/store/common.c b/sheep/store/common.c new file mode 100644 index 0000000..8843fb8 --- /dev/null +++ b/sheep/store/common.c @@ -0,0 +1,511 @@ +/* + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sheep_priv.h" + +char *obj_path; +char *epoch_path; + +struct store_driver *sd_store; +LIST_HEAD(store_drivers); + +int update_epoch_log(uint32_t epoch, struct sd_node *nodes, size_t nr_nodes) +{ + int ret, len, nodes_len; + time_t t; + char path[PATH_MAX], *buf; + + sd_debug("update epoch: %d, %zu", epoch, nr_nodes); + + /* Piggyback the epoch creation time for 'dog cluster info' */ + time(&t); + nodes_len = nr_nodes * sizeof(struct sd_node); + len = nodes_len + sizeof(time_t); + buf = xmalloc(len); + memcpy(buf, nodes, nodes_len); + memcpy(buf + nodes_len, &t, sizeof(time_t)); + + /* + * rb field is unused in epoch file, zero-filling it + * is good for epoch file recovery because it is unified + */ + for (int i = 0; i < nr_nodes; i++) + memset(buf + i * sizeof(struct sd_node) + + offsetof(struct sd_node, rb), + 0, sizeof(struct rb_node)); + + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); + + ret = atomic_create_and_write(path, buf, len, true); + + free(buf); + return ret; +} + +static int do_epoch_log_read(uint32_t epoch, struct sd_node *nodes, int len, + int *nr_nodes, time_t *timestamp) +{ + int fd, ret, buf_len; + char path[PATH_MAX]; + struct stat epoch_stat; + + snprintf(path, sizeof(path), "%s%08u", epoch_path, epoch); + fd = open(path, O_RDONLY); + if (fd < 0) { + sd_debug("failed to open epoch %"PRIu32" log, %m", epoch); + goto err; + } + + memset(&epoch_stat, 0, sizeof(epoch_stat)); + ret = fstat(fd, &epoch_stat); + if (ret < 0) { + sd_err("failed to stat epoch %"PRIu32" log, %m", epoch); + goto err; + } + + buf_len = epoch_stat.st_size - sizeof(*timestamp); + if (buf_len < 0) { + sd_err("invalid epoch %"PRIu32" log", epoch); + goto err; + } + if (len < buf_len) { + close(fd); + return SD_RES_BUFFER_SMALL; + } + + ret = xread(fd, nodes, buf_len); + if (ret < 0) { + sd_err("failed to read epoch %"PRIu32" log, %m", epoch); + goto err; + } + + /* Broken epoch, just ignore */ + if (ret % sizeof(struct sd_node) != 0) { + sd_err("invalid epoch %"PRIu32" log", epoch); + goto err; + } + + *nr_nodes = ret / sizeof(struct sd_node); + + if (timestamp) { + ret = xread(fd, timestamp, sizeof(*timestamp)); + if (ret != sizeof(*timestamp)) { + sd_err("invalid epoch %"PRIu32" log", epoch); + goto err; + } + } + + close(fd); + return SD_RES_SUCCESS; +err: + if (fd >= 0) + close(fd); + return SD_RES_NO_TAG; +} + +int epoch_log_read(uint32_t epoch, struct sd_node *nodes, + int len, int *nr_nodes) +{ + return do_epoch_log_read(epoch, nodes, len, nr_nodes, NULL); +} + +int epoch_log_read_with_timestamp(uint32_t epoch, struct sd_node *nodes, + int len, int *nr_nodes, time_t *timestamp) +{ + return do_epoch_log_read(epoch, nodes, len, nr_nodes, timestamp); +} + +uint32_t get_latest_epoch(void) +{ + DIR *dir; + struct dirent *d; + uint32_t e, epoch = 0; + char *p; + + dir = opendir(epoch_path); + if (!dir) + panic("failed to get the latest epoch: %m"); + + while ((d = readdir(dir))) { + e = strtol(d->d_name, &p, 10); + if (d->d_name == p) + continue; + + if (strlen(d->d_name) != 8) + continue; + + if (e > epoch) + epoch = e; + } + closedir(dir); + + return epoch; +} + +int lock_base_dir(const char *d) +{ +#define LOCK_PATH "/lock" + char *lock_path; + int ret = 0; + int fd, len = strlen(d) + strlen(LOCK_PATH) + 1; + + lock_path = xzalloc(len); + snprintf(lock_path, len, "%s" LOCK_PATH, d); + + fd = open(lock_path, O_WRONLY|O_CREAT, sd_def_fmode); + if (fd < 0) { + sd_err("failed to open lock file %s (%m)", lock_path); + ret = -1; + goto out; + } + + if (lockf(fd, F_TLOCK, 1) < 0) { + if (errno == EACCES || errno == EAGAIN) + sd_err("another sheep daemon is using %s", d); + else + sd_err("unable to get base dir lock (%m)"); + ret = -1; + goto out; + } + +out: + free(lock_path); + return ret; +} + +int init_base_path(const char *d) +{ + if (xmkdir(d, sd_def_dmode) < 0) { + sd_err("cannot create the directory %s (%m)", d); + return -1; + } + + return 0; +} + +static inline int check_path_len(const char *path) +{ + int len = strlen(path); + if (len > PATH_MAX) { + sd_err("insanely long object directory %s", path); + return -1; + } + + return 0; +} + +static int is_meta_store(const char *path) +{ + char conf[PATH_MAX]; + char epoch[PATH_MAX]; + + snprintf(conf, PATH_MAX, "%s/config", path); + snprintf(epoch, PATH_MAX, "%s/epoch", path); + if (!access(conf, R_OK) && !access(epoch, R_OK)) + return true; + + return false; +} + +static int init_obj_path(const char *base_path, char *argp) +{ + char *p; + int len; + + if (check_path_len(base_path) < 0) + return -1; + +#define OBJ_PATH "/obj" + len = strlen(base_path) + strlen(OBJ_PATH) + 1; + obj_path = xzalloc(len); + snprintf(obj_path, len, "%s" OBJ_PATH, base_path); + + /* Eat up the first component */ + strtok(argp, ","); + p = strtok(NULL, ","); + if (!p) { + /* + * If We have only one path, meta-store and object-store share + * it. This is helpful to upgrade old sheep cluster to + * the MD-enabled. + */ + md_add_disk(obj_path, false); + } else { + do { + if (is_meta_store(p)) { + sd_err("%s is meta-store, abort", p); + return -1; + } + md_add_disk(p, false); + } while ((p = strtok(NULL, ","))); + } + + if (md_nr_disks() <= 0) { + sd_err("There isn't any available disk!"); + return -1; + } + + return xmkdir(obj_path, sd_def_dmode); +} + +static int init_epoch_path(const char *base_path) +{ +#define EPOCH_PATH "/epoch/" + int len = strlen(base_path) + strlen(EPOCH_PATH) + 1; + epoch_path = xzalloc(len); + snprintf(epoch_path, len, "%s" EPOCH_PATH, base_path); + + return xmkdir(epoch_path, sd_def_dmode); +} + +/* + * If the node is gateway, this function only finds the store driver. + * Otherwise, this function initializes the backend store + */ +int init_store_driver(bool is_gateway) +{ + char driver_name[STORE_LEN], *p; + + pstrcpy(driver_name, sizeof(driver_name), (char *)sys->cinfo.store); + + p = memchr(driver_name, '\0', STORE_LEN); + if (!p) { + /* + * If the driver name is not NUL terminated we are in deep + * trouble, let's get out here. + */ + sd_debug("store name not NUL terminated"); + return SD_RES_NO_STORE; + } + + /* + * The store file might not exist in case this is a new sheep that + * never joined a cluster before. + */ + if (p == driver_name) + return 0; + + sd_store = find_store_driver(driver_name); + if (!sd_store) { + sd_debug("store %s not found", driver_name); + return SD_RES_NO_STORE; + } + + if (is_gateway) + return SD_RES_SUCCESS; + + return sd_store->init(); +} + +int init_disk_space(const char *base_path) +{ + int ret = SD_RES_SUCCESS; + uint64_t space_size = 0, mds; + struct statvfs fs; + + if (sys->gateway_only) + goto out; + + /* We need to init md even we don't need to update sapce */ + mds = md_init_space(); + + /* If it is restarted */ + ret = get_node_space(&space_size); + if (space_size != 0) { + sys->disk_space = space_size; + goto out; + } + + /* User has specified the space at startup */ + if (sys->disk_space) { + ret = set_node_space(sys->disk_space); + goto out; + } + + if (mds) { + sys->disk_space = mds; + } else { + ret = statvfs(base_path, &fs); + if (ret < 0) { + sd_debug("get disk space failed %m"); + ret = SD_RES_EIO; + goto out; + } + sys->disk_space = (uint64_t)fs.f_frsize * fs.f_bavail; + } + + ret = set_node_space(sys->disk_space); +out: + sd_debug("disk free space is %" PRIu64, sys->disk_space); + return ret; +} + +/* Initialize all the global pathnames used internally */ +int init_global_pathnames(const char *d, char *argp) +{ + int ret; + + ret = init_obj_path(d, argp); + if (ret) + return ret; + + ret = init_epoch_path(d); + if (ret) + return ret; + + init_config_path(d); + + return 0; +} + +/* Write data to both local object cache (if enabled) and backends */ +int sd_write_object(uint64_t oid, char *data, unsigned int datalen, + uint64_t offset, bool create) +{ + struct sd_req hdr; + int ret; + + if (sys->enable_object_cache && object_is_cached(oid)) { + ret = object_cache_write(oid, data, datalen, offset, + create); + if (ret == SD_RES_NO_CACHE) + goto forward_write; + + if (ret != 0) { + sd_err("write cache failed %" PRIx64 " %" PRIx32, oid, + ret); + return ret; + } + } + +forward_write: + if (create) + sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ); + else + sd_init_req(&hdr, SD_OP_WRITE_OBJ); + hdr.flags = SD_FLAG_CMD_WRITE; + hdr.data_length = datalen; + + hdr.obj.oid = oid; + hdr.obj.offset = offset; + + ret = exec_local_req(&hdr, data); + if (ret != SD_RES_SUCCESS) + sd_err("failed to write object %" PRIx64 ", %s", oid, + sd_strerror(ret)); + + return ret; +} + +int read_backend_object(uint64_t oid, char *data, unsigned int datalen, + uint64_t offset) +{ + struct sd_req hdr; + int ret; + + sd_init_req(&hdr, SD_OP_READ_OBJ); + hdr.data_length = datalen; + hdr.obj.oid = oid; + hdr.obj.offset = offset; + + ret = exec_local_req(&hdr, data); + if (ret != SD_RES_SUCCESS) + sd_err("failed to read object %" PRIx64 ", %s", oid, + sd_strerror(ret)); + return ret; +} + +/* + * Read data firstly from local object cache(if enabled), if fail, + * try read backends + */ +int sd_read_object(uint64_t oid, char *data, unsigned int datalen, + uint64_t offset) +{ + int ret; + + if (sys->enable_object_cache && object_is_cached(oid)) { + ret = object_cache_read(oid, data, datalen, offset); + if (ret != SD_RES_SUCCESS) { + sd_err("try forward read %" PRIx64 " %s", oid, + sd_strerror(ret)); + goto forward_read; + } + return ret; + } + +forward_read: + return read_backend_object(oid, data, datalen, offset); +} + +int sd_remove_object(uint64_t oid) +{ + struct sd_req hdr; + int ret; + + if (sys->enable_object_cache && object_is_cached(oid)) { + ret = object_cache_remove(oid); + if (ret != SD_RES_SUCCESS) + return ret; + } + + sd_init_req(&hdr, SD_OP_REMOVE_OBJ); + hdr.obj.oid = oid; + + ret = exec_local_req(&hdr, NULL); + if (ret != SD_RES_SUCCESS) + sd_err("failed to remove object %" PRIx64 ", %s", oid, + sd_strerror(ret)); + + return ret; +} + +int sd_discard_object(uint64_t oid) +{ + int ret; + struct sd_req hdr; + + sd_init_req(&hdr, SD_OP_DISCARD_OBJ); + hdr.obj.oid = oid; + + ret = exec_local_req(&hdr, NULL); + if (ret != SD_RES_SUCCESS) + sd_err("Failed to discard data obj %"PRIu64" %s", oid, + sd_strerror(ret)); + + return ret; +} + +int sd_dec_object_refcnt(uint64_t data_oid, uint32_t generation, + uint32_t refcnt) +{ + struct sd_req hdr; + int ret; + uint64_t ledger_oid = data_oid_to_ledger_oid(data_oid); + + sd_debug("%"PRIx64", %" PRId32 ", %" PRId32, + data_oid, generation, refcnt); + + if (generation == 0 && refcnt == 0) + return sd_remove_object(data_oid); + + sd_init_req(&hdr, SD_OP_DECREF_OBJ); + hdr.ref.oid = ledger_oid; + hdr.ref.generation = generation; + hdr.ref.count = refcnt; + + ret = exec_local_req(&hdr, NULL); + if (ret != SD_RES_SUCCESS) + sd_err("failed to decrement reference %" PRIx64 ", %s", + ledger_oid, sd_strerror(ret)); + + return ret; +} diff --git a/sheep/store/md.c b/sheep/store/md.c new file mode 100644 index 0000000..c07489e --- /dev/null +++ b/sheep/store/md.c @@ -0,0 +1,878 @@ +/* + * Copyright (C) 2013 Taobao Inc. + * + * Liu Yuan <namei.u...@gmail.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sheep_priv.h" + +#define MD_VDISK_SIZE ((uint64_t)1*1024*1024*1024) /* 1G */ + +#define NONE_EXIST_PATH "/all/disks/are/broken/,ps/əʌo7/!" + +struct md md = { + .vroot = RB_ROOT, + .root = RB_ROOT, + .lock = SD_RW_LOCK_INITIALIZER, +}; + +static inline uint32_t nr_online_disks(void) +{ + uint32_t nr; + + sd_read_lock(&md.lock); + nr = md.nr_disks; + sd_rw_unlock(&md.lock); + + return nr; +} + +static inline int vdisk_number(const struct disk *disk) +{ + return DIV_ROUND_UP(disk->space, MD_VDISK_SIZE); +} + +static int disk_cmp(const struct disk *d1, const struct disk *d2) +{ + return strcmp(d1->path, d2->path); +} + +static int vdisk_cmp(const struct vdisk *d1, const struct vdisk *d2) +{ + return intcmp(d1->hash, d2->hash); +} + +static struct vdisk *vdisk_insert(struct vdisk *new) +{ + return rb_insert(&md.vroot, new, rb, vdisk_cmp); +} + +/* If v1_hash < hval <= v2_hash, then oid is resident in v2 */ +static struct vdisk *hval_to_vdisk(uint64_t hval) +{ + struct vdisk dummy = { .hash = hval }; + + return rb_nsearch(&md.vroot, &dummy, rb, vdisk_cmp); +} + +static struct vdisk *oid_to_vdisk(uint64_t oid) +{ + return hval_to_vdisk(sd_hash_oid(oid)); +} + +static void create_vdisks(const struct disk *disk) +{ + uint64_t hval = sd_hash(disk->path, strlen(disk->path)); + const struct sd_node *n = &sys->this_node; + uint64_t node_hval; + int nr; + + if (is_cluster_diskmode(&sys->cinfo)) { + node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); + hval = fnv_64a_64(node_hval, hval); + nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); + if (0 == n->nid.port) + return; + } else + nr = vdisk_number(disk); + + for (int i = 0; i < nr; i++) { + struct vdisk *v = xmalloc(sizeof(*v)); + + hval = sd_hash_next(hval); + v->hash = hval; + v->disk = disk; + if (unlikely(vdisk_insert(v))) + panic("vdisk hash collison"); + } +} + +static inline void vdisk_free(struct vdisk *v) +{ + rb_erase(&v->rb, &md.vroot); + free(v); +} + +static void remove_vdisks(const struct disk *disk) +{ + uint64_t hval = sd_hash(disk->path, strlen(disk->path)); + const struct sd_node *n = &sys->this_node; + uint64_t node_hval; + int nr; + + if (is_cluster_diskmode(&sys->cinfo)) { + node_hval = sd_hash(&n->nid, offsetof(typeof(n->nid), io_addr)); + hval = fnv_64a_64(node_hval, hval); + nr = DIV_ROUND_UP(disk->space, WEIGHT_MIN); + } else + nr = vdisk_number(disk); + + for (int i = 0; i < nr; i++) { + struct vdisk *v; + + hval = sd_hash_next(hval); + v = hval_to_vdisk(hval); + sd_assert(v->hash == hval); + + vdisk_free(v); + } +} + +static inline void trim_last_slash(char *path) +{ + sd_assert(path[0]); + while (path[strlen(path) - 1] == '/') + path[strlen(path) - 1] = '\0'; +} + +static struct disk *path_to_disk(const char *path) +{ + struct disk key = {}; + + pstrcpy(key.path, sizeof(key.path), path); + trim_last_slash(key.path); + + return rb_search(&md.root, &key, rb, disk_cmp); +} + +size_t get_store_objsize(uint64_t oid) +{ + if (is_erasure_oid(oid)) { + uint8_t policy = get_vdi_copy_policy(oid_to_vid(oid)); + int d; + ec_policy_to_dp(policy, &d, NULL); + return get_vdi_object_size(oid_to_vid(oid)) / d; + } + return get_objsize(oid, get_vdi_object_size(oid_to_vid(oid))); +} + +static int get_total_object_size(uint64_t oid, const char *wd, uint32_t epoch, + uint8_t ec_index, struct vnode_info *vinfo, + void *total) +{ + uint64_t *t = total; + struct stat s; + char path[PATH_MAX]; + + snprintf(path, PATH_MAX, "%s/%016" PRIx64, wd, oid); + if (stat(path, &s) == 0) + *t += s.st_blocks * SECTOR_SIZE; + else + *t += get_store_objsize(oid); + + return SD_RES_SUCCESS; +} + +static int64_t find_string_integer(const char *str, const char *delimiter) +{ + char *pos = strstr(str, delimiter), *p; + int64_t ret; + + ret = strtoll(pos + 1, &p, 10); + if (ret == LLONG_MAX || p == pos + 1) { + sd_err("%s strtoul failed, delimiter %s, %m", str, delimiter); + return -1; + } + + return ret; +} + +/* If cleanup is true, temporary objects will be removed */ +static int for_each_object_in_path(const char *path, + int (*func)(uint64_t, const char *, uint32_t, + uint8_t, struct vnode_info *, + void *), + bool cleanup, struct vnode_info *vinfo, + void *arg) +{ + DIR *dir; + struct dirent *d; + uint64_t oid; + int ret = SD_RES_SUCCESS; + char file_name[PATH_MAX]; + + dir = opendir(path); + if (unlikely(!dir)) { + sd_err("failed to open %s, %m", path); + return SD_RES_EIO; + } + + while ((d = readdir(dir))) { + uint32_t epoch = 0; + uint8_t ec_index = SD_MAX_COPIES; + + /* skip ".", ".." and ".stale" */ + if (unlikely(!strncmp(d->d_name, ".", 1))) + continue; + + sd_debug("%s, %s", path, d->d_name); + oid = strtoull(d->d_name, NULL, 16); + if (oid == 0 || oid == ULLONG_MAX) + continue; + + /* don't call callback against temporary objects */ + if (is_tmp_dentry(d->d_name)) { + if (cleanup) { + snprintf(file_name, sizeof(file_name), + "%s/%s", path, d->d_name); + sd_debug("remove tmp object %s", file_name); + if (unlink(file_name) < 0) + sd_err("failed to unlink %s: %m", + file_name); + } + continue; + } + + if (is_stale_dentry(d->d_name)) { + epoch = find_string_integer(d->d_name, "."); + if (epoch < 0) + continue; + } + + if (is_ec_dentry(d->d_name)) { + ec_index = find_string_integer(d->d_name, "_"); + if (ec_index < 0) + continue; + } + + ret = func(oid, path, epoch, ec_index, vinfo, arg); + if (ret != SD_RES_SUCCESS) + break; + } + closedir(dir); + return ret; +} + +static uint64_t get_path_free_size(const char *path, uint64_t *used) +{ + struct statvfs fs; + uint64_t size; + + if (statvfs(path, &fs) < 0) { + sd_err("get disk %s space failed %m", path); + return 0; + } + size = (int64_t)fs.f_frsize * fs.f_bavail; + + if (!used) + goto out; + if (for_each_object_in_path(path, get_total_object_size, false, + NULL, used) + != SD_RES_SUCCESS) + return 0; +out: + return size; +} + +/* + * If path is broken during initialization or not support xattr return 0. We can + * safely use 0 to represent failure case because 0 space path can be + * considered as broken path. + */ +static uint64_t init_path_space(const char *path, bool purge) +{ + uint64_t size; + char stale[PATH_MAX]; + + if (!is_xattr_enabled(path)) { + sd_warn("multi-disk support need xattr feature for path: %s", + path); + goto broken_path; + } + + if (purge && purge_directory(path) < 0) + sd_err("failed to purge %s", path); + + snprintf(stale, PATH_MAX, "%s/.stale", path); + if (xmkdir(stale, sd_def_dmode) < 0) { + sd_err("can't mkdir for %s, %m", stale); + goto broken_path; + } + +#define MDNAME "user.md.size" +#define MDSIZE sizeof(uint64_t) + if (getxattr(path, MDNAME, &size, MDSIZE) < 0) { + if (errno == ENODATA) { + goto create; + } else { + sd_err("%s, %m", path); + goto broken_path; + } + } + + return size; +create: + size = get_path_free_size(path, NULL); + if (!size) + goto broken_path; + if (setxattr(path, MDNAME, &size, MDSIZE, 0) < 0) { + sd_err("%s, %m", path); + goto broken_path; + } + return size; +broken_path: + return 0; +} + +/* We don't need lock at init stage */ +bool md_add_disk(const char *path, bool purge) +{ + struct disk *new; + + if (path_to_disk(path)) { + sd_err("duplicate path %s", path); + return false; + } + + if (xmkdir(path, sd_def_dmode) < 0) { + sd_err("can't mkdir for %s, %m", path); + return false; + } + + new = xmalloc(sizeof(*new)); + pstrcpy(new->path, PATH_MAX, path); + trim_last_slash(new->path); + new->space = init_path_space(new->path, purge); + if (!new->space) { + free(new); + return false; + } + + create_vdisks(new); + rb_insert(&md.root, new, rb, disk_cmp); + md.space += new->space; + md.nr_disks++; + + sd_info("%s, vdisk nr %d, total disk %d", new->path, vdisk_number(new), + md.nr_disks); + return true; +} + +static inline void md_remove_disk(struct disk *disk) +{ + sd_info("%s from multi-disk array", disk->path); + rb_erase(&disk->rb, &md.root); + md.nr_disks--; + remove_vdisks(disk); + free(disk); +} + +uint64_t md_init_space(void) +{ + return md.space; +} + +static const char *md_get_object_dir_nolock(uint64_t oid) +{ + const struct vdisk *vd; + + if (unlikely(md.nr_disks == 0)) + return NONE_EXIST_PATH; /* To generate EIO */ + + vd = oid_to_vdisk(oid); + return vd->disk->path; +} + +const char *md_get_object_dir(uint64_t oid) +{ + const char *p; + + sd_read_lock(&md.lock); + p = md_get_object_dir_nolock(oid); + sd_rw_unlock(&md.lock); + + return p; +} + +struct process_path_arg { + const char *path; + struct vnode_info *vinfo; + int (*func)(uint64_t oid, const char *, uint32_t, uint8_t, + struct vnode_info *, void *arg); + bool cleanup; + void *opaque; + int result; +}; + +static void *thread_process_path(void *arg) +{ + int ret; + struct process_path_arg *parg = (struct process_path_arg *)arg; + + ret = for_each_object_in_path(parg->path, parg->func, parg->cleanup, + parg->vinfo, parg->opaque); + if (ret != SD_RES_SUCCESS) + parg->result = ret; + + return arg; +} + +main_fn int for_each_object_in_wd(int (*func)(uint64_t oid, const char *path, + uint32_t epoch, uint8_t ec_index, + struct vnode_info *vinfo, void *arg), + bool cleanup, void *arg) +{ + int ret = SD_RES_SUCCESS; + const struct disk *disk; + struct process_path_arg *thread_args, *path_arg; + struct vnode_info *vinfo; + void *ret_arg; + sd_thread_t *thread_array; + int nr_thread = 0, idx = 0; + + sd_read_lock(&md.lock); + + rb_for_each_entry(disk, &md.root, rb) { + nr_thread++; + } + + thread_args = xmalloc(nr_thread * sizeof(struct process_path_arg)); + thread_array = xmalloc(nr_thread * sizeof(sd_thread_t)); + + vinfo = get_vnode_info(); + + rb_for_each_entry(disk, &md.root, rb) { + thread_args[idx].path = disk->path; + thread_args[idx].vinfo = vinfo; + thread_args[idx].func = func; + thread_args[idx].cleanup = cleanup; + thread_args[idx].opaque = arg; + thread_args[idx].result = SD_RES_SUCCESS; + ret = sd_thread_create_with_idx("foreach wd", + thread_array + idx, + thread_process_path, + (void *)(thread_args + idx)); + if (ret) { + /* + * If we can't create enough threads to process + * files, the data-consistent will be broken if + * we continued. + */ + panic("Failed to create thread for path %s", + disk->path); + } + idx++; + } + + sd_debug("Create %d threads for all path", nr_thread); + /* wait for all threads to exit */ + for (idx = 0; idx < nr_thread; idx++) { + ret = sd_thread_join(thread_array[idx], &ret_arg); + if (ret) + sd_err("Failed to join thread"); + if (ret_arg) { + path_arg = (struct process_path_arg *)ret_arg; + if (path_arg->result != SD_RES_SUCCESS) + sd_err("%s, %s", path_arg->path, + sd_strerror(path_arg->result)); + } + } + + put_vnode_info(vinfo); + sd_rw_unlock(&md.lock); + + free(thread_args); + free(thread_array); + return ret; +} + +int for_each_object_in_stale(int (*func)(uint64_t oid, const char *path, + uint32_t epoch, uint8_t, + struct vnode_info *, void *arg), + void *arg) +{ + int ret = SD_RES_SUCCESS; + char path[PATH_MAX]; + const struct disk *disk; + + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + snprintf(path, sizeof(path), "%s/.stale", disk->path); + ret = for_each_object_in_path(path, func, false, NULL, arg); + if (ret != SD_RES_SUCCESS) + break; + } + sd_rw_unlock(&md.lock); + return ret; +} + + +int for_each_obj_path(int (*func)(const char *path)) +{ + int ret = SD_RES_SUCCESS; + const struct disk *disk; + + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + ret = func(disk->path); + if (ret != SD_RES_SUCCESS) + break; + } + sd_rw_unlock(&md.lock); + return ret; +} + +struct md_work { + struct work work; + char path[PATH_MAX]; +}; + +static inline void kick_recover(void) +{ + struct vnode_info *vinfo = get_vnode_info(); + + if (is_cluster_diskmode(&sys->cinfo)) + sys->cdrv->update_node(&sys->this_node); + else { + start_recovery(vinfo, vinfo, false); + put_vnode_info(vinfo); + } +} + +static void md_do_recover(struct work *work) +{ + struct md_work *mw = container_of(work, struct md_work, work); + struct disk *disk; + int nr = 0; + + sd_write_lock(&md.lock); + disk = path_to_disk(mw->path); + if (!disk) + /* Just ignore the duplicate EIO of the same path */ + goto out; + md_remove_disk(disk); + nr = md.nr_disks; +out: + sd_rw_unlock(&md.lock); + + if (disk) { + if (nr > 0) { + update_node_disks(); + kick_recover(); + } else { + leave_cluster(); + } + } + + free(mw); +} + +int md_handle_eio(const char *fault_path) +{ + struct md_work *mw; + + if (nr_online_disks() == 0) + return SD_RES_EIO; + + mw = xzalloc(sizeof(*mw)); + mw->work.done = md_do_recover; + pstrcpy(mw->path, PATH_MAX, fault_path); + queue_work(sys->md_wqueue, &mw->work); + + /* Fool the requester to retry */ + return SD_RES_NETWORK_ERROR; +} + +static inline bool md_access(const char *path) +{ + if (access(path, R_OK | W_OK) < 0) { + if (unlikely(errno != ENOENT)) + sd_err("failed to check %s, %m", path); + return false; + } + + return true; +} + +static int get_old_new_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, + const char *path, char *old, char *new) +{ + if (!epoch) { + if (!is_erasure_oid(oid)) { + snprintf(old, PATH_MAX, "%s/%016" PRIx64, path, oid); + snprintf(new, PATH_MAX, "%s/%016" PRIx64, + md_get_object_dir_nolock(oid), oid); + } else { + snprintf(old, PATH_MAX, "%s/%016" PRIx64"_%d", path, + oid, ec_index); + snprintf(new, PATH_MAX, "%s/%016" PRIx64"_%d", + md_get_object_dir_nolock(oid), oid, ec_index); + } + } else { + if (!is_erasure_oid(oid)) { + snprintf(old, PATH_MAX, + "%s/.stale/%016"PRIx64".%"PRIu32, path, + oid, epoch); + snprintf(new, PATH_MAX, + "%s/.stale/%016"PRIx64".%"PRIu32, + md_get_object_dir_nolock(oid), oid, epoch); + } else { + snprintf(old, PATH_MAX, + "%s/.stale/%016"PRIx64"_%d.%"PRIu32, path, + oid, ec_index, epoch); + snprintf(new, PATH_MAX, + "%s/.stale/%016"PRIx64"_%d.%"PRIu32, + md_get_object_dir_nolock(oid), + oid, ec_index, epoch); + } + } + + if (!md_access(old)) + return -1; + + return 0; +} + +static int md_move_object(uint64_t oid, const char *old, const char *new) +{ + struct strbuf buf = STRBUF_INIT; + int fd, ret = -1; + size_t sz = get_store_objsize(oid); + + fd = open(old, O_RDONLY); + if (fd < 0) { + sd_err("failed to open %s", old); + goto out; + } + + ret = strbuf_read(&buf, fd, sz); + if (ret != sz) { + sd_err("failed to read %s, size %zu, %d, %m", old, sz, ret); + ret = -1; + goto out_close; + } + + if (atomic_create_and_write(new, buf.buf, buf.len, false) < 0) { + if (errno != EEXIST) { + sd_err("failed to create %s", new); + ret = -1; + goto out_close; + } + } + unlink(old); + ret = 0; +out_close: + close(fd); +out: + strbuf_release(&buf); + return ret; +} + +static int md_check_and_move(uint64_t oid, uint32_t epoch, uint8_t ec_index, + const char *path) +{ + char old[PATH_MAX], new[PATH_MAX]; + + if (get_old_new_path(oid, epoch, ec_index, path, old, new) < 0) + return SD_RES_EIO; + /* + * Recovery thread and main thread might try to recover the same object. + * Either one succeeds, the other will fail and proceed and end up + * trying to move the object to where it is already in place, in this + * case we simply return. + */ + if (!strcmp(old, new)) + return SD_RES_SUCCESS; + + /* We can't use rename(2) across device */ + if (md_move_object(oid, old, new) < 0) { + sd_err("move old %s to new %s failed", old, new); + return SD_RES_EIO; + } + + sd_debug("from %s to %s", old, new); + return SD_RES_SUCCESS; +} + +static int scan_wd(uint64_t oid, uint32_t epoch, uint8_t ec_index) +{ + int ret = SD_RES_EIO; + const struct disk *disk; + + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + ret = md_check_and_move(oid, epoch, ec_index, disk->path); + if (ret == SD_RES_SUCCESS) + break; + } + sd_rw_unlock(&md.lock); + return ret; +} + +bool md_exist(uint64_t oid, uint8_t ec_index, char *path) +{ + if (md_access(path)) + return true; + /* + * We have to iterate the WD because we don't have epoch-like history + * track to locate the objects for multiple disk failure. Simply do + * hard iteration simplify the code a lot. + */ + if (scan_wd(oid, 0, ec_index) == SD_RES_SUCCESS) + return true; + + return false; +} + +int md_get_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, + char *path) +{ + if (unlikely(!epoch)) + panic("invalid 0 epoch"); + + if (is_erasure_oid(oid)) { + if (unlikely(ec_index >= SD_MAX_COPIES)) + panic("invalid ec index %d", ec_index); + + snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64"_%d.%"PRIu32, + md_get_object_dir(oid), oid, ec_index, epoch); + } else + snprintf(path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, + md_get_object_dir(oid), oid, epoch); + + if (md_access(path)) + return SD_RES_SUCCESS; + + if (scan_wd(oid, epoch, ec_index) == SD_RES_SUCCESS) + return SD_RES_SUCCESS; + + return SD_RES_NO_OBJ; +} + +uint32_t md_get_info(struct sd_md_info *info) +{ + uint32_t ret = sizeof(*info); + const struct disk *disk; + int i = 0; + + memset(info, 0, ret); + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + info->disk[i].idx = i; + pstrcpy(info->disk[i].path, PATH_MAX, disk->path); + /* FIXME: better handling failure case. */ + info->disk[i].free = get_path_free_size(info->disk[i].path, + &info->disk[i].used); + i++; + } + info->nr = md.nr_disks; + sd_rw_unlock(&md.lock); + return ret; +} + +static inline void md_del_disk(const char *path) +{ + struct disk *disk = path_to_disk(path); + + if (!disk) { + sd_err("invalid path %s", path); + return; + } + md_remove_disk(disk); +} + +#ifdef HAVE_DISKVNODES +void update_node_disks(void) +{ + const struct disk *disk; + int i = 0; + bool rb_empty = false; + + if (!sys) + return; + + memset(sys->this_node.disks, 0, sizeof(struct disk_info) * DISK_MAX); + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + sys->this_node.disks[i].disk_id = + sd_hash(disk->path, strlen(disk->path)); + sys->this_node.disks[i].disk_space = disk->space; + i++; + } + sd_rw_unlock(&md.lock); + + if (RB_EMPTY_ROOT(&md.vroot)) + rb_empty = true; + sd_write_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + if (!rb_empty) + remove_vdisks(disk); + create_vdisks(disk); + } + sd_rw_unlock(&md.lock); +} +#else +void update_node_disks(void) +{ +} +#endif + +static int do_plug_unplug(char *disks, bool plug) +{ + const char *path; + int old_nr, ret = SD_RES_UNKNOWN; + + sd_write_lock(&md.lock); + old_nr = md.nr_disks; + path = strtok(disks, ","); + do { + if (plug) { + if (!md_add_disk(path, true)) + sd_err("failed to add %s", path); + } else { + md_del_disk(path); + } + } while ((path = strtok(NULL, ","))); + + /* If no disks change, bail out */ + if (old_nr == md.nr_disks) + goto out; + + ret = SD_RES_SUCCESS; +out: + sd_rw_unlock(&md.lock); + + if (ret == SD_RES_SUCCESS) { + update_node_disks(); + kick_recover(); + } + + return ret; +} + +int md_plug_disks(char *disks) +{ + return do_plug_unplug(disks, true); +} + +int md_unplug_disks(char *disks) +{ + return do_plug_unplug(disks, false); +} + +uint64_t md_get_size(uint64_t *used) +{ + uint64_t fsize = 0; + const struct disk *disk; + + *used = 0; + sd_read_lock(&md.lock); + rb_for_each_entry(disk, &md.root, rb) { + fsize += get_path_free_size(disk->path, used); + } + sd_rw_unlock(&md.lock); + + return fsize + *used; +} + +uint32_t md_nr_disks(void) +{ + return nr_online_disks(); +} diff --git a/sheep/store/plain_store.c b/sheep/store/plain_store.c new file mode 100644 index 0000000..efbf129 --- /dev/null +++ b/sheep/store/plain_store.c @@ -0,0 +1,759 @@ +/* + * Copyright (C) 2012 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include <libgen.h> +#include <linux/falloc.h> + +#include "sheep_priv.h" + +#ifndef FALLOC_FL_PUNCH_HOLE +#define FALLOC_FL_PUNCH_HOLE 0x02 +#endif + +#define sector_algined(x) ({ ((x) & (SECTOR_SIZE - 1)) == 0; }) + +static inline bool iocb_is_aligned(const struct siocb *iocb) +{ + return sector_algined(iocb->offset) && sector_algined(iocb->length); +} + +static int prepare_iocb(uint64_t oid, const struct siocb *iocb, bool create) +{ + int syncflag = create ? O_SYNC : O_DSYNC; + int flags = syncflag | O_RDWR; + + if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) + flags &= ~syncflag; + + if (sys->backend_dio && iocb_is_aligned(iocb)) { + if (!is_aligned_to_pagesize(iocb->buf)) + panic("Memory isn't aligned to pagesize %p", iocb->buf); + flags |= O_DIRECT; + } + + if (create) + flags |= O_CREAT | O_EXCL; + + return flags; +} + +static int get_store_path(uint64_t oid, uint8_t ec_index, char *path) +{ + if (is_erasure_oid(oid)) { + if (unlikely(ec_index >= SD_MAX_COPIES)) + panic("invalid ec_index %d", ec_index); + return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", + md_get_object_dir(oid), oid, ec_index); + } + + return snprintf(path, PATH_MAX, "%s/%016" PRIx64, + md_get_object_dir(oid), oid); +} + +static int get_store_tmp_path(uint64_t oid, uint8_t ec_index, char *path) +{ + if (is_erasure_oid(oid)) { + if (unlikely(ec_index >= SD_MAX_COPIES)) + panic("invalid ec_index %d", ec_index); + return snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d.tmp", + md_get_object_dir(oid), oid, ec_index); + } + + return snprintf(path, PATH_MAX, "%s/%016" PRIx64".tmp", + md_get_object_dir(oid), oid); +} + +static int get_store_stale_path(uint64_t oid, uint32_t epoch, uint8_t ec_index, + char *path) +{ + return md_get_stale_path(oid, epoch, ec_index, path); +} + +/* + * Check if oid is in this nodes (if oid is in the wrong place, it will be moved + * to the correct one after this call in a MD setup. + */ +bool default_exist(uint64_t oid, uint8_t ec_index) +{ + char path[PATH_MAX]; + + get_store_path(oid, ec_index, path); + + return md_exist(oid, ec_index, path); +} + +static int err_to_sderr(const char *path, uint64_t oid, int err) +{ + struct stat s; + char p[PATH_MAX], *dir; + + /* Use a temporary buffer since dirname() may modify its argument. */ + pstrcpy(p, sizeof(p), path); + dir = dirname(p); + + sd_debug("%s", path); + switch (err) { + case ENOENT: + if (stat(dir, &s) < 0) { + sd_err("%s corrupted", dir); + return md_handle_eio(dir); + } + sd_debug("object %016" PRIx64 " not found locally", oid); + return SD_RES_NO_OBJ; + case ENOSPC: + /* TODO: stop automatic recovery */ + sd_err("diskfull, oid=%"PRIx64, oid); + return SD_RES_NO_SPACE; + case EMFILE: + case ENFILE: + case EINTR: + case EAGAIN: + case EEXIST: + sd_err("%m, oid=%"PRIx64, oid); + /* make gateway try again */ + return SD_RES_NETWORK_ERROR; + default: + sd_err("oid=%"PRIx64", %m", oid); + return md_handle_eio(dir); + } +} + +static int discard(int fd, uint64_t start, uint32_t end) +{ + int ret = xfallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, + start, end - start); + if (ret < 0) { + if (errno == ENOSYS || errno == EOPNOTSUPP) + sd_info("FALLOC_FL_PUNCH_HOLE is not supported " + "on this filesystem"); + else + sd_err("failed to discard object, %m"); + } + + return ret; +} + +/* Trim zero blocks of the beginning and end of the object. */ +static int default_trim(int fd, uint64_t oid, const struct siocb *iocb, + uint64_t *poffset, uint32_t *plen) +{ + trim_zero_blocks(iocb->buf, poffset, plen); + + if (iocb->offset < *poffset) { + sd_debug("discard between %d, %ld, %" PRIx64, iocb->offset, + *poffset, oid); + + if (discard(fd, iocb->offset, *poffset) < 0) + return -1; + } + + if (*poffset + *plen < iocb->offset + iocb->length) { + uint64_t end = iocb->offset + iocb->length; + uint32_t object_size = get_vdi_object_size(oid_to_vid(oid)); + if (end == get_objsize(oid, object_size)) + /* This is necessary to punch the last block */ + end = round_up(end, BLOCK_SIZE); + sd_debug("discard between %ld, %ld, %" PRIx64, *poffset + *plen, + end, oid); + + if (discard(fd, *poffset + *plen, end) < 0) + return -1; + } + + return 0; +} + +int default_write(uint64_t oid, const struct siocb *iocb) +{ + int flags = prepare_iocb(oid, iocb, false), fd, + ret = SD_RES_SUCCESS; + char path[PATH_MAX]; + ssize_t size; + uint32_t len = iocb->length; + uint64_t offset = iocb->offset; + static bool trim_is_supported = true; + + if (iocb->epoch < sys_epoch()) { + sd_debug("%"PRIu32" sys %"PRIu32, iocb->epoch, sys_epoch()); + return SD_RES_OLD_NODE_VER; + } + + if (uatomic_is_true(&sys->use_journal) && + unlikely(journal_write_store(oid, iocb->buf, iocb->length, + iocb->offset, false)) + != SD_RES_SUCCESS) { + sd_err("turn off journaling"); + uatomic_set_false(&sys->use_journal); + flags |= O_DSYNC; + sync(); + } + + get_store_path(oid, iocb->ec_index, path); + + /* + * Make sure oid is in the right place because oid might be misplaced + * in a wrong place, due to 'shutdown/restart with less/more disks' or + * any bugs. We need call err_to_sderr() to return EIO if disk is broken + */ + if (!default_exist(oid, iocb->ec_index)) + return err_to_sderr(path, oid, ENOENT); + + fd = open(path, flags, sd_def_fmode); + if (unlikely(fd < 0)) + return err_to_sderr(path, oid, errno); + + if (trim_is_supported && is_sparse_object(oid)) { + if (default_trim(fd, oid, iocb, &offset, &len) < 0) { + trim_is_supported = false; + offset = iocb->offset; + len = iocb->length; + } + } + + size = xpwrite(fd, iocb->buf, len, offset); + if (unlikely(size != len)) { + sd_err("failed to write object %"PRIx64", path=%s, offset=%" + PRId32", size=%"PRId32", result=%zd, %m", oid, path, + iocb->offset, iocb->length, size); + ret = err_to_sderr(path, oid, errno); + goto out; + } +out: + close(fd); + return ret; +} + +static int make_stale_dir(const char *path) +{ + char p[PATH_MAX]; + + snprintf(p, PATH_MAX, "%s/.stale", path); + if (xmkdir(p, sd_def_dmode) < 0) { + sd_err("%s failed, %m", p); + return SD_RES_EIO; + } + return SD_RES_SUCCESS; +} + +static int purge_dir(const char *path) +{ + if (purge_directory(path) < 0) + return SD_RES_EIO; + + return SD_RES_SUCCESS; +} + +static int purge_stale_dir(const char *path) +{ + char p[PATH_MAX]; + + snprintf(p, PATH_MAX, "%s/.stale", path); + + if (purge_directory_async(p) < 0) + return SD_RES_EIO; + + return SD_RES_SUCCESS; +} + +int default_cleanup(void) +{ + int ret; + + ret = for_each_obj_path(purge_stale_dir); + if (ret != SD_RES_SUCCESS) + return ret; + + return SD_RES_SUCCESS; +} + +static int init_vdi_state(uint64_t oid, const char *wd, uint32_t epoch) +{ + int ret; + struct sd_inode *inode = xzalloc(SD_INODE_HEADER_SIZE); + struct siocb iocb = { + .epoch = epoch, + .buf = inode, + .length = SD_INODE_HEADER_SIZE, + }; + + ret = default_read(oid, &iocb); + if (ret != SD_RES_SUCCESS) { + sd_err("failed to read inode header %" PRIx64 " %" PRId32 + "wat %s", oid, epoch, wd); + goto out; + } + add_vdi_state_unordered(oid_to_vid(oid), inode->nr_copies, + vdi_is_snapshot(inode), inode->copy_policy, + inode->block_size_shift, inode->parent_vdi_id); + + if (inode->name[0] == '\0') + atomic_set_bit(oid_to_vid(oid), sys->vdi_deleted); + + atomic_set_bit(oid_to_vid(oid), sys->vdi_inuse); + + ret = SD_RES_SUCCESS; +out: + free(inode); + return ret; +} + +static int init_objlist_and_vdi_bitmap(uint64_t oid, const char *wd, + uint32_t epoch, uint8_t ec_index, + struct vnode_info *vinfo, + void *arg) +{ + int ret; + objlist_cache_insert(oid); + + if (is_vdi_obj(oid)) { + sd_debug("found the VDI object %" PRIx64" epoch %"PRIu32 + " at %s", oid, epoch, wd); + ret = init_vdi_state(oid, wd, epoch); + if (ret != SD_RES_SUCCESS) + return ret; + } + return SD_RES_SUCCESS; +} + +int default_init(void) +{ + int ret; + + sd_debug("use plain store driver"); + ret = for_each_obj_path(make_stale_dir); + if (ret != SD_RES_SUCCESS) + return ret; + + for_each_object_in_stale(init_objlist_and_vdi_bitmap, NULL); + + return for_each_object_in_wd(init_objlist_and_vdi_bitmap, true, NULL); +} + +static int default_read_from_path(uint64_t oid, const char *path, + const struct siocb *iocb) +{ + int flags = prepare_iocb(oid, iocb, false), fd, + ret = SD_RES_SUCCESS; + ssize_t size; + + /* + * Make sure oid is in the right place because oid might be misplaced + * in a wrong place, due to 'shutdown/restart with less disks' or any + * bugs. We need call err_to_sderr() to return EIO if disk is broken. + * + * For stale path, get_store_stale_path already does default_exist job. + */ + if (!is_stale_path(path) && !default_exist(oid, iocb->ec_index)) + return err_to_sderr(path, oid, ENOENT); + + fd = open(path, flags); + if (fd < 0) + return err_to_sderr(path, oid, errno); + + size = xpread(fd, iocb->buf, iocb->length, iocb->offset); + if (size < 0) { + sd_err("failed to read object %"PRIx64", path=%s, offset=%" + PRId32", size=%"PRId32", result=%zd, %m", oid, path, + iocb->offset, iocb->length, size); + ret = err_to_sderr(path, oid, errno); + } + close(fd); + return ret; +} + +int default_read(uint64_t oid, const struct siocb *iocb) +{ + int ret; + char path[PATH_MAX]; + + get_store_path(oid, iocb->ec_index, path); + ret = default_read_from_path(oid, path, iocb); + + /* + * If the request is against the older epoch, try to read from + * the stale directory + */ + if (ret == SD_RES_NO_OBJ && iocb->epoch > 0 && + iocb->epoch < sys_epoch()) { + get_store_stale_path(oid, iocb->epoch, iocb->ec_index, path); + ret = default_read_from_path(oid, path, iocb); + } + + return ret; +} + +int default_create_and_write(uint64_t oid, const struct siocb *iocb) +{ + char path[PATH_MAX], tmp_path[PATH_MAX], *dir; + int flags = prepare_iocb(oid, iocb, true); + int ret, fd; + uint32_t len = iocb->length; + uint32_t object_size = 0; + size_t obj_size; + uint64_t offset = iocb->offset; + + sd_debug("%"PRIx64, oid); + get_store_path(oid, iocb->ec_index, path); + get_store_tmp_path(oid, iocb->ec_index, tmp_path); + + if (uatomic_is_true(&sys->use_journal) && + journal_write_store(oid, iocb->buf, iocb->length, + iocb->offset, true) + != SD_RES_SUCCESS) { + sd_err("turn off journaling"); + uatomic_set_false(&sys->use_journal); + flags |= O_SYNC; + sync(); + } + + fd = open(tmp_path, flags, sd_def_fmode); + if (fd < 0) { + if (errno == EEXIST) { + /* + * This happens if node membership changes during object + * creation; while gateway retries a CREATE request, + * recovery process could also recover the object at the + * same time. They should try to write the same date, + * so it is okay to simply return success here. + */ + sd_debug("%s exists", tmp_path); + return SD_RES_SUCCESS; + } + + sd_err("failed to open %s: %m", tmp_path); + return err_to_sderr(path, oid, errno); + } + + obj_size = get_store_objsize(oid); + + trim_zero_blocks(iocb->buf, &offset, &len); + + object_size = get_vdi_object_size(oid_to_vid(oid)); + + if (offset != 0 || len != get_objsize(oid, object_size)) { + if (is_sparse_object(oid)) + ret = xftruncate(fd, obj_size); + else + ret = prealloc(fd, obj_size); + if (ret < 0) { + ret = err_to_sderr(path, oid, errno); + goto out; + } + } + + ret = xpwrite(fd, iocb->buf, len, offset); + if (ret != len) { + sd_err("failed to write object. %m"); + ret = err_to_sderr(path, oid, errno); + goto out; + } + + ret = rename(tmp_path, path); + if (ret < 0) { + sd_err("failed to rename %s to %s: %m", tmp_path, path); + ret = err_to_sderr(path, oid, errno); + goto out; + } + + close(fd); + + if (uatomic_is_true(&sys->use_journal) || sys->nosync == true) { + objlist_cache_insert(oid); + return SD_RES_SUCCESS; + } + + pstrcpy(tmp_path, sizeof(tmp_path), path); + dir = dirname(tmp_path); + fd = open(dir, O_DIRECTORY | O_RDONLY); + if (fd < 0) { + sd_err("failed to open directory %s: %m", dir); + return err_to_sderr(path, oid, errno); + } + + if (fsync(fd) != 0) { + sd_err("failed to write directory %s: %m", dir); + ret = err_to_sderr(path, oid, errno); + close(fd); + if (unlink(path) != 0) + sd_err("failed to unlink %s: %m", path); + return ret; + } + close(fd); + objlist_cache_insert(oid); + return SD_RES_SUCCESS; + +out: + if (unlink(tmp_path) != 0) + sd_err("failed to unlink %s: %m", tmp_path); + close(fd); + return ret; +} + +int default_link(uint64_t oid, uint32_t tgt_epoch) +{ + char path[PATH_MAX], stale_path[PATH_MAX]; + + sd_debug("try link %"PRIx64" from snapshot with epoch %d", oid, + tgt_epoch); + + snprintf(path, PATH_MAX, "%s/%016"PRIx64, md_get_object_dir(oid), oid); + get_store_stale_path(oid, tgt_epoch, 0, stale_path); + + if (link(stale_path, path) < 0) { + /* + * Recovery thread and main thread might try to recover the + * same object and we might get EEXIST in such case. + */ + if (errno == EEXIST) + goto out; + + sd_debug("failed to link from %s to %s, %m", stale_path, path); + return err_to_sderr(path, oid, errno); + } +out: + return SD_RES_SUCCESS; +} + +/* + * For replicated object, if any of the replica belongs to this node, we + * consider it not stale. + * + * For erasure coded object, since every copy is unique and if it migrates to + * other node(index gets changed even it has some other copy belongs to it) + * because of hash ring changes, we consider it stale. + */ +static bool oid_stale(uint64_t oid, int ec_index, struct vnode_info *vinfo) +{ + uint32_t i, nr_copies; + const struct sd_vnode *v; + bool ret = true; + const struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; + + nr_copies = get_obj_copy_number(oid, vinfo->nr_zones); + oid_to_vnodes(oid, &vinfo->vroot, nr_copies, obj_vnodes); + for (i = 0; i < nr_copies; i++) { + v = obj_vnodes[i]; + if (vnode_is_local(v)) { + if (ec_index < SD_MAX_COPIES) { + if (i == ec_index) + ret = false; + } else { + ret = false; + } + break; + } + } + + return ret; +} + +static int move_object_to_stale_dir(uint64_t oid, const char *wd, + uint32_t epoch, uint8_t ec_index, + struct vnode_info *vinfo, void *arg) +{ + char path[PATH_MAX], stale_path[PATH_MAX]; + uint32_t tgt_epoch = *(uint32_t *)arg; + + /* ec_index from md.c is reliable so we can directly use it */ + if (ec_index < SD_MAX_COPIES) { + snprintf(path, PATH_MAX, "%s/%016"PRIx64"_%d", + md_get_object_dir(oid), oid, ec_index); + snprintf(stale_path, PATH_MAX, + "%s/.stale/%016"PRIx64"_%d.%"PRIu32, + md_get_object_dir(oid), oid, ec_index, tgt_epoch); + } else { + snprintf(path, PATH_MAX, "%s/%016" PRIx64, + md_get_object_dir(oid), oid); + snprintf(stale_path, PATH_MAX, "%s/.stale/%016"PRIx64".%"PRIu32, + md_get_object_dir(oid), oid, tgt_epoch); + } + + if (unlikely(rename(path, stale_path)) < 0) { + sd_err("failed to move stale object %" PRIX64 " to %s, %m", oid, + path); + return SD_RES_EIO; + } + + sd_debug("moved object %"PRIx64, oid); + return SD_RES_SUCCESS; +} + +static int check_stale_objects(uint64_t oid, const char *wd, uint32_t epoch, + uint8_t ec_index, struct vnode_info *vinfo, + void *arg) +{ + if (oid_stale(oid, ec_index, vinfo)) + return move_object_to_stale_dir(oid, wd, 0, ec_index, + NULL, arg); + + return SD_RES_SUCCESS; +} + +int default_update_epoch(uint32_t epoch) +{ + sd_assert(epoch); + return for_each_object_in_wd(check_stale_objects, false, &epoch); +} + +int default_format(void) +{ + unsigned ret; + + sd_debug("try get a clean store"); + ret = for_each_obj_path(purge_dir); + if (ret != SD_RES_SUCCESS) + return ret; + + if (sys->enable_object_cache) + object_cache_format(); + + return SD_RES_SUCCESS; +} + +int default_remove_object(uint64_t oid, uint8_t ec_index) +{ + char path[PATH_MAX]; + + if (uatomic_is_true(&sys->use_journal)) + journal_remove_object(oid); + + get_store_path(oid, ec_index, path); + + if (unlink(path) < 0) { + if (errno == ENOENT) + return SD_RES_NO_OBJ; + + sd_err("failed, %s, %m", path); + return SD_RES_EIO; + } + + return SD_RES_SUCCESS; +} + +#define SHA1NAME "user.obj.sha1" + +static int get_object_sha1(const char *path, uint8_t *sha1) +{ + if (getxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE) + != SHA1_DIGEST_SIZE) { + if (errno == ENODATA) + sd_debug("sha1 is not cached yet, %s", path); + else + sd_err("fail to get xattr, %s", path); + return -1; + } + + return 0; +} + +static int set_object_sha1(const char *path, const uint8_t *sha1) +{ + int ret; + + ret = setxattr(path, SHA1NAME, sha1, SHA1_DIGEST_SIZE, 0); + if (ret < 0) + sd_err("fail to set sha1, %s", path); + + return ret; +} + +static int get_object_path(uint64_t oid, uint32_t epoch, char *path, + size_t size) +{ + if (default_exist(oid, 0)) { + snprintf(path, PATH_MAX, "%s/%016"PRIx64, + md_get_object_dir(oid), oid); + } else { + get_store_stale_path(oid, epoch, 0, path); + if (access(path, F_OK) < 0) { + if (errno == ENOENT) + return SD_RES_NO_OBJ; + return SD_RES_EIO; + } + + } + + return SD_RES_SUCCESS; +} + +int default_get_hash(uint64_t oid, uint32_t epoch, uint8_t *sha1) +{ + int ret; + void *buf; + struct siocb iocb = {}; + uint32_t length; + bool is_readonly_obj = oid_is_readonly(oid); + char path[PATH_MAX]; + + ret = get_object_path(oid, epoch, path, sizeof(path)); + if (ret != SD_RES_SUCCESS) + return ret; + + if (is_readonly_obj) { + if (get_object_sha1(path, sha1) == 0) { + sd_debug("use cached sha1 digest %s", + sha1_to_hex(sha1)); + return SD_RES_SUCCESS; + } + } + + length = get_store_objsize(oid); + buf = valloc(length); + if (buf == NULL) + return SD_RES_NO_MEM; + + iocb.epoch = epoch; + iocb.buf = buf; + iocb.length = length; + + ret = default_read_from_path(oid, path, &iocb); + if (ret != SD_RES_SUCCESS) { + free(buf); + return ret; + } + + get_buffer_sha1(buf, length, sha1); + free(buf); + + sd_debug("the message digest of %"PRIx64" at epoch %d is %s", oid, + epoch, sha1_to_hex(sha1)); + + if (is_readonly_obj) + set_object_sha1(path, sha1); + + return ret; +} + +int default_purge_obj(void) +{ + uint32_t tgt_epoch = get_latest_epoch(); + + return for_each_object_in_wd(move_object_to_stale_dir, true, + &tgt_epoch); +} + +static struct store_driver plain_store = { + .name = "plain", + .init = default_init, + .exist = default_exist, + .create_and_write = default_create_and_write, + .write = default_write, + .read = default_read, + .link = default_link, + .update_epoch = default_update_epoch, + .cleanup = default_cleanup, + .format = default_format, + .remove_object = default_remove_object, + .get_hash = default_get_hash, + .purge_obj = default_purge_obj, +}; + +add_store_driver(plain_store); -- 1.7.1 -- sheepdog mailing list sheepdog@lists.wpkg.org https://lists.wpkg.org/mailman/listinfo/sheepdog