Good point Jordan, I added a jira for this: https://issues.apache.org/jira/browse/ZOOKEEPER-1464
Patrick On Wed, May 9, 2012 at 1:27 PM, Jordan Zimmerman <[email protected]> wrote: > Interesting - this issue has come up several times with Curator users. I > ended up writing a Tech Note on it. > > https://github.com/Netflix/curator/wiki/Tech-Note-1 > > > -JZ > > On 5/9/12 1:23 PM, "Patrick Hunt" <[email protected]> wrote: > >>I believe the issue is that there is a single thread updating >>watchers. If you block that thread then the event can't be delivered. >> >>Patrick >> >>On Fri, May 4, 2012 at 4:06 AM, guru singh <[email protected]> wrote: >>> Hi, >>> >>> Sorry if the subject is not appropriately titled. >>> >>> I'm trying to implement a redis-failover solution using zookeeper. >>> I've been working with the python binding for zk >>> Basically, I have a znode called /master, a watch is set on this so >>> that, whenever master changes, self.master is upated >>> There is another znode called /errors, a watch is set on this via >>> get_children to errors_watcher function. >>> My code is supposed to continuously loop and create a childe znode on >>> /errors, whenever an error is detected. >>> The function errors_watcher, counts the number of children for znode >>> /errors, if it exceeds a certain length, it writes a new master >>> 'ip:port' to the znode /master, this calls the master watcher and >>> updates self.master. I use python's threading.Condition() to block for >>> certain operations, for instance initially when znode /master is >>> created, I wait() for master_watcher to be called which updates >>> self.master and releases the lock. This works as expected, however the >>> problem is that when znode /master is changed from within >>> errors_watcher, if I wait() for master_watcher to be called, updating >>> self.master and then releasing the lock. The code just keeps waiting, >>> the master_watcher is never called. However, if I don't wait after >>> setting znode /master from within errors_watcher, master_watcher is >>> called and it updates self.master. >>> >>> It'll be really helpful if somebody could point out what's wrong? Is >>> it zk or is my understanding of threading.Condition() incorrect? >>> Or both :) >>> Thanks for your help >>> >>> This code snippet below, simulates the problem. >>> >>> class ZKtest: >>> >>> def __init__(self,zk_server): >>> zk.set_log_stream(open('zk.log','w')) >>> self.master = None >>> self.zk_server = zk_server >>> self.connected = False >>> self.conn_cv = threading.Condition() >>> >>> def global_watcher(self,handle,event,state,path): >>> self.conn_cv.acquire() >>> print 'global watcher called' >>> self.connected = True >>> self.conn_cv.notifyAll() >>> self.conn_cv.release() >>> >>> def master_watcher(self,handle,event,state,path): >>> self.conn_cv.acquire() >>> print 'master watcher called' >>> master = zk.get(self.handle,path,self.master_watcher)[0] >>> self.master = master >>> print 'Master is %s' %(master) >>> self.conn_cv.notifyAll() >>> self.conn_cv.release() >>> >>> def errors_watcher(self,handle,event,state,path): >>> self.conn_cv.acquire() >>> print 'error watcher called' >>> errors = >>>len(zk.get_children(self.handle,'/errors',self.errors_watcher)) >>> print 'Current errors %d' %(errors) >>> if errors > 5 : >>> print 'Set new master, update znode /master' >>> zk.set(self.handle,'/master','127.0.0.1:6380') >>> #self.conn_cv.wait() <-- Why doesn't this return?? >>> self.conn_cv.notifyAll() >>> self.conn_cv.release() >>> >>> >>> def create_znodes(self): >>> self.conn_cv.acquire() >>> master = zk.exists(self.handle,'/master',self.master_watcher) >>> if not master: >>> print 'Creating znode /master' >>> zk.create(self.handle,'/master','127.0.0.1:6379', >>> [ZOO_OPEN_ACL_UNSAFE]) >>> else : >>> print 'Updating znode /master' >>> >>>zk.set(self.handle,'/master','127.0.0.1:6379',master['version']) >>> self.conn_cv.wait() # wait until master_watcher has updated >>> self.master, this returns after master_watcher is called >>> print self.master # should not be None, since master_watcher >>>updates it >>> errors = zk.exists(self.handle,'/errors') >>> if not errors: >>> print 'Creating znode /errors' >>> zk.create(self.handle,'/errors','Errors follow', >>> [ZOO_OPEN_ACL_UNSAFE]) >>> else : >>> print 'Purge previous errors' >>> for err in zk.get_children(self.handle,'/errors'): >>> zk.delete(self.handle,'/errors/'+err) >>> err = zk.get_children(self.handle,'/errors',self.errors_watcher) >>> # set a watch for children of znode /errors >>> self.conn_cv.release() >>> >>> >>> def run(self): >>> self.conn_cv.acquire() >>> self.handle = zk.init(self.zk_server,self.global_watcher) >>> if not self.connected: >>> while not self.connected : >>> print 'Not Connected, retry in 5' >>> self.conn_cv.wait(5) >>> self.handle = zk.init(self.zk_server) >>> self.create_znodes() >>> while self.master != '127.0.0.1:6380': >>> print 'Current Master %s' %(self.master) >>> # simulate errors, until master is not 127.0.0.1:6380 >>> >>>zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE], >>> zk.SEQUENCE) >>> self.conn_cv.wait() >>> self.conn_cv.release() >>> >>> >>> if __name__ == '__main__' : >>> zkt = ZKtest('127.0.0.1:2181') >>> zkt.run() >> >
