Interesting - this issue has come up several times with Curator users. I ended up writing a Tech Note on it.
https://github.com/Netflix/curator/wiki/Tech-Note-1 -JZ On 5/9/12 1:23 PM, "Patrick Hunt" <[email protected]> wrote: >I believe the issue is that there is a single thread updating >watchers. If you block that thread then the event can't be delivered. > >Patrick > >On Fri, May 4, 2012 at 4:06 AM, guru singh <[email protected]> wrote: >> Hi, >> >> Sorry if the subject is not appropriately titled. >> >> I'm trying to implement a redis-failover solution using zookeeper. >> I've been working with the python binding for zk >> Basically, I have a znode called /master, a watch is set on this so >> that, whenever master changes, self.master is upated >> There is another znode called /errors, a watch is set on this via >> get_children to errors_watcher function. >> My code is supposed to continuously loop and create a childe znode on >> /errors, whenever an error is detected. >> The function errors_watcher, counts the number of children for znode >> /errors, if it exceeds a certain length, it writes a new master >> 'ip:port' to the znode /master, this calls the master watcher and >> updates self.master. I use python's threading.Condition() to block for >> certain operations, for instance initially when znode /master is >> created, I wait() for master_watcher to be called which updates >> self.master and releases the lock. This works as expected, however the >> problem is that when znode /master is changed from within >> errors_watcher, if I wait() for master_watcher to be called, updating >> self.master and then releasing the lock. The code just keeps waiting, >> the master_watcher is never called. However, if I don't wait after >> setting znode /master from within errors_watcher, master_watcher is >> called and it updates self.master. >> >> It'll be really helpful if somebody could point out what's wrong? Is >> it zk or is my understanding of threading.Condition() incorrect? >> Or both :) >> Thanks for your help >> >> This code snippet below, simulates the problem. >> >> class ZKtest: >> >> def __init__(self,zk_server): >> zk.set_log_stream(open('zk.log','w')) >> self.master = None >> self.zk_server = zk_server >> self.connected = False >> self.conn_cv = threading.Condition() >> >> def global_watcher(self,handle,event,state,path): >> self.conn_cv.acquire() >> print 'global watcher called' >> self.connected = True >> self.conn_cv.notifyAll() >> self.conn_cv.release() >> >> def master_watcher(self,handle,event,state,path): >> self.conn_cv.acquire() >> print 'master watcher called' >> master = zk.get(self.handle,path,self.master_watcher)[0] >> self.master = master >> print 'Master is %s' %(master) >> self.conn_cv.notifyAll() >> self.conn_cv.release() >> >> def errors_watcher(self,handle,event,state,path): >> self.conn_cv.acquire() >> print 'error watcher called' >> errors = >>len(zk.get_children(self.handle,'/errors',self.errors_watcher)) >> print 'Current errors %d' %(errors) >> if errors > 5 : >> print 'Set new master, update znode /master' >> zk.set(self.handle,'/master','127.0.0.1:6380') >> #self.conn_cv.wait() <-- Why doesn't this return?? >> self.conn_cv.notifyAll() >> self.conn_cv.release() >> >> >> def create_znodes(self): >> self.conn_cv.acquire() >> master = zk.exists(self.handle,'/master',self.master_watcher) >> if not master: >> print 'Creating znode /master' >> zk.create(self.handle,'/master','127.0.0.1:6379', >> [ZOO_OPEN_ACL_UNSAFE]) >> else : >> print 'Updating znode /master' >> >>zk.set(self.handle,'/master','127.0.0.1:6379',master['version']) >> self.conn_cv.wait() # wait until master_watcher has updated >> self.master, this returns after master_watcher is called >> print self.master # should not be None, since master_watcher >>updates it >> errors = zk.exists(self.handle,'/errors') >> if not errors: >> print 'Creating znode /errors' >> zk.create(self.handle,'/errors','Errors follow', >> [ZOO_OPEN_ACL_UNSAFE]) >> else : >> print 'Purge previous errors' >> for err in zk.get_children(self.handle,'/errors'): >> zk.delete(self.handle,'/errors/'+err) >> err = zk.get_children(self.handle,'/errors',self.errors_watcher) >> # set a watch for children of znode /errors >> self.conn_cv.release() >> >> >> def run(self): >> self.conn_cv.acquire() >> self.handle = zk.init(self.zk_server,self.global_watcher) >> if not self.connected: >> while not self.connected : >> print 'Not Connected, retry in 5' >> self.conn_cv.wait(5) >> self.handle = zk.init(self.zk_server) >> self.create_znodes() >> while self.master != '127.0.0.1:6380': >> print 'Current Master %s' %(self.master) >> # simulate errors, until master is not 127.0.0.1:6380 >> >>zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE], >> zk.SEQUENCE) >> self.conn_cv.wait() >> self.conn_cv.release() >> >> >> if __name__ == '__main__' : >> zkt = ZKtest('127.0.0.1:2181') >> zkt.run() >
