Implement the distributed lock by zookeeper (refer: 
http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
        1. create a seq-ephemeral znode in lock directory (use lock-id as dir 
name)
        2. get smallest file path as owner of the lock; the other thread wait 
on a  pthread_mutex_t
        3. if owner of the lock releas it (or the owner is kileed by accident), 
zookeeper will
           trigger zk_watch() which will wake up all waiting thread to compete 
new owner of the lock

We add ->local_lock for zk_mutex to avoid many threads in one sheepdog daemon 
to create too many files
in lock directory.

Signed-off-by: Robin Dong <san...@taobao.com>
---
 sheep/Makefile.am  |    2 +-
 sheep/http/http.c  |   37 ++++++++-
 sheep/http/lock.c  |  218 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 sheep/http/lock.h  |   35 ++++++++
 sheep/sheep.c      |    2 +-
 sheep/sheep_priv.h |    4 +-
 6 files changed, 291 insertions(+), 7 deletions(-)
 create mode 100644 sheep/http/lock.c
 create mode 100644 sheep/http/lock.h

diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 552e86a..578e183 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -30,7 +30,7 @@ sheep_SOURCES         = sheep.c group.c request.c gateway.c 
store.c vdi.c \
                          plain_store.c config.c migrate.c md.c
 
 if BUILD_HTTP
-sheep_SOURCES          += http/http.c http/kv.c http/s3.c http/swift.c
+sheep_SOURCES          += http/http.c http/kv.c http/s3.c http/swift.c 
http/lock.c
 endif
 if BUILD_COROSYNC
 sheep_SOURCES          += cluster/corosync.c
diff --git a/sheep/http/http.c b/sheep/http/http.c
index 04ef364..c1fc5a8 100644
--- a/sheep/http/http.c
+++ b/sheep/http/http.c
@@ -14,6 +14,7 @@
 /* This files implement RESTful interface to sheepdog storage via fastcgi */
 
 #include "http.h"
+#include "lock.h"
 #include "sheep_priv.h"
 #include "option.h"
 
@@ -342,11 +343,12 @@ static struct option_parser http_opt_parsers[] = {
        { NULL, NULL },
 };
 
-int http_init(const char *options)
+int http_init(const char *options, const char *cdrv_option)
 {
        pthread_t t;
-       int err;
-       char *s, address[HOST_NAME_MAX + 8];
+       int err, timeout = LOCKS_TIMEOUT;
+       char *s, address[HOST_NAME_MAX + 8], *hosts, *to;
+       const char *p = cdrv_option;
 
        s = strdup(options);
        if (s == NULL) {
@@ -354,6 +356,35 @@ int http_init(const char *options)
                return -1;
        }
 
+       if (!cdrv_option) {
+               sd_err("You must specify zookeeper servers for zk_locks_init");
+               return -1;
+       }
+
+       to = strstr(cdrv_option, "=");
+       if (to) {
+               if (sscanf(++to, "%u", &timeout) != 1) {
+                       sd_err("Invalid paramter for timeout");
+                       return -1;
+               }
+               p = strstr(cdrv_option, "timeout");
+               if (!p) {
+                       sd_err("Invalid parameter for timeout");
+                       return -1;
+               }
+               p--;
+       }
+       hosts = strndup(cdrv_option, p - cdrv_option);
+       if (hosts == NULL) {
+               sd_emerg("OOM");
+               return -1;
+       }
+
+       if (zk_locks_init(hosts, timeout)) {
+               sd_debug("Failed to init zookeeper locks");
+               return -1;
+       }
+
        if (option_parse(s, ",", http_opt_parsers) < 0)
                return -1;
 
diff --git a/sheep/http/lock.c b/sheep/http/lock.c
new file mode 100644
index 0000000..790c6c1
--- /dev/null
+++ b/sheep/http/lock.c
@@ -0,0 +1,218 @@
+#include <zookeeper/zookeeper.h>
+#include "lock.h"
+
+#define MAX_MUTEX_NR   4096
+#define LOCK_ZNODE     "/sheepdog_lock"
+#define WAIT_TIME      1                       /* seconds */
+
+/* iterate child znodes */
+#define FOR_EACH_ZNODE(parent, path, strs)                            \
+       for ((strs)->data += (strs)->count;                            \
+            (strs)->count-- ?                                         \
+                    snprintf(path, sizeof(path), "%s/%s", parent,     \
+                             *--(strs)->data) : (free((strs)->data), 0); \
+            free(*(strs)->data))
+
+static zhandle_t *zhandle;
+/*
+ * when a seq file is deleted, we nned to wake up
+ * the lock who wait on the lock-id
+ */
+static struct zk_mutex **mutex_array;
+
+/* Wait a while when create, delete or get_children fail on zookeeper */
+static void zk_wait(void)
+{
+       sleep(WAIT_TIME);
+}
+
+/* zookeeper will call this function when same events happends */
+static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
+                      void *ctx)
+{
+       char str[MAX_NODE_STR_LEN];
+       uint32_t lock_id;
+
+       sd_debug("watch event type: %d state: %d", type, state);
+
+       if (type == ZOO_DELETED_EVENT) {
+               sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str);
+               if (mutex_array && mutex_array[lock_id]) {
+                       pthread_mutex_unlock(&(mutex_array[lock_id]->wait));
+                       sd_debug("release lock %u %s", lock_id, str);
+               }
+       }
+}
+
+/* get smallest seq file in a zookeeper directory */
+static int zk_get_lowest_seq(const char *parent, char *lowest_seq_path,
+                           int path_len)
+{
+       char path[MAX_NODE_STR_LEN], *p, *tmp;
+       struct String_vector strs;
+       int rc, lowest_seq = INT_MAX , seq;
+
+       while (true) {
+               rc = zoo_get_children(zhandle, parent, 1, &strs);
+               if (rc == ZOK)
+                       break;
+               sd_err("failed to get children %s, %s", parent, zerror(rc));
+               zk_wait();
+       }
+       FOR_EACH_ZNODE(parent, path, &strs) {
+               p = strrchr(path, '/');
+               seq = strtol(++p, &tmp, 10);
+               if (seq < lowest_seq)
+                       lowest_seq = seq;
+       }
+       snprintf(lowest_seq_path, path_len, "%s/%010"PRId32,
+                parent, lowest_seq);
+       sd_debug("get lowest path: %s", lowest_seq_path);
+       return 0;
+}
+
+int zk_locks_init(const char *hosts, int timeout)
+{
+       int rc;
+
+       sd_debug("hosts %s timeout %d", hosts, timeout);
+       mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR);
+
+       zhandle = zookeeper_init(hosts, zk_watcher, timeout, NULL, NULL, 0);
+       if (!zhandle) {
+               sd_err("failed to connect to zk server for mutex");
+               return -1;
+       }
+
+       rc = zoo_create(zhandle, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+                       0, NULL, 0);
+       if (rc != ZOK && rc != ZNODEEXISTS) {
+               sd_err("failed to create %s %s", LOCK_ZNODE, zerror(rc));
+               return -1;
+       }
+
+       return 0;
+}
+
+void zk_locks_close(void)
+{
+       zookeeper_close(zhandle);
+       free(mutex_array);
+}
+
+static int zk_create_lock_path(struct zk_mutex *mutex)
+{
+       char path[MAX_NODE_STR_LEN];
+       int rc;
+
+       snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+       rc = zoo_create(zhandle, path, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+                       0, NULL, 0);
+       if (rc != ZOK && rc != ZNODEEXISTS) {
+               sd_err("failed to create path: %s, %s", path, zerror(rc));
+               return rc;
+       }
+
+       return 0;
+}
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id)
+{
+       int rc;
+
+       if (id > MAX_MUTEX_NR) {
+               sd_err("lock-id is too large!");
+               rc = -1;
+               goto err;
+       }
+
+       mutex_array[id] = mutex;
+       mutex->id = id;
+       rc = zk_create_lock_path(mutex);
+       if (rc)
+               goto err;
+
+       rc = pthread_mutex_init(&mutex->wait, NULL);
+       if (rc) {
+               sd_err("failed to init mutex->wait");
+               goto err;
+       }
+
+       rc = pthread_mutex_init(&mutex->local_lock, NULL);
+       if (rc) {
+               sd_err("failed to init mutex->local_lock");
+               goto err;
+       }
+
+       return 0;
+err:
+       mutex_array[id] = NULL;
+       return rc;
+}
+
+void zk_lock_mutex(struct zk_mutex *mutex)
+{
+       int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+       int rc;
+       char *my_path;
+       char parent[MAX_NODE_STR_LEN];
+       char lowest_seq_path[MAX_NODE_STR_LEN];
+
+       /*
+        * if many threads use locks with same id, we should use
+        * ->local_lock to avoid the only zookeeper handler to
+        * create many seq-ephemeral files.
+        */
+       pthread_mutex_lock(&mutex->local_lock);
+
+       my_path = mutex->ephemeral_path;
+
+       snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id);
+       while (true) {
+               rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+                               flags, my_path, MAX_NODE_STR_LEN);
+               if (rc == ZOK)
+                       break;
+               sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+               zk_wait();
+       }
+       sd_debug("create path %s success", my_path);
+
+       /* create node ok now */
+       snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id);
+       while (true) {
+               zk_get_lowest_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN);
+
+               /* I got the lock */
+               if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+                       sd_debug("I am master now. %s", lowest_seq_path);
+                       return;
+               }
+
+               rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+               if (rc == ZOK) {
+                       sd_debug("call zoo_exits success %s", lowest_seq_path);
+                       pthread_mutex_lock(&mutex->wait);
+               } else {
+                       sd_err("failed to call zoo_exists %s", zerror(rc));
+                       if (rc != ZNONODE)
+                               zk_wait();
+               }
+       }
+}
+
+void zk_unlock_mutex(struct zk_mutex *mutex)
+{
+       int rc;
+
+       while (true) {
+               rc = zoo_delete(zhandle, mutex->ephemeral_path, -1);
+               if (rc == ZOK || rc == ZNONODE)
+                       break;
+               sd_err("failed to delete %s %s", mutex->ephemeral_path,
+                      zerror(rc));
+               zk_wait();
+       }
+
+       pthread_mutex_unlock(&mutex->local_lock);
+}
diff --git a/sheep/http/lock.h b/sheep/http/lock.h
new file mode 100644
index 0000000..2dc47f6
--- /dev/null
+++ b/sheep/http/lock.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2013 Robin Dong <san...@taobao.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#ifndef __SHEEP_HTTP_LOCK_H__
+#define __SHEEP_HTTP_LOCK_H__
+
+#define MAX_LOCK_NAME 64
+#define LOCKS_TIMEOUT 30000            /* millisecond */
+
+#include <stdint.h>
+#include <pthread.h>
+#include "sheep.h"
+
+struct zk_mutex {
+       uint32_t id; /* id of this mutex */
+       pthread_mutex_t wait;
+       pthread_mutex_t local_lock;
+       char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
+int zk_locks_init(const char *hosts, int timeout);
+void zk_locks_close(void);
+
+int zk_init_mutex(struct zk_mutex *mutex, uint32_t id);
+void zk_lock_mutex(struct zk_mutex *mutex);
+void zk_unlock_mutex(struct zk_mutex *mutex);
+
+#endif /* __SHEEP_HTTP_LOCK_H__ */
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 9d9afa0..f465fa6 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -820,7 +820,7 @@ int main(int argc, char **argv)
        if (ret)
                exit(1);
 
-       if (http_options && http_init(http_options) != 0)
+       if (http_options && http_init(http_options, sys->cdrv_option) != 0)
                exit(1);
 
        if (pid_file && (create_pidfile(pid_file) != 0)) {
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index d333573..8887497 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -472,9 +472,9 @@ uint64_t md_get_size(uint64_t *used);
 
 /* http.c */
 #ifdef HAVE_HTTP
-int http_init(const char *options);
+int http_init(const char *options, const char *cdrv_option);
 #else
-static inline int http_init(const char *options)
+static inline int http_init(const char *options, const char *cdev_option)
 {
        sd_notice("http service is not complied");
        return 0;
-- 
1.7.1

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to