update/fix some codes based on @revans2

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09e2755
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09e2755
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09e2755

Branch: refs/heads/master
Commit: b09e2755c2fedcb9048afda63bea897fe1dbd12b
Parents: d709525
Author: xiaojian.fxj <[email protected]>
Authored: Wed Feb 24 11:19:06 2016 +0800
Committer: xiaojian.fxj <[email protected]>
Committed: Wed Feb 24 11:19:06 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  2 +-
 .../clj/org/apache/storm/daemon/supervisor.clj  |  4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  8 +-
 .../org/apache/storm/cluster/ClusterUtils.java  | 25 +-----
 .../storm/cluster/IStormClusterState.java       |  4 +-
 .../storm/cluster/PaceMakerStateStorage.java    | 12 ++-
 .../storm/cluster/StormClusterStateImpl.java    | 41 +++++-----
 .../apache/storm/cluster/ZKStateStorage.java    | 80 +++++++-------------
 .../org/apache/storm/zookeeper/Zookeeper.java   |  9 ++-
 9 files changed, 74 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 2f6587a..e524ec2 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -1676,7 +1676,7 @@
                                              [(node->host node) port])
                                     executor->node+port)
               nodeinfos (stats/extract-nodeinfos-from-hb-for-comp 
executor->host+port task->component false component_id)
-              all-pending-actions-for-topology (clojurify-profile-request 
(.getTopologyProfileRequests storm-cluster-state id true))
+              all-pending-actions-for-topology (clojurify-profile-request 
(.getTopologyProfileRequests storm-cluster-state id))
               latest-profile-actions (remove nil? (map (fn [nodeInfo]
                                                          (->> 
all-pending-actions-for-topology
                                                               (filter #(and (= 
(:host nodeInfo) (.get_node (.get_nodeInfo %)))

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 0cee414..1446ac9 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -81,7 +81,7 @@
           (->>
             (dofor [sid (distinct storm-ids)]
 
-                   (if-let [topo-profile-actions (into [] (for [request 
(.getTopologyProfileRequests storm-cluster-state sid false)] 
(clojurify-profile-request request)))]
+                   (if-let [topo-profile-actions (into [] (for [request 
(.getTopologyProfileRequests storm-cluster-state sid)] 
(clojurify-profile-request request)))]
                       {sid topo-profile-actions}))
            (apply merge))]
       {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
@@ -607,7 +607,7 @@
           storm-cluster-state (:storm-cluster-state supervisor)
           ^ISupervisor isupervisor (:isupervisor supervisor)
           ^LocalState local-state (:local-state supervisor)
-          sync-callback (fn [& ignored] (.add event-manager (reify Runnable
+          sync-callback (fn [] (.add event-manager (reify Runnable
                                                                    (^void run 
[this]
                                                                      
(callback-supervisor)))))
           assignment-versions @(:assignment-versions supervisor)

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj 
b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index af88f6a..110d415 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -383,7 +383,7 @@
         storm-id (:storm-id worker)]
     (fn refresh-connections
       ([]
-        (refresh-connections (fn [& ignored]
+        (refresh-connections (fn []
                 (.schedule
                   (:refresh-connections-timer worker) 0 refresh-connections))))
       ([callback]
@@ -438,7 +438,7 @@
 (defn refresh-storm-active
   ([worker]
     (refresh-storm-active
-      worker (fn [& ignored]
+      worker (fn []
                (.schedule
                  (:refresh-active-timer worker) 0 (partial 
refresh-storm-active worker)))))
   ([worker callback]
@@ -685,7 +685,7 @@
         backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger 
worker) worker backpressure-handler)
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) 
             (.start backpressure-thread))
-        callback (fn cb [& ignored]
+        callback (fn cb []
                    (let [throttle-on (.topologyBackpressure 
storm-cluster-state storm-id cb)]
                      (reset! (:throttle-on worker) throttle-on)))
         _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE)
@@ -757,7 +757,7 @@
                                         (dofor [e @executors] 
(.credentials-changed e new-creds))
                                         (reset! credentials new-creds))))
        check-throttle-changed (fn []
-                                (let [callback (fn cb [& ignored]
+                                (let [callback (fn cb []
                                                  (let [throttle-on 
(.topologyBackpressure (:storm-cluster-state worker) storm-id cb)]
                                                    (reset! (:throttle-on 
worker) throttle-on)))
                                       new-throttle-on (.topologyBackpressure 
(:storm-cluster-state worker) storm-id callback)]

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java 
b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index 1095fff..96c177b 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@ -211,7 +211,6 @@ public class ClusterUtils {
             IStateStorage Storage = _instance.mkStateStorageImpl((Map) 
stateStorage, (Map) stateStorage, acls, context);
             return new StormClusterStateImpl(Storage, acls, context, true);
         }
-
     }
 
     public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, 
List<ACL> acls, ClusterStateContext context) throws Exception {
@@ -237,25 +236,9 @@ public class ClusterUtils {
     }
 
     public static String stringifyError(Throwable error) {
-        String errorString = null;
-        StringWriter result = null;
-        PrintWriter printWriter = null;
-        try {
-            result = new StringWriter();
-            printWriter = new PrintWriter(result);
-            error.printStackTrace(printWriter);
-            if (result != null) {
-                errorString = result.toString();
-            }
-        } finally {
-            try {
-                if (result != null)
-                    result.close();
-                if (printWriter != null)
-                    printWriter.close();
-            } catch (Exception e) {
-            }
-        }
-        return errorString;
+        StringWriter result = new StringWriter();
+        PrintWriter printWriter = new PrintWriter(result);
+        error.printStackTrace(printWriter);
+        return result.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java 
b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index c88935e..e26c598 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@ -45,9 +45,9 @@ public interface IStormClusterState {
 
     public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String 
node, Long port);
 
-    public List<ProfileRequest> getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo, boolean isThrift);
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo);
 
-    public List<ProfileRequest> getTopologyProfileRequests(String stormId, 
boolean isThrift);
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId);
 
     public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java 
b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index c29078e..c42bd38 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@ -127,7 +127,8 @@ public class PaceMakerStateStorage implements IStateStorage 
{
                 if (retry <= 0) {
                     throw Utils.wrapInRuntime(e);
                 }
-                LOG.error("{} Failed to set_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
+                retry--;
+                LOG.error("{} Failed to set_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry);
             }
         }
     }
@@ -148,7 +149,8 @@ public class PaceMakerStateStorage implements IStateStorage 
{
                 if (retry <= 0) {
                     throw Utils.wrapInRuntime(e);
                 }
-                LOG.error("{} Failed to get_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
+                retry--;
+                LOG.error("{} Failed to get_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry);
             }
         }
     }
@@ -169,7 +171,8 @@ public class PaceMakerStateStorage implements IStateStorage 
{
                 if (retry <= 0) {
                     throw Utils.wrapInRuntime(e);
                 }
-                LOG.error("{} Failed to get_worker_hb_children. Will make {} 
more attempts.", e.getMessage(), retry--);
+                retry--;
+                LOG.error("{} Failed to get_worker_hb_children. Will make {} 
more attempts.", e.getMessage(), retry);
             }
         }
     }
@@ -190,7 +193,8 @@ public class PaceMakerStateStorage implements IStateStorage 
{
                 if (retry <= 0) {
                     throw Utils.wrapInRuntime(e);
                 }
-                LOG.error("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
+                retry--;
+                LOG.error("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java 
b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
index 5fa586a..bde7670 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java
@@ -106,7 +106,7 @@ public class StormClusterStateImpl implements 
IStormClusterState {
                     } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && 
size > 1) {
                         issueMapCallback(logConfigCallback, toks.get(1));
                     } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && 
size > 1) {
-                        issueMapCallback(logConfigCallback, toks.get(1));
+                        issueMapCallback(backPressureCallback, toks.get(1));
                     } else {
                         LOG.error("{} Unknown callback for subtree {}", new 
RuntimeException("Unknown callback for this path"), path);
                         Runtime.getRuntime().exit(30);
@@ -242,9 +242,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public List<ProfileRequest> getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo, boolean isThrift) {
+    public List<ProfileRequest> getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo) {
         List<ProfileRequest> requests = new ArrayList<>();
-        List<ProfileRequest> profileRequests = 
getTopologyProfileRequests(stormId, isThrift);
+        List<ProfileRequest> profileRequests = 
getTopologyProfileRequests(stormId);
         for (ProfileRequest profileRequest : profileRequests) {
             NodeInfo nodeInfo1 = profileRequest.get_nodeInfo();
             if (nodeInfo1.equals(nodeInfo))
@@ -254,7 +254,7 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     }
 
     @Override
-    public List<ProfileRequest> getTopologyProfileRequests(String stormId, 
boolean isThrift) {
+    public List<ProfileRequest> getTopologyProfileRequests(String stormId) {
         List<ProfileRequest> profileRequests = new ArrayList<>();
         String path = ClusterUtils.profilerConfigPath(stormId);
         if (stateStorage.node_exists(path, false)) {
@@ -382,6 +382,9 @@ public class StormClusterStateImpl implements 
IStormClusterState {
 
     @Override
     public LogConfig topologyLogConfig(String stormId, Runnable cb) {
+        if (cb != null){
+            logConfigCallback.put(stormId, cb);
+        }
         String path = ClusterUtils.logConfigPath(stormId);
         return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != 
null), LogConfig.class);
     }
@@ -625,25 +628,21 @@ public class StormClusterStateImpl implements 
IStormClusterState {
     @Override
     public List<ErrorInfo> errors(String stormId, String componentId) {
         List<ErrorInfo> errorInfos = new ArrayList<>();
-        try {
-            String path = ClusterUtils.errorPath(stormId, componentId);
-            if (stateStorage.node_exists(path, false)) {
-                List<String> childrens = stateStorage.get_children(path, 
false);
-                for (String child : childrens) {
-                    String childPath = path + ClusterUtils.ZK_SEPERATOR + 
child;
-                    ErrorInfo errorInfo = 
ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), 
ErrorInfo.class);
-                    if (errorInfo != null)
-                        errorInfos.add(errorInfo);
-                }
+        String path = ClusterUtils.errorPath(stormId, componentId);
+        if (stateStorage.node_exists(path, false)) {
+            List<String> childrens = stateStorage.get_children(path, false);
+            for (String child : childrens) {
+                String childPath = path + ClusterUtils.ZK_SEPERATOR + child;
+                ErrorInfo errorInfo = 
ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), 
ErrorInfo.class);
+                if (errorInfo != null)
+                    errorInfos.add(errorInfo);
             }
-            Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
-                public int compare(ErrorInfo arg0, ErrorInfo arg1) {
-                    return Integer.compare(arg1.get_error_time_secs(), 
arg0.get_error_time_secs());
-                }
-            });
-        } catch (Exception e) {
-            throw Utils.wrapInRuntime(e);
         }
+        Collections.sort(errorInfos, new Comparator<ErrorInfo>() {
+            public int compare(ErrorInfo arg0, ErrorInfo arg1) {
+                return Integer.compare(arg1.get_error_time_secs(), 
arg0.get_error_time_secs());
+            }
+        });
 
         return errorInfos;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java 
b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
index 56115ce..4cf0c05 100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java
@@ -53,6 +53,26 @@ public class ZKStateStorage implements IStateStorage {
     private Map authConf;
     private Map<Object, Object> conf;
 
+    private class ZkWatcherCallBack implements WatcherCallBack{
+        @Override
+        public void execute(Watcher.Event.KeeperState state, 
Watcher.Event.EventType type, String path) {
+            if (active.get()) {
+                if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
+                    LOG.debug("Received event {} : {}: {} with disconnected 
Zookeeper.", state, type, path);
+                } else {
+                    LOG.debug("Received event {} : {} : {}", state, type, 
path);
+                }
+
+                if (!type.equals(Watcher.Event.EventType.None)) {
+                    for (Map.Entry<String, ZKStateChangedCallback> e : 
callbacks.entrySet()) {
+                        ZKStateChangedCallback fn = e.getValue();
+                        fn.changed(type, path);
+                    }
+                }
+            }
+        }
+    }
+
     public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> 
acls, ClusterStateContext context) throws Exception {
         this.conf = conf;
         this.authConf = authConf;
@@ -66,45 +86,9 @@ public class ZKStateStorage implements IStateStorage {
         zkTemp.close();
 
         active = new AtomicBoolean(true);
-        zkWriter = mkZk(new WatcherCallBack() {
-            @Override
-            public void execute(Watcher.Event.KeeperState state, 
Watcher.Event.EventType type, String path) {
-                if (active.get()) {
-                    if 
(!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
-                        LOG.warn("Received event {} : {}: {} with disconnected 
Zookeeper.", state, type, path);
-                    } else {
-                        LOG.info("Received event {} : {} : {}", state, type, 
path);
-                    }
-
-                    if (!type.equals(Watcher.Event.EventType.None)) {
-                        for (Map.Entry<String, ZKStateChangedCallback> e : 
callbacks.entrySet()) {
-                            ZKStateChangedCallback fn = e.getValue();
-                            fn.changed(type, path);
-                        }
-                    }
-                }
-            }
-        });
+        zkWriter = mkZk(new ZkWatcherCallBack());
         if (isNimbus) {
-            zkReader = mkZk(new WatcherCallBack() {
-                @Override
-                public void execute(Watcher.Event.KeeperState state, 
Watcher.Event.EventType type, String path) {
-                    if (active.get()) {
-                        if 
(!(state.equals(Watcher.Event.KeeperState.SyncConnected))) {
-                            LOG.warn("Received event {} : {}: {} with 
disconnected Zookeeper.", state, type, path);
-                        } else {
-                            LOG.debug("Received event {} : {} : {}", state, 
type, path);
-                        }
-
-                        if (!type.equals(Watcher.Event.EventType.None)) {
-                            for (Map.Entry<String, ZKStateChangedCallback> e : 
callbacks.entrySet()) {
-                                ZKStateChangedCallback fn = e.getValue();
-                                fn.changed(type, path);
-                            }
-                        }
-                    }
-                }
-            });
+            zkReader = mkZk(new ZkWatcherCallBack());
         } else {
             zkReader = zkWriter;
         }
@@ -157,15 +141,15 @@ public class ZKStateStorage implements IStateStorage {
 
     @Override
     public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
-        Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+        Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
         if (Zookeeper.exists(zkWriter, path, false)) {
             try {
                 Zookeeper.setData(zkWriter, path, data);
-            } catch (Exception e) {
+            } catch (RuntimeException e) {
                 if 
(Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
                     Zookeeper.createNode(zkWriter, path, data, 
CreateMode.EPHEMERAL, acls);
                 } else {
-                    throw Utils.wrapInRuntime(e);
+                    throw e;
                 }
             }
 
@@ -182,7 +166,7 @@ public class ZKStateStorage implements IStateStorage {
 
     @Override
     public boolean node_exists(String path, boolean watch) {
-        return Zookeeper.existsNode(zkWriter, path, watch);
+        return Zookeeper.existsNode(zkReader, path, watch);
     }
 
     @Override
@@ -204,7 +188,7 @@ public class ZKStateStorage implements IStateStorage {
         if (Zookeeper.exists(zkWriter, path, false)) {
             Zookeeper.setData(zkWriter, path, data);
         } else {
-            Zookeeper.mkdirs(zkWriter, parentPath(path), acls);
+            Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls);
             Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, 
acls);
         }
     }
@@ -257,14 +241,4 @@ public class ZKStateStorage implements IStateStorage {
     public void sync_path(String path) {
         Zookeeper.syncPath(zkWriter, path);
     }
-
-    // To be remove when finished port Util.clj
-    public static String parentPath(String path) {
-        List<String> toks = Zookeeper.tokenizePath(path);
-        int size = toks.size();
-        if (size > 0) {
-            toks.remove(size - 1);
-        }
-        return Zookeeper.toksToPath(toks);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java 
b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index e5b2666..5e9039a 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -394,9 +394,12 @@ public class Zookeeper {
     }
 
     public static String parentPath(String path) {
-        List<String> tokens = tokenizePath(path);
-        tokens.remove(tokens.size() - 1);
-        return "/" + StringUtils.join(tokens, "/");
+        List<String> toks = Zookeeper.tokenizePath(path);
+        int size = toks.size();
+        if (size > 0) {
+            toks.remove(size - 1);
+        }
+        return Zookeeper.toksToPath(toks);
     }
 
     public static String toksToPath(List<String> toks) {

Reply via email to