From: Robin Dong <san...@taobao.com>

Using only lock_id as parameter of lock interface and reduce function to 
onlytwo:

        lock(uint64_t lock_id)
        unlock(uint64_t lock_id)

The ->lock will create and acquire the distributed lock and ->unlock will 
release it.
It is more convient for end user.
We use existed hash table to store the
"<lock_id, cluster_lock*>" and release all the resources of a distributed lock 
after
all threads call "->unlock".

v2 --> v3:
        1. loop to retry deleting znode in zk_unlock().

Signed-off-by: Robin Dong <san...@taobao.com>
---
 include/sheep.h           |  12 ----
 sheep/cluster.h           |  31 ++++----
 sheep/cluster/corosync.c  |  10 +--
 sheep/cluster/local.c     |  10 +--
 sheep/cluster/zookeeper.c | 176 +++++++++++++++++++++++++++++-----------------
 5 files changed, 130 insertions(+), 109 deletions(-)

diff --git a/include/sheep.h b/include/sheep.h
index e5726e8..293e057 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -255,18 +255,6 @@ static inline void nodes_to_buffer(struct rb_root *nroot, 
void *buffer)
 
 #define MAX_NODE_STR_LEN 256
 
-/* structure for distributed lock */
-struct cluster_lock {
-       struct hlist_node hnode;
-       /* id is passed by users to represent a lock handle */
-       uint64_t id;
-       /* wait for the release of id by other lock owner */
-       pthread_mutex_t wait_release;
-       /* lock for different threads of the same node on the same id */
-       pthread_mutex_t id_lock;
-       char lock_path[MAX_NODE_STR_LEN];
-};
-
 static inline const char *node_to_str(const struct sd_node *id)
 {
        static __thread char str[MAX_NODE_STR_LEN];
diff --git a/sheep/cluster.h b/sheep/cluster.h
index 08df91c..0693633 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -109,33 +109,32 @@ struct cluster_driver {
        int (*unblock)(void *msg, size_t msg_len);
 
        /*
-        * Init a distributed mutually exclusive lock to avoid race condition
-        * when the whole sheepdog cluster process one exclusive resource.
+        * Acquire the distributed lock.
         *
-        * This function use 'lock_id' as the id of this distributed lock.
-        * A thread can create many locks in one sheep daemon.
+        * Create a distributed mutually exclusive lock to avoid race condition
+        * and try to acquire the lock.
         *
-        * Returns SD_RES_XXX
-        */
-       int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
-
-       /*
-        * Acquire the distributed lock.
+        * This function use 'lock_id' as the id of this distributed lock.
+        * A thread can acquire many locks with different lock_id in one
+        * sheep daemon.
         *
-        * The cluster_lock referenced by 'lock' shall be locked by calling
-        * cluster->lock(). If the cluster_lock is already locked, the calling
-        * thread shall block until the cluster_lock becomes available.
+        * The cluster lock referenced by 'lock' shall be locked by calling
+        * cluster->lock(). If the cluster lock is already locked, the calling
+        * thread shall block until the cluster lock becomes available.
         */
-       void (*lock)(struct cluster_lock *lock);
+       void (*lock)(uint64_t lock_id);
 
        /*
         * Release the distributed lock.
         *
-        * If the owner of the cluster_lock release it (or the owner is
+        * If the owner of the cluster lock release it (or the owner is
         * killed by accident), zookeeper will trigger zk_watch() which will
         * wake up all waiting threads to compete new owner of the lock
+        *
+        * After all thread unlock, all the resource of this distributed lock
+        * will be released.
         */
-       void (*unlock)(struct cluster_lock *lock);
+       void (*unlock)(uint64_t lock_id);
 
        /*
         * Update the specific node in the driver's private copy of nodes
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 19fc73c..6974dd9 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,16 +774,11 @@ again:
        return 0;
 }
 
-static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
-{
-       return -1;
-}
-
-static void corosync_lock(struct cluster_lock *cluster_lock)
+static void corosync_lock(uint64_t lock_id)
 {
 }
 
-static void corosync_unlock(struct cluster_lock *cluster_lock)
+static void corosync_unlock(uint64_t lock_id)
 {
 }
 
@@ -807,7 +802,6 @@ static struct cluster_driver cdrv_corosync = {
        .notify         = corosync_notify,
        .block          = corosync_block,
        .unblock        = corosync_unblock,
-       .init_lock      = corosync_init_lock,
        .lock           = corosync_lock,
        .unlock         = corosync_unlock,
        .update_node    = corosync_update_node,
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 4c4d83b..6d0af68 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,16 +547,11 @@ static int local_init(const char *option)
        return 0;
 }
 
-static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id)
+static void local_lock(uint64_t lock_id)
 {
-       return -1;
 }
 
-static void local_lock(struct cluster_lock *cluster_lock)
-{
-}
-
-static void local_unlock(struct cluster_lock *cluster_lock)
+static void local_unlock(uint64_t lock_id)
 {
 }
 
@@ -579,7 +574,6 @@ static struct cluster_driver cdrv_local = {
        .notify         = local_notify,
        .block          = local_block,
        .unblock        = local_unblock,
-       .init_lock      = local_init_lock,
        .lock           = local_lock,
        .unlock         = local_unlock,
        .update_node    = local_update_node,
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 9e4ffa5..52d0daf 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -32,6 +32,23 @@
 #define MASTER_ZNONE BASE_ZNODE "/master"
 #define LOCK_ZNODE BASE_ZNODE "/lock"
 
+static inline ZOOAPI int zk_init_node(const char *path);
+static inline ZOOAPI int zk_delete_node(const char *path, int version);
+
+/* structure for distributed lock */
+struct cluster_lock {
+       struct hlist_node hnode;
+       /* id is passed by users to represent a lock handle */
+       uint64_t id;
+       /* referenced by different threads in one sheepdog daemon */
+       uint64_t ref;
+       /* wait for the release of id by other lock owner */
+       pthread_mutex_t wait_wakeup;
+       /* lock for different threads of the same node on the same id */
+       pthread_mutex_t id_lock;
+       char lock_path[MAX_NODE_STR_LEN];
+};
+
 #define WAIT_TIME      1               /* second */
 
 #define HASH_BUCKET_NR 1021
@@ -43,49 +60,118 @@ static pthread_mutex_t table_locks[HASH_BUCKET_NR];
  * cluster_lock->id_lock so we don't need to add lock here
  */
 
-static void lock_table_del(uint64_t lock_id)
+static int lock_table_lookup_wakeup(uint64_t lock_id)
 {
        uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+       int res = -1;
        struct hlist_node *iter;
        struct cluster_lock *lock;
 
        pthread_mutex_lock(table_locks + hval);
        hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
                if (lock->id == lock_id) {
-                       hlist_del(iter);
+                       pthread_mutex_unlock(&lock->wait_wakeup);
+                       res = 0;
                        break;
                }
        }
        pthread_mutex_unlock(table_locks + hval);
+       return res;
 }
 
-static void lock_table_add(uint64_t lock_id,
-                          struct cluster_lock *cluster_lock)
+static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id)
 {
        uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
+       int rc;
+       struct hlist_node *iter;
+       struct cluster_lock *lock, *ret_lock = NULL;
+       char path[MAX_NODE_STR_LEN];
 
        pthread_mutex_lock(table_locks + hval);
-       hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval);
+       hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
+               if (lock->id == lock_id) {
+                       ret_lock = lock;
+                       ret_lock->ref++;
+                       break;
+               }
+       }
+
+       if (!ret_lock) {
+               /* create lock and add it to hash table */
+               ret_lock = xzalloc(sizeof(*ret_lock));
+               ret_lock->id = lock_id;
+               ret_lock->ref = 1;
+               snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
+                        ret_lock->id);
+               rc = zk_init_node(path);
+               if (rc)
+                       panic("Failed to init node %s", path);
+
+               rc = pthread_mutex_init(&ret_lock->wait_wakeup, NULL);
+               if (rc)
+                       panic("failed to init cluster_lock->wait_wakeup");
+
+               rc = pthread_mutex_init(&ret_lock->id_lock, NULL);
+               if (rc)
+                       panic("failed to init cluster_lock->id_lock");
+
+               hlist_add_head(&(ret_lock->hnode), cluster_locks_table + hval);
+       }
        pthread_mutex_unlock(table_locks + hval);
+
+       /*
+        * if many threads use locks with same id, we should use
+        * ->id_lock to avoid the only zookeeper handler to
+        * create many seq-ephemeral files.
+        */
+       pthread_mutex_lock(&ret_lock->id_lock);
+       return ret_lock;
 }
 
-static int lock_table_lookup_release(uint64_t lock_id)
+static void lock_table_lookup_release(uint64_t lock_id)
 {
        uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR;
-       int res = -1;
+       int rc;
        struct hlist_node *iter;
        struct cluster_lock *lock;
+       char path[MAX_NODE_STR_LEN];
 
        pthread_mutex_lock(table_locks + hval);
        hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) {
-               if (lock->id == lock_id) {
-                       pthread_mutex_unlock(&lock->wait_release);
-                       res = 0;
-                       break;
+               if (lock->id != lock_id)
+                       continue;
+               while (true) {
+                       rc = zk_delete_node(lock->lock_path, -1);
+                       if (rc == ZOK)
+                               break;
+                       sd_err("Failed to delete path: %s %s", lock->lock_path,
+                              zerror(rc));
+                       zk_wait();
+               }
+               lock->lock_path[0] = '\0';
+               pthread_mutex_unlock(&lock->id_lock);
+               lock->ref--;
+               if (!lock->ref) {
+                       hlist_del(iter);
+                       /* free all resource used by this lock */
+                       pthread_mutex_destroy(&lock->id_lock);
+                       pthread_mutex_destroy(&lock->wait_wakeup);
+                       snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu",
+                                lock->id);
+                       /*
+                        * If deletion of directory 'lock_id' fail, we only get
+                        * a * empty directory in zookeeper. That's unharmful
+                        * so we don't need to retry it.
+                        */
+                       rc = zk_delete_node(path, -1);
+                       if (rc != ZOK)
+                               sd_err("Failed to delete path: %s %s", path,
+                                     zerror(rc));
+                       free(lock);
                }
+               break;
        }
        pthread_mutex_unlock(table_locks + hval);
-       return res;
 }
 
 /*
@@ -183,6 +269,7 @@ static struct zk_node this_node;
        switch (rc) {                                                   \
        case ZNONODE:                                                   \
        case ZNODEEXISTS:                                               \
+       case ZNOTEMPTY:                                                 \
                break;                                                  \
        case ZINVALIDSTATE:                                             \
        case ZSESSIONEXPIRED:                                           \
@@ -585,7 +672,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
        }
 
 /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
-       sd_debug("path:%s, type:%d", path, type);
+       sd_debug("path:%s, type:%d, state:%d", path, type, state);
        if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
                ret = sscanf(path, MEMBER_ZNODE "/%s", str);
                if (ret == 1)
@@ -598,7 +685,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
                /* process distributed lock */
                ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
                if (ret == 2) {
-                       ret = lock_table_lookup_release(lock_id);
+                       ret = lock_table_lookup_wakeup(lock_id);
                        if (ret)
                                sd_debug("release lock %lu %s", lock_id, str);
                        return;
@@ -1142,39 +1229,13 @@ kick_block_event:
        kick_block_event();
 }
 
-static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id)
-{
-       int rc = 0;
-       char path[MAX_NODE_STR_LEN];
-
-       cluster_lock->id = lock_id;
-       snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id);
-       rc = zk_init_node(path);
-       if (rc)
-               goto out;
-
-       rc = pthread_mutex_init(&cluster_lock->wait_release, NULL);
-       if (rc) {
-               sd_err("failed to init cluster_lock->wait_release");
-               goto out;
-       }
-
-       rc = pthread_mutex_init(&cluster_lock->id_lock, NULL);
-       if (rc) {
-               sd_err("failed to init cluster_lock->id_lock");
-               goto out;
-       }
-out:
-       return rc;
-}
-
 /*
  * This operation will create a seq-ephemeral znode in lock directory
  * of zookeeper (use lock-id as dir name). The smallest file path in
  * this directory wil be the owner of the lock; the other threads will
- * wait on a pthread_mutex_t (cluster_lock->wait_release)
+ * wait on a pthread_mutex_t (cluster_lock->wait_wakeup)
  */
-static void zk_lock(struct cluster_lock *cluster_lock)
+static void zk_lock(uint64_t lock_id)
 {
        int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
        int rc, len = MAX_NODE_STR_LEN;
@@ -1182,15 +1243,9 @@ static void zk_lock(struct cluster_lock *cluster_lock)
        char parent[MAX_NODE_STR_LEN];
        char lowest_seq_path[MAX_NODE_STR_LEN];
        char owner_name[MAX_NODE_STR_LEN];
+       struct cluster_lock *cluster_lock;
 
-       /*
-        * if many threads use locks with same id, we should use
-        * ->id_lock to avoid the only zookeeper handler to
-        * create many seq-ephemeral files.
-        */
-       pthread_mutex_lock(&cluster_lock->id_lock);
-
-       lock_table_add(cluster_lock->id, cluster_lock);
+       cluster_lock = lock_table_lookup_acquire(lock_id);
 
        my_path = cluster_lock->lock_path;
 
@@ -1223,28 +1278,20 @@ static void zk_lock(struct cluster_lock *cluster_lock)
                rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
                if (rc == ZOK) {
                        sd_debug("call zoo_exits success %s", lowest_seq_path);
-                       pthread_mutex_lock(&cluster_lock->wait_release);
+                       pthread_mutex_lock(&cluster_lock->wait_wakeup);
                } else {
-                       sd_err("failed to call zoo_exists %s", zerror(rc));
+                       sd_debug("failed to call zoo_exists %s", zerror(rc));
                        if (rc != ZNONODE)
                                zk_wait();
                }
        }
 }
 
-static void zk_unlock(struct cluster_lock *cluster_lock)
+static void zk_unlock(uint64_t lock_id)
 {
-       int rc;
-       rc = zk_delete_node(cluster_lock->lock_path, -1);
-       if (rc != ZOK)
-               sd_err("Failed to delete path: %s %s",
-                      cluster_lock->lock_path, zerror(rc));
-
-       lock_table_del(cluster_lock->id);
-       pthread_mutex_unlock(&cluster_lock->id_lock);
+       lock_table_lookup_release(lock_id);
 }
 
-
 static int zk_init(const char *option)
 {
        char *hosts, *to, *p;
@@ -1324,9 +1371,8 @@ static struct cluster_driver cdrv_zookeeper = {
        .notify     = zk_notify,
        .block      = zk_block,
        .unblock    = zk_unblock,
-       .init_lock  = zk_init_lock,
-       .lock       = zk_lock,
-       .unlock     = zk_unlock,
+       .lock         = zk_lock,
+       .unlock       = zk_unlock,
        .update_node  = zk_update_node,
        .get_local_addr = get_local_addr,
 };
-- 
1.7.12.4

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to