http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java 
b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
index 351e830..7650ab1 100644
--- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java
@@ -17,8 +17,6 @@
  */
 package org.apache.storm.stats;
 
-import clojure.lang.Keyword;
-import clojure.lang.RT;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -27,10 +25,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.storm.cluster.ExecutorBeat;
 import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.BoltAggregateStats;
 import org.apache.storm.generated.BoltStats;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
 import org.apache.storm.generated.CommonAggregateStats;
 import org.apache.storm.generated.ComponentAggregateStats;
 import org.apache.storm.generated.ComponentPageInfo;
@@ -48,19 +48,18 @@ import org.apache.storm.generated.SpoutStats;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyPageInfo;
 import org.apache.storm.generated.TopologyStats;
+import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("unchecked, unused")
+@SuppressWarnings("unchecked")
 public class StatsUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(StatsUtil.class);
 
     public static final String TYPE = "type";
     private static final String SPOUT = "spout";
     private static final String BOLT = "bolt";
-    public static final Keyword KW_SPOUT = keyword(SPOUT);
-    public static final Keyword KW_BOLT = keyword(BOLT);
 
     private static final String UPTIME = "uptime";
     private static final String HOST = "host";
@@ -73,7 +72,10 @@ public class StatsUtil {
     private static final String EXECUTOR_STATS = "executor-stats";
     private static final String EXECUTOR_ID = "executor-id";
     private static final String LAST_ERROR = "lastError";
+    private static final String HEARTBEAT = "heartbeat";
+    private static final String TIME_SECS = "time-secs";
 
+    private static final String RATE = "rate";
     private static final String ACKED = "acked";
     private static final String FAILED = "failed";
     private static final String EXECUTED = "executed";
@@ -130,8 +132,10 @@ public class StatsUtil {
      * @param id2procAvg { global stream id -> proc avg value }
      * @param id2numExec { global stream id -> executed }
      */
-    public static Map aggBoltLatAndCount(Map id2execAvg, Map id2procAvg, Map 
id2numExec) {
-        Map ret = new HashMap();
+    public static Map<String, Number> aggBoltLatAndCount(Map<List<String>, 
Double> id2execAvg,
+                                                         Map<List<String>, 
Double> id2procAvg,
+                                                         Map<List<String>, 
Long> id2numExec) {
+        Map<String, Number> ret = new HashMap<>();
         putKV(ret, EXEC_LAT_TOTAL, weightAvgAndSum(id2execAvg, id2numExec));
         putKV(ret, PROC_LAT_TOTAL, weightAvgAndSum(id2procAvg, id2numExec));
         putKV(ret, EXECUTED, sumValues(id2numExec));
@@ -142,8 +146,8 @@ public class StatsUtil {
     /**
      * Aggregates number acked and complete latencies across all streams.
      */
-    public static Map aggSpoutLatAndCount(Map id2compAvg, Map id2numAcked) {
-        Map ret = new HashMap();
+    public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double> 
id2compAvg, Map<String, Long> id2numAcked) {
+        Map<String, Number> ret = new HashMap<>();
         putKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked));
         putKV(ret, ACKED, sumValues(id2numAcked));
 
@@ -185,7 +189,7 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggPreMergeCompPageBolt(Map m, String window, boolean 
includeSys) {
+    public static Map aggPreMergeCompPageBolt(Map<String, Object> m, String 
window, boolean includeSys) {
         Map ret = new HashMap();
         putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
         putKV(ret, HOST, getByKey(m, HOST));
@@ -195,7 +199,7 @@ public class StatsUtil {
         putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS));
 
         Map stat2win2sid2num = getMapByKey(m, STATS);
-        putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
+        putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeyOr0(m, UPTIME).intValue()));
 
         // calc cid+sid->input_stats
         Map inputStats = new HashMap();
@@ -232,8 +236,8 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggPreMergeCompPageSpout(Map m, String window, boolean 
includeSys) {
-        Map ret = new HashMap();
+    public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, 
Object> m, String window, boolean includeSys) {
+        Map<String, Object> ret = new HashMap<>();
         putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id"));
         putKV(ret, HOST, getByKey(m, HOST));
         putKV(ret, PORT, getByKey(m, PORT));
@@ -265,97 +269,111 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggPreMergeTopoPageBolt(Map m, String window, boolean 
includeSys) {
-        Map ret = new HashMap();
+    public static <K, V extends Number> Map<String, Object> 
aggPreMergeTopoPageBolt(
+            Map<String, Object> m, String window, boolean includeSys) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Map subRet = new HashMap();
+        Map<String, Object> subRet = new HashMap<>();
         putKV(subRet, NUM_EXECUTORS, 1);
         putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS));
 
-        Map stat2win2sid2num = getMapByKey(m, STATS);
-        putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeywordOr0(m, UPTIME).intValue()));
+        Map<String, Object> stat2win2sid2num = getMapByKey(m, STATS);
+        putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, 
getByKeyOr0(m, UPTIME).intValue()));
 
         for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) {
-            Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, 
key), TO_STRING).get(window);
+            Map<String, Map<K, V>> stat = 
windowSetConverter(getMapByKey(stat2win2sid2num, key), TO_STRING);
             if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
                 stat = filterSysStreams(stat, includeSys);
             }
+            Map<K, V> winStat = stat.get(window);
             long sum = 0;
-            if (stat != null) {
-                for (Object o : stat.values()) {
-                    sum += ((Number) o).longValue();
+            if (winStat != null) {
+                for (V v : winStat.values()) {
+                    sum += v.longValue();
                 }
             }
             putKV(subRet, key, sum);
         }
 
-        Map win2sid2execLat = windowSetConverter(getMapByKey(stat2win2sid2num, 
EXEC_LATENCIES), TO_STRING);
-        Map win2sid2procLat = windowSetConverter(getMapByKey(stat2win2sid2num, 
PROC_LATENCIES), TO_STRING);
-        Map win2sid2exec = windowSetConverter(getMapByKey(stat2win2sid2num, 
EXECUTED), TO_STRING);
+        Map<String, Map<List<String>, Double>> win2sid2execLat =
+                windowSetConverter(getMapByKey(stat2win2sid2num, 
EXEC_LATENCIES), TO_STRING);
+        Map<String, Map<List<String>, Double>> win2sid2procLat =
+                windowSetConverter(getMapByKey(stat2win2sid2num, 
PROC_LATENCIES), TO_STRING);
+        Map<String, Map<List<String>, Long>> win2sid2exec =
+                windowSetConverter(getMapByKey(stat2win2sid2num, EXECUTED), 
TO_STRING);
         subRet.putAll(aggBoltLatAndCount(
-                (Map) win2sid2execLat.get(window), (Map) 
win2sid2procLat.get(window), (Map) win2sid2exec.get(window)));
+                win2sid2execLat.get(window), win2sid2procLat.get(window), 
win2sid2exec.get(window)));
 
-        ret.put(getByKey(m, "comp-id"), subRet);
+        ret.put((String) getByKey(m, "comp-id"), subRet);
         return ret;
     }
 
-    public static Map aggPreMergeTopoPageSpout(Map m, String window, boolean 
includeSys) {
-        Map ret = new HashMap();
+    /**
+     * returns { comp id -> comp-stats }
+     */
+    public static <K, V extends Number> Map<String, Object> 
aggPreMergeTopoPageSpout(
+            Map<String, Object> m, String window, boolean includeSys) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Map subRet = new HashMap();
+        Map<String, Object> subRet = new HashMap<>();
         putKV(subRet, NUM_EXECUTORS, 1);
         putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS));
 
         // no capacity for spout
-        Map stat2win2sid2num = getMapByKey(m, STATS);
+        Map<String, Map<String, Map<String, V>>> stat2win2sid2num = 
getMapByKey(m, STATS);
         for (String key : new String[]{EMITTED, TRANSFERRED, FAILED}) {
-            Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, 
key), TO_STRING).get(window);
+            Map<String, Map<K, V>> stat = 
windowSetConverter(stat2win2sid2num.get(key), TO_STRING);
             if (EMITTED.equals(key) || TRANSFERRED.equals(key)) {
                 stat = filterSysStreams(stat, includeSys);
             }
+            Map<K, V> winStat = stat.get(window);
             long sum = 0;
-            if (stat != null) {
-                for (Object o : stat.values()) {
-                    sum += ((Number) o).longValue();
+            if (winStat != null) {
+                for (V v : winStat.values()) {
+                    sum += v.longValue();
                 }
             }
             putKV(subRet, key, sum);
         }
 
-        Map win2sid2compLat = windowSetConverter(getMapByKey(stat2win2sid2num, 
COMP_LATENCIES), TO_STRING);
-        Map win2sid2acked = windowSetConverter(getMapByKey(stat2win2sid2num, 
ACKED), TO_STRING);
-        subRet.putAll(aggSpoutLatAndCount((Map) win2sid2compLat.get(window), 
(Map) win2sid2acked.get(window)));
+        Map<String, Map<String, Double>> win2sid2compLat =
+                windowSetConverter(getMapByKey(stat2win2sid2num, 
COMP_LATENCIES), TO_STRING);
+        Map<String, Map<String, Long>> win2sid2acked =
+                windowSetConverter(getMapByKey(stat2win2sid2num, ACKED), 
TO_STRING);
+        subRet.putAll(aggSpoutLatAndCount(win2sid2compLat.get(window), 
win2sid2acked.get(window)));
 
-        ret.put(getByKey(m, "comp-id"), subRet);
+        ret.put((String) getByKey(m, "comp-id"), subRet);
         return ret;
     }
 
-    public static Map mergeAggCompStatsCompPageBolt(Map accBoltStats, Map 
boltStats) {
-        Map ret = new HashMap();
+    public static Map<String, Object> mergeAggCompStatsCompPageBolt(
+            Map<String, Object> accBoltStats, Map<String, Object> boltStats) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Map accIn = getMapByKey(accBoltStats, CID_SID_TO_IN_STATS);
-        Map accOut = getMapByKey(accBoltStats, SID_TO_OUT_STATS);
-        Map boltIn = getMapByKey(boltStats, CID_SID_TO_IN_STATS);
-        Map boltOut = getMapByKey(boltStats, SID_TO_OUT_STATS);
+        Map<List<String>, Map<String, ?>> accIn = getMapByKey(accBoltStats, 
CID_SID_TO_IN_STATS);
+        Map<String, Map<String, ?>> accOut = getMapByKey(accBoltStats, 
SID_TO_OUT_STATS);
+        Map<List<String>, Map<String, ?>> boltIn = getMapByKey(boltStats, 
CID_SID_TO_IN_STATS);
+        Map<String, Map<String, ?>> boltOut = getMapByKey(boltStats, 
SID_TO_OUT_STATS);
 
-        int numExecutors = getByKeywordOr0(accBoltStats, 
NUM_EXECUTORS).intValue();
+        int numExecutors = getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue();
         putKV(ret, NUM_EXECUTORS, numExecutors + 1);
         putKV(ret, NUM_TASKS, sumOr0(
-                getByKeywordOr0(accBoltStats, NUM_TASKS), 
getByKeywordOr0(boltStats, NUM_TASKS)));
+                getByKeyOr0(accBoltStats, NUM_TASKS), getByKeyOr0(boltStats, 
NUM_TASKS)));
 
         // (merge-with (partial merge-with sum-or-0) acc-out spout-out)
         putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, boltOut));
