From: Robin Dong <san...@taobao.com> Using only lock_id as parameter of lock interface and reduce function to only two:
lock(uint64_t lock_id) unlock(uint64_t lock_id) The ->lock will create and acquire the distributed lock and ->unlock will release it which is more convient for end user. We use existed hash table to store the "<lock_id, cluster_lock*>" and release all the resources of a distributed lock after all threads call "->unlock". v3 --> v4: 1. move new function after zk_helpers. 2. if znode is not exists, the zk_unlock will success. v2 --> v3: 1. loop to retry deleting znode in zk_unlock(). Signed-off-by: Robin Dong <san...@taobao.com> --- include/sheep.h | 12 --- sheep/cluster.h | 31 +++--- sheep/cluster/corosync.c | 10 +- sheep/cluster/local.c | 10 +- sheep/cluster/zookeeper.c | 252 +++++++++++++++++++++++++++------------------- 5 files changed, 168 insertions(+), 147 deletions(-) diff --git a/include/sheep.h b/include/sheep.h index e5726e8..293e057 100644 --- a/include/sheep.h +++ b/include/sheep.h @@ -255,18 +255,6 @@ static inline void nodes_to_buffer(struct rb_root *nroot, void *buffer) #define MAX_NODE_STR_LEN 256 -/* structure for distributed lock */ -struct cluster_lock { - struct hlist_node hnode; - /* id is passed by users to represent a lock handle */ - uint64_t id; - /* wait for the release of id by other lock owner */ - pthread_mutex_t wait_release; - /* lock for different threads of the same node on the same id */ - pthread_mutex_t id_lock; - char lock_path[MAX_NODE_STR_LEN]; -}; - static inline const char *node_to_str(const struct sd_node *id) { static __thread char str[MAX_NODE_STR_LEN]; diff --git a/sheep/cluster.h b/sheep/cluster.h index 08df91c..0693633 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -109,33 +109,32 @@ struct cluster_driver { int (*unblock)(void *msg, size_t msg_len); /* - * Init a distributed mutually exclusive lock to avoid race condition - * when the whole sheepdog cluster process one exclusive resource. + * Acquire the distributed lock. * - * This function use 'lock_id' as the id of this distributed lock. - * A thread can create many locks in one sheep daemon. + * Create a distributed mutually exclusive lock to avoid race condition + * and try to acquire the lock. * - * Returns SD_RES_XXX - */ - int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id); - - /* - * Acquire the distributed lock. + * This function use 'lock_id' as the id of this distributed lock. + * A thread can acquire many locks with different lock_id in one + * sheep daemon. * - * The cluster_lock referenced by 'lock' shall be locked by calling - * cluster->lock(). If the cluster_lock is already locked, the calling - * thread shall block until the cluster_lock becomes available. + * The cluster lock referenced by 'lock' shall be locked by calling + * cluster->lock(). If the cluster lock is already locked, the calling + * thread shall block until the cluster lock becomes available. */ - void (*lock)(struct cluster_lock *lock); + void (*lock)(uint64_t lock_id); /* * Release the distributed lock. * - * If the owner of the cluster_lock release it (or the owner is + * If the owner of the cluster lock release it (or the owner is * killed by accident), zookeeper will trigger zk_watch() which will * wake up all waiting threads to compete new owner of the lock + * + * After all thread unlock, all the resource of this distributed lock + * will be released. */ - void (*unlock)(struct cluster_lock *lock); + void (*unlock)(uint64_t lock_id); /* * Update the specific node in the driver's private copy of nodes diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c index 19fc73c..6974dd9 100644 --- a/sheep/cluster/corosync.c +++ b/sheep/cluster/corosync.c @@ -774,16 +774,11 @@ again: return 0; } -static int corosync_init_lock(struct cluster_lock *cluster_lock, uint64_t id) -{ - return -1; -} - -static void corosync_lock(struct cluster_lock *cluster_lock) +static void corosync_lock(uint64_t lock_id) { } -static void corosync_unlock(struct cluster_lock *cluster_lock) +static void corosync_unlock(uint64_t lock_id) { } @@ -807,7 +802,6 @@ static struct cluster_driver cdrv_corosync = { .notify = corosync_notify, .block = corosync_block, .unblock = corosync_unblock, - .init_lock = corosync_init_lock, .lock = corosync_lock, .unlock = corosync_unlock, .update_node = corosync_update_node, diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c index 4c4d83b..6d0af68 100644 --- a/sheep/cluster/local.c +++ b/sheep/cluster/local.c @@ -547,16 +547,11 @@ static int local_init(const char *option) return 0; } -static int local_init_lock(struct cluster_lock *cluster_lock, uint64_t id) +static void local_lock(uint64_t lock_id) { - return -1; } -static void local_lock(struct cluster_lock *cluster_lock) -{ -} - -static void local_unlock(struct cluster_lock *cluster_lock) +static void local_unlock(uint64_t lock_id) { } @@ -579,7 +574,6 @@ static struct cluster_driver cdrv_local = { .notify = local_notify, .block = local_block, .unblock = local_unblock, - .init_lock = local_init_lock, .lock = local_lock, .unlock = local_unlock, .update_node = local_update_node, diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 9e4ffa5..08d83d8 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -32,6 +32,23 @@ #define MASTER_ZNONE BASE_ZNODE "/master" #define LOCK_ZNODE BASE_ZNODE "/lock" +static inline ZOOAPI int zk_init_node(const char *path); +static inline ZOOAPI int zk_delete_node(const char *path, int version); + +/* structure for distributed lock */ +struct cluster_lock { + struct hlist_node hnode; + /* id is passed by users to represent a lock handle */ + uint64_t id; + /* referenced by different threads in one sheepdog daemon */ + uint64_t ref; + /* wait for the release of id by other lock owner */ + pthread_mutex_t wait_wakeup; + /* lock for different threads of the same node on the same id */ + pthread_mutex_t id_lock; + char lock_path[MAX_NODE_STR_LEN]; +}; + #define WAIT_TIME 1 /* second */ #define HASH_BUCKET_NR 1021 @@ -39,56 +56,6 @@ static struct hlist_head *cluster_locks_table; static pthread_mutex_t table_locks[HASH_BUCKET_NR]; /* - * All the operations of the lock table is protected by - * cluster_lock->id_lock so we don't need to add lock here - */ - -static void lock_table_del(uint64_t lock_id) -{ - uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; - struct hlist_node *iter; - struct cluster_lock *lock; - - pthread_mutex_lock(table_locks + hval); - hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { - if (lock->id == lock_id) { - hlist_del(iter); - break; - } - } - pthread_mutex_unlock(table_locks + hval); -} - -static void lock_table_add(uint64_t lock_id, - struct cluster_lock *cluster_lock) -{ - uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; - - pthread_mutex_lock(table_locks + hval); - hlist_add_head(&(cluster_lock->hnode), cluster_locks_table + hval); - pthread_mutex_unlock(table_locks + hval); -} - -static int lock_table_lookup_release(uint64_t lock_id) -{ - uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; - int res = -1; - struct hlist_node *iter; - struct cluster_lock *lock; - - pthread_mutex_lock(table_locks + hval); - hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { - if (lock->id == lock_id) { - pthread_mutex_unlock(&lock->wait_release); - res = 0; - break; - } - } - pthread_mutex_unlock(table_locks + hval); - return res; -} - -/* * Wait a while when create, delete or get_children fail on * zookeeper lock so it will not print too much loop log */ @@ -183,6 +150,7 @@ static struct zk_node this_node; switch (rc) { \ case ZNONODE: \ case ZNODEEXISTS: \ + case ZNOTEMPTY: \ break; \ case ZINVALIDSTATE: \ case ZSESSIONEXPIRED: \ @@ -332,6 +300,125 @@ static inline ZOOAPI int zk_get_children(const char *path, return rc; } +/* + * All the operations of the lock table is protected by + * cluster_lock->id_lock so we don't need to add lock here + */ + +static int lock_table_lookup_wakeup(uint64_t lock_id) +{ + uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; + int res = -1; + struct hlist_node *iter; + struct cluster_lock *lock; + + pthread_mutex_lock(table_locks + hval); + hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { + if (lock->id == lock_id) { + pthread_mutex_unlock(&lock->wait_wakeup); + res = 0; + break; + } + } + pthread_mutex_unlock(table_locks + hval); + return res; +} + +static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id) +{ + uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; + int rc; + struct hlist_node *iter; + struct cluster_lock *lock, *ret_lock = NULL; + char path[MAX_NODE_STR_LEN]; + + pthread_mutex_lock(table_locks + hval); + hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { + if (lock->id == lock_id) { + ret_lock = lock; + ret_lock->ref++; + break; + } + } + + if (!ret_lock) { + /* create lock and add it to hash table */ + ret_lock = xzalloc(sizeof(*ret_lock)); + ret_lock->id = lock_id; + ret_lock->ref = 1; + snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", + ret_lock->id); + rc = zk_init_node(path); + if (rc) + panic("Failed to init node %s", path); + + rc = pthread_mutex_init(&ret_lock->wait_wakeup, NULL); + if (rc) + panic("failed to init cluster_lock->wait_wakeup"); + + rc = pthread_mutex_init(&ret_lock->id_lock, NULL); + if (rc) + panic("failed to init cluster_lock->id_lock"); + + hlist_add_head(&(ret_lock->hnode), cluster_locks_table + hval); + } + pthread_mutex_unlock(table_locks + hval); + + /* + * if many threads use locks with same id, we should use + * ->id_lock to avoid the only zookeeper handler to + * create many seq-ephemeral files. + */ + pthread_mutex_lock(&ret_lock->id_lock); + return ret_lock; +} + +static void lock_table_lookup_release(uint64_t lock_id) +{ + uint64_t hval = sd_hash_64(lock_id) % HASH_BUCKET_NR; + int rc; + struct hlist_node *iter; + struct cluster_lock *lock; + char path[MAX_NODE_STR_LEN]; + + pthread_mutex_lock(table_locks + hval); + hlist_for_each_entry(lock, iter, cluster_locks_table + hval, hnode) { + if (lock->id != lock_id) + continue; + while (true) { + rc = zk_delete_node(lock->lock_path, -1); + if (rc == ZOK || rc == ZNONODE) + break; + sd_err("Failed to delete path: %s %s", lock->lock_path, + zerror(rc)); + zk_wait(); + } + lock->lock_path[0] = '\0'; + pthread_mutex_unlock(&lock->id_lock); + lock->ref--; + if (!lock->ref) { + hlist_del(iter); + /* free all resource used by this lock */ + pthread_mutex_destroy(&lock->id_lock); + pthread_mutex_destroy(&lock->wait_wakeup); + snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", + lock->id); + /* + * If deletion of directory 'lock_id' fail, we only get + * a * empty directory in zookeeper. That's unharmful + * so we don't need to retry it. + */ + rc = zk_delete_node(path, -1); + if (rc != ZOK) + sd_err("Failed to delete path: %s %s", path, + zerror(rc)); + free(lock); + } + break; + } + pthread_mutex_unlock(table_locks + hval); +} + /* ZooKeeper-based queue give us an totally ordered events */ static int efd; static int32_t queue_pos; @@ -585,7 +672,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, } /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */ - sd_debug("path:%s, type:%d", path, type); + sd_debug("path:%s, type:%d, state:%d", path, type, state); if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) { ret = sscanf(path, MEMBER_ZNODE "/%s", str); if (ret == 1) @@ -598,7 +685,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, /* process distributed lock */ ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str); if (ret == 2) { - ret = lock_table_lookup_release(lock_id); + ret = lock_table_lookup_wakeup(lock_id); if (ret) sd_debug("release lock %lu %s", lock_id, str); return; @@ -1142,39 +1229,13 @@ kick_block_event: kick_block_event(); } -static int zk_init_lock(struct cluster_lock *cluster_lock, uint64_t lock_id) -{ - int rc = 0; - char path[MAX_NODE_STR_LEN]; - - cluster_lock->id = lock_id; - snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%lu", cluster_lock->id); - rc = zk_init_node(path); - if (rc) - goto out; - - rc = pthread_mutex_init(&cluster_lock->wait_release, NULL); - if (rc) { - sd_err("failed to init cluster_lock->wait_release"); - goto out; - } - - rc = pthread_mutex_init(&cluster_lock->id_lock, NULL); - if (rc) { - sd_err("failed to init cluster_lock->id_lock"); - goto out; - } -out: - return rc; -} - /* * This operation will create a seq-ephemeral znode in lock directory * of zookeeper (use lock-id as dir name). The smallest file path in * this directory wil be the owner of the lock; the other threads will - * wait on a pthread_mutex_t (cluster_lock->wait_release) + * wait on a pthread_mutex_t (cluster_lock->wait_wakeup) */ -static void zk_lock(struct cluster_lock *cluster_lock) +static void zk_lock(uint64_t lock_id) { int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL; int rc, len = MAX_NODE_STR_LEN; @@ -1182,15 +1243,9 @@ static void zk_lock(struct cluster_lock *cluster_lock) char parent[MAX_NODE_STR_LEN]; char lowest_seq_path[MAX_NODE_STR_LEN]; char owner_name[MAX_NODE_STR_LEN]; + struct cluster_lock *cluster_lock; - /* - * if many threads use locks with same id, we should use - * ->id_lock to avoid the only zookeeper handler to - * create many seq-ephemeral files. - */ - pthread_mutex_lock(&cluster_lock->id_lock); - - lock_table_add(cluster_lock->id, cluster_lock); + cluster_lock = lock_table_lookup_acquire(lock_id); my_path = cluster_lock->lock_path; @@ -1223,28 +1278,20 @@ static void zk_lock(struct cluster_lock *cluster_lock) rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL); if (rc == ZOK) { sd_debug("call zoo_exits success %s", lowest_seq_path); - pthread_mutex_lock(&cluster_lock->wait_release); + pthread_mutex_lock(&cluster_lock->wait_wakeup); } else { - sd_err("failed to call zoo_exists %s", zerror(rc)); + sd_debug("failed to call zoo_exists %s", zerror(rc)); if (rc != ZNONODE) zk_wait(); } } } -static void zk_unlock(struct cluster_lock *cluster_lock) +static void zk_unlock(uint64_t lock_id) { - int rc; - rc = zk_delete_node(cluster_lock->lock_path, -1); - if (rc != ZOK) - sd_err("Failed to delete path: %s %s", - cluster_lock->lock_path, zerror(rc)); - - lock_table_del(cluster_lock->id); - pthread_mutex_unlock(&cluster_lock->id_lock); + lock_table_lookup_release(lock_id); } - static int zk_init(const char *option) { char *hosts, *to, *p; @@ -1324,9 +1371,8 @@ static struct cluster_driver cdrv_zookeeper = { .notify = zk_notify, .block = zk_block, .unblock = zk_unblock, - .init_lock = zk_init_lock, - .lock = zk_lock, - .unlock = zk_unlock, + .lock = zk_lock, + .unlock = zk_unlock, .update_node = zk_update_node, .get_local_addr = get_local_addr, }; -- 1.7.12.4 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog