+
+static const char base_znode[] = "/sheepdog";
+static char queue_znode[MAX_NODE_STR_LEN] = "";
+static const char queue_znode_post[] = "/queue";
+static char member_znode[MAX_NODE_STR_LEN] = "";
+static const char member_znode_post[] = "/member";
+static char master_znode[MAX_NODE_STR_LEN] = "";
+static const char master_znode_post[] = "/master";
+static char lock_znode[MAX_NODE_STR_LEN] = "";
+static const char lock_znode_post[] = "/lock";
-#define BASE_ZNODE "/sheepdog"
-#define QUEUE_ZNODE BASE_ZNODE "/queue"
-#define MEMBER_ZNODE BASE_ZNODE "/member"
-#define MASTER_ZNODE BASE_ZNODE "/master"
-#define LOCK_ZNODE BASE_ZNODE "/lock"
static int zk_timeout = SESSION_TIMEOUT;
static int my_master_seq;
+static char sd_domain[MAX_NODE_STR_LEN] = SD_DEFAULT_DOMAIN;
/* structure for distributed lock */
struct cluster_lock {
@@ -347,7 +354,7 @@ static struct cluster_lock
*lock_table_lookup_acquire(uint64_t lock_id)
ret_lock = xzalloc(sizeof(*ret_lock));
ret_lock->id = lock_id;
ret_lock->ref = 1;
- snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+ snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64, lock_znode,
ret_lock->id);
rc = zk_init_node(path);
if (rc)
@@ -399,8 +406,8 @@ static void lock_table_lookup_release(uint64_t lock_id)
/* free all resource used by this lock */
sd_destroy_mutex(&lock->id_lock);
sem_destroy(&lock->wait_wakeup);
- snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
- lock->id);
+ snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,
+ lock_znode, lock->id);
/*
* If deletion of directory 'lock_id' fail, we only get
* a * empty directory in zookeeper. That's unharmful
@@ -458,7 +465,7 @@ static int zk_queue_peek(bool *peek)
int rc;
char path[MAX_NODE_STR_LEN];
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
rc = zk_node_exists(path);
switch (rc) {
@@ -483,7 +490,8 @@ static int zk_find_seq_node(uint64_t id, char *seq_path,
int seq_path_len,
for (int seq = queue_pos; ; seq++) {
struct zk_event ev;
- snprintf(seq_path, seq_path_len, QUEUE_ZNODE"/%010"PRId32, seq);
+ snprintf(seq_path, seq_path_len, "%s/%010"PRId32,
+ queue_znode, seq);
len = offsetof(typeof(ev), id) + sizeof(ev.id);
rc = zk_get_data(seq_path, &ev, &len);
switch (rc) {
@@ -513,7 +521,7 @@ static int zk_queue_push(struct zk_event *ev)
bool found;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
- snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
+ snprintf(path, sizeof(path), "%s/", queue_znode);
again:
rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf), false);
switch (rc) {
@@ -537,7 +545,10 @@ again:
if (first_push) {
int32_t seq;
- sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
+ strcpy(temp_char_form, queue_znode);
+ strcat(temp_char_form, "/%"PRId32);
+ sscanf(buf, temp_char_form, &seq);
queue_pos = seq;
eventfd_xwrite(efd, 1);
first_push = false;
@@ -568,7 +579,7 @@ static int push_join_response(struct zk_event *ev)
queue_pos--;
len = offsetof(typeof(*ev), buf) + ev->buf_len;
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
RETURN_IF_ERROR(zk_set_data(path, (char *)ev, len, -1), "");
sd_debug("update path:%s, queue_pos:%010" PRId32 ", len:%d", path,
@@ -582,7 +593,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
char path[MAX_NODE_STR_LEN];
len = sizeof(*ev);
- snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+ snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
@@ -641,10 +652,16 @@ static inline void build_node_list(void)
static int zk_queue_init(void)
{
- RETURN_IF_ERROR(zk_init_node(BASE_ZNODE), "path %s", BASE_ZNODE);
- RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
- RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
- RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+ char sd_domain_znode[128] = "";
+ strcpy(sd_domain_znode, base_znode);
+ strcat(sd_domain_znode, "/");
+ strcat(sd_domain_znode, sd_domain);
+ RETURN_IF_ERROR(zk_init_node(base_znode), "path %s", base_znode);
+ RETURN_IF_ERROR(zk_init_node(sd_domain_znode),
+ "path %s", sd_domain_znode);
+ RETURN_IF_ERROR(zk_init_node(master_znode), "path %s", master_znode);
+ RETURN_IF_ERROR(zk_init_node(queue_znode), "path %s", queue_znode);
+ RETURN_IF_ERROR(zk_init_node(member_znode), "path %s", member_znode);
return ZOK;
}
@@ -692,6 +709,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path,
char str[MAX_NODE_STR_LEN], *p;
uint64_t lock_id;
int ret;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
/*
@@ -705,7 +723,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state,
const char *path,
/* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
sd_debug("path:%s, type:%d, state:%d", path, type, state);
if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
- ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret == 1)
zk_node_exists(path);
/* kick off the event handler */
@@ -714,7 +734,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state,
const char *path,
struct zk_node *n;
/* process distributed lock */
- ret = sscanf(path, LOCK_ZNODE "/%"PRIu64"/%s", &lock_id, str);
+ strcpy(temp_char_form, lock_znode);
+ strcat(temp_char_form, "/%"PRIu64"/%s");
+ ret = sscanf(path, temp_char_form, &lock_id, str);
if (ret == 2) {
ret = lock_table_lookup_wakeup(lock_id);
if (ret)
@@ -723,13 +745,17 @@ static void zk_watcher(zhandle_t *zh, int type, int
state, const char *path,
return;
}
- ret = sscanf(path, MASTER_ZNODE "/%s", str);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret == 1) {
zk_compete_master();
return;
}
- ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ ret = sscanf(path, temp_char_form, str);
if (ret != 1)
return;
p = strrchr(path, '/');
@@ -815,19 +841,23 @@ static int zk_find_master(int *master_seq, char
*master_name)
{
int rc, len = MAX_NODE_STR_LEN;
char master_compete_path[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
if (*master_seq < 0) {
- RETURN_IF_ERROR(zk_get_least_seq(MASTER_ZNODE,
+ RETURN_IF_ERROR(zk_get_least_seq(master_znode,
master_compete_path,
MAX_NODE_STR_LEN, master_name,
&len), "");
- sscanf(master_compete_path, MASTER_ZNODE "/%"PRId32,
- master_seq);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%"PRId32);
+ sscanf(master_compete_path, temp_char_form, master_seq);
return ZOK;
} else {
while (true) {
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%010"PRId32);
snprintf(master_compete_path, len,
- MASTER_ZNODE "/%010"PRId32, *master_seq);
+ temp_char_form, *master_seq);
rc = zk_get_data(master_compete_path, master_name,
&len);
switch (rc) {
@@ -854,10 +884,12 @@ static int zk_verify_last_sheep_join(int seq, int
*last_sheep)
{
int rc, len = MAX_NODE_STR_LEN;
char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
- snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNODE "/%010"PRId32,
- *last_sheep);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%010"PRId32);
+ snprintf(path, MAX_NODE_STR_LEN, temp_char_form, *last_sheep);
rc = zk_get_data(path, name, &len);
switch (rc) {
case ZNONODE:
@@ -871,8 +903,9 @@ static int zk_verify_last_sheep_join(int seq, int
*last_sheep)
if (!strcmp(name, node_to_str(&this_node.node)))
continue;
-
- snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name);
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, MAX_NODE_STR_LEN, temp_char_form, name);
rc = zk_node_exists(path);
switch (rc) {
case ZOK:
@@ -909,23 +942,28 @@ static void zk_compete_master(void)
goto out_unlock;
if (!joined) {
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/");
sd_debug("start to compete master for the first time");
do {
if (uatomic_is_true(&stop))
goto out_unlock;
/* duplicate sequential node has no side-effect */
- rc = zk_create_seq_node(MASTER_ZNODE "/",
+ rc = zk_create_seq_node(temp_char_form,
node_to_str(&this_node.node),
MAX_NODE_STR_LEN,
my_compete_path,
MAX_NODE_STR_LEN, true);
} while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
- CHECK_ZK_RC(rc, MASTER_ZNODE "/");
+ CHECK_ZK_RC(rc, temp_char_form);
if (rc != ZOK)
goto out_unlock;
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%"PRId32);
sd_debug("my compete path: %s", my_compete_path);
- sscanf(my_compete_path, MASTER_ZNODE "/%"PRId32,
+ sscanf(my_compete_path, temp_char_form,
&my_seq);
}
@@ -964,10 +1002,12 @@ static int zk_join(const struct sd_node *myself,
{
int rc;
char path[MAX_NODE_STR_LEN];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
this_node.node = *myself;
-
- snprintf(path, sizeof(path), MEMBER_ZNODE "/%s", node_to_str(myself));
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, sizeof(path), temp_char_form, node_to_str(myself));
rc = zk_node_exists(path);
if (rc == ZOK) {
sd_err("Previous zookeeper session exist, shoot myself. Please "
@@ -985,17 +1025,21 @@ static int zk_join(const struct sd_node *myself,
static int zk_leave(void)
{
char path[PATH_MAX];
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
sd_info("leaving from cluster");
uatomic_set_true(&stop);
if (uatomic_is_true(&is_master)) {
- snprintf(path, sizeof(path), MASTER_ZNODE "/%010"PRId32,
- my_master_seq);
+ strcpy(temp_char_form, master_znode);
+ strcat(temp_char_form, "/%010"PRId32);
+ snprintf(path, sizeof(path), temp_char_form, my_master_seq);
zk_delete_node(path, -1);
}
- snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, sizeof(path), temp_char_form,
node_to_str(&this_node.node));
add_event(EVENT_LEAVE, &this_node, NULL, 0);
lock_table_remove_znodes();
@@ -1038,9 +1082,9 @@ static void watch_all_nodes(void)
struct String_vector strs;
char path[MAX_NODE_STR_LEN];
- RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, &strs), "");
+ RETURN_VOID_IF_ERROR(zk_get_children(member_znode, &strs), "");
- FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+ FOR_EACH_ZNODE(member_znode, path, &strs) {
RETURN_VOID_IF_ERROR(zk_node_exists(path), "");
}
}
@@ -1066,6 +1110,7 @@ static void zk_handle_accept(struct zk_event *ev)
{
char path[MAX_NODE_STR_LEN];
int rc;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
sd_debug("ACCEPT");
if (node_eq(&ev->sender.node, &this_node.node))
@@ -1074,7 +1119,9 @@ static void zk_handle_accept(struct zk_event *ev)
sd_debug("%s", node_to_str(&ev->sender.node));
- snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+ strcpy(temp_char_form, member_znode);
+ strcat(temp_char_form, "/%s");
+ snprintf(path, sizeof(path), temp_char_form,
node_to_str(&ev->sender.node));
if (node_eq(&ev->sender.node, &this_node.node)) {
joined = true;
@@ -1286,18 +1333,22 @@ static void zk_lock(uint64_t lock_id)
char lowest_seq_path[MAX_NODE_STR_LEN];
char owner_name[MAX_NODE_STR_LEN];
struct cluster_lock *cluster_lock;
+ char temp_char_form[MAX_NODE_STR_LEN] = "";
cluster_lock = lock_table_lookup_acquire(lock_id);
my_path = cluster_lock->lock_path;
- snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64"/",
+ strcpy(temp_char_form, lock_znode);
+ strcat(temp_char_form, "/%"PRIu64"/");
+ snprintf(parent, MAX_NODE_STR_LEN, temp_char_form,
cluster_lock->id);
/*
* It need using path without end of '/' to create node of lock_id in
* zookeeper's API, so we use 'parent_node'.
*/
- snprintf(parent_node, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+ temp_char_form[strlen(temp_char_form)-1] = '\0';
+ snprintf(parent_node, MAX_NODE_STR_LEN, temp_char_form,
cluster_lock->id);
create_seq_node:
/* compete owner of lock is just like zk_compete_master() */
@@ -1359,7 +1410,8 @@ static void zk_unlock(uint64_t lock_id)
static int zk_init(const char *option)
{
- char *hosts, *to, *p;
+ char hosts[MAX_NODE_STR_LEN];
+ const char *pt, *pd;
int ret, interval, retry = 0, max_retry;
if (!option) {
@@ -1367,17 +1419,29 @@ static int zk_init(const char *option)
return -1;
}
- hosts = strtok((char *)option, "=");
- if ((to = strtok(NULL, "="))) {
- if (sscanf(to, "%u", &zk_timeout) != 1) {
- sd_err("Invalid parameter for timeout");
- return -1;
- }
- p = strstr(hosts, "timeout");
- *--p = '\0';
+ pt = strstr(option, "timeout=");
+ pd = strstr(option, "domain=");
+ if (pt == NULL && pd == NULL) {
+ strcpy(hosts, option);
+ } else if (pt) {
+ int i = 0;
+ while (option != pt)
+ hosts[i++] = *option++;
+ hosts[i-1] = '\0';
+ sscanf(pt, "timeout=%d", &zk_timeout);
+ if (pd)
+ sscanf(pd, "domain=%s", sd_domain);
+ } else {
+ int i = 0;
+ while (option != pd)
+ hosts[i++] = *option++;
+ hosts[i-1] = '\0';
+ sscanf(pd, "domain=%s", sd_domain);
}
- sd_debug("version %d.%d.%d, address %s, timeout %d", ZOO_MAJOR_VERSION,
- ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout);
+
+ sd_debug("version %d.%d.%d, address %s, timeout %d, sheepdog domain %s",
+ ZOO_MAJOR_VERSION, ZOO_MINOR_VERSION, ZOO_PATCH_VERSION,
+ hosts, zk_timeout, sd_domain);
zhandle = zookeeper_init(hosts, zk_watcher, zk_timeout, NULL, NULL, 0);
if (!zhandle) {
sd_err("failed to initialize zk server %s", option);
@@ -1398,6 +1462,18 @@ static int zk_init(const char *option)
uatomic_set_false(&stop);
uatomic_set_false(&is_master);
+ strcpy(master_znode, base_znode);
+ strcat(master_znode, "/");
+ strcat(master_znode, sd_domain);
+ strcat(master_znode, master_znode_post);
+ strcpy(queue_znode, base_znode);
+ strcat(queue_znode, "/");
+ strcat(queue_znode, sd_domain);
+ strcat(queue_znode, queue_znode_post);
+ strcpy(member_znode, base_znode);
+ strcat(member_znode, "/");
+ strcat(member_znode, sd_domain);
+ strcat(member_znode, member_znode_post);
if (zk_queue_init() != ZOK)
return -1;
@@ -1421,9 +1497,13 @@ static int zk_init(const char *option)
sd_init_mutex(table_locks + i);
}
- ret = zk_init_node(LOCK_ZNODE);
+ strcpy(lock_znode, base_znode);
+ strcat(lock_znode, "/");
+ strcat(lock_znode, sd_domain);
+ strcat(lock_znode, lock_znode_post);
+ ret = zk_init_node(lock_znode);
if (ret != ZOK) {
- sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+ sd_err("Failed to create %s %s", lock_znode, zerror(ret));
free(cluster_locks_table);
return -1;
}
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d5fa0f..0cbe69b 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -68,12 +68,16 @@ static const char cluster_help[] =
"\tlocal: use local driver\n"
"\tcorosync: use corosync driver\n"
"\tzookeeper: use zookeeper driver, need extra arguments\n"
-"\n\tzookeeper arguments: address-list,timeout=value (default as 3000)\n"
+"\n\tzookeeper arguments: address-list,timeout=value(default as 3000)"
+",domain=value(default as sd_domain_default)\n"
"\nExample:\n\t"
-"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000 ...\n"
+"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000,domain=sheep_01
...\n"
"This tries to use 3 node zookeeper cluster, which can be reached by\n"
"IP1:PORT1, IP2:PORT2, IP3:PORT3 to manage membership and broadcast message\n"
-"and set the timeout of node heartbeat as 1000 milliseconds\n";
+"and set the timeout of node heartbeat as 1000 milliseconds\n"
+"and join the domain sheep_01.\n"
+"Notice that timeout should be followed by domain "
+"if both are given explicitly.\n";