Hi,

I am a first-time Zookeeper(ZK) user. I want to use ZK to maintain active
nodes (node membership) in a cluster. When a node joins or dies, I'd like
other nodes to be notified as soon as possible.

The zookeeper cluster includes 3 nodes, running at node0, node1 and node2
(leader). I reduced the tick length and sync limit, as follows.

# The number of milliseconds of each tick
tickTime=100
# The number of ticks that the initial synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=1

I have written a simple CPP test program to, to get myself familiar with ZK
C API. The test program creates a znode for itself under /Nodes and then
listens for changes for the child nodes in /Nodes. For testing, I start the
same test program at multiple servers and then kill each test program in
turn. The test program prints a timestamp, whenever it receives a child
node change event or when receiving a ctrl+c from the user.

The following output showed that when node0 was killed at 1617601651.582
sec, node1 and node2 got the notice at 1617601653.520 sec. When node1 was
killed by 1617601656.088 sec, node2 was noticed at 1617601657.611 sec. There
is almost 1.5 ~ 2 seconds delay between when a node is killed and when
other alive nodes receive that change. On the other hand, when a new node
joins, that change got propagated to other nodes quickly (node1 started at
1617601643.715 sec and node0 received an update at 1617601643.731 sec). Is
there a way to reduce the delay in propagating the change when an ephemeral
child node was removed?  Help is appreciated!

*node0:*
1617601636.917 s: watcher SESSION_EVENT, state = CONNECTED_STATE
zkWatcher: Connected to zkserver
/Nodes: rc = 0
node0
1617601643.731 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node0
node1
1617601646.654 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node2
node0
node1
^C1617601651.582 s, shutdown...

*node1:*

1617601643.715 s: watcher SESSION_EVENT, state = CONNECTED_STATE
zkWatcher: Connected to zkserver
/Nodes: rc = 0
node0
node1
1617601646.654 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node2
node0
node1
1617601653.520 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node2
node1
^C1617601656.088 s, shutdown...

*node2:*

1617601646.637 s: watcher SESSION_EVENT, state = CONNECTED_STATE
zkWatcher: Connected to zkserver
/Nodes: rc = 0
node2
node0
node1
1617601653.520 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node2
node1
1617601657.611 s: watcher CHILD_EVENT, state = CONNECTED_STATE for path
/Nodes
re-register for childlist again.
/Nodes: rc = 0
node2
^C1617601660.254 s, shutdown...


###################
// my test program.
// To compile, run "g++ -std=c++17 -Og -g -o zktest zktest.cpp
-lzookeeper_mt"

#include <iostream>
#include <zookeeper/zookeeper.h>
#include <error.h> // for handling errno from zookeeper.
#include <unistd.h> // for gethostname()
#include <climits> // for HOST_NAME_MAX
#include <string>
#include <cstring>
#include <chrono>
#include <thread>
#include <csignal>
#include <sys/time.h> // for gettimeofday


std::string getShortHostName() {
char hostname [HOST_NAME_MAX+1];
gethostname(hostname, HOST_NAME_MAX+1);

std::string hostNameStr (hostname);

// extract the short hostname from domain name.
// specific for emulab deployment.
std::string::size_type shortNameLen = hostNameStr.find('.');
std::string shortHostName = hostNameStr.substr(0, shortNameLen);

return shortHostName;
}

// copy pasted from zookeeper-client/zookeeper-client-c/src/cli.c
static const char* state2String(int state){
if (state == 0)
return "CLOSED_STATE";
if (state == ZOO_CONNECTING_STATE)
return "CONNECTING_STATE";
if (state == ZOO_ASSOCIATING_STATE)
return "ASSOCIATING_STATE";
if (state == ZOO_CONNECTED_STATE)
return "CONNECTED_STATE";
if (state == ZOO_READONLY_STATE)
return "READONLY_STATE";
if (state == ZOO_EXPIRED_SESSION_STATE)
return "EXPIRED_SESSION_STATE";
if (state == ZOO_AUTH_FAILED_STATE)
return "AUTH_FAILED_STATE";

return "INVALID_STATE";
}

static const char* type2String(int state){
if (state == ZOO_CREATED_EVENT)
return "CREATED_EVENT";
if (state == ZOO_DELETED_EVENT)
return "DELETED_EVENT";
if (state == ZOO_CHANGED_EVENT)
return "CHANGED_EVENT";
if (state == ZOO_CHILD_EVENT)
return "CHILD_EVENT";
if (state == ZOO_SESSION_EVENT)
return "SESSION_EVENT";
if (state == ZOO_NOTWATCHING_EVENT)
return "NOTWATCHING_EVENT";

return "UNKNOWN_EVENT_TYPE";
}

