Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html) The routine is: 1. create a seq-ephemeral znode in lock directory (use lock-id as dir name) 2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t 3. if owner of the lock releas it (or the owner is kileed by accident), zookeeper will trigger zk_watch() which will wake up all waiting thread to compete new owner of the lock
We add ->local_lock for zk_mutex to avoid many threads in one sheepdog daemon to create too many files in lock directory. Signed-off-by: Robin Dong <san...@taobao.com> --- sheep/Makefile.am | 2 +- sheep/http/http.c | 37 ++++++++- sheep/http/lock.c | 218 ++++++++++++++++++++++++++++++++++++++++++++++++++++ sheep/http/lock.h | 35 ++++++++ sheep/sheep.c | 2 +- sheep/sheep_priv.h | 4 +- 6 files changed, 291 insertions(+), 7 deletions(-) create mode 100644 sheep/http/lock.c create mode 100644 sheep/http/lock.h diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 552e86a..578e183 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -30,7 +30,7 @@ sheep_SOURCES = sheep.c group.c request.c gateway.c store.c vdi.c \ plain_store.c config.c migrate.c md.c if BUILD_HTTP -sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c +sheep_SOURCES += http/http.c http/kv.c http/s3.c http/swift.c http/lock.c endif if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c diff --git a/sheep/http/http.c b/sheep/http/http.c index 04ef364..c1fc5a8 100644 --- a/sheep/http/http.c +++ b/sheep/http/http.c @@ -14,6 +14,7 @@ /* This files implement RESTful interface to sheepdog storage via fastcgi */ #include "http.h" +#include "lock.h" #include "sheep_priv.h" #include "option.h" @@ -342,11 +343,12 @@ static struct option_parser http_opt_parsers[] = { { NULL, NULL }, }; -int http_init(const char *options) +int http_init(const char *options, const char *cdrv_option) { pthread_t t; - int err; - char *s, address[HOST_NAME_MAX + 8]; + int err, timeout = LOCKS_TIMEOUT; + char *s, address[HOST_NAME_MAX + 8], *hosts, *to; + const char *p = cdrv_option; s = strdup(options); if (s == NULL) { @@ -354,6 +356,35 @@ int http_init(const char *options) return -1; } + if (!cdrv_option) { + sd_err("You must specify zookeeper servers for zk_locks_init"); + return -1; + } + + to = strstr(cdrv_option, "="); + if (to) { + if (sscanf(++to, "%u", &timeout) != 1) { + sd_err("Invalid paramter for timeout"); + return -1; + } + p = strstr(cdrv_option, "timeout"); + if (!p) { + sd_err("Invalid parameter for timeout"); + return -1; + } + p--; + } + hosts = strndup(cdrv_option, p - cdrv_option); + if (hosts == NULL) { + sd_emerg("OOM"); + return -1; + } + + if (zk_locks_init(hosts, timeout)) { + sd_debug("Failed to init zookeeper locks"); + return -1; + } + if (option_parse(s, ",", http_opt_parsers) < 0) return -1; diff --git a/sheep/http/lock.c b/sheep/http/lock.c new file mode 100644 index 0000000..790c6c1 --- /dev/null +++ b/sheep/http/lock.c @@ -0,0 +1,218 @@ +#include <zookeeper/zookeeper.h> +#include "lock.h" + +#define MAX_MUTEX_NR 4096 +#define LOCK_ZNODE "/sheepdog_lock" +#define WAIT_TIME 1 /* seconds */ + +/* iterate child znodes */ +#define FOR_EACH_ZNODE(parent, path, strs) \ + for ((strs)->data += (strs)->count; \ + (strs)->count-- ? \ + snprintf(path, sizeof(path), "%s/%s", parent, \ + *--(strs)->data) : (free((strs)->data), 0); \ + free(*(strs)->data)) + +static zhandle_t *zhandle; +/* + * when a seq file is deleted, we nned to wake up + * the lock who wait on the lock-id + */ +static struct zk_mutex **mutex_array; + +/* Wait a while when create, delete or get_children fail on zookeeper */ +static void zk_wait(void) +{ + sleep(WAIT_TIME); +} + +/* zookeeper will call this function when same events happends */ +static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, + void *ctx) +{ + char str[MAX_NODE_STR_LEN]; + uint32_t lock_id; + + sd_debug("watch event type: %d state: %d", type, state); + + if (type == ZOO_DELETED_EVENT) { + sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str); + if (mutex_array && mutex_array[lock_id]) { + pthread_mutex_unlock(&(mutex_array[lock_id]->wait)); + sd_debug("release lock %u %s", lock_id, str); + } + } +} + +/* get smallest seq file in a zookeeper directory */ +static int zk_get_lowest_seq(const char *parent, char *lowest_seq_path, + int path_len) +{ + char path[MAX_NODE_STR_LEN], *p, *tmp; + struct String_vector strs; + int rc, lowest_seq = INT_MAX , seq; + + while (true) { + rc = zoo_get_children(zhandle, parent, 1, &strs); + if (rc == ZOK) + break; + sd_err("failed to get children %s, %s", parent, zerror(rc)); + zk_wait(); + } + FOR_EACH_ZNODE(parent, path, &strs) { + p = strrchr(path, '/'); + seq = strtol(++p, &tmp, 10); + if (seq < lowest_seq) + lowest_seq = seq; + } + snprintf(lowest_seq_path, path_len, "%s/%010"PRId32, + parent, lowest_seq); + sd_debug("get lowest path: %s", lowest_seq_path); + return 0; +} + +int zk_locks_init(const char *hosts, int timeout) +{ + int rc; + + sd_debug("hosts %s timeout %d", hosts, timeout); + mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR); + + zhandle = zookeeper_init(hosts, zk_watcher, timeout, NULL, NULL, 0); + if (!zhandle) { + sd_err("failed to connect to zk server for mutex"); + return -1; + } + + rc = zoo_create(zhandle, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE, + 0, NULL, 0); + if (rc != ZOK && rc != ZNODEEXISTS) { + sd_err("failed to create %s %s", LOCK_ZNODE, zerror(rc)); + return -1; + } + + return 0; +} + +void zk_locks_close(void) +{ + zookeeper_close(zhandle); + free(mutex_array); +} + +static int zk_create_lock_path(struct zk_mutex *mutex) +{ + char path[MAX_NODE_STR_LEN]; + int rc; + + snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id); + rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE, + 0, NULL, 0); + if (rc != ZOK && rc != ZNODEEXISTS) { + sd_err("failed to create path: %s, %s", path, zerror(rc)); + return rc; + } + + return 0; +} + +int zk_init_mutex(struct zk_mutex *mutex, uint32_t id) +{ + int rc; + + if (id > MAX_MUTEX_NR) { + sd_err("lock-id is too large!"); + rc = -1; + goto err; + } + + mutex_array[id] = mutex; + mutex->id = id; + rc = zk_create_lock_path(mutex); + if (rc) + goto err; + + rc = pthread_mutex_init(&mutex->wait, NULL); + if (rc) { + sd_err("failed to init mutex->wait"); + goto err; + } + + rc = pthread_mutex_init(&mutex->local_lock, NULL); + if (rc) { + sd_err("failed to init mutex->local_lock"); + goto err; + } + + return 0; +err: + mutex_array[id] = NULL; + return rc; +} + +void zk_lock_mutex(struct zk_mutex *mutex) +{ + int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL; + int rc; + char *my_path; + char parent[MAX_NODE_STR_LEN]; + char lowest_seq_path[MAX_NODE_STR_LEN]; + + /* + * if many threads use locks with same id, we should use + * ->local_lock to avoid the only zookeeper handler to + * create many seq-ephemeral files. + */ + pthread_mutex_lock(&mutex->local_lock); + + my_path = mutex->ephemeral_path; + + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id); + while (true) { + rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE, + flags, my_path, MAX_NODE_STR_LEN); + if (rc == ZOK) + break; + sd_err("failed to create path:%s, %s", my_path, zerror(rc)); + zk_wait(); + } + sd_debug("create path %s success", my_path); + + /* create node ok now */ + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id); + while (true) { + zk_get_lowest_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN); + + /* I got the lock */ + if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) { + sd_debug("I am master now. %s", lowest_seq_path); + return; + } + + rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL); + if (rc == ZOK) { + sd_debug("call zoo_exits success %s", lowest_seq_path); + pthread_mutex_lock(&mutex->wait); + } else { + sd_err("failed to call zoo_exists %s", zerror(rc)); + if (rc != ZNONODE) + zk_wait(); + } + } +} + +void zk_unlock_mutex(struct zk_mutex *mutex) +{ + int rc; + + while (true) { + rc = zoo_delete(zhandle, mutex->ephemeral_path, -1); + if (rc == ZOK || rc == ZNONODE) + break; + sd_err("failed to delete %s %s", mutex->ephemeral_path, + zerror(rc)); + zk_wait(); + } + + pthread_mutex_unlock(&mutex->local_lock); +} diff --git a/sheep/http/lock.h b/sheep/http/lock.h new file mode 100644 index 0000000..2dc47f6 --- /dev/null +++ b/sheep/http/lock.h @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2013 Robin Dong <san...@taobao.com> + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#ifndef __SHEEP_HTTP_LOCK_H__ +#define __SHEEP_HTTP_LOCK_H__ + +#define MAX_LOCK_NAME 64 +#define LOCKS_TIMEOUT 30000 /* millisecond */ + +#include <stdint.h> +#include <pthread.h> +#include "sheep.h" + +struct zk_mutex { + uint32_t id; /* id of this mutex */ + pthread_mutex_t wait; + pthread_mutex_t local_lock; + char ephemeral_path[MAX_NODE_STR_LEN]; +}; + +int zk_locks_init(const char *hosts, int timeout); +void zk_locks_close(void); + +int zk_init_mutex(struct zk_mutex *mutex, uint32_t id); +void zk_lock_mutex(struct zk_mutex *mutex); +void zk_unlock_mutex(struct zk_mutex *mutex); + +#endif /* __SHEEP_HTTP_LOCK_H__ */ diff --git a/sheep/sheep.c b/sheep/sheep.c index 9d9afa0..f465fa6 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -820,7 +820,7 @@ int main(int argc, char **argv) if (ret) exit(1); - if (http_options && http_init(http_options) != 0) + if (http_options && http_init(http_options, sys->cdrv_option) != 0) exit(1); if (pid_file && (create_pidfile(pid_file) != 0)) { diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h index d333573..8887497 100644 --- a/sheep/sheep_priv.h +++ b/sheep/sheep_priv.h @@ -472,9 +472,9 @@ uint64_t md_get_size(uint64_t *used); /* http.c */ #ifdef HAVE_HTTP -int http_init(const char *options); +int http_init(const char *options, const char *cdrv_option); #else -static inline int http_init(const char *options) +static inline int http_init(const char *options, const char *cdev_option) { sd_notice("http service is not complied"); return 0; -- 1.7.1 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog