update/fix some codes based on @revans2
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b09e2755 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b09e2755 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b09e2755 Branch: refs/heads/master Commit: b09e2755c2fedcb9048afda63bea897fe1dbd12b Parents: d709525 Author: xiaojian.fxj <[email protected]> Authored: Wed Feb 24 11:19:06 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Wed Feb 24 11:19:06 2016 +0800 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/nimbus.clj | 2 +- .../clj/org/apache/storm/daemon/supervisor.clj | 4 +- .../src/clj/org/apache/storm/daemon/worker.clj | 8 +- .../org/apache/storm/cluster/ClusterUtils.java | 25 +----- .../storm/cluster/IStormClusterState.java | 4 +- .../storm/cluster/PaceMakerStateStorage.java | 12 ++- .../storm/cluster/StormClusterStateImpl.java | 41 +++++----- .../apache/storm/cluster/ZKStateStorage.java | 80 +++++++------------- .../org/apache/storm/zookeeper/Zookeeper.java | 9 ++- 9 files changed, 74 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index 2f6587a..e524ec2 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -1676,7 +1676,7 @@ [(node->host node) port]) executor->node+port) nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id) - all-pending-actions-for-topology (clojurify-profile-request (.getTopologyProfileRequests storm-cluster-state id true)) + all-pending-actions-for-topology (clojurify-profile-request (.getTopologyProfileRequests storm-cluster-state id)) latest-profile-actions (remove nil? (map (fn [nodeInfo] (->> all-pending-actions-for-topology (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %))) http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 0cee414..1446ac9 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -81,7 +81,7 @@ (->> (dofor [sid (distinct storm-ids)] - (if-let [topo-profile-actions (into [] (for [request (.getTopologyProfileRequests storm-cluster-state sid false)] (clojurify-profile-request request)))] + (if-let [topo-profile-actions (into [] (for [request (.getTopologyProfileRequests storm-cluster-state sid)] (clojurify-profile-request request)))] {sid topo-profile-actions})) (apply merge))] {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) @@ -607,7 +607,7 @@ storm-cluster-state (:storm-cluster-state supervisor) ^ISupervisor isupervisor (:isupervisor supervisor) ^LocalState local-state (:local-state supervisor) - sync-callback (fn [& ignored] (.add event-manager (reify Runnable + sync-callback (fn [] (.add event-manager (reify Runnable (^void run [this] (callback-supervisor))))) assignment-versions @(:assignment-versions supervisor) http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index af88f6a..110d415 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -383,7 +383,7 @@ storm-id (:storm-id worker)] (fn refresh-connections ([] - (refresh-connections (fn [& ignored] + (refresh-connections (fn [] (.schedule (:refresh-connections-timer worker) 0 refresh-connections)))) ([callback] @@ -438,7 +438,7 @@ (defn refresh-storm-active ([worker] (refresh-storm-active - worker (fn [& ignored] + worker (fn [] (.schedule (:refresh-active-timer worker) 0 (partial refresh-storm-active worker))))) ([worker callback] @@ -685,7 +685,7 @@ backpressure-thread (WorkerBackpressureThread. (:backpressure-trigger worker) worker backpressure-handler) _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) (.start backpressure-thread)) - callback (fn cb [& ignored] + callback (fn cb [] (let [throttle-on (.topologyBackpressure storm-cluster-state storm-id cb)] (reset! (:throttle-on worker) throttle-on))) _ (if ((:storm-conf worker) TOPOLOGY-BACKPRESSURE-ENABLE) @@ -757,7 +757,7 @@ (dofor [e @executors] (.credentials-changed e new-creds)) (reset! credentials new-creds)))) check-throttle-changed (fn [] - (let [callback (fn cb [& ignored] + (let [callback (fn cb [] (let [throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id cb)] (reset! (:throttle-on worker) throttle-on))) new-throttle-on (.topologyBackpressure (:storm-cluster-state worker) storm-id callback)] http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java index 1095fff..96c177b 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java @@ -211,7 +211,6 @@ public class ClusterUtils { IStateStorage Storage = _instance.mkStateStorageImpl((Map) stateStorage, (Map) stateStorage, acls, context); return new StormClusterStateImpl(Storage, acls, context, true); } - } public IStateStorage mkStateStorageImpl(Map config, Map auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception { @@ -237,25 +236,9 @@ public class ClusterUtils { } public static String stringifyError(Throwable error) { - String errorString = null; - StringWriter result = null; - PrintWriter printWriter = null; - try { - result = new StringWriter(); - printWriter = new PrintWriter(result); - error.printStackTrace(printWriter); - if (result != null) { - errorString = result.toString(); - } - } finally { - try { - if (result != null) - result.close(); - if (printWriter != null) - printWriter.close(); - } catch (Exception e) { - } - } - return errorString; + StringWriter result = new StringWriter(); + PrintWriter printWriter = new PrintWriter(result); + error.printStackTrace(printWriter); + return result.toString(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java index c88935e..e26c598 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@ -45,9 +45,9 @@ public interface IStormClusterState { public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); - public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift); + public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo); - public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift); + public List<ProfileRequest> getTopologyProfileRequests(String stormId); public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index c29078e..c42bd38 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@ -127,7 +127,8 @@ public class PaceMakerStateStorage implements IStateStorage { if (retry <= 0) { throw Utils.wrapInRuntime(e); } - LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + retry--; + LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry); } } } @@ -148,7 +149,8 @@ public class PaceMakerStateStorage implements IStateStorage { if (retry <= 0) { throw Utils.wrapInRuntime(e); } - LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + retry--; + LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry); } } } @@ -169,7 +171,8 @@ public class PaceMakerStateStorage implements IStateStorage { if (retry <= 0) { throw Utils.wrapInRuntime(e); } - LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry--); + retry--; + LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry); } } } @@ -190,7 +193,8 @@ public class PaceMakerStateStorage implements IStateStorage { if (retry <= 0) { throw Utils.wrapInRuntime(e); } - LOG.error("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + retry--; + LOG.error("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry); } } } http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java index 5fa586a..bde7670 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java @@ -106,7 +106,7 @@ public class StormClusterStateImpl implements IStormClusterState { } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) { issueMapCallback(logConfigCallback, toks.get(1)); } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) { - issueMapCallback(logConfigCallback, toks.get(1)); + issueMapCallback(backPressureCallback, toks.get(1)); } else { LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path); Runtime.getRuntime().exit(30); @@ -242,9 +242,9 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift) { + public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo) { List<ProfileRequest> requests = new ArrayList<>(); - List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId, isThrift); + List<ProfileRequest> profileRequests = getTopologyProfileRequests(stormId); for (ProfileRequest profileRequest : profileRequests) { NodeInfo nodeInfo1 = profileRequest.get_nodeInfo(); if (nodeInfo1.equals(nodeInfo)) @@ -254,7 +254,7 @@ public class StormClusterStateImpl implements IStormClusterState { } @Override - public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift) { + public List<ProfileRequest> getTopologyProfileRequests(String stormId) { List<ProfileRequest> profileRequests = new ArrayList<>(); String path = ClusterUtils.profilerConfigPath(stormId); if (stateStorage.node_exists(path, false)) { @@ -382,6 +382,9 @@ public class StormClusterStateImpl implements IStormClusterState { @Override public LogConfig topologyLogConfig(String stormId, Runnable cb) { + if (cb != null){ + logConfigCallback.put(stormId, cb); + } String path = ClusterUtils.logConfigPath(stormId); return ClusterUtils.maybeDeserialize(stateStorage.get_data(path, cb != null), LogConfig.class); } @@ -625,25 +628,21 @@ public class StormClusterStateImpl implements IStormClusterState { @Override public List<ErrorInfo> errors(String stormId, String componentId) { List<ErrorInfo> errorInfos = new ArrayList<>(); - try { - String path = ClusterUtils.errorPath(stormId, componentId); - if (stateStorage.node_exists(path, false)) { - List<String> childrens = stateStorage.get_children(path, false); - for (String child : childrens) { - String childPath = path + ClusterUtils.ZK_SEPERATOR + child; - ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class); - if (errorInfo != null) - errorInfos.add(errorInfo); - } + String path = ClusterUtils.errorPath(stormId, componentId); + if (stateStorage.node_exists(path, false)) { + List<String> childrens = stateStorage.get_children(path, false); + for (String child : childrens) { + String childPath = path + ClusterUtils.ZK_SEPERATOR + child; + ErrorInfo errorInfo = ClusterUtils.maybeDeserialize(stateStorage.get_data(childPath, false), ErrorInfo.class); + if (errorInfo != null) + errorInfos.add(errorInfo); } - Collections.sort(errorInfos, new Comparator<ErrorInfo>() { - public int compare(ErrorInfo arg0, ErrorInfo arg1) { - return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs()); - } - }); - } catch (Exception e) { - throw Utils.wrapInRuntime(e); } + Collections.sort(errorInfos, new Comparator<ErrorInfo>() { + public int compare(ErrorInfo arg0, ErrorInfo arg1) { + return Integer.compare(arg1.get_error_time_secs(), arg0.get_error_time_secs()); + } + }); return errorInfos; } http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java index 56115ce..4cf0c05 100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ZKStateStorage.java @@ -53,6 +53,26 @@ public class ZKStateStorage implements IStateStorage { private Map authConf; private Map<Object, Object> conf; + private class ZkWatcherCallBack implements WatcherCallBack{ + @Override + public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { + if (active.get()) { + if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) { + LOG.debug("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path); + } else { + LOG.debug("Received event {} : {} : {}", state, type, path); + } + + if (!type.equals(Watcher.Event.EventType.None)) { + for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) { + ZKStateChangedCallback fn = e.getValue(); + fn.changed(type, path); + } + } + } + } + } + public ZKStateStorage(Map<Object, Object> conf, Map authConf, List<ACL> acls, ClusterStateContext context) throws Exception { this.conf = conf; this.authConf = authConf; @@ -66,45 +86,9 @@ public class ZKStateStorage implements IStateStorage { zkTemp.close(); active = new AtomicBoolean(true); - zkWriter = mkZk(new WatcherCallBack() { - @Override - public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { - if (active.get()) { - if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) { - LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path); - } else { - LOG.info("Received event {} : {} : {}", state, type, path); - } - - if (!type.equals(Watcher.Event.EventType.None)) { - for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) { - ZKStateChangedCallback fn = e.getValue(); - fn.changed(type, path); - } - } - } - } - }); + zkWriter = mkZk(new ZkWatcherCallBack()); if (isNimbus) { - zkReader = mkZk(new WatcherCallBack() { - @Override - public void execute(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String path) { - if (active.get()) { - if (!(state.equals(Watcher.Event.KeeperState.SyncConnected))) { - LOG.warn("Received event {} : {}: {} with disconnected Zookeeper.", state, type, path); - } else { - LOG.debug("Received event {} : {} : {}", state, type, path); - } - - if (!type.equals(Watcher.Event.EventType.None)) { - for (Map.Entry<String, ZKStateChangedCallback> e : callbacks.entrySet()) { - ZKStateChangedCallback fn = e.getValue(); - fn.changed(type, path); - } - } - } - } - }); + zkReader = mkZk(new ZkWatcherCallBack()); } else { zkReader = zkWriter; } @@ -157,15 +141,15 @@ public class ZKStateStorage implements IStateStorage { @Override public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) { - Zookeeper.mkdirs(zkWriter, parentPath(path), acls); + Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls); if (Zookeeper.exists(zkWriter, path, false)) { try { Zookeeper.setData(zkWriter, path, data); - } catch (Exception e) { + } catch (RuntimeException e) { if (Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) { Zookeeper.createNode(zkWriter, path, data, CreateMode.EPHEMERAL, acls); } else { - throw Utils.wrapInRuntime(e); + throw e; } } @@ -182,7 +166,7 @@ public class ZKStateStorage implements IStateStorage { @Override public boolean node_exists(String path, boolean watch) { - return Zookeeper.existsNode(zkWriter, path, watch); + return Zookeeper.existsNode(zkReader, path, watch); } @Override @@ -204,7 +188,7 @@ public class ZKStateStorage implements IStateStorage { if (Zookeeper.exists(zkWriter, path, false)) { Zookeeper.setData(zkWriter, path, data); } else { - Zookeeper.mkdirs(zkWriter, parentPath(path), acls); + Zookeeper.mkdirs(zkWriter, Zookeeper.parentPath(path), acls); Zookeeper.createNode(zkWriter, path, data, CreateMode.PERSISTENT, acls); } } @@ -257,14 +241,4 @@ public class ZKStateStorage implements IStateStorage { public void sync_path(String path) { Zookeeper.syncPath(zkWriter, path); } - - // To be remove when finished port Util.clj - public static String parentPath(String path) { - List<String> toks = Zookeeper.tokenizePath(path); - int size = toks.size(); - if (size > 0) { - toks.remove(size - 1); - } - return Zookeeper.toksToPath(toks); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/b09e2755/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java index e5b2666..5e9039a 100644 --- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java +++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java @@ -394,9 +394,12 @@ public class Zookeeper { } public static String parentPath(String path) { - List<String> tokens = tokenizePath(path); - tokens.remove(tokens.size() - 1); - return "/" + StringUtils.join(tokens, "/"); + List<String> toks = Zookeeper.tokenizePath(path); + int size = toks.size(); + if (size > 0) { + toks.remove(size - 1); + } + return Zookeeper.toksToPath(toks); } public static String toksToPath(List<String> toks) {
