added last-error to stats
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5564c0f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5564c0f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5564c0f Branch: refs/heads/master Commit: e5564c0f888e40af2726a645d24cfad0aaeed26a Parents: 8801348 Author: å«ä¹ <[email protected]> Authored: Thu Feb 25 15:06:59 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Thu Feb 25 15:06:59 2016 +0800 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/nimbus.clj | 8 ++---- .../jvm/org/apache/storm/stats/StatsUtil.java | 26 ++++++++++++-------- 2 files changed, 18 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e5564c0f/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index a0e652b..f58353a 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -2109,19 +2109,15 @@ [this ^String topo-id ^String window ^boolean include-sys?] (mark! nimbus:num-getTopologyPageInfo-calls) (let [info (get-common-topo-info topo-id "getTopologyPageInfo") - exec->node+port (:executor->node+port (:assignment info)) - last-err-fn (partial get-last-error - (:storm-cluster-state info) - topo-id) - ;;TODO: add last-error-fn to aggTopoExecsStats method topo-page-info (StatsUtil/aggTopoExecsStats topo-id exec->node+port (:task->component info) (:beats info) (:topology info) window - include-sys?)] + include-sys? + (:storm-cluster-state info))] (when-let [owner (:owner (:base info))] (.set_owner topo-page-info owner)) (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)] http://git-wip-us.apache.org/repos/asf/storm/blob/e5564c0f/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 22ececf..c06d7db 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -18,7 +18,6 @@ package org.apache.storm.stats; import clojure.lang.Keyword; -import clojure.lang.PersistentVector; import clojure.lang.RT; import com.google.common.collect.Lists; import java.util.ArrayList; @@ -28,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.BoltAggregateStats; import org.apache.storm.generated.BoltStats; @@ -543,13 +543,12 @@ public class StatsUtil { return ret; } - // TODO: add last-error-fn arg to get last error public static TopologyPageInfo aggTopoExecsStats( String topologyId, Map exec2nodePort, Map task2component, - Map beats, StormTopology topology, String window, boolean includeSys) { + Map beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) { List beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); Map topoStats = aggregateTopoStats(window, includeSys, beatList); - topoStats = postAggregateTopoStats(task2component, exec2nodePort, topoStats); + topoStats = postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState); return thriftifyTopoPageData(topologyId, topoStats); } @@ -574,7 +573,8 @@ public class StatsUtil { return initVal; } - public static Map postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map accData) { + public static Map postAggregateTopoStats( + Map task2comp, Map exec2nodePort, Map accData, String topologyId, IStormClusterState clusterState) { Map ret = new HashMap(); putRawKV(ret, NUM_TASKS, task2comp.size()); putRawKV(ret, NUM_WORKERS, ((Set) getByKeyword(accData, WORKERS_SET)).size()); @@ -596,8 +596,7 @@ public class StatsUtil { } removeByKeyword(m, EXEC_LAT_TOTAL); removeByKeyword(m, PROC_LAT_TOTAL); - //TODO: get last error depends on cluster.clj - putRawKV(m, "last-error", null); + putRawKV(m, "last-error", getLastError(clusterState, topologyId, id)); aggBolt2stats.put(id, m); } @@ -615,8 +614,7 @@ public class StatsUtil { putRawKV(m, COMP_LATENCY, compLatencyTotal / acked); } removeByKeyword(m, COMP_LAT_TOTAL); - //TODO: get last error depends on cluster.clj - putRawKV(m, "last-error", null); + putRawKV(m, "last-error", getLastError(clusterState, topologyId, id)); spoutBolt2stats.put(id, m); } @@ -1493,6 +1491,7 @@ public class StatsUtil { } private static ComponentAggregateStats thriftifySpoutAggStats(Map m) { + logger.warn("spout agg stats:{}", m); ComponentAggregateStats stats = new ComponentAggregateStats(); stats.set_type(ComponentType.SPOUT); stats.set_last_error((ErrorInfo) getByKeyword(m, LAST_ERROR)); @@ -1958,7 +1957,10 @@ public class StatsUtil { return t / c; } - public static String floatStr(double n) { + public static String floatStr(Double n) { + if (n == null) { + return "0"; + } return String.format("%.3f", n); } @@ -1970,6 +1972,10 @@ public class StatsUtil { return RT.keyword(null, key); } + private static ErrorInfo getLastError(IStormClusterState stormClusterState, String stormId, String compId) { + return stormClusterState.lastError(stormId, compId); + } + interface KeyTransformer<T> { T transform(Object key); }