+        // {component id -> metric -> value}, note that input may contain both 
long and double values
         putKV(ret, CID_SID_TO_IN_STATS, fullMergeWithSum(accIn, boltIn));
 
         long executed = sumStreamsLong(boltIn, EXECUTED);
         putKV(ret, EXECUTED, executed);
 
-        Map executorStats = new HashMap();
-        putKV(executorStats, EXECUTOR_ID, getByKey(boltStats, EXECUTOR_ID));
-        putKV(executorStats, UPTIME, getByKey(boltStats, UPTIME));
-        putKV(executorStats, HOST, getByKey(boltStats, HOST));
-        putKV(executorStats, PORT, getByKey(boltStats, PORT));
-        putKV(executorStats, CAPACITY, getByKey(boltStats, CAPACITY));
+        Map<String, Object> executorStats = new HashMap<>();
+        putKV(executorStats, EXECUTOR_ID, boltStats.get(EXECUTOR_ID));
+        putKV(executorStats, UPTIME, boltStats.get(UPTIME));
+        putKV(executorStats, HOST, boltStats.get(HOST));
+        putKV(executorStats, PORT, boltStats.get(PORT));
+        putKV(executorStats, CAPACITY, boltStats.get(CAPACITY));
 
         putKV(executorStats, EMITTED, sumStreamsLong(boltOut, EMITTED));
         putKV(executorStats, TRANSFERRED, sumStreamsLong(boltOut, 
TRANSFERRED));
@@ -377,16 +395,18 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map mergeAggCompStatsCompPageSpout(Map accSpoutStats, Map 
spoutStats) {
-        Map ret = new HashMap();
+    public static Map<String, Object> mergeAggCompStatsCompPageSpout(
+            Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) 
{
+        Map<String, Object> ret = new HashMap<>();
 
-        Map accOut = getMapByKey(accSpoutStats, SID_TO_OUT_STATS);
-        Map spoutOut = getMapByKey(spoutStats, SID_TO_OUT_STATS);
+        // {stream id -> metric -> value}, note that sid->out-stats may 
contain both long and double values
+        Map<String, Map<String, ?>> accOut = getMapByKey(accSpoutStats, 
SID_TO_OUT_STATS);
+        Map<String, Map<String, ?>> spoutOut = getMapByKey(spoutStats, 
SID_TO_OUT_STATS);
 
-        int numExecutors = getByKeywordOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
+        int numExecutors = getByKeyOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
         putKV(ret, NUM_EXECUTORS, numExecutors + 1);
         putKV(ret, NUM_TASKS, sumOr0(
-                getByKeywordOr0(accSpoutStats, NUM_TASKS), 
getByKeywordOr0(spoutStats, NUM_TASKS)));
+                getByKeyOr0(accSpoutStats, NUM_TASKS), getByKeyOr0(spoutStats, 
NUM_TASKS)));
         putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, spoutOut));
 
         Map executorStats = new HashMap();
@@ -412,48 +432,50 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map mergeAggCompStatsTopoPageBolt(Map accBoltStats, Map 
boltStats) {
-        Map ret = new HashMap();
-        Integer numExecutors = getByKeywordOr0(accBoltStats, 
NUM_EXECUTORS).intValue();
+    public static Map<String, Object> 
mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, 
Object> boltStats) {
+        Map<String, Object> ret = new HashMap<>();
+
+        Integer numExecutors = getByKeyOr0(accBoltStats, 
NUM_EXECUTORS).intValue();
         putKV(ret, NUM_EXECUTORS, numExecutors + 1);
-        putKV(ret, NUM_TASKS, sumOr0(
-                getByKeywordOr0(accBoltStats, NUM_TASKS), 
getByKeywordOr0(boltStats, NUM_TASKS)));
-        putKV(ret, EMITTED, sumOr0(
-                getByKeywordOr0(accBoltStats, EMITTED), 
getByKeywordOr0(boltStats, EMITTED)));
-        putKV(ret, TRANSFERRED, sumOr0(
-                getByKeywordOr0(accBoltStats, TRANSFERRED), 
getByKeywordOr0(boltStats, TRANSFERRED)));
-        putKV(ret, EXEC_LAT_TOTAL, sumOr0(
-                getByKeywordOr0(accBoltStats, EXEC_LAT_TOTAL), 
getByKeywordOr0(boltStats, EXEC_LAT_TOTAL)));
-        putKV(ret, PROC_LAT_TOTAL, sumOr0(
-                getByKeywordOr0(accBoltStats, PROC_LAT_TOTAL), 
getByKeywordOr0(boltStats, PROC_LAT_TOTAL)));
-        putKV(ret, EXECUTED, sumOr0(
-                getByKeywordOr0(accBoltStats, EXECUTED), 
getByKeywordOr0(boltStats, EXECUTED)));
-        putKV(ret, ACKED, sumOr0(
-                getByKeywordOr0(accBoltStats, ACKED), 
getByKeywordOr0(boltStats, ACKED)));
-        putKV(ret, FAILED, sumOr0(
-                getByKeywordOr0(accBoltStats, FAILED), 
getByKeywordOr0(boltStats, FAILED)));
-        putKV(ret, CAPACITY, maxOr0(
-                getByKeywordOr0(accBoltStats, CAPACITY), 
getByKeywordOr0(boltStats, CAPACITY)));
+        putKV(ret, NUM_TASKS,
+                sumOr0(getByKeyOr0(accBoltStats, NUM_TASKS), 
getByKeyOr0(boltStats, NUM_TASKS)));
+        putKV(ret, EMITTED,
+                sumOr0(getByKeyOr0(accBoltStats, EMITTED), 
getByKeyOr0(boltStats, EMITTED)));
+        putKV(ret, TRANSFERRED,
+                sumOr0(getByKeyOr0(accBoltStats, TRANSFERRED), 
getByKeyOr0(boltStats, TRANSFERRED)));
+        putKV(ret, EXEC_LAT_TOTAL,
+                sumOr0(getByKeyOr0(accBoltStats, EXEC_LAT_TOTAL), 
getByKeyOr0(boltStats, EXEC_LAT_TOTAL)));
+        putKV(ret, PROC_LAT_TOTAL,
+                sumOr0(getByKeyOr0(accBoltStats, PROC_LAT_TOTAL), 
getByKeyOr0(boltStats, PROC_LAT_TOTAL)));
+        putKV(ret, EXECUTED,
+                sumOr0(getByKeyOr0(accBoltStats, EXECUTED), 
getByKeyOr0(boltStats, EXECUTED)));
+        putKV(ret, ACKED,
+                sumOr0(getByKeyOr0(accBoltStats, ACKED), 
getByKeyOr0(boltStats, ACKED)));
+        putKV(ret, FAILED,
+                sumOr0(getByKeyOr0(accBoltStats, FAILED), 
getByKeyOr0(boltStats, FAILED)));
+        putKV(ret, CAPACITY,
+                maxOr0(getByKeyOr0(accBoltStats, CAPACITY), 
getByKeyOr0(boltStats, CAPACITY)));
 
         return ret;
     }
 
-    public static Map mergeAggCompStatsTopoPageSpout(Map accSpoutStats, Map 
spoutStats) {
-        Map ret = new HashMap();
-        Integer numExecutors = getByKeywordOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
+    public static Map<String, Object> 
mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, 
Object> spoutStats) {
+        Map<String, Object> ret = new HashMap<>();
+
+        Integer numExecutors = getByKeyOr0(accSpoutStats, 
NUM_EXECUTORS).intValue();
         putKV(ret, NUM_EXECUTORS, numExecutors + 1);
-        putKV(ret, NUM_TASKS, sumOr0(
-                getByKeywordOr0(accSpoutStats, NUM_TASKS), 
getByKeywordOr0(spoutStats, NUM_TASKS)));
-        putKV(ret, EMITTED, sumOr0(
-                getByKeywordOr0(accSpoutStats, EMITTED), 
getByKeywordOr0(spoutStats, EMITTED)));
-        putKV(ret, TRANSFERRED, sumOr0(
-                getByKeywordOr0(accSpoutStats, TRANSFERRED), 
getByKeywordOr0(spoutStats, TRANSFERRED)));
-        putKV(ret, COMP_LAT_TOTAL, sumOr0(
-                getByKeywordOr0(accSpoutStats, COMP_LAT_TOTAL), 
getByKeywordOr0(spoutStats, COMP_LAT_TOTAL)));
-        putKV(ret, ACKED, sumOr0(
-                getByKeywordOr0(accSpoutStats, ACKED), 
getByKeywordOr0(spoutStats, ACKED)));
-        putKV(ret, FAILED, sumOr0(
-                getByKeywordOr0(accSpoutStats, FAILED), 
getByKeywordOr0(spoutStats, FAILED)));
+        putKV(ret, NUM_TASKS,
+                sumOr0(getByKeyOr0(accSpoutStats, NUM_TASKS), 
getByKeyOr0(spoutStats, NUM_TASKS)));
+        putKV(ret, EMITTED,
+                sumOr0(getByKeyOr0(accSpoutStats, EMITTED), 
getByKeyOr0(spoutStats, EMITTED)));
+        putKV(ret, TRANSFERRED,
+                sumOr0(getByKeyOr0(accSpoutStats, TRANSFERRED), 
getByKeyOr0(spoutStats, TRANSFERRED)));
+        putKV(ret, COMP_LAT_TOTAL,
+                sumOr0(getByKeyOr0(accSpoutStats, COMP_LAT_TOTAL), 
getByKeyOr0(spoutStats, COMP_LAT_TOTAL)));
+        putKV(ret, ACKED,
+                sumOr0(getByKeyOr0(accSpoutStats, ACKED), 
getByKeyOr0(spoutStats, ACKED)));
+        putKV(ret, FAILED,
+                sumOr0(getByKeyOr0(accSpoutStats, FAILED), 
getByKeyOr0(spoutStats, FAILED)));
 
         return ret;
     }
