added last-error to stats

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5564c0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5564c0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5564c0f

Branch: refs/heads/master
Commit: e5564c0f888e40af2726a645d24cfad0aaeed26a
Parents: 8801348
Author: 卫乐 <[email protected]>
Authored: Thu Feb 25 15:06:59 2016 +0800
Committer: 卫乐 <[email protected]>
Committed: Thu Feb 25 15:06:59 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |  8 ++----
 .../jvm/org/apache/storm/stats/StatsUtil.java   | 26 ++++++++++++--------
 2 files changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e5564c0f/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index a0e652b..f58353a 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -2109,19 +2109,15 @@
         [this ^String topo-id ^String window ^boolean include-sys?]
         (mark! nimbus:num-getTopologyPageInfo-calls)
         (let [info (get-common-topo-info topo-id "getTopologyPageInfo")
-
               exec->node+port (:executor->node+port (:assignment info))
-              last-err-fn (partial get-last-error
-                                   (:storm-cluster-state info)
-                                   topo-id)
-              ;;TODO: add last-error-fn to aggTopoExecsStats method
               topo-page-info (StatsUtil/aggTopoExecsStats topo-id
                                                          exec->node+port
                                                          (:task->component 
info)
                                                          (:beats info)
                                                          (:topology info)
                                                          window
-                                                         include-sys?)]
+                                                         include-sys?
+                                                         (:storm-cluster-state 
info))]
           (when-let [owner (:owner (:base info))]
             (.set_owner topo-page-info owner))
           (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]

http://git-wip-us.apache.org/repos/asf/storm/blob/e5564c0f/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java 
b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 22ececf..c06d7db 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -18,7 +18,6 @@
 package org.apache.storm.stats;
 
 import clojure.lang.Keyword;
-import clojure.lang.PersistentVector;
 import clojure.lang.RT;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
@@ -28,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.BoltAggregateStats;
 import org.apache.storm.generated.BoltStats;
@@ -543,13 +543,12 @@ public class StatsUtil {
         return ret;
     }
 
-    // TODO: add last-error-fn arg to get last error
     public static TopologyPageInfo aggTopoExecsStats(
             String topologyId, Map exec2nodePort, Map task2component,
-            Map beats, StormTopology topology, String window, boolean 
includeSys) {
+            Map beats, StormTopology topology, String window, boolean 
includeSys, IStormClusterState clusterState) {
         List beatList = extractDataFromHb(exec2nodePort, task2component, 
beats, includeSys, topology);
         Map topoStats = aggregateTopoStats(window, includeSys, beatList);
-        topoStats = postAggregateTopoStats(task2component, exec2nodePort, 
topoStats);
+        topoStats = postAggregateTopoStats(task2component, exec2nodePort, 
topoStats, topologyId, clusterState);
 
         return thriftifyTopoPageData(topologyId, topoStats);
     }
@@ -574,7 +573,8 @@ public class StatsUtil {
         return initVal;
     }
 
-    public static Map postAggregateTopoStats(Map task2comp, Map exec2nodePort, 
Map accData) {
+    public static Map postAggregateTopoStats(
+            Map task2comp, Map exec2nodePort, Map accData, String topologyId, 
IStormClusterState clusterState) {
         Map ret = new HashMap();
         putRawKV(ret, NUM_TASKS, task2comp.size());
         putRawKV(ret, NUM_WORKERS, ((Set) getByKeyword(accData, 
WORKERS_SET)).size());
@@ -596,8 +596,7 @@ public class StatsUtil {
             }
             removeByKeyword(m, EXEC_LAT_TOTAL);
             removeByKeyword(m, PROC_LAT_TOTAL);
-            //TODO: get last error depends on cluster.clj
-            putRawKV(m, "last-error", null);
+            putRawKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
 
             aggBolt2stats.put(id, m);
         }
@@ -615,8 +614,7 @@ public class StatsUtil {
                 putRawKV(m, COMP_LATENCY, compLatencyTotal / acked);
             }
             removeByKeyword(m, COMP_LAT_TOTAL);
-            //TODO: get last error depends on cluster.clj
-            putRawKV(m, "last-error", null);
+            putRawKV(m, "last-error", getLastError(clusterState, topologyId, 
id));
 
             spoutBolt2stats.put(id, m);
         }
@@ -1493,6 +1491,7 @@ public class StatsUtil {
     }
 
     private static ComponentAggregateStats thriftifySpoutAggStats(Map m) {
+        logger.warn("spout agg stats:{}", m);
         ComponentAggregateStats stats = new ComponentAggregateStats();
         stats.set_type(ComponentType.SPOUT);
         stats.set_last_error((ErrorInfo) getByKeyword(m, LAST_ERROR));
@@ -1958,7 +1957,10 @@ public class StatsUtil {
         return t / c;
     }
 
-    public static String floatStr(double n) {
+    public static String floatStr(Double n) {
+        if (n == null) {
+            return "0";
+        }
         return String.format("%.3f", n);
     }
 
@@ -1970,6 +1972,10 @@ public class StatsUtil {
         return RT.keyword(null, key);
     }
 
+    private static ErrorInfo getLastError(IStormClusterState 
stormClusterState, String stormId, String compId) {
+        return stormClusterState.lastError(stormId, compId);
+    }
+
     interface KeyTransformer<T> {
         T transform(Object key);
     }

Reply via email to