Repository: storm
Updated Branches:
  refs/heads/jstorm-import 27fb31c1c -> 7eaf06513


http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java 
b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java
index ed9989d..ca40e16 100755
--- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java
+++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/Zookeeper.java
@@ -53,8 +53,7 @@ public class Zookeeper {
 
     private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
 
-    public CuratorFramework mkClient(Map conf, List<String> servers,
-            Object port, String root) {
+    public CuratorFramework mkClient(Map conf, List<String> servers, Object 
port, String root) {
         return mkClient(conf, servers, port, root, new 
DefaultWatcherCallBack());
     }
 
@@ -63,64 +62,50 @@ public class Zookeeper {
      * 
      * @return
      */
-    public CuratorFramework mkClient(Map conf, List<String> servers,
-            Object port, String root, final WatcherCallBack watcher) {
+    public CuratorFramework mkClient(Map conf, List<String> servers, Object 
port, String root, final WatcherCallBack watcher) {
 
         CuratorFramework fk = Utils.newCurator(conf, servers, port, root);
 
         fk.getCuratorListenable().addListener(new CuratorListener() {
             @Override
-            public void eventReceived(CuratorFramework _fk, CuratorEvent e)
-                    throws Exception {
+            public void eventReceived(CuratorFramework _fk, CuratorEvent e) 
throws Exception {
                 if (e.getType().equals(CuratorEventType.WATCHED)) {
                     WatchedEvent event = e.getWatchedEvent();
 
-                    watcher.execute(event.getState(), event.getType(),
-                            event.getPath());
+                    watcher.execute(event.getState(), event.getType(), 
event.getPath());
                 }
 
             }
         });
 
-        fk.getUnhandledErrorListenable().addListener(
-                new UnhandledErrorListener() {
-                    @Override
-                    public void unhandledError(String msg, Throwable error) {
-                        String errmsg =
-                                "Unrecoverable Zookeeper error, halting 
process: "
-                                        + msg;
-                        LOG.error(errmsg, error);
-                        JStormUtils.halt_process(1,
-                                "Unrecoverable Zookeeper error");
-
-                    }
-                });
+        fk.getUnhandledErrorListenable().addListener(new 
UnhandledErrorListener() {
+            @Override
+            public void unhandledError(String msg, Throwable error) {
+                String errmsg = "Unrecoverable Zookeeper error, halting 
process: " + msg;
+                LOG.error(errmsg, error);
+                JStormUtils.halt_process(1, "Unrecoverable Zookeeper error");
+
+            }
+        });
         fk.start();
         return fk;
     }
 
-    public String createNode(CuratorFramework zk, String path, byte[] data,
-            org.apache.zookeeper.CreateMode mode) throws Exception {
+    public String createNode(CuratorFramework zk, String path, byte[] data, 
org.apache.zookeeper.CreateMode mode) throws Exception {
 
         String npath = PathUtils.normalize_path(path);
 
-        return zk.create().withMode(mode).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
-                .forPath(npath, data);
+        return 
zk.create().withMode(mode).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(npath, 
data);
     }
 
-    public String createNode(CuratorFramework zk, String path, byte[] data)
-            throws Exception {
-        return createNode(zk, path, data,
-                org.apache.zookeeper.CreateMode.PERSISTENT);
+    public String createNode(CuratorFramework zk, String path, byte[] data) 
throws Exception {
+        return createNode(zk, path, data, 
org.apache.zookeeper.CreateMode.PERSISTENT);
     }
 
-    public boolean existsNode(CuratorFramework zk, String path, boolean watch)
-            throws Exception {
+    public boolean existsNode(CuratorFramework zk, String path, boolean watch) 
throws Exception {
         Stat stat = null;
         if (watch) {
-            stat =
-                    zk.checkExists().watched()
-                            .forPath(PathUtils.normalize_path(path));
+            stat = 
zk.checkExists().watched().forPath(PathUtils.normalize_path(path));
         } else {
             stat = zk.checkExists().forPath(PathUtils.normalize_path(path));
         }
@@ -147,8 +132,7 @@ public class Zookeeper {
 
         mkdirs(zk, PathUtils.parent_path(npath));
         try {
-            createNode(zk, npath, JStormUtils.barr((byte) 7),
-                    org.apache.zookeeper.CreateMode.PERSISTENT);
+            createNode(zk, npath, JStormUtils.barr((byte) 7), 
org.apache.zookeeper.CreateMode.PERSISTENT);
         } catch (KeeperException e) {
             ;// this can happen when multiple clients doing mkdir at same
              // time
@@ -158,8 +142,7 @@ public class Zookeeper {
 
     }
 
-    public byte[] getData(CuratorFramework zk, String path, boolean watch)
-            throws Exception {
+    public byte[] getData(CuratorFramework zk, String path, boolean watch) 
throws Exception {
         String npath = PathUtils.normalize_path(path);
         try {
             if (existsNode(zk, npath, watch)) {
@@ -176,8 +159,7 @@ public class Zookeeper {
         return null;
     }
 
-    public List<String> getChildren(CuratorFramework zk, String path,
-            boolean watch) throws Exception {
+    public List<String> getChildren(CuratorFramework zk, String path, boolean 
watch) throws Exception {
 
         String npath = PathUtils.normalize_path(path);
 
@@ -188,41 +170,26 @@ public class Zookeeper {
         }
     }
 
-    public Stat setData(CuratorFramework zk, String path, byte[] data)
-            throws Exception {
+    public Stat setData(CuratorFramework zk, String path, byte[] data) throws 
Exception {
         String npath = PathUtils.normalize_path(path);
         return zk.setData().forPath(npath, data);
     }
 
-    public boolean exists(CuratorFramework zk, String path, boolean watch)
-            throws Exception {
+    public boolean exists(CuratorFramework zk, String path, boolean watch) 
throws Exception {
         return existsNode(zk, path, watch);
     }
 
-    public void deletereRcursive(CuratorFramework zk, String path)
-            throws Exception {
+    public void deletereRcursive(CuratorFramework zk, String path) throws 
Exception {
 
         String npath = PathUtils.normalize_path(path);
 
         if (existsNode(zk, npath, false)) {
-
-            List<String> childs = getChildren(zk, npath, false);
-
-            for (String child : childs) {
-
-                String childFullPath = PathUtils.full_path(npath, child);
-
-                deletereRcursive(zk, childFullPath);
-            }
-
-            deleteNode(zk, npath);
+            zk.delete().guaranteed().deletingChildrenIfNeeded().forPath(npath);
         }
     }
 
-    public static Factory mkInprocessZookeeper(String localdir, int port)
-            throws IOException, InterruptedException {
-        LOG.info("Starting inprocess zookeeper at port " + port + " and dir "
-                + localdir);
+    public static Factory mkInprocessZookeeper(String localdir, int port) 
throws IOException, InterruptedException {
+        LOG.info("Starting inprocess zookeeper at port " + port + " and dir " 
+ localdir);
         File localfile = new File(localdir);
         ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000);
         Factory factory = new Factory(new InetSocketAddress(port), 0);

Reply via email to