@@ -462,10 +484,11 @@ public class StatsUtil {
      * A helper function that does the common work to aggregate stats of one
      * executor with the given map for the topology page.
      */
-    public static Map aggTopoExecStats(String window, boolean includeSys, Map 
accStats, Map newData, String compType) {
-        Map ret = new HashMap();
+    public static Map<String, Object> aggTopoExecStats(
+            String window, boolean includeSys, Map<String, Object> accStats, 
Map<String, Object> newData, String compType) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Set workerSet = (Set) getByKey(accStats, WORKERS_SET);
+        Set workerSet = (Set) accStats.get(WORKERS_SET);
         Map bolt2stats = getMapByKey(accStats, BOLT_TO_STATS);
         Map spout2stats = getMapByKey(accStats, SPOUT_TO_STATS);
         Map win2emitted = getMapByKey(accStats, WIN_TO_EMITTED);
@@ -473,16 +496,17 @@ public class StatsUtil {
         Map win2compLatWgtAvg = getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG);
         Map win2acked = getMapByKey(accStats, WIN_TO_ACKED);
         Map win2failed = getMapByKey(accStats, WIN_TO_FAILED);
-        Map stats = getMapByKey(newData, STATS);
 
         boolean isSpout = compType.equals(SPOUT);
-        Map cid2stat2num;
+        // component id -> stats
+        Map<String, Object> cid2stats;
         if (isSpout) {
-            cid2stat2num = aggPreMergeTopoPageSpout(newData, window, 
includeSys);
+            cid2stats = aggPreMergeTopoPageSpout(newData, window, includeSys);
         } else {
-            cid2stat2num = aggPreMergeTopoPageBolt(newData, window, 
includeSys);
+            cid2stats = aggPreMergeTopoPageBolt(newData, window, includeSys);
         }
 
+        Map stats = getMapByKey(newData, STATS);
         Map w2compLatWgtAvg, w2acked;
         Map compLatStats = getMapByKey(stats, COMP_LATENCIES);
         if (isSpout) { // agg spout stats
@@ -504,38 +528,38 @@ public class StatsUtil {
         putKV(ret, WORKERS_SET, workerSet);
         putKV(ret, BOLT_TO_STATS, bolt2stats);
         putKV(ret, SPOUT_TO_STATS, spout2stats);
-        putKV(ret, WIN_TO_EMITTED, mergeWithSum(win2emitted, 
aggregateCountStreams(
+        putKV(ret, WIN_TO_EMITTED, mergeWithSumLong(win2emitted, 
aggregateCountStreams(
                 filterSysStreams(getMapByKey(stats, EMITTED), includeSys))));
-        putKV(ret, WIN_TO_TRANSFERRED, mergeWithSum(win2transferred, 
aggregateCountStreams(
+        putKV(ret, WIN_TO_TRANSFERRED, mergeWithSumLong(win2transferred, 
aggregateCountStreams(
                 filterSysStreams(getMapByKey(stats, TRANSFERRED), 
includeSys))));
-        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(win2compLatWgtAvg, 
w2compLatWgtAvg));
+        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, 
mergeWithSumDouble(win2compLatWgtAvg, w2compLatWgtAvg));
 
         //boolean isSpoutStat = SPOUT.equals(((Keyword) getByKey(stats, 
TYPE)).getName());
-        putKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSum(win2acked, w2acked) : 
win2acked);
+        putKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSumLong(win2acked, 
w2acked) : win2acked);
         putKV(ret, WIN_TO_FAILED, isSpout ?
-                mergeWithSum(aggregateCountStreams(getMapByKey(stats, 
FAILED)), win2failed) : win2failed);
+                mergeWithSumLong(aggregateCountStreams(getMapByKey(stats, 
FAILED)), win2failed) : win2failed);
         putKV(ret, TYPE, getByKey(stats, TYPE));
 
         // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats 
comp-key) cid->statk->num)
         // (acc-stats comp-key) ==> bolt2stats/spout2stats
         if (isSpout) {
-            Set<Object> keySet = new HashSet<>();
+            Set<String> keySet = new HashSet<>();
             keySet.addAll(spout2stats.keySet());
-            keySet.addAll(cid2stat2num.keySet());
+            keySet.addAll(cid2stats.keySet());
 
             Map mm = new HashMap();
-            for (Object k : keySet) {
-                mm.put(k, mergeAggCompStatsTopoPageSpout((Map) 
spout2stats.get(k), (Map) cid2stat2num.get(k)));
+            for (String k : keySet) {
+                mm.put(k, mergeAggCompStatsTopoPageSpout((Map) 
spout2stats.get(k), (Map) cid2stats.get(k)));
             }
             putKV(ret, SPOUT_TO_STATS, mm);
         } else {
-            Set<Object> keySet = new HashSet<>();
+            Set<String> keySet = new HashSet<>();
             keySet.addAll(bolt2stats.keySet());
-            keySet.addAll(cid2stat2num.keySet());
+            keySet.addAll(cid2stats.keySet());
 
             Map mm = new HashMap();
-            for (Object k : keySet) {
-                mm.put(k, mergeAggCompStatsTopoPageBolt((Map) 
bolt2stats.get(k), (Map) cid2stat2num.get(k)));
+            for (String k : keySet) {
+                mm.put(k, mergeAggCompStatsTopoPageBolt((Map) 
bolt2stats.get(k), (Map) cid2stats.get(k)));
             }
             putKV(ret, BOLT_TO_STATS, mm);
         }
@@ -543,18 +567,30 @@ public class StatsUtil {
         return ret;
     }
 
+    /**
+     * aggregate topo executors stats
+     * TODO: change clojure maps to java HashMap's when nimbus.clj is 
translated to java
+     *
+     * @param topologyId     topology id
+     * @param exec2nodePort  executor -> host+port, note it's a clojure map
+     * @param task2component task -> component, note it's a clojure map
+     * @param beats          executor[start, end] -> executor heartbeat, note 
it's a java HashMap
+     * @param topology       storm topology
+     * @param window         the window to be aggregated
+     * @param includeSys     whether to include system streams
+     * @param clusterState   cluster state
+     * @return TopologyPageInfo thrift structure
+     */
     public static TopologyPageInfo aggTopoExecsStats(
-            String topologyId, Map exec2nodePort, Map task2component,
-            Map beats, StormTopology topology, String window, boolean 
includeSys, IStormClusterState clusterState) {
-        List beatList = extractDataFromHb(exec2nodePort, task2component, 
beats, includeSys, topology);
-        Map topoStats = aggregateTopoStats(window, includeSys, beatList);
-        topoStats = postAggregateTopoStats(task2component, exec2nodePort, 
topoStats, topologyId, clusterState);
-
-        return thriftifyTopoPageData(topologyId, topoStats);
+            String topologyId, Map exec2nodePort, Map task2component, 
Map<List<Integer>, Map<String, Object>> beats,
+            StormTopology topology, String window, boolean includeSys, 
IStormClusterState clusterState) {
+        List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, 
task2component, beats, includeSys, topology);
+        Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, 
beatList);
+        return postAggregateTopoStats(task2component, exec2nodePort, 
topoStats, topologyId, clusterState);
     }
 
