Author: ivank Date: Thu Oct 25 13:35:28 2012 New Revision: 1402146 URL: http://svn.apache.org/viewvc?rev=1402146&view=rev Log: BOOKKEEPER-424: Bookie start is failing intermittently when zkclient connection delays (rakeshr via ivank)
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Modified: zookeeper/bookkeeper/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/CHANGES.txt (original) +++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 25 13:35:28 2012 @@ -96,6 +96,8 @@ Trunk (unreleased changes) BOOKKEEPER-436: Journal#rollLog may leak file handler (umamahesh via ivank) + BOOKKEEPER-424: Bookie start is failing intermittently when zkclient connection delays (rakeshr via ivank) + hedwig-protocol: BOOKKEEPER-394: CompositeException message is not useful (Stu Hood via sijie) Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java Thu Oct 25 13:35:28 2012 @@ -50,6 +50,8 @@ import org.apache.bookkeeper.jmx.BKMBean import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,7 +96,6 @@ public class Bookie extends Thread { // ZooKeeper client instance for the Bookie ZooKeeper zk; - private volatile boolean isZkExpired = true; // Running flag private volatile boolean running = false; @@ -467,9 +468,11 @@ public class Bookie extends Thread { } catch (IOException ioe) { LOG.error("Exception while replaying journals, shutting down", ioe); shutdown(ExitCode.BOOKIE_EXCEPTION); + return; } catch (BookieException be) { LOG.error("Exception while replaying journals, shutting down", be); shutdown(ExitCode.BOOKIE_EXCEPTION); + return; } // start bookie thread super.start(); @@ -574,10 +577,10 @@ public class Bookie extends Thread { /** * Instantiate the ZooKeeper client for the Bookie. */ - private ZooKeeper instantiateZookeeperClient(ServerConfiguration conf) throws IOException { + private ZooKeeper instantiateZookeeperClient(ServerConfiguration conf) + throws IOException, InterruptedException, KeeperException { if (conf.getZkServers() == null) { LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!"); - isZkExpired = false; return null; } // Create the ZooKeeper client instance @@ -715,33 +718,21 @@ public class Bookie extends Thread { * @return zk client instance */ private ZooKeeper newZookeeper(final String zkServers, - final int sessionTimeout) throws IOException { - ZooKeeper newZk = new ZooKeeper(zkServers, sessionTimeout, - new Watcher() { + final int sessionTimeout) throws IOException, InterruptedException, + KeeperException { + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()) { @Override public void process(WatchedEvent event) { - // handle session disconnects and expires - if (event.getType() - .equals(Watcher.Event.EventType.None)) { - if (event.getState().equals( - Watcher.Event.KeeperState.Disconnected)) { - LOG.warn("ZK client has been disconnected to the ZK server!"); - } else if (event.getState().equals( - Watcher.Event.KeeperState.SyncConnected)) { - LOG.info("ZK client has been reconnected to the ZK server!"); - } - } // Check for expired connection. - if (event.getState().equals( - Watcher.Event.KeeperState.Expired)) { + if (event.getState().equals(Watcher.Event.KeeperState.Expired)) { LOG.error("ZK client connection to the ZK server has expired!"); - isZkExpired = true; shutdown(ExitCode.ZK_EXPIRED); + } else { + super.process(event); } } - }); - isZkExpired = false; - return newZk; + }; + return ZkUtils.createConnectedZookeeperClient(zkServers, w); } public boolean isRunning() { Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java Thu Oct 25 13:35:28 2012 @@ -34,6 +34,8 @@ import org.apache.bookkeeper.meta.Ledger import org.apache.bookkeeper.meta.LedgerManagerFactory; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; @@ -120,24 +122,9 @@ public class BookKeeper { public BookKeeper(final ClientConfiguration conf) throws IOException, InterruptedException, KeeperException { this.conf = conf; - - final CountDownLatch zkConnectLatch = new CountDownLatch(1); - this.zk = new ZooKeeper(conf.getZkServers(), conf.getZkTimeout(), - new Watcher() { - @Override - public void process(WatchedEvent event) { - // countdown the latch on all events, even if we haven't - // successfully connected. - zkConnectLatch.countDown(); - - // TODO: handle session disconnects and expires - LOG.debug("Process: {} {}", event.getType(), event.getPath()); - } - }); - if (!zkConnectLatch.await(conf.getZkTimeout(), TimeUnit.MILLISECONDS) - || !zk.getState().isConnected()) { - throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); - } + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()); + this.zk = ZkUtils + .createConnectedZookeeperClient(conf.getZkServers(), w); this.channelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java Thu Oct 25 13:35:28 2012 @@ -40,6 +40,7 @@ import org.apache.bookkeeper.proto.Bookk import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -118,8 +119,8 @@ public class BookKeeperAdmin { */ public BookKeeperAdmin(ClientConfiguration conf) throws IOException, InterruptedException, KeeperException { // Create the ZooKeeper client instance - zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), - conf.getZkTimeout()); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()); + zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w); // Create the bookie path bookiesPath = conf.getZkAvailableBookiesPath(); // Create the BookKeeper client instance @@ -693,9 +694,9 @@ public class BookKeeperAdmin { */ public static boolean format(ClientConfiguration conf, boolean isInteractive, boolean force) throws Exception { - - ZooKeeper zkc = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), - conf.getZkTimeout()); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()); + ZooKeeper zkc = ZkUtils.createConnectedZookeeperClient( + conf.getZkServers(), w); BookKeeper bkc = null; try { boolean ledgerRootExists = null != zkc.exists( Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java Thu Oct 25 13:35:28 2012 @@ -32,6 +32,7 @@ import org.apache.bookkeeper.conf.Server import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.HelpFormatter; @@ -64,9 +65,7 @@ public class AutoRecoveryMain { InterruptedException, KeeperException, UnavailableException, CompatibilityException { this.conf = conf; - zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), - conf.getZkTimeout()); - Watcher watcher = new Watcher() { + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(conf.getZkTimeout()) { @Override public void process(WatchedEvent event) { // Check for expired connection. @@ -74,10 +73,12 @@ public class AutoRecoveryMain { LOG.error("ZK client connection to the" + " ZK server has expired!"); shutdown(ExitCode.ZK_EXPIRED); + } else { + super.process(event); } } }; - zk.register(watcher); + zk = ZkUtils.createConnectedZookeeperClient(conf.getZkServers(), w); auditorElector = new AuditorElector( getMyBookieAddress(conf).toString(), conf, zk); replicationWorker = new ReplicationWorker(zk, conf, Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java Thu Oct 25 13:35:28 2012 @@ -24,28 +24,20 @@ package org.apache.bookkeeper.util; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.KeeperException.Code; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.Watcher.Event.EventType; -import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.ZooKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Provided utilites for zookeeper access, etc. */ public class ZkUtils { - private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class); /** * Create zookeeper path recursively @@ -111,31 +103,17 @@ public class ZkUtils { * @param timeout * Session timeout. */ - public static ZooKeeper createConnectedZookeeperClient(String servers, int timeout) - throws IOException, InterruptedException, KeeperException { + public static ZooKeeper createConnectedZookeeperClient(String servers, + ZooKeeperWatcherBase w) throws IOException, InterruptedException, + KeeperException { if (servers == null || servers.isEmpty()) { throw new IllegalArgumentException("servers cannot be empty"); } - final CountDownLatch zkConnectLatch = new CountDownLatch(1); - Watcher connectWatcher = new Watcher() { - - @Override - public void process(WatchedEvent event) { - if (LOG.isDebugEnabled()) { - LOG.debug("Process: " + event.getType() + " " - + event.getPath()); - } - if (event.getType().equals(EventType.None) - && event.getState() == KeeperState.SyncConnected) { - if (zkConnectLatch.getCount() > 0) { - zkConnectLatch.countDown(); - } - } - } - }; - final ZooKeeper newZk = new ZooKeeper(servers, timeout, connectWatcher); - if (!zkConnectLatch.await(timeout, TimeUnit.MILLISECONDS) - || !newZk.getState().isConnected()) { + final ZooKeeper newZk = new ZooKeeper(servers, w.getZkSessionTimeOut(), + w); + w.waitForConnection(); + // Re-checking zookeeper connection status + if (!newZk.getState().isConnected()) { throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS); } return newZk; Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java Thu Oct 25 13:35:28 2012 @@ -36,6 +36,7 @@ import org.apache.bookkeeper.bookie.Book import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.ZooKeeperUtil; +import org.apache.commons.io.FileUtils; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; @@ -88,7 +89,7 @@ public class BookieInitializationTest { * timeout when previous reg node exists in zk. On zNode delete event, * should continue startup */ - @Test + @Test(timeout = 20000) public void testBookieRegistration() throws Exception { File tmpDir = File.createTempFile("bookie", "test"); tmpDir.delete(); @@ -154,7 +155,7 @@ public class BookieInitializationTest { * KeeperException.NodeExistsException if the znode still exists even after * the zk session timeout. */ - @Test + @Test(timeout = 30000) public void testRegNodeExistsAfterSessionTimeOut() throws Exception { File tmpDir = File.createTempFile("bookie", "test"); tmpDir.delete(); @@ -209,7 +210,7 @@ public class BookieInitializationTest { * Verify duplicate bookie server startup. Should throw * java.net.BindException if already BK server is running */ - @Test + @Test(timeout = 20000) public void testDuplicateBookieServerStartup() throws Exception { File tmpDir = File.createTempFile("bookie", "test"); tmpDir.delete(); @@ -234,6 +235,31 @@ public class BookieInitializationTest { } } + /** + * Verify bookie start behaviour when ZK Server is not running. + */ + @Test(timeout = 20000) + public void testStartBookieWithoutZKServer() throws Exception { + zkutil.killServer(); + + File tmpDir = File.createTempFile("bookie", "test"); + tmpDir.delete(); + tmpDir.mkdir(); + + final ServerConfiguration conf = new ServerConfiguration() + .setZkServers(zkutil.getZooKeeperConnectString()) + .setZkTimeout(5000).setJournalDirName(tmpDir.getPath()) + .setLedgerDirNames(new String[] { tmpDir.getPath() }); + try { + new Bookie(conf); + fail("Should throw ConnectionLossException as ZKServer is not running!"); + } catch (KeeperException.ConnectionLossException e) { + // expected behaviour + } finally { + FileUtils.deleteDirectory(tmpDir); + } + } + private void createNewZKClient() throws Exception { // create a zookeeper client LOG.debug("Instantiate ZK Client"); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestLedgerUnderreplicationManager.java Thu Oct 25 13:35:28 2012 @@ -52,6 +52,7 @@ import org.apache.bookkeeper.replication import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; import org.apache.bookkeeper.test.ZooKeeperUtil; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.lang.StringUtils; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -94,10 +95,12 @@ public class TestLedgerUnderreplicationM executor = Executors.newCachedThreadPool(); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000); zkc1 = ZkUtils.createConnectedZookeeperClient( - zkUtil.getZooKeeperConnectString(), 10000); + zkUtil.getZooKeeperConnectString(), w); + w = new ZooKeeperWatcherBase(10000); zkc2 = ZkUtils.createConnectedZookeeperClient( - zkUtil.getZooKeeperConnectString(), 10000); + zkUtil.getZooKeeperConnectString(), w); lmf1 = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc1); lmf2 = LedgerManagerFactory.newLedgerManagerFactory(conf, zkc2); basePath = conf.getZkLedgersRootPath() + '/' Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java Thu Oct 25 13:35:28 2012 @@ -39,6 +39,7 @@ import org.apache.bookkeeper.meta.Ledger import org.apache.bookkeeper.meta.ZkLedgerUnderreplicationManager; import org.apache.bookkeeper.test.MultiLedgerManagerTestCase; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.junit.Test; @@ -224,8 +225,9 @@ public class TestReplicationWorker exten InetSocketAddress newBkAddr2 = new InetSocketAddress(InetAddress .getLocalHost().getHostAddress(), startNewBookie2); LOG.info("New Bookie addr :" + newBkAddr2); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000); ZooKeeper zkc1 = ZkUtils.createConnectedZookeeperClient( - zkUtil.getZooKeeperConnectString(), 10000); + zkUtil.getZooKeeperConnectString(), w); ReplicationWorker rw2 = new ReplicationWorker(zkc1, baseConf, newBkAddr2); rw1.start(); Modified: zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java?rev=1402146&r1=1402145&r2=1402146&view=diff ============================================================================== --- zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java (original) +++ zookeeper/bookkeeper/trunk/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java Thu Oct 25 13:35:28 2012 @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.io.FileUtils; import java.util.concurrent.CountDownLatch; @@ -90,8 +91,9 @@ public class ZooKeeperUtil { // create a zookeeper client LOG.debug("Instantiate ZK Client"); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000); zkc = ZkUtils.createConnectedZookeeperClient( - getZooKeeperConnectString(), 10000); + getZooKeeperConnectString(), w); // initialize the zk client with values zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);