Hi,
Sorry if the subject is not appropriately titled.
I'm trying to implement a redis-failover solution using zookeeper.
I've been working with the python binding for zk
Basically, I have a znode called /master, a watch is set on this so
that, whenever master changes, self.master is upated
There is another znode called /errors, a watch is set on this via
get_children to errors_watcher function.
My code is supposed to continuously loop and create a childe znode on
/errors, whenever an error is detected.
The function errors_watcher, counts the number of children for znode
/errors, if it exceeds a certain length, it writes a new master
'ip:port' to the znode /master, this calls the master watcher and
updates self.master. I use python's threading.Condition() to block for
certain operations, for instance initially when znode /master is
created, I wait() for master_watcher to be called which updates
self.master and releases the lock. This works as expected, however the
problem is that when znode /master is changed from within
errors_watcher, if I wait() for master_watcher to be called, updating
self.master and then releasing the lock. The code just keeps waiting,
the master_watcher is never called. However, if I don't wait after
setting znode /master from within errors_watcher, master_watcher is
called and it updates self.master.
It'll be really helpful if somebody could point out what's wrong? Is
it zk or is my understanding of threading.Condition() incorrect?
Or both :)
Thanks for your help
This code snippet below, simulates the problem.
class ZKtest:
def __init__(self,zk_server):
zk.set_log_stream(open('zk.log','w'))
self.master = None
self.zk_server = zk_server
self.connected = False
self.conn_cv = threading.Condition()
def global_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'global watcher called'
self.connected = True
self.conn_cv.notifyAll()
self.conn_cv.release()
def master_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'master watcher called'
master = zk.get(self.handle,path,self.master_watcher)[0]
self.master = master
print 'Master is %s' %(master)
self.conn_cv.notifyAll()
self.conn_cv.release()
def errors_watcher(self,handle,event,state,path):
self.conn_cv.acquire()
print 'error watcher called'
errors = len(zk.get_children(self.handle,'/errors',self.errors_watcher))
print 'Current errors %d' %(errors)
if errors > 5 :
print 'Set new master, update znode /master'
zk.set(self.handle,'/master','127.0.0.1:6380')
#self.conn_cv.wait() <-- Why doesn't this return??
self.conn_cv.notifyAll()
self.conn_cv.release()
def create_znodes(self):
self.conn_cv.acquire()
master = zk.exists(self.handle,'/master',self.master_watcher)
if not master:
print 'Creating znode /master'
zk.create(self.handle,'/master','127.0.0.1:6379',
[ZOO_OPEN_ACL_UNSAFE])
else :
print 'Updating znode /master'
zk.set(self.handle,'/master','127.0.0.1:6379',master['version'])
self.conn_cv.wait() # wait until master_watcher has updated
self.master, this returns after master_watcher is called
print self.master # should not be None, since master_watcher updates it
errors = zk.exists(self.handle,'/errors')
if not errors:
print 'Creating znode /errors'
zk.create(self.handle,'/errors','Errors follow',
[ZOO_OPEN_ACL_UNSAFE])
else :
print 'Purge previous errors'
for err in zk.get_children(self.handle,'/errors'):
zk.delete(self.handle,'/errors/'+err)
err = zk.get_children(self.handle,'/errors',self.errors_watcher)
# set a watch for children of znode /errors
self.conn_cv.release()
def run(self):
self.conn_cv.acquire()
self.handle = zk.init(self.zk_server,self.global_watcher)
if not self.connected:
while not self.connected :
print 'Not Connected, retry in 5'
self.conn_cv.wait(5)
self.handle = zk.init(self.zk_server)
self.create_znodes()
while self.master != '127.0.0.1:6380':
print 'Current Master %s' %(self.master)
# simulate errors, until master is not 127.0.0.1:6380
zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE],
zk.SEQUENCE)
self.conn_cv.wait()
self.conn_cv.release()
if __name__ == '__main__' :
zkt = ZKtest('127.0.0.1:2181')
zkt.run()