I believe the issue is that there is a single thread updating watchers. If you block that thread then the event can't be delivered.
Patrick On Fri, May 4, 2012 at 4:06 AM, guru singh <[email protected]> wrote: > Hi, > > Sorry if the subject is not appropriately titled. > > I'm trying to implement a redis-failover solution using zookeeper. > I've been working with the python binding for zk > Basically, I have a znode called /master, a watch is set on this so > that, whenever master changes, self.master is upated > There is another znode called /errors, a watch is set on this via > get_children to errors_watcher function. > My code is supposed to continuously loop and create a childe znode on > /errors, whenever an error is detected. > The function errors_watcher, counts the number of children for znode > /errors, if it exceeds a certain length, it writes a new master > 'ip:port' to the znode /master, this calls the master watcher and > updates self.master. I use python's threading.Condition() to block for > certain operations, for instance initially when znode /master is > created, I wait() for master_watcher to be called which updates > self.master and releases the lock. This works as expected, however the > problem is that when znode /master is changed from within > errors_watcher, if I wait() for master_watcher to be called, updating > self.master and then releasing the lock. The code just keeps waiting, > the master_watcher is never called. However, if I don't wait after > setting znode /master from within errors_watcher, master_watcher is > called and it updates self.master. > > It'll be really helpful if somebody could point out what's wrong? Is > it zk or is my understanding of threading.Condition() incorrect? > Or both :) > Thanks for your help > > This code snippet below, simulates the problem. > > class ZKtest: > > def __init__(self,zk_server): > zk.set_log_stream(open('zk.log','w')) > self.master = None > self.zk_server = zk_server > self.connected = False > self.conn_cv = threading.Condition() > > def global_watcher(self,handle,event,state,path): > self.conn_cv.acquire() > print 'global watcher called' > self.connected = True > self.conn_cv.notifyAll() > self.conn_cv.release() > > def master_watcher(self,handle,event,state,path): > self.conn_cv.acquire() > print 'master watcher called' > master = zk.get(self.handle,path,self.master_watcher)[0] > self.master = master > print 'Master is %s' %(master) > self.conn_cv.notifyAll() > self.conn_cv.release() > > def errors_watcher(self,handle,event,state,path): > self.conn_cv.acquire() > print 'error watcher called' > errors = > len(zk.get_children(self.handle,'/errors',self.errors_watcher)) > print 'Current errors %d' %(errors) > if errors > 5 : > print 'Set new master, update znode /master' > zk.set(self.handle,'/master','127.0.0.1:6380') > #self.conn_cv.wait() <-- Why doesn't this return?? > self.conn_cv.notifyAll() > self.conn_cv.release() > > > def create_znodes(self): > self.conn_cv.acquire() > master = zk.exists(self.handle,'/master',self.master_watcher) > if not master: > print 'Creating znode /master' > zk.create(self.handle,'/master','127.0.0.1:6379', > [ZOO_OPEN_ACL_UNSAFE]) > else : > print 'Updating znode /master' > zk.set(self.handle,'/master','127.0.0.1:6379',master['version']) > self.conn_cv.wait() # wait until master_watcher has updated > self.master, this returns after master_watcher is called > print self.master # should not be None, since master_watcher updates it > errors = zk.exists(self.handle,'/errors') > if not errors: > print 'Creating znode /errors' > zk.create(self.handle,'/errors','Errors follow', > [ZOO_OPEN_ACL_UNSAFE]) > else : > print 'Purge previous errors' > for err in zk.get_children(self.handle,'/errors'): > zk.delete(self.handle,'/errors/'+err) > err = zk.get_children(self.handle,'/errors',self.errors_watcher) > # set a watch for children of znode /errors > self.conn_cv.release() > > > def run(self): > self.conn_cv.acquire() > self.handle = zk.init(self.zk_server,self.global_watcher) > if not self.connected: > while not self.connected : > print 'Not Connected, retry in 5' > self.conn_cv.wait(5) > self.handle = zk.init(self.zk_server) > self.create_znodes() > while self.master != '127.0.0.1:6380': > print 'Current Master %s' %(self.master) > # simulate errors, until master is not 127.0.0.1:6380 > zk.create(self.handle,'/errors/','Error!',[ZOO_OPEN_ACL_UNSAFE], > zk.SEQUENCE) > self.conn_cv.wait() > self.conn_cv.release() > > > if __name__ == '__main__' : > zkt = ZKtest('127.0.0.1:2181') > zkt.run()