-    public static Map aggregateTopoStats(String win, boolean includeSys, List 
data) {
-        Map initVal = new HashMap();
+    public static Map<String, Object> aggregateTopoStats(String win, boolean 
includeSys, List<Map<String, Object>> heartbeats) {
+        Map<String, Object> initVal = new HashMap<>();
         putKV(initVal, WORKERS_SET, new HashSet());
         putKV(initVal, BOLT_TO_STATS, new HashMap());
         putKV(initVal, SPOUT_TO_STATS, new HashMap());
@@ -564,68 +600,72 @@ public class StatsUtil {
         putKV(initVal, WIN_TO_ACKED, new HashMap());
         putKV(initVal, WIN_TO_FAILED, new HashMap());
 
-        for (Object o : data) {
-            Map newData = (Map) o;
-            String compType = ((Keyword) getByKey(newData, TYPE)).getName();
-            initVal = aggTopoExecStats(win, includeSys, initVal, newData, 
compType);
+        for (Map<String, Object> heartbeat : heartbeats) {
+            String compType = (String) getByKey(heartbeat, TYPE);
+            initVal = aggTopoExecStats(win, includeSys, initVal, heartbeat, 
compType);
         }
 
         return initVal;
     }
 
-    public static Map postAggregateTopoStats(
-            Map task2comp, Map exec2nodePort, Map accData, String topologyId, 
IStormClusterState clusterState) {
-        Map ret = new HashMap();
-        putKV(ret, NUM_TASKS, task2comp.size());
-        putKV(ret, NUM_WORKERS, ((Set) getByKey(accData, WORKERS_SET)).size());
-        putKV(ret, NUM_EXECUTORS, exec2nodePort != null ? exec2nodePort.size() 
: 0);
+    public static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map 
exec2nodePort, Map<String, Object> accData,
+                                                          String topologyId, 
IStormClusterState clusterState) {
+        TopologyPageInfo ret = new TopologyPageInfo(topologyId);
+
+        ret.set_num_tasks(task2comp.size());
+        ret.set_num_workers(((Set) getByKey(accData, WORKERS_SET)).size());
+        ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 
0);
 
         Map bolt2stats = getMapByKey(accData, BOLT_TO_STATS);
-        Map aggBolt2stats = new HashMap();
+        Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>();
         for (Object o : bolt2stats.entrySet()) {
             Map.Entry e = (Map.Entry) o;
             String id = (String) e.getKey();
             Map m = (Map) e.getValue();
-            long executed = getByKeywordOr0(m, EXECUTED).longValue();
+            long executed = getByKeyOr0(m, EXECUTED).longValue();
             if (executed > 0) {
-                double execLatencyTotal = getByKeywordOr0(m, 
EXEC_LAT_TOTAL).doubleValue();
+                double execLatencyTotal = getByKeyOr0(m, 
EXEC_LAT_TOTAL).doubleValue();
                 putKV(m, EXEC_LATENCY, execLatencyTotal / executed);
 
-                double procLatencyTotal = getByKeywordOr0(m, 
PROC_LAT_TOTAL).doubleValue();
+                double procLatencyTotal = getByKeyOr0(m, 
PROC_LAT_TOTAL).doubleValue();
                 putKV(m, PROC_LATENCY, procLatencyTotal / executed);
             }
             remove(m, EXEC_LAT_TOTAL);
             remove(m, PROC_LAT_TOTAL);
             putKV(m, "last-error", getLastError(clusterState, topologyId, id));
 
-            aggBolt2stats.put(id, m);
+            aggBolt2stats.put(id, thriftifyBoltAggStats(m));
         }
-        putKV(ret, BOLT_TO_STATS, aggBolt2stats);
 
         Map spout2stats = getMapByKey(accData, SPOUT_TO_STATS);
-        Map spoutBolt2stats = new HashMap();
+        Map<String, ComponentAggregateStats> aggSpout2stats = new HashMap<>();
         for (Object o : spout2stats.entrySet()) {
             Map.Entry e = (Map.Entry) o;
             String id = (String) e.getKey();
             Map m = (Map) e.getValue();
-            long acked = getByKeywordOr0(m, ACKED).longValue();
+            long acked = getByKeyOr0(m, ACKED).longValue();
             if (acked > 0) {
-                double compLatencyTotal = getByKeywordOr0(m, 
COMP_LAT_TOTAL).doubleValue();
+                double compLatencyTotal = getByKeyOr0(m, 
COMP_LAT_TOTAL).doubleValue();
                 putKV(m, COMP_LATENCY, compLatencyTotal / acked);
             }
             remove(m, COMP_LAT_TOTAL);
             putKV(m, "last-error", getLastError(clusterState, topologyId, id));
 
-            spoutBolt2stats.put(id, m);
+            aggSpout2stats.put(id, thriftifySpoutAggStats(m));
         }
-        putKV(ret, SPOUT_TO_STATS, spoutBolt2stats);
 
-        putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EMITTED)));
-        putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, 
WIN_TO_TRANSFERRED)));
-        putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, 
WIN_TO_ACKED)));
-        putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, 
WIN_TO_FAILED)));
-        putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
+        TopologyStats topologyStats = new TopologyStats();
+        topologyStats.set_window_to_acked(mapKeyStr(getMapByKey(accData, 
WIN_TO_ACKED)));
+        topologyStats.set_window_to_emitted(mapKeyStr(getMapByKey(accData, 
WIN_TO_EMITTED)));
+        topologyStats.set_window_to_failed(mapKeyStr(getMapByKey(accData, 
WIN_TO_FAILED)));
+        topologyStats.set_window_to_transferred(mapKeyStr(getMapByKey(accData, 
WIN_TO_TRANSFERRED)));
+        
topologyStats.set_window_to_complete_latencies_ms(computeWeightedAveragesPerWindow(
                 accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
+
+        ret.set_topology_stats(topologyStats);
+        ret.set_id_to_spout_agg_stats(aggSpout2stats);
+        ret.set_id_to_bolt_agg_stats(aggBolt2stats);
+
         return ret;
     }
 
@@ -636,17 +676,19 @@ public class StatsUtil {
      * @param includeSys whether to include system streams
      * @return aggregated bolt stats
      */
-    public static Map aggregateBoltStats(List statsSeq, boolean includeSys) {
-        Map ret = new HashMap();
-
-        Map commonStats = 
preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys);
-        List acked = new ArrayList();
-        List failed = new ArrayList();
-        List executed = new ArrayList();
-        List processLatencies = new ArrayList();
-        List executeLatencies = new ArrayList();
-        for (Object o : statsSeq) {
-            ExecutorStats stat = (ExecutorStats) o;
+    public static <T> Map<String, Map> 
aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) {
+        Map<String, Map> ret = new HashMap<>();
+
+        Map<String, Map<String, Map<T, Long>>> commonStats = 
aggregateCommonStats(statsSeq);
+        commonStats = preProcessStreamSummary(commonStats, includeSys);
+
+        List<Map<String, Map<GlobalStreamId, Long>>> acked = new ArrayList<>();
+        List<Map<String, Map<GlobalStreamId, Long>>> failed = new 
ArrayList<>();
+        List<Map<String, Map<GlobalStreamId, Long>>> executed = new 
ArrayList<>();
+        List<Map<String, Map<GlobalStreamId, Double>>> processLatencies = new 
ArrayList<>();
+        List<Map<String, Map<GlobalStreamId, Double>>> executeLatencies = new 
ArrayList<>();
+        for (ExecutorSummary summary : statsSeq) {
+            ExecutorStats stat = summary.get_stats();
             acked.add(stat.get_specific().get_bolt().get_acked());
             failed.add(stat.get_specific().get_bolt().get_failed());
             executed.add(stat.get_specific().get_bolt().get_executed());
@@ -670,20 +712,23 @@ public class StatsUtil {
      * @param includeSys whether to include system streams
      * @return aggregated spout stats
      */
-    public static Map aggregateSpoutStats(List statsSeq, boolean includeSys) {
-        Map ret = new HashMap();
-
-        Map commonStats = 
preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys);
-        List acked = new ArrayList();
-        List failed = new ArrayList();
-        List completeLatencies = new ArrayList();
-        for (Object o : statsSeq) {
-            ExecutorStats stat = (ExecutorStats) o;
-            acked.add(stat.get_specific().get_spout().get_acked());
-            failed.add(stat.get_specific().get_spout().get_failed());
-            
completeLatencies.add(stat.get_specific().get_spout().get_complete_ms_avg());
-        }
-        mergeMaps(ret, commonStats);
+    public static Map<String, Map> aggregateSpoutStats(List<ExecutorSummary> 
statsSeq, boolean includeSys) {
+        // actually Map<String, Map<String, Map<String, Long/Double>>>
+        Map<String, Map> ret = new HashMap<>();
+
+        Map<String, Map<String, Map<String, Long>>> commonStats = 
aggregateCommonStats(statsSeq);
+        commonStats = preProcessStreamSummary(commonStats, includeSys);
+
+        List<Map<String, Map<String, Long>>> acked = new ArrayList<>();
+        List<Map<String, Map<String, Long>>> failed = new ArrayList<>();
+        List<Map<String, Map<String, Double>>> completeLatencies = new 
ArrayList<>();
+        for (ExecutorSummary summary : statsSeq) {
+            ExecutorStats stats = summary.get_stats();
+            acked.add(stats.get_specific().get_spout().get_acked());
+            failed.add(stats.get_specific().get_spout().get_failed());
+            
completeLatencies.add(stats.get_specific().get_spout().get_complete_ms_avg());
+        }
+        ret.putAll(commonStats);
         putKV(ret, ACKED, aggregateCounts(acked));
         putKV(ret, FAILED, aggregateCounts(failed));
         putKV(ret, COMP_LATENCIES, aggregateAverages(completeLatencies, 
acked));
@@ -691,25 +736,25 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggregateCommonStats(List statsSeq) {
-        Map ret = new HashMap();
+    public static <T> Map<String, Map<String, Map<T, Long>>> 
aggregateCommonStats(List<ExecutorSummary> statsSeq) {
+        Map<String, Map<String, Map<T, Long>>> ret = new HashMap<>();
 
-        List emitted = new ArrayList();
-        List transferred = new ArrayList();
-        for (Object o : statsSeq) {
-            ExecutorStats stat = (ExecutorStats) o;
-            emitted.add(stat.get_emitted());
-            transferred.add(stat.get_transferred());
+        List<Map<String, Map<String, Long>>> emitted = new ArrayList<>();
+        List<Map<String, Map<String, Long>>> transferred = new ArrayList<>();
+        for (ExecutorSummary summ : statsSeq) {
+            emitted.add(summ.get_stats().get_emitted());
+            transferred.add(summ.get_stats().get_transferred());
         }
-
         putKV(ret, EMITTED, aggregateCounts(emitted));
         putKV(ret, TRANSFERRED, aggregateCounts(transferred));
+
         return ret;
     }
 
-    public static Map preProcessStreamSummary(Map streamSummary, boolean 
includeSys) {
-        Map emitted = getMapByKey(streamSummary, EMITTED);
-        Map transferred = getMapByKey(streamSummary, TRANSFERRED);
+    public static <T> Map<String, Map<String, Map<T, Long>>> 
preProcessStreamSummary(
+            Map<String, Map<String, Map<T, Long>>> streamSummary, boolean 
includeSys) {
+        Map<String, Map<T, Long>> emitted = getMapByKey(streamSummary, 
EMITTED);
+        Map<String, Map<T, Long>> transferred = getMapByKey(streamSummary, 
TRANSFERRED);
 
         putKV(streamSummary, EMITTED, filterSysStreams(emitted, includeSys));
         putKV(streamSummary, TRANSFERRED, filterSysStreams(transferred, 
includeSys));
@@ -717,32 +762,32 @@ public class StatsUtil {
         return streamSummary;
     }
 
-    public static Map aggregateCountStreams(Map stats) {
-        Map ret = new HashMap();
-        for (Object o : stats.entrySet()) {
-            Map.Entry entry = (Map.Entry) o;
-            Map value = (Map) entry.getValue();
+    public static <K, V extends Number> Map<String, Long> 
aggregateCountStreams(
+            Map<String, Map<K, V>> stats) {
+        Map<String, Long> ret = new HashMap<>();
+        for (Map.Entry<String, Map<K, V>> entry : stats.entrySet()) {
+            Map<K, V> value = entry.getValue();
             long sum = 0l;
-            for (Object num : value.values()) {
-                sum += ((Number) num).longValue();
+            for (V num : value.values()) {
+                sum += num.longValue();
             }
             ret.put(entry.getKey(), sum);
         }
         return ret;
     }
 
-    public static Map aggregateAverages(List avgSeq, List countSeq) {
-        Map ret = new HashMap();
+    public static <K> Map<String, Map<K, Double>> 
aggregateAverages(List<Map<String, Map<K, Double>>> avgSeq,
+                                                                    
List<Map<String, Map<K, Long>>> countSeq) {
+        Map<String, Map<K, Double>> ret = new HashMap<>();
 
-        Map expands = expandAveragesSeq(avgSeq, countSeq);
-        for (Object o : expands.entrySet()) {
-            Map.Entry entry = (Map.Entry) o;
-            Object k = entry.getKey();
+        Map<String, Map<K, List>> expands = expandAveragesSeq(avgSeq, 
countSeq);
+        for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
+            String k = entry.getKey();
 
-            Map tmp = new HashMap();
-            Map inner = (Map) entry.getValue();
-            for (Object kk : inner.keySet()) {
-                List vv = (List) inner.get(kk);
+            Map<K, Double> tmp = new HashMap<>();
+            Map<K, List> inner = entry.getValue();
+            for (K kk : inner.keySet()) {
+                List vv = inner.get(kk);
                 tmp.put(kk, valAvg(((Number) vv.get(0)).doubleValue(), 
((Number) vv.get(1)).longValue()));
             }
             ret.put(k, tmp);
@@ -751,19 +796,19 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggregateAvgStreams(Map avgs, Map counts) {
-        Map ret = new HashMap();
+    public static <K> Map<String, Double> aggregateAvgStreams(
+            Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> 
counts) {
+        Map<String, Double> ret = new HashMap<>();
 
-        Map expands = expandAverages(avgs, counts);
-        for (Object o : expands.entrySet()) {
-            Map.Entry e = (Map.Entry) o;
-            Object win = e.getKey();
+        Map<String, Map<K, List>> expands = expandAverages(avgs, counts);
+        for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) {
+            String win = entry.getKey();
 
             double avgTotal = 0.0;
             long cntTotal = 0l;
-            Map inner = (Map) e.getValue();
-            for (Object kk : inner.keySet()) {
-                List vv = (List) inner.get(kk);
+            Map<K, List> inner = entry.getValue();
+            for (K kk : inner.keySet()) {
+                List vv = inner.get(kk);
                 avgTotal += ((Number) vv.get(0)).doubleValue();
                 cntTotal += ((Number) vv.get(1)).longValue();
             }
@@ -773,18 +818,25 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map spoutStreamsStats(List summs, boolean includeSys) {
-        List statsSeq = getFilledStats(summs);
+    public static Map<String, Map> spoutStreamsStats(List<ExecutorSummary> 
summs, boolean includeSys) {
+        if (summs == null) {
+            return new HashMap<>();
+        }
+        List<ExecutorSummary> statsSeq = getFilledStats(summs);
         return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, 
includeSys));
     }
 
-    public static Map boltStreamsStats(List summs, boolean includeSys) {
-        List statsSeq = getFilledStats(summs);
+    public static Map<String, Map> boltStreamsStats(List<ExecutorSummary> 
summs, boolean includeSys) {
+        if (summs == null) {
+            return new HashMap<>();
+        }
+        List<ExecutorSummary> statsSeq = getFilledStats(summs);
         return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys));
     }
 
-    public static Map aggregateSpoutStreams(Map stats) {
-        Map ret = new HashMap();
+    public static Map<String, Map> aggregateSpoutStreams(Map<String, Map> 
stats) {
+        // actual ret is Map<String, Map<String, Long/Double>>
+        Map<String, Map> ret = new HashMap<>();
         putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED)));
         putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED)));
         putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, 
EMITTED)));
@@ -794,8 +846,8 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggregateBoltStreams(Map stats) {
-        Map ret = new HashMap();
+    public static Map<String, Map> aggregateBoltStreams(Map<String, Map> 
stats) {
+        Map<String, Map> ret = new HashMap<>();
         putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED)));
         putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED)));
         putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, 
EMITTED)));
@@ -811,41 +863,42 @@ public class StatsUtil {
     /**
      * A helper function that aggregates windowed stats from one spout 
executor.
      */
-    public static Map aggBoltExecWinStats(Map accStats, Map newStats, boolean 
includeSys) {
-        Map ret = new HashMap();
+    public static Map<String, Object> aggBoltExecWinStats(
+            Map<String, Object> accStats, Map<String, Object> newStats, 
boolean includeSys) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Map m = new HashMap();
+        Map<String, Map<String, Number>> m = new HashMap<>();
         for (Object win : getMapByKey(newStats, EXECUTED).keySet()) {
-            m.put(win, aggBoltLatAndCount(
+            m.put((String) win, aggBoltLatAndCount(
                     (Map) (getMapByKey(newStats, EXEC_LATENCIES)).get(win),
                     (Map) (getMapByKey(newStats, PROC_LATENCIES)).get(win),
                     (Map) (getMapByKey(newStats, EXECUTED)).get(win)));
         }
         m = swapMapOrder(m);
 
-        Map win2execLatWgtAvg = getMapByKey(m, EXEC_LAT_TOTAL);
-        Map win2procLatWgtAvg = getMapByKey(m, PROC_LAT_TOTAL);
-        Map win2executed = getMapByKey(m, EXECUTED);
+        Map<String, Double> win2execLatWgtAvg = getMapByKey(m, EXEC_LAT_TOTAL);
+        Map<String, Double> win2procLatWgtAvg = getMapByKey(m, PROC_LAT_TOTAL);
+        Map<String, Long> win2executed = getMapByKey(m, EXECUTED);
 
-        Map emitted = getMapByKey(newStats, EMITTED);
-        emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, 
includeSys)),
+        Map<String, Map<String, Long>> emitted = getMapByKey(newStats, 
EMITTED);
+        Map<String, Long> win2emitted = 
mergeWithSumLong(aggregateCountStreams(filterSysStreams(emitted, includeSys)),
                 getMapByKey(accStats, WIN_TO_EMITTED));
-        putKV(ret, WIN_TO_EMITTED, emitted);
+        putKV(ret, WIN_TO_EMITTED, win2emitted);
 
-        Map transferred = getMapByKey(newStats, TRANSFERRED);
-        transferred = 
mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)),
+        Map<String, Map<String, Long>> transferred = getMapByKey(newStats, 
TRANSFERRED);
+        Map<String, Long> win2transferred = 
mergeWithSumLong(aggregateCountStreams(filterSysStreams(transferred, 
includeSys)),
                 getMapByKey(accStats, WIN_TO_TRANSFERRED));
-        putKV(ret, WIN_TO_TRANSFERRED, transferred);
+        putKV(ret, WIN_TO_TRANSFERRED, win2transferred);
 
-        putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSum(
+        putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSumDouble(
                 getMapByKey(accStats, WIN_TO_EXEC_LAT_WGT_AVG), 
win2execLatWgtAvg));
-        putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSum(
+        putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSumDouble(
                 getMapByKey(accStats, WIN_TO_PROC_LAT_WGT_AVG), 
win2procLatWgtAvg));
-        putKV(ret, WIN_TO_EXECUTED, mergeWithSum(
+        putKV(ret, WIN_TO_EXECUTED, mergeWithSumLong(
                 getMapByKey(accStats, WIN_TO_EXECUTED), win2executed));
-        putKV(ret, WIN_TO_ACKED, mergeWithSum(
+        putKV(ret, WIN_TO_ACKED, mergeWithSumLong(
                 aggregateCountStreams(getMapByKey(newStats, ACKED)), 
getMapByKey(accStats, WIN_TO_ACKED)));
-        putKV(ret, WIN_TO_FAILED, mergeWithSum(
+        putKV(ret, WIN_TO_FAILED, mergeWithSumLong(
                 aggregateCountStreams(getMapByKey(newStats, FAILED)), 
getMapByKey(accStats, WIN_TO_FAILED)));
 
         return ret;
@@ -854,36 +907,37 @@ public class StatsUtil {
     /**
      * A helper function that aggregates windowed stats from one spout 
executor.
      */
-    public static Map aggSpoutExecWinStats(Map accStats, Map newStats, boolean 
includeSys) {
-        Map ret = new HashMap();
+    public static Map<String, Object> aggSpoutExecWinStats(
+            Map<String, Object> accStats, Map<String, Object> beat, boolean 
includeSys) {
+        Map<String, Object> ret = new HashMap<>();
 
-        Map m = new HashMap();
-        for (Object win : getMapByKey(newStats, ACKED).keySet()) {
-            m.put(win, aggSpoutLatAndCount(
-                    (Map) (getMapByKey(newStats, COMP_LATENCIES)).get(win),
-                    (Map) (getMapByKey(newStats, ACKED)).get(win)));
+        Map<String, Map<String, Number>> m = new HashMap<>();
+        for (Object win : getMapByKey(beat, ACKED).keySet()) {
+            m.put((String) win, aggSpoutLatAndCount(
+                    (Map<String, Double>) (getMapByKey(beat, 
COMP_LATENCIES)).get(win),
+                    (Map<String, Long>) (getMapByKey(beat, ACKED)).get(win)));
         }
         m = swapMapOrder(m);
 
-        Map win2compLatWgtAvg = getMapByKey(m, COMP_LAT_TOTAL);
-        Map win2acked = getMapByKey(m, ACKED);
+        Map<String, Double> win2compLatWgtAvg = getMapByKey(m, COMP_LAT_TOTAL);
+        Map<String, Long> win2acked = getMapByKey(m, ACKED);
 
-        Map emitted = getMapByKey(newStats, EMITTED);
-        emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, 
includeSys)),
+        Map<String, Map<String, Long>> emitted = getMapByKey(beat, EMITTED);
+        Map<String, Long> win2emitted = 
mergeWithSumLong(aggregateCountStreams(filterSysStreams(emitted, includeSys)),
                 getMapByKey(accStats, WIN_TO_EMITTED));
-        putKV(ret, WIN_TO_EMITTED, emitted);
+        putKV(ret, WIN_TO_EMITTED, win2emitted);
 
-        Map transferred = getMapByKey(newStats, TRANSFERRED);
-        transferred = 
mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)),
+        Map<String, Map<String, Long>> transferred = getMapByKey(beat, 
TRANSFERRED);
+        Map<String, Long> win2transferred = 
mergeWithSumLong(aggregateCountStreams(filterSysStreams(transferred, 
includeSys)),
                 getMapByKey(accStats, WIN_TO_TRANSFERRED));
-        putKV(ret, WIN_TO_TRANSFERRED, transferred);
+        putKV(ret, WIN_TO_TRANSFERRED, win2transferred);
 
-        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(
+        putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSumDouble(
                 getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG), 
win2compLatWgtAvg));
-        putKV(ret, WIN_TO_ACKED, mergeWithSum(
+        putKV(ret, WIN_TO_ACKED, mergeWithSumLong(
                 getMapByKey(accStats, WIN_TO_ACKED), win2acked));
-        putKV(ret, WIN_TO_FAILED, mergeWithSum(
-                aggregateCountStreams(getMapByKey(newStats, FAILED)), 
getMapByKey(accStats, WIN_TO_FAILED)));
+        putKV(ret, WIN_TO_FAILED, mergeWithSumLong(
+                aggregateCountStreams(getMapByKey(beat, FAILED)), 
getMapByKey(accStats, WIN_TO_FAILED)));
 
         return ret;
     }
@@ -894,25 +948,23 @@ public class StatsUtil {
      *
      * @param countsSeq a seq of {win -> GlobalStreamId -> value}
      */
-    public static Map aggregateCounts(List countsSeq) {
-        Map ret = new HashMap();
-        for (Object counts : countsSeq) {
-            for (Object o : ((Map) counts).entrySet()) {
-                Map.Entry e = (Map.Entry) o;
-                Object win = e.getKey();
-                Map stream2count = (Map) e.getValue();
+    public static <T> Map<String, Map<T, Long>> 
aggregateCounts(List<Map<String, Map<T, Long>>> countsSeq) {
+        Map<String, Map<T, Long>> ret = new HashMap<>();
+        for (Map<String, Map<T, Long>> counts : countsSeq) {
+            for (Map.Entry<String, Map<T, Long>> entry : counts.entrySet()) {
+                String win = entry.getKey();
+                Map<T, Long> stream2count = entry.getValue();
 
                 if (!ret.containsKey(win)) {
                     ret.put(win, stream2count);
                 } else {
-                    Map existing = (Map) ret.get(win);
-                    for (Object oo : stream2count.entrySet()) {
-                        Map.Entry ee = (Map.Entry) oo;
-                        Object stream = ee.getKey();
+                    Map<T, Long> existing = ret.get(win);
+                    for (Map.Entry<T, Long> subEntry : 
stream2count.entrySet()) {
+                        T stream = subEntry.getKey();
                         if (!existing.containsKey(stream)) {
-                            existing.put(stream, ee.getValue());
+                            existing.put(stream, subEntry.getValue());
                         } else {
-                            existing.put(stream, (Long) ee.getValue() + (Long) 
existing.get(stream));
+                            existing.put(stream, subEntry.getValue() + 
existing.get(stream));
                         }
                     }
                 }
@@ -921,23 +973,24 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map aggregateCompStats(String window, boolean includeSys, 
List data, String compType) {
+    public static Map<String, Object> aggregateCompStats(String window, 
boolean includeSys,
+                                                         List<Map<String, 
Object>> beats, String compType) {
         boolean isSpout = SPOUT.equals(compType);
 
-        Map initVal = new HashMap();
+        Map<String, Object> initVal = new HashMap<>();
         putKV(initVal, WIN_TO_ACKED, new HashMap());
         putKV(initVal, WIN_TO_FAILED, new HashMap());
         putKV(initVal, WIN_TO_EMITTED, new HashMap());
         putKV(initVal, WIN_TO_TRANSFERRED, new HashMap());
 
-        Map stats = new HashMap();
+        Map<String, Object> stats = new HashMap();
         putKV(stats, EXECUTOR_STATS, new ArrayList());
         putKV(stats, SID_TO_OUT_STATS, new HashMap());
         if (isSpout) {
-            putKV(initVal, TYPE, KW_SPOUT);
+            putKV(initVal, TYPE, SPOUT);
             putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap());
         } else {
-            putKV(initVal, TYPE, KW_BOLT);
+            putKV(initVal, TYPE, BOLT);
             putKV(initVal, WIN_TO_EXECUTED, new HashMap());
             putKV(stats, CID_SID_TO_IN_STATS, new HashMap());
             putKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap());
@@ -945,8 +998,8 @@ public class StatsUtil {
         }
         putKV(initVal, STATS, stats);
 
-        for (Object o : data) {
-            initVal = aggCompExecStats(window, includeSys, initVal, (Map) o, 
compType);
+        for (Map<String, Object> beat : beats) {
+            initVal = aggCompExecStats(window, includeSys, initVal, beat, 
compType);
         }
 
         return initVal;
@@ -956,41 +1009,50 @@ public class StatsUtil {
      * Combines the aggregate stats of one executor with the given map, 
selecting
      * the appropriate window and including system components as specified.
      */
-    public static Map aggCompExecStats(String window, boolean includeSys, Map 
accStats, Map newData, String compType) {
-        Map ret = new HashMap();
+    public static Map<String, Object> aggCompExecStats(String window, boolean 
includeSys, Map<String, Object> accStats,
+                                                       Map<String, Object> 
beat, String compType) {
+        Map<String, Object> ret = new HashMap<>();
         if (SPOUT.equals(compType)) {
-            ret.putAll(aggSpoutExecWinStats(accStats, getMapByKey(newData, 
STATS), includeSys));
+            ret.putAll(aggSpoutExecWinStats(accStats, getMapByKey(beat, 
STATS), includeSys));
             putKV(ret, STATS, mergeAggCompStatsCompPageSpout(
                     getMapByKey(accStats, STATS),
-                    aggPreMergeCompPageSpout(newData, window, includeSys)));
+                    aggPreMergeCompPageSpout(beat, window, includeSys)));
         } else {
-            ret.putAll(aggBoltExecWinStats(accStats, getMapByKey(newData, 
STATS), includeSys));
+            ret.putAll(aggBoltExecWinStats(accStats, getMapByKey(beat, STATS), 
includeSys));
             putKV(ret, STATS, mergeAggCompStatsCompPageBolt(
                     getMapByKey(accStats, STATS),
-                    aggPreMergeCompPageBolt(newData, window, includeSys)));
+                    aggPreMergeCompPageBolt(beat, window, includeSys)));
         }
-        putKV(ret, TYPE, keyword(compType));
+        putKV(ret, TYPE, compType);
 
         return ret;
     }
 
-    public static Map postAggregateCompStats(Map task2component, Map 
exec2hostPort, Map accData) {
-        Map ret = new HashMap();
+    /**
+     * post aggregate component stats
+     *
+     * @param task2component task -> component, note it's a clojure map
+     * @param exec2hostPort  executor -> host+port, note it's a clojure map
+     * @param compStats      accumulated comp stats
+     * @return
+     */
+    public static Map<String, Object> postAggregateCompStats(Map 
task2component, Map exec2hostPort, Map<String, Object> compStats) {
+        Map<String, Object> ret = new HashMap<>();
 
-        String compType = ((Keyword) getByKey(accData, TYPE)).getName();
-        Map stats = getMapByKey(accData, STATS);
-        Integer numTasks = getByKeywordOr0(stats, NUM_TASKS).intValue();
-        Integer numExecutors = getByKeywordOr0(stats, 
NUM_EXECUTORS).intValue();
+        String compType = (String) compStats.get(TYPE);
+        Map stats = getMapByKey(compStats, STATS);
+        Integer numTasks = getByKeyOr0(stats, NUM_TASKS).intValue();
+        Integer numExecutors = getByKeyOr0(stats, NUM_EXECUTORS).intValue();
         Map outStats = getMapByKey(stats, SID_TO_OUT_STATS);
 
-        putKV(ret, TYPE, keyword(compType));
+        putKV(ret, TYPE, compType);
         putKV(ret, NUM_TASKS, numTasks);
         putKV(ret, NUM_EXECUTORS, numExecutors);
         putKV(ret, EXECUTOR_STATS, getByKey(stats, EXECUTOR_STATS));
-        putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EMITTED)));
-        putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, 
WIN_TO_TRANSFERRED)));
-        putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, 
WIN_TO_ACKED)));
-        putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, 
WIN_TO_FAILED)));
+        putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(compStats, 
WIN_TO_EMITTED)));
+        putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(compStats, 
WIN_TO_TRANSFERRED)));
+        putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(compStats, 
WIN_TO_ACKED)));
+        putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(compStats, 
WIN_TO_FAILED)));
 
         if (BOLT.equals(compType)) {
             Map inStats = getMapByKey(stats, CID_SID_TO_IN_STATS);
@@ -1000,10 +1062,10 @@ public class StatsUtil {
                 Map.Entry e = (Map.Entry) o;
                 Object k = e.getKey();
                 Map v = (Map) e.getValue();
-                long executed = getByKeywordOr0(v, EXECUTED).longValue();
+                long executed = getByKeyOr0(v, EXECUTED).longValue();
                 if (executed > 0) {
-                    double executeLatencyTotal = getByKeywordOr0(v, 
EXEC_LAT_TOTAL).doubleValue();
-                    double processLatencyTotal = getByKeywordOr0(v, 
PROC_LAT_TOTAL).doubleValue();
+                    double executeLatencyTotal = getByKeyOr0(v, 
EXEC_LAT_TOTAL).doubleValue();
+                    double processLatencyTotal = getByKeyOr0(v, 
PROC_LAT_TOTAL).doubleValue();
                     putKV(v, EXEC_LATENCY, executeLatencyTotal / executed);
                     putKV(v, PROC_LATENCY, processLatencyTotal / executed);
                 } else {
@@ -1017,20 +1079,20 @@ public class StatsUtil {
             putKV(ret, CID_SID_TO_IN_STATS, inStats2);
 
             putKV(ret, SID_TO_OUT_STATS, outStats);
-            putKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKey(accData, 
WIN_TO_EXECUTED)));
+            putKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKey(compStats, 
WIN_TO_EXECUTED)));
             putKV(ret, WIN_TO_EXEC_LAT, computeWeightedAveragesPerWindow(
-                    accData, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED));
+                    compStats, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED));
             putKV(ret, WIN_TO_PROC_LAT, computeWeightedAveragesPerWindow(
-                    accData, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED));
+                    compStats, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED));
         } else {
             Map outStats2 = new HashMap();
             for (Object o : outStats.entrySet()) {
                 Map.Entry e = (Map.Entry) o;
                 Object k = e.getKey();
                 Map v = (Map) e.getValue();
-                long acked = getByKeywordOr0(v, ACKED).longValue();
+                long acked = getByKeyOr0(v, ACKED).longValue();
                 if (acked > 0) {
-                    double compLatencyTotal = getByKeywordOr0(v, 
COMP_LAT_TOTAL).doubleValue();
+                    double compLatencyTotal = getByKeyOr0(v, 
COMP_LAT_TOTAL).doubleValue();
                     putKV(v, COMP_LATENCY, compLatencyTotal / acked);
                 } else {
                     putKV(v, COMP_LATENCY, 0.0);
@@ -1040,60 +1102,103 @@ public class StatsUtil {
             }
             putKV(ret, SID_TO_OUT_STATS, outStats2);
             putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow(
-                    accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
+                    compStats, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED));
         }
 
         return ret;
     }
 
     public static ComponentPageInfo aggCompExecsStats(
-            Map exec2hostPort, Map task2component, Map beats, String window, 
boolean includeSys,
-            String topologyId, StormTopology topology, String componentId) {
+            Map exec2hostPort, Map task2component, Map<List<Integer>, 
Map<String, Object>> beats,
+            String window, boolean includeSys, String topologyId, 
StormTopology topology, String componentId) {
 
-        List beatList = extractDataFromHb(exec2hostPort, task2component, 
beats, includeSys, topology, componentId);
-        Map compStats = aggregateCompStats(window, includeSys, beatList, 
componentType(topology, componentId).getName());
+        List<Map<String, Object>> beatList =
+                extractDataFromHb(exec2hostPort, task2component, beats, 
includeSys, topology, componentId);
+        Map<String, Object> compStats = aggregateCompStats(window, includeSys, 
beatList, componentType(topology, componentId));
         compStats = postAggregateCompStats(task2component, exec2hostPort, 
compStats);
         return thriftifyCompPageData(topologyId, topology, componentId, 
compStats);
     }
 
 
     // 