void stringCompletion(int rc, const char *value, const void *data) {
char *path = (char*)data;

if ( !(rc == ZOK || rc == ZNODEEXISTS) ) {
fprintf(stderr, "ZK Error: rc = %d with path = %s\n", rc, (path ? path :
"null"));
exit(EXIT_FAILURE);
}

}

void stringsCompletion(int rc, const struct String_vector *strings, const
void *data) {
int i;
char* path = (char* )data;

fprintf(stderr, "%s: rc = %d\n", (char*)data, rc);
if (strings) {
for (i=0; i < strings->count; i++) {
fprintf(stderr, "\t%s\n", strings->data[i]);
}
}
free((void*)data);
}

std::string formatTS(struct timeval &tv) {
char buf[30];
sprintf(buf, "%.3f", tv.tv_sec + tv.tv_usec/1000000.0);
return std::string(buf);
}

// zk watcher function would process events
void zkWatcher(zhandle_t *zkH, int type, int state, const char *path, void *
watcherCtx)
{
struct timeval tv;
gettimeofday(&tv, 0);

std::cout << formatTS(tv) << " s: watcher " << type2String(type) << ",
state = " << state2String(state);
if (path && strlen(path) > 0) {
std::cout << " for path " << path;
}
std::cout << std::endl;

if (type == ZOO_SESSION_EVENT) {

// state refers to states of zookeeper connection.
// To keep it simple, we would demonstrate these 3:
ZOO_EXPIRED_SESSION_STATE, ZOO_CONNECTED_STATE, ZOO_NOTCONNECTED_STATE
// If you are using ACL, you should be aware of an authentication failure
state - ZOO_AUTH_FAILED_STATE
if (state == ZOO_CONNECTED_STATE) {
std::cout << "zkWatcher: Connected to zkserver" << std::endl;
} else if (state == ZOO_NOTCONNECTED_STATE ) {
std::cout << "zkWatcher: Not connected to zkserver " << std::endl;
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
std::cout << "zkWatcher: zk session expired" << std::endl;
zookeeper_close(zkH);
}
} else if (type == ZOO_CHILD_EVENT) {
std::cout << "re-register for childlist again." << std::endl;
int rc = zoo_aget_children(zkH, path, 1, stringsCompletion, strdup(path));
if(rc) {
std::cout << "zoo_aget_children error: rc=" << rc << std::endl;
}
}
}

zhandle_t* zkConnect(std::string zkServers) {
zhandle_t* h = zookeeper_init(zkServers.c_str(), zkWatcher, 10000, 0,0,0);
if(!h) {
std::cerr << "Error in connecting zk servers. " << std::endl;
}
return h;
}


void shutdown(int signum)
{
struct timeval tv;
gettimeofday(&tv, 0);
std::cout << formatTS(tv) << " s, shutdown..." << std::endl;

exit(signum);
}

int main(int argc, char **argv) {

// register a handler for ctrl+C for graceful shutdown
signal(SIGINT, shutdown);

using namespace std::literals::chrono_literals;
std::string zkservers = "node2.zk-cluster.logkv.emulab.net:2181"; //
zookeeper server list ("127.0.0.1:2181,127.0.0.1:3001,127.0.0.1:3002")

zhandle_t* zk = zkConnect(zkservers);
std::cout << "connected to zk servers" << std::endl;

// First, create the root node.
std::string rootPath {"/Nodes"};
int rc = zoo_acreate(zk, rootPath.c_str(), "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_PERSISTENT, stringCompletion, strdup(rootPath.c_str()));
if(rc) {
std::cerr << "zoo_acreate error: rc=" << rc << ", path=" << rootPath << std
::endl;
exit(EXIT_FAILURE);
}
// create a ephemeral node for itself in /Nodes
std::string hostname = getShortHostName();
std::cout << "hostname: " << hostname << std::endl;
std::string hostPath = rootPath + "/" + hostname;
rc = zoo_acreate(zk, hostPath.c_str(), "", 0, &ZOO_OPEN_ACL_UNSAFE,
ZOO_EPHEMERAL, stringCompletion, strdup(hostPath.c_str()));
if(rc) {
std::cerr << "zoo_acreate error: rc=" << rc << ", path=" << hostPath << std
::endl;
exit(EXIT_FAILURE);
}

// add a watcher for /Nodes, to listen for changes for child nodes
rc = zoo_aget_children(zk, rootPath.c_str(), 1, stringsCompletion, strdup(
rootPath.c_str()));
if(rc) {
std::cerr << "zoo_aget_children error: rc=" << rc << ", path=" << rootPath
<< std::endl;
exit(EXIT_FAILURE);
}
std::this_thread::sleep_for(60s);
//zookeeper_close(zk);
}



-Xing

Reply via email to