Repository: storm Updated Branches: refs/heads/jstorm-import 27fb31c1c -> 7eaf06513
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java index ed9989d..ca40e16 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java @@ -53,8 +53,7 @@ public class Zookeeper { private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class); - public CuratorFramework mkClient(Map conf, List<String> servers, - Object port, String root) { + public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root) { return mkClient(conf, servers, port, root, new DefaultWatcherCallBack()); } @@ -63,64 +62,50 @@ public class Zookeeper { * * @return */ - public CuratorFramework mkClient(Map conf, List<String> servers, - Object port, String root, final WatcherCallBack watcher) { + public CuratorFramework mkClient(Map conf, List<String> servers, Object port, String root, final WatcherCallBack watcher) { CuratorFramework fk = Utils.newCurator(conf, servers, port, root); fk.getCuratorListenable().addListener(new CuratorListener() { @Override - public void eventReceived(CuratorFramework _fk, CuratorEvent e) - throws Exception { + public void eventReceived(CuratorFramework _fk, CuratorEvent e) throws Exception { if (e.getType().equals(CuratorEventType.WATCHED)) { WatchedEvent event = e.getWatchedEvent(); - watcher.execute(event.getState(), event.getType(), - event.getPath()); + watcher.execute(event.getState(), event.getType(), event.getPath()); } } }); - fk.getUnhandledErrorListenable().addListener( - new UnhandledErrorListener() { - @Override - public void unhandledError(String msg, Throwable error) { - String errmsg = - "Unrecoverable Zookeeper error, halting process: " - + msg; - LOG.error(errmsg, error); - JStormUtils.halt_process(1, - "Unrecoverable Zookeeper error"); - - } - }); + fk.getUnhandledErrorListenable().addListener(new UnhandledErrorListener() { + @Override + public void unhandledError(String msg, Throwable error) { + String errmsg = "Unrecoverable Zookeeper error, halting process: " + msg; + LOG.error(errmsg, error); + JStormUtils.halt_process(1, "Unrecoverable Zookeeper error"); + + } + }); fk.start(); return fk; } - public String createNode(CuratorFramework zk, String path, byte[] data, - org.apache.zookeeper.CreateMode mode) throws Exception { + public String createNode(CuratorFramework zk, String path, byte[] data, org.apache.zookeeper.CreateMode mode) throws Exception { String npath = PathUtils.normalize_path(path); - return zk.create().withMode(mode).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) - .forPath(npath, data); + return zk.create().withMode(mode).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(npath, data); } - public String createNode(CuratorFramework zk, String path, byte[] data) - throws Exception { - return createNode(zk, path, data, - org.apache.zookeeper.CreateMode.PERSISTENT); + public String createNode(CuratorFramework zk, String path, byte[] data) throws Exception { + return createNode(zk, path, data, org.apache.zookeeper.CreateMode.PERSISTENT); } - public boolean existsNode(CuratorFramework zk, String path, boolean watch) - throws Exception { + public boolean existsNode(CuratorFramework zk, String path, boolean watch) throws Exception { Stat stat = null; if (watch) { - stat = - zk.checkExists().watched() - .forPath(PathUtils.normalize_path(path)); + stat = zk.checkExists().watched().forPath(PathUtils.normalize_path(path)); } else { stat = zk.checkExists().forPath(PathUtils.normalize_path(path)); } @@ -147,8 +132,7 @@ public class Zookeeper { mkdirs(zk, PathUtils.parent_path(npath)); try { - createNode(zk, npath, JStormUtils.barr((byte) 7), - org.apache.zookeeper.CreateMode.PERSISTENT); + createNode(zk, npath, JStormUtils.barr((byte) 7), org.apache.zookeeper.CreateMode.PERSISTENT); } catch (KeeperException e) { ;// this can happen when multiple clients doing mkdir at same // time @@ -158,8 +142,7 @@ public class Zookeeper { } - public byte[] getData(CuratorFramework zk, String path, boolean watch) - throws Exception { + public byte[] getData(CuratorFramework zk, String path, boolean watch) throws Exception { String npath = PathUtils.normalize_path(path); try { if (existsNode(zk, npath, watch)) { @@ -176,8 +159,7 @@ public class Zookeeper { return null; } - public List<String> getChildren(CuratorFramework zk, String path, - boolean watch) throws Exception { + public List<String> getChildren(CuratorFramework zk, String path, boolean watch) throws Exception { String npath = PathUtils.normalize_path(path); @@ -188,41 +170,26 @@ public class Zookeeper { } } - public Stat setData(CuratorFramework zk, String path, byte[] data) - throws Exception { + public Stat setData(CuratorFramework zk, String path, byte[] data) throws Exception { String npath = PathUtils.normalize_path(path); return zk.setData().forPath(npath, data); } - public boolean exists(CuratorFramework zk, String path, boolean watch) - throws Exception { + public boolean exists(CuratorFramework zk, String path, boolean watch) throws Exception { return existsNode(zk, path, watch); } - public void deletereRcursive(CuratorFramework zk, String path) - throws Exception { + public void deletereRcursive(CuratorFramework zk, String path) throws Exception { String npath = PathUtils.normalize_path(path); if (existsNode(zk, npath, false)) { - - List<String> childs = getChildren(zk, npath, false); - - for (String child : childs) { - - String childFullPath = PathUtils.full_path(npath, child); - - deletereRcursive(zk, childFullPath); - } - - deleteNode(zk, npath); + zk.delete().guaranteed().deletingChildrenIfNeeded().forPath(npath); } } - public static Factory mkInprocessZookeeper(String localdir, int port) - throws IOException, InterruptedException { - LOG.info("Starting inprocess zookeeper at port " + port + " and dir " - + localdir); + public static Factory mkInprocessZookeeper(String localdir, int port) throws IOException, InterruptedException { + LOG.info("Starting inprocess zookeeper at port " + port + " and dir " + localdir); File localfile = new File(localdir); ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000); Factory factory = new Factory(new InetSocketAddress(port), 0);