=====================================================================================
-    // clojurify stats methods
+    // convert thrift stats to java maps
     // 
=====================================================================================
 
-    public static Map clojurifyStats(Map stats) {
-        Map ret = new HashMap();
-        for (Object o : stats.entrySet()) {
-            Map.Entry entry = (Map.Entry) o;
-            ExecutorInfo executorInfo = (ExecutorInfo) entry.getKey();
-            ExecutorStats executorStats = (ExecutorStats) entry.getValue();
+    public static Map<List<Integer>, Map<String, Object>> 
convertExecutorBeats(Map<ExecutorInfo, ExecutorBeat> beats) {
+        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
+        for (Map.Entry<ExecutorInfo, ExecutorBeat> beat : beats.entrySet()) {
+            ExecutorInfo executorInfo = beat.getKey();
+            ExecutorBeat executorBeat = beat.getValue();
+            ret.put(Lists.newArrayList(executorInfo.get_task_start(), 
executorInfo.get_task_end()),
+                    convertZkExecutorHb(executorBeat));
+        }
+
+        return ret;
+    }
+
+    /**
+     * convert thrift ExecutorBeat into a java HashMap
+     */
+    public static Map<String, Object> convertZkExecutorHb(ExecutorBeat beat) {
+        Map<String, Object> ret = new HashMap<>();
+        if (beat != null) {
+            ret.put(TIME_SECS, beat.getTimeSecs());
+            ret.put(UPTIME, beat.getUptime());
+            ret.put(STATS, convertExecutorStats(beat.getStats()));
+        }
+
+        return ret;
+    }
+
+    public static Map<String, Object> convertZkWorkerHb(ClusterWorkerHeartbeat 
workerHb) {
+        Map<String, Object> ret = new HashMap<>();
+        if (workerHb != null) {
+            ret.put("storm-id", workerHb.get_storm_id());
+            ret.put(EXECUTOR_STATS, 
convertExecutorsStats(workerHb.get_executor_stats()));
+            ret.put(UPTIME, workerHb.get_uptime_secs());
+            ret.put(TIME_SECS, workerHb.get_time_secs());
+        }
+        return ret;
+    }
+
+    /**
+     * convert executors stats into a HashMap, note that ExecutorStats are 
remained unchanged
+     */
+    public static Map<List<Integer>, ExecutorStats> 
convertExecutorsStats(Map<ExecutorInfo, ExecutorStats> stats) {
+        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
+        for (Map.Entry<ExecutorInfo, ExecutorStats> entry : stats.entrySet()) {
+            ExecutorInfo executorInfo = entry.getKey();
+            ExecutorStats executorStats = entry.getValue();
 
             ret.put(Lists.newArrayList(executorInfo.get_task_start(), 
executorInfo.get_task_end()),
-                    clojurifyExecutorStats(executorStats));
+                    executorStats);
         }
         return ret;
     }
 
-    public static Map clojurifyExecutorStats(ExecutorStats stats) {
-        Map ret = new HashMap();
+    /**
+     * convert thrift ExecutorStats structure into a java HashMap
+     */
+    public static Map<String, Object> convertExecutorStats(ExecutorStats 
stats) {
+        Map<String, Object> ret = new HashMap<>();
 
         putKV(ret, EMITTED, stats.get_emitted());
         putKV(ret, TRANSFERRED, stats.get_transferred());
-        putKV(ret, "rate", stats.get_rate());
+        putKV(ret, RATE, stats.get_rate());
 
         if (stats.get_specific().is_set_bolt()) {
-            mergeMaps(ret, 
clojurifySpecificStats(stats.get_specific().get_bolt()));
-            putKV(ret, TYPE, KW_BOLT);
+            ret.putAll(convertSpecificStats(stats.get_specific().get_bolt()));
+            putKV(ret, TYPE, BOLT);
         } else {
-            mergeMaps(ret, 
clojurifySpecificStats(stats.get_specific().get_spout()));
-            putKV(ret, TYPE, KW_SPOUT);
+            ret.putAll(convertSpecificStats(stats.get_specific().get_spout()));
+            putKV(ret, TYPE, SPOUT);
         }
 
         return ret;
     }
 
-    public static Map clojurifySpecificStats(SpoutStats stats) {
-        Map ret = new HashMap();
+    private static Map<String, Object> convertSpecificStats(SpoutStats stats) {
+        Map<String, Object> ret = new HashMap<>();
         putKV(ret, ACKED, stats.get_acked());
         putKV(ret, FAILED, stats.get_failed());
         putKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg());
@@ -1101,8 +1206,8 @@ public class StatsUtil {
         return ret;
     }
 
-    public static Map clojurifySpecificStats(BoltStats stats) {
-        Map ret = new HashMap();
+    private static Map<String, Object> convertSpecificStats(BoltStats stats) {
+        Map<String, Object> ret = new HashMap<>();
 
         Map acked = windowSetConverter(stats.get_acked(), FROM_GSID, IDENTITY);
         Map failed = windowSetConverter(stats.get_failed(), FROM_GSID, 
IDENTITY);
@@ -1119,9 +1224,9 @@ public class StatsUtil {
         return ret;
     }
 
-    public static List extractNodeInfosFromHbForComp(
+    public static List<Map<String, Object>> extractNodeInfosFromHbForComp(
             Map exec2hostPort, Map task2component, boolean includeSys, String 
compId) {
-        List ret = new ArrayList();
+        List<Map<String, Object>> ret = new ArrayList<>();
 
         Set<List> hostPorts = new HashSet<>();
         for (Object o : exec2hostPort.entrySet()) {
@@ -1139,7 +1244,7 @@ public class StatsUtil {
         }
 
         for (List hostPort : hostPorts) {
-            Map m = new HashMap();
+            Map<String, Object> m = new HashMap<>();
             putKV(m, HOST, hostPort.get(0));
             putKV(m, PORT, hostPort.get(1));
             ret.add(m);
@@ -1148,32 +1253,108 @@ public class StatsUtil {
         return ret;
     }
 
+
+    // 
=====================================================================================
+    // heartbeats related
+    // 
=====================================================================================
+
+    /**
+     * update all executor heart beats
+     * TODO: should move this method to nimbus when nimbus.clj is translated
+     *
+     * @param cache         existing heart beats cache
+     * @param executorBeats new heart beats
+     * @param executors     all executors
+     * @param timeout       timeout
+     * @return a HashMap of updated executor heart beats
+     */
+    public static Map<List<Integer>, Object> 
updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
+                                                                  
Map<List<Integer>, Map<String, Object>> executorBeats,
+                                                                  
Set<List<Integer>> executors, Integer timeout) {
+        Map<List<Integer>, Object> ret = new HashMap<>();
+        if (cache == null && executorBeats == null) {
+            return ret;
+        }
+
+        if (cache == null) {
+            cache = new HashMap<>();
+        }
+        if (executorBeats == null) {
+            executorBeats = new HashMap<>();
+        }
+
+        for (List<Integer> executor : executors) {
+            ret.put(executor, updateExecutorCache(cache.get(executor), 
executorBeats.get(executor), timeout));
+        }
+
+        return ret;
+    }
+
+    // TODO: should move this method to nimbus when nimbus.clj is translated
+    public static Map<String, Object> updateExecutorCache(
+            Map<String, Object> currBeat, Map<String, Object> newBeat, Integer 
timeout) {
+        Map<String, Object> ret = new HashMap<>();
+
+        Integer lastNimbusTime = null, lastReportedTime = null;
+        if (currBeat != null) {
+            lastNimbusTime = (Integer) currBeat.get("nimbus-time");
+            lastReportedTime = (Integer) 
currBeat.get("executor-reported-time");
+        }
+
+        Integer reportedTime = null;
+        if (newBeat != null) {
+            reportedTime = (Integer) newBeat.get(TIME_SECS);
+        }
+
+        if (reportedTime == null) {
+            if (lastReportedTime != null) {
+                reportedTime = lastReportedTime;
+            } else {
+                reportedTime = 0;
+            }
+        }
+
+        if (lastNimbusTime == null || !reportedTime.equals(lastReportedTime)) {
+            lastNimbusTime = Time.currentTimeSecs();
+        }
+
+        ret.put("is-timed-out", Time.deltaSecs(lastNimbusTime) >= timeout);
+        ret.put("nimbus-time", lastNimbusTime);
+        ret.put("executor-reported-time", reportedTime);
+        ret.put(HEARTBEAT, newBeat);
+
+        return ret;
+    }
+
+
     /**
      * extracts a list of executor data from heart beats
      */
-    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component, Map beats,
+    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component,
+                                                              
Map<List<Integer>, Map<String, Object>> beats,
                                                               boolean 
includeSys, StormTopology topology) {
         return extractDataFromHb(executor2hostPort, task2component, beats, 
includeSys, topology, null);
     }
 
-    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component, Map beats,
+    public static List<Map<String, Object>> extractDataFromHb(Map 
executor2hostPort, Map task2component,
+                                                              
Map<List<Integer>, Map<String, Object>> beats,
                                                               boolean 
includeSys, StormTopology topology, String compId) {
         List<Map<String, Object>> ret = new ArrayList<>();
-        if (executor2hostPort == null) {
+        if (executor2hostPort == null || beats == null) {
             return ret;
         }
         for (Object o : executor2hostPort.entrySet()) {
             Map.Entry entry = (Map.Entry) o;
-            List key = (List) entry.getKey();
-            List value = (List) entry.getValue();
+            List executor = (List) entry.getKey();
+            List hostPort = (List) entry.getValue();
 
-            Integer start = ((Number) key.get(0)).intValue();
-            Integer end = ((Number) key.get(1)).intValue();
+            Integer start = ((Number) executor.get(0)).intValue();
+            Integer end = ((Number) executor.get(1)).intValue();
 
-            String host = (String) value.get(0);
-            Integer port = ((Number) value.get(1)).intValue();
+            String host = (String) hostPort.get(0);
+            Integer port = ((Number) hostPort.get(1)).intValue();
 
-            Map beat = (Map) beats.get(key);
+            Map<String, Object> beat = beats.get(convertExecutor(executor));
             if (beat == null) {
                 continue;
             }
@@ -1186,14 +1367,16 @@ public class StatsUtil {
                 putKV(m, NUM_TASKS, end - start + 1);
                 putKV(m, HOST, host);
                 putKV(m, PORT, port);
-                putKV(m, UPTIME, beat.get(keyword(UPTIME)));
-                putKV(m, STATS, beat.get(keyword(STATS)));
 
-                Keyword type = componentType(topology, compId);
+                Map stats = getMapByKey(getMapByKey(beat, (HEARTBEAT)), STATS);
+                putKV(m, UPTIME, getMapByKey(beat, HEARTBEAT).get(UPTIME));
+                putKV(m, STATS, stats);
+
+                String type = componentType(topology, compId);
                 if (type != null) {
                     putKV(m, TYPE, type);
                 } else {
-                    putKV(m, TYPE, getByKey(getMapByKey(beat, STATS), TYPE));
+                    putKV(m, TYPE, stats.get(TYPE));
                 }
                 ret.add(m);
             }
@@ -1201,8 +1384,9 @@ public class StatsUtil {
         return ret;
     }
 
-    private static Map computeWeightedAveragesPerWindow(Map accData, String 
wgtAvgKey, String divisorKey) {
-        Map ret = new HashMap();
+    private static Map<String, Double> 
computeWeightedAveragesPerWindow(Map<String, Object> accData,
+                                                                        String 
wgtAvgKey, String divisorKey) {
+        Map<String, Double> ret = new HashMap<>();
         for (Object o : getMapByKey(accData, wgtAvgKey).entrySet()) {
             Map.Entry e = (Map.Entry) o;
             Object window = e.getKey();
@@ -1216,16 +1400,31 @@ public class StatsUtil {
     }
 
 
+    public static Set<List<Integer>> convertExecutors(Set executors) {
+        Set<List<Integer>> convertedExecutors = new HashSet<>();
+        for (Object executor : executors) {
+            List l = (List) executor;
+            convertedExecutors.add(convertExecutor(l));
+        }
+        return convertedExecutors;
+    }
+
+    /**
+     * convert a clojure executor to java List<Integer>
+     */
+    public static List<Integer> convertExecutor(List executor) {
+        return Lists.newArrayList(((Number) executor.get(0)).intValue(), 
((Number) executor.get(1)).intValue());
+    }
+
     /**
      * computes max bolt capacity
      *
      * @param executorSumms a list of ExecutorSummary
      * @return max bolt capacity
      */
-    public static double computeBoltCapacity(List executorSumms) {
+    public static double computeBoltCapacity(List<ExecutorSummary> 
executorSumms) {
         double max = 0.0;
-        for (Object o : executorSumms) {
-            ExecutorSummary summary = (ExecutorSummary) o;
+        for (ExecutorSummary summary : executorSumms) {
             double capacity = computeExecutorCapacity(summary);
             if (capacity > max) {
                 max = capacity;
@@ -1234,19 +1433,22 @@ public class StatsUtil {
         return max;
     }
 
-    public static double computeExecutorCapacity(ExecutorSummary summ) {
-        ExecutorStats stats = summ.get_stats();
+    public static double computeExecutorCapacity(ExecutorSummary summary) {
+        ExecutorStats stats = summary.get_stats();
         if (stats == null) {
             return 0.0;
         } else {
-            Map m = aggregateBoltStats(Lists.newArrayList(stats), true);
+            // Map<String, Map<String/GlobalStreamId, Long/Double>> {win -> 
stream -> value}
+            Map<String, Map> m = 
aggregateBoltStats(Lists.newArrayList(summary), true);
+            // {metric -> win -> value} ==> {win -> metric -> value}
             m = swapMapOrder(aggregateBoltStreams(m));
+            // {metric -> value}
             Map data = getMapByKey(m, TEN_MIN_IN_SECONDS_STR);
 
-            int uptime = summ.get_uptime_secs();
+            int uptime = summary.get_uptime_secs();
             int win = Math.min(uptime, TEN_MIN_IN_SECONDS);
-            long executed = getByKeywordOr0(data, EXECUTED).longValue();
-            double latency = getByKeywordOr0(data, 
EXEC_LATENCIES).doubleValue();
+            long executed = getByKeyOr0(data, EXECUTED).longValue();
+            double latency = getByKeyOr0(data, EXEC_LATENCIES).doubleValue();
             if (win > 0) {
                 return executed * latency / (1000 * win);
             }
@@ -1260,35 +1462,33 @@ public class StatsUtil {
      * @param summs a list of ExecutorSummary
      * @return filtered summs
      */
-    public static List getFilledStats(List summs) {
-        for (Iterator itr = summs.iterator(); itr.hasNext(); ) {
-            ExecutorSummary summ = (ExecutorSummary) itr.next();
-            if (summ.get_stats() == null) {
-                itr.remove();
+    public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> 
summs) {
+        List<ExecutorSummary> ret = new ArrayList<>();
+        for (ExecutorSummary summ : summs) {
+            if (summ.get_stats() != null) {
+                ret.add(summ);
             }
         }
-        return summs;
+        return ret;
     }
 
-    private static Map mapKeyStr(Map m) {
-        Map ret = new HashMap();
-        for (Object k : m.keySet()) {
-            ret.put(k.toString(), m.get(k));
+    private static <K, V> Map<String, V> mapKeyStr(Map<K, V> m) {
+        Map<String, V> ret = new HashMap<>();
+        for (Map.Entry<K, V> entry : m.entrySet()) {
+            ret.put(entry.getKey().toString(), entry.getValue());
         }
         return ret;
     }
 
-    private static long sumStreamsLong(Map m, String key) {
+    private static <K1, K2> long sumStreamsLong(Map<K1, Map<K2, ?>> m, String 
key) {
         long sum = 0;
         if (m == null) {
             return sum;
         }
-        for (Object v : m.values()) {
-            Map sub = (Map) v;
-            for (Object o : sub.entrySet()) {
-                Map.Entry e = (Map.Entry) o;
-                if (((Keyword) e.getKey()).getName().equals(key)) {
-                    sum += ((Number) e.getValue()).longValue();
+        for (Map<K2, ?> v : m.values()) {
+            for (Map.Entry<K2, ?> entry : v.entrySet()) {
+                if (entry.getKey().equals(key)) {
+                    sum += ((Number) entry.getValue()).longValue();
                 }
             }
         }
@@ -1304,7 +1504,7 @@ public class StatsUtil {
             Map sub = (Map) v;
             for (Object o : sub.entrySet()) {
                 Map.Entry e = (Map.Entry) o;
-                if (((Keyword) e.getKey()).getName().equals(key)) {
+                if (e.getKey().equals(key)) {
                     sum += ((Number) e.getValue()).doubleValue();
                 }
             }
@@ -1340,21 +1540,15 @@ public class StatsUtil {
      * @param includeSys whether to filter system streams
      * @return filtered stats
      */
-    private static Map filterSysStreams(Map stats, boolean includeSys) {
+    private static <K, V> Map<String, Map<K, V>> filterSysStreams(Map<String, 
Map<K, V>> stats, boolean includeSys) {
         if (!includeSys) {
-            for (Iterator itr = stats.keySet().iterator(); itr.hasNext(); ) {
-                Object winOrStream = itr.next();
-                if (isWindow(winOrStream)) {
-                    Map stream2stat = (Map) stats.get(winOrStream);
-                    for (Iterator subItr = stream2stat.keySet().iterator(); 
subItr.hasNext(); ) {
-                        Object key = subItr.next();
-                        if (key instanceof String && Utils.isSystemId((String) 
key)) {
-                            subItr.remove();
-                        }
-                    }
-                } else {
-                    if (winOrStream instanceof String && 
Utils.isSystemId((String) winOrStream)) {
-                        itr.remove();
+            for (Iterator<String> itr = stats.keySet().iterator(); 
itr.hasNext(); ) {
+                String winOrStream = itr.next();
+                Map<K, V> stream2stat = stats.get(winOrStream);
+                for (Iterator subItr = stream2stat.keySet().iterator(); 
subItr.hasNext(); ) {
+                    Object key = subItr.next();
+                    if (key instanceof String && Utils.isSystemId((String) 
key)) {
+                        subItr.remove();
                     }
                 }
             }
@@ -1362,15 +1556,12 @@ public class StatsUtil {
         return stats;
     }
 
-    private static boolean isWindow(Object key) {
-        return key.equals("600") || key.equals("10800") || key.equals("86400") 
|| key.equals(":all-time");
-    }
-
     /**
      * equals to clojure's: (merge-with (partial merge-with sum-or-0) acc-out 
spout-out)
      */
-    private static Map fullMergeWithSum(Map m1, Map m2) {
-        Set<Object> allKeys = new HashSet<>();
+    private static <K1, K2> Map<K1, Map<K2, Number>> fullMergeWithSum(Map<K1, 
Map<K2, ?>> m1,
+                                                                      Map<K1, 
Map<K2, ?>> m2) {
+        Set<K1> allKeys = new HashSet<>();
         if (m1 != null) {
             allKeys.addAll(m1.keySet());
         }
@@ -1378,14 +1569,14 @@ public class StatsUtil {
             allKeys.addAll(m2.keySet());
         }
 
-        Map ret = new HashMap();
-        for (Object k : allKeys) {
-            Map mm1 = null, mm2 = null;
+        Map<K1, Map<K2, Number>> ret = new HashMap<>();
+        for (K1 k : allKeys) {
+            Map<K2, ?> mm1 = null, mm2 = null;
             if (m1 != null) {
-                mm1 = (Map) m1.get(k);
+                mm1 = m1.get(k);
             }
             if (m2 != null) {
-                mm2 = (Map) m2.get(k);
+                mm2 = m2.get(k);
             }
             ret.put(k, mergeWithSum(mm1, mm2));
         }
@@ -1393,10 +1584,10 @@ public class StatsUtil {
         return ret;
     }
 
-    private static Map mergeWithSum(Map m1, Map m2) {
-        Map ret = new HashMap();
+    private static <K> Map<K, Number> mergeWithSum(Map<K, ?> m1, Map<K, ?> m2) 
{
+        Map<K, Number> ret = new HashMap<>();
 
-        Set<Object> allKeys = new HashSet<>();
+        Set<K> allKeys = new HashSet<>();
         if (m1 != null) {
             allKeys.addAll(m1.keySet());
         }
@@ -1404,10 +1595,52 @@ public class StatsUtil {
             allKeys.addAll(m2.keySet());
         }
 
-        for (Object k : allKeys) {
+        for (K k : allKeys) {
             Number n1 = getOr0(m1, k);
             Number n2 = getOr0(m2, k);
-            ret.put(k, add(n1, n2));
+            if (n1 instanceof Long) {
+                ret.put(k, n1.longValue() + n2.longValue());
+            } else {
+                ret.put(k, n1.doubleValue() + n2.doubleValue());
+            }
+        }
+        return ret;
+    }
+
+    private static <K> Map mergeWithSumLong(Map<K, Long> m1, Map<K, Long> m2) {
+        Map<K, Long> ret = new HashMap<>();
+
+        Set<K> allKeys = new HashSet<>();
+        if (m1 != null) {
+            allKeys.addAll(m1.keySet());
+        }
+        if (m2 != null) {
+            allKeys.addAll(m2.keySet());
+        }
+
+        for (K k : allKeys) {
+            Number n1 = getOr0(m1, k);
+            Number n2 = getOr0(m2, k);
+            ret.put(k, n1.longValue() + n2.longValue());
+        }
+        return ret;
+    }
+
+    private static <K> Map mergeWithSumDouble(Map<K, Double> m1, Map<K, 
Double> m2) {
+        Map<K, Double> ret = new HashMap<>();
+
+        Set<K> allKeys = new HashSet<>();
+        if (m1 != null) {
+            allKeys.addAll(m1.keySet());
+        }
+        if (m2 != null) {
+            allKeys.addAll(m2.keySet());
+        }
+
+        for (K k : allKeys) {
+            Number n1 = getOr0(m1, k);
+            Number n2 = getOr0(m2, k);
+            ret.put(k, n1.doubleValue() + n2.doubleValue());
         }
         return ret;
     }
@@ -1416,10 +1649,11 @@ public class StatsUtil {
      * this method merges 2 two-level-deep maps, which is different from 
mergeWithSum, and we expect the two maps
      * have the same keys
      */
-    private static Map mergeWithAddPair(Map m1, Map m2) {
-        Map ret = new HashMap();
+    private static <K> Map<String, Map<K, List>> mergeWithAddPair(Map<String, 
Map<K, List>> m1,
+                                                                  Map<String, 
Map<K, List>> m2) {
+        Map<String, Map<K, List>> ret = new HashMap<>();
 
-        Set<Object> allKeys = new HashSet<>();
+        Set<String> allKeys = new HashSet<>();
         if (m1 != null) {
             allKeys.addAll(m1.keySet());
         }
@@ -1427,9 +1661,9 @@ public class StatsUtil {
             allKeys.addAll(m2.keySet());
         }
 
-        for (Object k : allKeys) {
-            Map mm1 = (m1 != null) ? (Map) m1.get(k) : null;
-            Map mm2 = (m2 != null) ? (Map) m2.get(k) : null;
+        for (String k : allKeys) {
+            Map<K, List> mm1 = (m1 != null) ? m1.get(k) : null;
+            Map<K, List> mm2 = (m2 != null) ? m2.get(k) : null;
             if (mm1 == null && mm2 == null) {
                 continue;
             } else if (mm1 == null) {
@@ -1437,13 +1671,17 @@ public class StatsUtil {
             } else if (mm2 == null) {
                 ret.put(k, mm1);
             } else {
-                Map tmp = new HashMap();
-                for (Object kk : mm1.keySet()) {
-                    List seq1 = (List) mm1.get(kk);
-                    List seq2 = (List) mm2.get(kk);
+                Map<K, List> tmp = new HashMap<>();
+                for (K kk : mm1.keySet()) {
+                    List seq1 = mm1.get(kk);
+                    List seq2 = mm2.get(kk);
                     List sums = new ArrayList();
                     for (int i = 0; i < seq1.size(); i++) {
-                        sums.add(add((Number) seq1.get(i), (Number) 
seq2.get(i)));
+                        if (seq1.get(i) instanceof Long) {
+                            sums.add(((Number) seq1.get(i)).longValue() + 
((Number) seq2.get(i)).longValue());
+                        } else {
+                            sums.add(((Number) seq1.get(i)).doubleValue() + 
((Number) seq2.get(i)).doubleValue());
+                        }
                     }
                     tmp.put(kk, sums);
                 }
@@ -1457,65 +1695,36 @@ public class StatsUtil {
     // thriftify stats methods
     // 
=====================================================================================
 
-    private static TopologyPageInfo thriftifyTopoPageData(String topologyId, 
Map data) {
-        TopologyPageInfo ret = new TopologyPageInfo(topologyId);
-        Integer numTasks = getByKeywordOr0(data, NUM_TASKS).intValue();
-        Integer numWorkers = getByKeywordOr0(data, NUM_WORKERS).intValue();
-        Integer numExecutors = getByKeywordOr0(data, NUM_EXECUTORS).intValue();
-        Map spout2stats = getMapByKey(data, SPOUT_TO_STATS);
-        Map bolt2stats = getMapByKey(data, BOLT_TO_STATS);
-        Map win2emitted = getMapByKey(data, WIN_TO_EMITTED);
-        Map win2transferred = getMapByKey(data, WIN_TO_TRANSFERRED);
-        Map win2compLatency = getMapByKey(data, WIN_TO_COMP_LAT);
-        Map win2acked = getMapByKey(data, WIN_TO_ACKED);
-        Map win2failed = getMapByKey(data, WIN_TO_FAILED);
-
-        Map<String, ComponentAggregateStats> spoutAggStats = new HashMap<>();
-        for (Object o : spout2stats.entrySet()) {
-            Map.Entry e = (Map.Entry) o;
-            String id = (String) e.getKey();
-            Map v = (Map) e.getValue();
-            putKV(v, TYPE, KW_SPOUT);
-
-            spoutAggStats.put(id, thriftifySpoutAggStats(v));
-        }
+    public static ClusterWorkerHeartbeat thriftifyZkWorkerH

<TRUNCATED>

Reply via email to