http://git-wip-us.apache.org/repos/asf/storm/blob/4c246d1c/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index 351e830..7650ab1 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -17,8 +17,6 @@ */ package org.apache.storm.stats; -import clojure.lang.Keyword; -import clojure.lang.RT; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.HashMap; @@ -27,10 +25,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.storm.cluster.ExecutorBeat; import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.BoltAggregateStats; import org.apache.storm.generated.BoltStats; +import org.apache.storm.generated.ClusterWorkerHeartbeat; import org.apache.storm.generated.CommonAggregateStats; import org.apache.storm.generated.ComponentAggregateStats; import org.apache.storm.generated.ComponentPageInfo; @@ -48,19 +48,18 @@ import org.apache.storm.generated.SpoutStats; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.TopologyPageInfo; import org.apache.storm.generated.TopologyStats; +import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@SuppressWarnings("unchecked, unused") +@SuppressWarnings("unchecked") public class StatsUtil { private static final Logger logger = LoggerFactory.getLogger(StatsUtil.class); public static final String TYPE = "type"; private static final String SPOUT = "spout"; private static final String BOLT = "bolt"; - public static final Keyword KW_SPOUT = keyword(SPOUT); - public static final Keyword KW_BOLT = keyword(BOLT); private static final String UPTIME = "uptime"; private static final String HOST = "host"; @@ -73,7 +72,10 @@ public class StatsUtil { private static final String EXECUTOR_STATS = "executor-stats"; private static final String EXECUTOR_ID = "executor-id"; private static final String LAST_ERROR = "lastError"; + private static final String HEARTBEAT = "heartbeat"; + private static final String TIME_SECS = "time-secs"; + private static final String RATE = "rate"; private static final String ACKED = "acked"; private static final String FAILED = "failed"; private static final String EXECUTED = "executed"; @@ -130,8 +132,10 @@ public class StatsUtil { * @param id2procAvg { global stream id -> proc avg value } * @param id2numExec { global stream id -> executed } */ - public static Map aggBoltLatAndCount(Map id2execAvg, Map id2procAvg, Map id2numExec) { - Map ret = new HashMap(); + public static Map<String, Number> aggBoltLatAndCount(Map<List<String>, Double> id2execAvg, + Map<List<String>, Double> id2procAvg, + Map<List<String>, Long> id2numExec) { + Map<String, Number> ret = new HashMap<>(); putKV(ret, EXEC_LAT_TOTAL, weightAvgAndSum(id2execAvg, id2numExec)); putKV(ret, PROC_LAT_TOTAL, weightAvgAndSum(id2procAvg, id2numExec)); putKV(ret, EXECUTED, sumValues(id2numExec)); @@ -142,8 +146,8 @@ public class StatsUtil { /** * Aggregates number acked and complete latencies across all streams. */ - public static Map aggSpoutLatAndCount(Map id2compAvg, Map id2numAcked) { - Map ret = new HashMap(); + public static Map<String, Number> aggSpoutLatAndCount(Map<String, Double> id2compAvg, Map<String, Long> id2numAcked) { + Map<String, Number> ret = new HashMap<>(); putKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked)); putKV(ret, ACKED, sumValues(id2numAcked)); @@ -185,7 +189,7 @@ public class StatsUtil { return ret; } - public static Map aggPreMergeCompPageBolt(Map m, String window, boolean includeSys) { + public static Map aggPreMergeCompPageBolt(Map<String, Object> m, String window, boolean includeSys) { Map ret = new HashMap(); putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id")); putKV(ret, HOST, getByKey(m, HOST)); @@ -195,7 +199,7 @@ public class StatsUtil { putKV(ret, NUM_TASKS, getByKey(m, NUM_TASKS)); Map stat2win2sid2num = getMapByKey(m, STATS); - putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeywordOr0(m, UPTIME).intValue())); + putKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue())); // calc cid+sid->input_stats Map inputStats = new HashMap(); @@ -232,8 +236,8 @@ public class StatsUtil { return ret; } - public static Map aggPreMergeCompPageSpout(Map m, String window, boolean includeSys) { - Map ret = new HashMap(); + public static Map<String, Object> aggPreMergeCompPageSpout(Map<String, Object> m, String window, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); putKV(ret, EXECUTOR_ID, getByKey(m, "exec-id")); putKV(ret, HOST, getByKey(m, HOST)); putKV(ret, PORT, getByKey(m, PORT)); @@ -265,97 +269,111 @@ public class StatsUtil { return ret; } - public static Map aggPreMergeTopoPageBolt(Map m, String window, boolean includeSys) { - Map ret = new HashMap(); + public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageBolt( + Map<String, Object> m, String window, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); - Map subRet = new HashMap(); + Map<String, Object> subRet = new HashMap<>(); putKV(subRet, NUM_EXECUTORS, 1); putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS)); - Map stat2win2sid2num = getMapByKey(m, STATS); - putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeywordOr0(m, UPTIME).intValue())); + Map<String, Object> stat2win2sid2num = getMapByKey(m, STATS); + putKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeyOr0(m, UPTIME).intValue())); for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) { - Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, key), TO_STRING).get(window); + Map<String, Map<K, V>> stat = windowSetConverter(getMapByKey(stat2win2sid2num, key), TO_STRING); if (EMITTED.equals(key) || TRANSFERRED.equals(key)) { stat = filterSysStreams(stat, includeSys); } + Map<K, V> winStat = stat.get(window); long sum = 0; - if (stat != null) { - for (Object o : stat.values()) { - sum += ((Number) o).longValue(); + if (winStat != null) { + for (V v : winStat.values()) { + sum += v.longValue(); } } putKV(subRet, key, sum); } - Map win2sid2execLat = windowSetConverter(getMapByKey(stat2win2sid2num, EXEC_LATENCIES), TO_STRING); - Map win2sid2procLat = windowSetConverter(getMapByKey(stat2win2sid2num, PROC_LATENCIES), TO_STRING); - Map win2sid2exec = windowSetConverter(getMapByKey(stat2win2sid2num, EXECUTED), TO_STRING); + Map<String, Map<List<String>, Double>> win2sid2execLat = + windowSetConverter(getMapByKey(stat2win2sid2num, EXEC_LATENCIES), TO_STRING); + Map<String, Map<List<String>, Double>> win2sid2procLat = + windowSetConverter(getMapByKey(stat2win2sid2num, PROC_LATENCIES), TO_STRING); + Map<String, Map<List<String>, Long>> win2sid2exec = + windowSetConverter(getMapByKey(stat2win2sid2num, EXECUTED), TO_STRING); subRet.putAll(aggBoltLatAndCount( - (Map) win2sid2execLat.get(window), (Map) win2sid2procLat.get(window), (Map) win2sid2exec.get(window))); + win2sid2execLat.get(window), win2sid2procLat.get(window), win2sid2exec.get(window))); - ret.put(getByKey(m, "comp-id"), subRet); + ret.put((String) getByKey(m, "comp-id"), subRet); return ret; } - public static Map aggPreMergeTopoPageSpout(Map m, String window, boolean includeSys) { - Map ret = new HashMap(); + /** + * returns { comp id -> comp-stats } + */ + public static <K, V extends Number> Map<String, Object> aggPreMergeTopoPageSpout( + Map<String, Object> m, String window, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); - Map subRet = new HashMap(); + Map<String, Object> subRet = new HashMap<>(); putKV(subRet, NUM_EXECUTORS, 1); putKV(subRet, NUM_TASKS, getByKey(m, NUM_TASKS)); // no capacity for spout - Map stat2win2sid2num = getMapByKey(m, STATS); + Map<String, Map<String, Map<String, V>>> stat2win2sid2num = getMapByKey(m, STATS); for (String key : new String[]{EMITTED, TRANSFERRED, FAILED}) { - Map stat = (Map) windowSetConverter(getMapByKey(stat2win2sid2num, key), TO_STRING).get(window); + Map<String, Map<K, V>> stat = windowSetConverter(stat2win2sid2num.get(key), TO_STRING); if (EMITTED.equals(key) || TRANSFERRED.equals(key)) { stat = filterSysStreams(stat, includeSys); } + Map<K, V> winStat = stat.get(window); long sum = 0; - if (stat != null) { - for (Object o : stat.values()) { - sum += ((Number) o).longValue(); + if (winStat != null) { + for (V v : winStat.values()) { + sum += v.longValue(); } } putKV(subRet, key, sum); } - Map win2sid2compLat = windowSetConverter(getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING); - Map win2sid2acked = windowSetConverter(getMapByKey(stat2win2sid2num, ACKED), TO_STRING); - subRet.putAll(aggSpoutLatAndCount((Map) win2sid2compLat.get(window), (Map) win2sid2acked.get(window))); + Map<String, Map<String, Double>> win2sid2compLat = + windowSetConverter(getMapByKey(stat2win2sid2num, COMP_LATENCIES), TO_STRING); + Map<String, Map<String, Long>> win2sid2acked = + windowSetConverter(getMapByKey(stat2win2sid2num, ACKED), TO_STRING); + subRet.putAll(aggSpoutLatAndCount(win2sid2compLat.get(window), win2sid2acked.get(window))); - ret.put(getByKey(m, "comp-id"), subRet); + ret.put((String) getByKey(m, "comp-id"), subRet); return ret; } - public static Map mergeAggCompStatsCompPageBolt(Map accBoltStats, Map boltStats) { - Map ret = new HashMap(); + public static Map<String, Object> mergeAggCompStatsCompPageBolt( + Map<String, Object> accBoltStats, Map<String, Object> boltStats) { + Map<String, Object> ret = new HashMap<>(); - Map accIn = getMapByKey(accBoltStats, CID_SID_TO_IN_STATS); - Map accOut = getMapByKey(accBoltStats, SID_TO_OUT_STATS); - Map boltIn = getMapByKey(boltStats, CID_SID_TO_IN_STATS); - Map boltOut = getMapByKey(boltStats, SID_TO_OUT_STATS); + Map<List<String>, Map<String, ?>> accIn = getMapByKey(accBoltStats, CID_SID_TO_IN_STATS); + Map<String, Map<String, ?>> accOut = getMapByKey(accBoltStats, SID_TO_OUT_STATS); + Map<List<String>, Map<String, ?>> boltIn = getMapByKey(boltStats, CID_SID_TO_IN_STATS); + Map<String, Map<String, ?>> boltOut = getMapByKey(boltStats, SID_TO_OUT_STATS); - int numExecutors = getByKeywordOr0(accBoltStats, NUM_EXECUTORS).intValue(); + int numExecutors = getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue(); putKV(ret, NUM_EXECUTORS, numExecutors + 1); putKV(ret, NUM_TASKS, sumOr0( - getByKeywordOr0(accBoltStats, NUM_TASKS), getByKeywordOr0(boltStats, NUM_TASKS))); + getByKeyOr0(accBoltStats, NUM_TASKS), getByKeyOr0(boltStats, NUM_TASKS))); // (merge-with (partial merge-with sum-or-0) acc-out spout-out) putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, boltOut)); + // {component id -> metric -> value}, note that input may contain both long and double values putKV(ret, CID_SID_TO_IN_STATS, fullMergeWithSum(accIn, boltIn)); long executed = sumStreamsLong(boltIn, EXECUTED); putKV(ret, EXECUTED, executed); - Map executorStats = new HashMap(); - putKV(executorStats, EXECUTOR_ID, getByKey(boltStats, EXECUTOR_ID)); - putKV(executorStats, UPTIME, getByKey(boltStats, UPTIME)); - putKV(executorStats, HOST, getByKey(boltStats, HOST)); - putKV(executorStats, PORT, getByKey(boltStats, PORT)); - putKV(executorStats, CAPACITY, getByKey(boltStats, CAPACITY)); + Map<String, Object> executorStats = new HashMap<>(); + putKV(executorStats, EXECUTOR_ID, boltStats.get(EXECUTOR_ID)); + putKV(executorStats, UPTIME, boltStats.get(UPTIME)); + putKV(executorStats, HOST, boltStats.get(HOST)); + putKV(executorStats, PORT, boltStats.get(PORT)); + putKV(executorStats, CAPACITY, boltStats.get(CAPACITY)); putKV(executorStats, EMITTED, sumStreamsLong(boltOut, EMITTED)); putKV(executorStats, TRANSFERRED, sumStreamsLong(boltOut, TRANSFERRED)); @@ -377,16 +395,18 @@ public class StatsUtil { return ret; } - public static Map mergeAggCompStatsCompPageSpout(Map accSpoutStats, Map spoutStats) { - Map ret = new HashMap(); + public static Map<String, Object> mergeAggCompStatsCompPageSpout( + Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) { + Map<String, Object> ret = new HashMap<>(); - Map accOut = getMapByKey(accSpoutStats, SID_TO_OUT_STATS); - Map spoutOut = getMapByKey(spoutStats, SID_TO_OUT_STATS); + // {stream id -> metric -> value}, note that sid->out-stats may contain both long and double values + Map<String, Map<String, ?>> accOut = getMapByKey(accSpoutStats, SID_TO_OUT_STATS); + Map<String, Map<String, ?>> spoutOut = getMapByKey(spoutStats, SID_TO_OUT_STATS); - int numExecutors = getByKeywordOr0(accSpoutStats, NUM_EXECUTORS).intValue(); + int numExecutors = getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue(); putKV(ret, NUM_EXECUTORS, numExecutors + 1); putKV(ret, NUM_TASKS, sumOr0( - getByKeywordOr0(accSpoutStats, NUM_TASKS), getByKeywordOr0(spoutStats, NUM_TASKS))); + getByKeyOr0(accSpoutStats, NUM_TASKS), getByKeyOr0(spoutStats, NUM_TASKS))); putKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, spoutOut)); Map executorStats = new HashMap(); @@ -412,48 +432,50 @@ public class StatsUtil { return ret; } - public static Map mergeAggCompStatsTopoPageBolt(Map accBoltStats, Map boltStats) { - Map ret = new HashMap(); - Integer numExecutors = getByKeywordOr0(accBoltStats, NUM_EXECUTORS).intValue(); + public static Map<String, Object> mergeAggCompStatsTopoPageBolt(Map<String, Object> accBoltStats, Map<String, Object> boltStats) { + Map<String, Object> ret = new HashMap<>(); + + Integer numExecutors = getByKeyOr0(accBoltStats, NUM_EXECUTORS).intValue(); putKV(ret, NUM_EXECUTORS, numExecutors + 1); - putKV(ret, NUM_TASKS, sumOr0( - getByKeywordOr0(accBoltStats, NUM_TASKS), getByKeywordOr0(boltStats, NUM_TASKS))); - putKV(ret, EMITTED, sumOr0( - getByKeywordOr0(accBoltStats, EMITTED), getByKeywordOr0(boltStats, EMITTED))); - putKV(ret, TRANSFERRED, sumOr0( - getByKeywordOr0(accBoltStats, TRANSFERRED), getByKeywordOr0(boltStats, TRANSFERRED))); - putKV(ret, EXEC_LAT_TOTAL, sumOr0( - getByKeywordOr0(accBoltStats, EXEC_LAT_TOTAL), getByKeywordOr0(boltStats, EXEC_LAT_TOTAL))); - putKV(ret, PROC_LAT_TOTAL, sumOr0( - getByKeywordOr0(accBoltStats, PROC_LAT_TOTAL), getByKeywordOr0(boltStats, PROC_LAT_TOTAL))); - putKV(ret, EXECUTED, sumOr0( - getByKeywordOr0(accBoltStats, EXECUTED), getByKeywordOr0(boltStats, EXECUTED))); - putKV(ret, ACKED, sumOr0( - getByKeywordOr0(accBoltStats, ACKED), getByKeywordOr0(boltStats, ACKED))); - putKV(ret, FAILED, sumOr0( - getByKeywordOr0(accBoltStats, FAILED), getByKeywordOr0(boltStats, FAILED))); - putKV(ret, CAPACITY, maxOr0( - getByKeywordOr0(accBoltStats, CAPACITY), getByKeywordOr0(boltStats, CAPACITY))); + putKV(ret, NUM_TASKS, + sumOr0(getByKeyOr0(accBoltStats, NUM_TASKS), getByKeyOr0(boltStats, NUM_TASKS))); + putKV(ret, EMITTED, + sumOr0(getByKeyOr0(accBoltStats, EMITTED), getByKeyOr0(boltStats, EMITTED))); + putKV(ret, TRANSFERRED, + sumOr0(getByKeyOr0(accBoltStats, TRANSFERRED), getByKeyOr0(boltStats, TRANSFERRED))); + putKV(ret, EXEC_LAT_TOTAL, + sumOr0(getByKeyOr0(accBoltStats, EXEC_LAT_TOTAL), getByKeyOr0(boltStats, EXEC_LAT_TOTAL))); + putKV(ret, PROC_LAT_TOTAL, + sumOr0(getByKeyOr0(accBoltStats, PROC_LAT_TOTAL), getByKeyOr0(boltStats, PROC_LAT_TOTAL))); + putKV(ret, EXECUTED, + sumOr0(getByKeyOr0(accBoltStats, EXECUTED), getByKeyOr0(boltStats, EXECUTED))); + putKV(ret, ACKED, + sumOr0(getByKeyOr0(accBoltStats, ACKED), getByKeyOr0(boltStats, ACKED))); + putKV(ret, FAILED, + sumOr0(getByKeyOr0(accBoltStats, FAILED), getByKeyOr0(boltStats, FAILED))); + putKV(ret, CAPACITY, + maxOr0(getByKeyOr0(accBoltStats, CAPACITY), getByKeyOr0(boltStats, CAPACITY))); return ret; } - public static Map mergeAggCompStatsTopoPageSpout(Map accSpoutStats, Map spoutStats) { - Map ret = new HashMap(); - Integer numExecutors = getByKeywordOr0(accSpoutStats, NUM_EXECUTORS).intValue(); + public static Map<String, Object> mergeAggCompStatsTopoPageSpout(Map<String, Object> accSpoutStats, Map<String, Object> spoutStats) { + Map<String, Object> ret = new HashMap<>(); + + Integer numExecutors = getByKeyOr0(accSpoutStats, NUM_EXECUTORS).intValue(); putKV(ret, NUM_EXECUTORS, numExecutors + 1); - putKV(ret, NUM_TASKS, sumOr0( - getByKeywordOr0(accSpoutStats, NUM_TASKS), getByKeywordOr0(spoutStats, NUM_TASKS))); - putKV(ret, EMITTED, sumOr0( - getByKeywordOr0(accSpoutStats, EMITTED), getByKeywordOr0(spoutStats, EMITTED))); - putKV(ret, TRANSFERRED, sumOr0( - getByKeywordOr0(accSpoutStats, TRANSFERRED), getByKeywordOr0(spoutStats, TRANSFERRED))); - putKV(ret, COMP_LAT_TOTAL, sumOr0( - getByKeywordOr0(accSpoutStats, COMP_LAT_TOTAL), getByKeywordOr0(spoutStats, COMP_LAT_TOTAL))); - putKV(ret, ACKED, sumOr0( - getByKeywordOr0(accSpoutStats, ACKED), getByKeywordOr0(spoutStats, ACKED))); - putKV(ret, FAILED, sumOr0( - getByKeywordOr0(accSpoutStats, FAILED), getByKeywordOr0(spoutStats, FAILED))); + putKV(ret, NUM_TASKS, + sumOr0(getByKeyOr0(accSpoutStats, NUM_TASKS), getByKeyOr0(spoutStats, NUM_TASKS))); + putKV(ret, EMITTED, + sumOr0(getByKeyOr0(accSpoutStats, EMITTED), getByKeyOr0(spoutStats, EMITTED))); + putKV(ret, TRANSFERRED, + sumOr0(getByKeyOr0(accSpoutStats, TRANSFERRED), getByKeyOr0(spoutStats, TRANSFERRED))); + putKV(ret, COMP_LAT_TOTAL, + sumOr0(getByKeyOr0(accSpoutStats, COMP_LAT_TOTAL), getByKeyOr0(spoutStats, COMP_LAT_TOTAL))); + putKV(ret, ACKED, + sumOr0(getByKeyOr0(accSpoutStats, ACKED), getByKeyOr0(spoutStats, ACKED))); + putKV(ret, FAILED, + sumOr0(getByKeyOr0(accSpoutStats, FAILED), getByKeyOr0(spoutStats, FAILED))); return ret; } @@ -462,10 +484,11 @@ public class StatsUtil { * A helper function that does the common work to aggregate stats of one * executor with the given map for the topology page. */ - public static Map aggTopoExecStats(String window, boolean includeSys, Map accStats, Map newData, String compType) { - Map ret = new HashMap(); + public static Map<String, Object> aggTopoExecStats( + String window, boolean includeSys, Map<String, Object> accStats, Map<String, Object> newData, String compType) { + Map<String, Object> ret = new HashMap<>(); - Set workerSet = (Set) getByKey(accStats, WORKERS_SET); + Set workerSet = (Set) accStats.get(WORKERS_SET); Map bolt2stats = getMapByKey(accStats, BOLT_TO_STATS); Map spout2stats = getMapByKey(accStats, SPOUT_TO_STATS); Map win2emitted = getMapByKey(accStats, WIN_TO_EMITTED); @@ -473,16 +496,17 @@ public class StatsUtil { Map win2compLatWgtAvg = getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG); Map win2acked = getMapByKey(accStats, WIN_TO_ACKED); Map win2failed = getMapByKey(accStats, WIN_TO_FAILED); - Map stats = getMapByKey(newData, STATS); boolean isSpout = compType.equals(SPOUT); - Map cid2stat2num; + // component id -> stats + Map<String, Object> cid2stats; if (isSpout) { - cid2stat2num = aggPreMergeTopoPageSpout(newData, window, includeSys); + cid2stats = aggPreMergeTopoPageSpout(newData, window, includeSys); } else { - cid2stat2num = aggPreMergeTopoPageBolt(newData, window, includeSys); + cid2stats = aggPreMergeTopoPageBolt(newData, window, includeSys); } + Map stats = getMapByKey(newData, STATS); Map w2compLatWgtAvg, w2acked; Map compLatStats = getMapByKey(stats, COMP_LATENCIES); if (isSpout) { // agg spout stats @@ -504,38 +528,38 @@ public class StatsUtil { putKV(ret, WORKERS_SET, workerSet); putKV(ret, BOLT_TO_STATS, bolt2stats); putKV(ret, SPOUT_TO_STATS, spout2stats); - putKV(ret, WIN_TO_EMITTED, mergeWithSum(win2emitted, aggregateCountStreams( + putKV(ret, WIN_TO_EMITTED, mergeWithSumLong(win2emitted, aggregateCountStreams( filterSysStreams(getMapByKey(stats, EMITTED), includeSys)))); - putKV(ret, WIN_TO_TRANSFERRED, mergeWithSum(win2transferred, aggregateCountStreams( + putKV(ret, WIN_TO_TRANSFERRED, mergeWithSumLong(win2transferred, aggregateCountStreams( filterSysStreams(getMapByKey(stats, TRANSFERRED), includeSys)))); - putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(win2compLatWgtAvg, w2compLatWgtAvg)); + putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSumDouble(win2compLatWgtAvg, w2compLatWgtAvg)); //boolean isSpoutStat = SPOUT.equals(((Keyword) getByKey(stats, TYPE)).getName()); - putKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSum(win2acked, w2acked) : win2acked); + putKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSumLong(win2acked, w2acked) : win2acked); putKV(ret, WIN_TO_FAILED, isSpout ? - mergeWithSum(aggregateCountStreams(getMapByKey(stats, FAILED)), win2failed) : win2failed); + mergeWithSumLong(aggregateCountStreams(getMapByKey(stats, FAILED)), win2failed) : win2failed); putKV(ret, TYPE, getByKey(stats, TYPE)); // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats comp-key) cid->statk->num) // (acc-stats comp-key) ==> bolt2stats/spout2stats if (isSpout) { - Set<Object> keySet = new HashSet<>(); + Set<String> keySet = new HashSet<>(); keySet.addAll(spout2stats.keySet()); - keySet.addAll(cid2stat2num.keySet()); + keySet.addAll(cid2stats.keySet()); Map mm = new HashMap(); - for (Object k : keySet) { - mm.put(k, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(k), (Map) cid2stat2num.get(k))); + for (String k : keySet) { + mm.put(k, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(k), (Map) cid2stats.get(k))); } putKV(ret, SPOUT_TO_STATS, mm); } else { - Set<Object> keySet = new HashSet<>(); + Set<String> keySet = new HashSet<>(); keySet.addAll(bolt2stats.keySet()); - keySet.addAll(cid2stat2num.keySet()); + keySet.addAll(cid2stats.keySet()); Map mm = new HashMap(); - for (Object k : keySet) { - mm.put(k, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(k), (Map) cid2stat2num.get(k))); + for (String k : keySet) { + mm.put(k, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(k), (Map) cid2stats.get(k))); } putKV(ret, BOLT_TO_STATS, mm); } @@ -543,18 +567,30 @@ public class StatsUtil { return ret; } + /** + * aggregate topo executors stats + * TODO: change clojure maps to java HashMap's when nimbus.clj is translated to java + * + * @param topologyId topology id + * @param exec2nodePort executor -> host+port, note it's a clojure map + * @param task2component task -> component, note it's a clojure map + * @param beats executor[start, end] -> executor heartbeat, note it's a java HashMap + * @param topology storm topology + * @param window the window to be aggregated + * @param includeSys whether to include system streams + * @param clusterState cluster state + * @return TopologyPageInfo thrift structure + */ public static TopologyPageInfo aggTopoExecsStats( - String topologyId, Map exec2nodePort, Map task2component, - Map beats, StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) { - List beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); - Map topoStats = aggregateTopoStats(window, includeSys, beatList); - topoStats = postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState); - - return thriftifyTopoPageData(topologyId, topoStats); + String topologyId, Map exec2nodePort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, + StormTopology topology, String window, boolean includeSys, IStormClusterState clusterState) { + List<Map<String, Object>> beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); + Map<String, Object> topoStats = aggregateTopoStats(window, includeSys, beatList); + return postAggregateTopoStats(task2component, exec2nodePort, topoStats, topologyId, clusterState); } - public static Map aggregateTopoStats(String win, boolean includeSys, List data) { - Map initVal = new HashMap(); + public static Map<String, Object> aggregateTopoStats(String win, boolean includeSys, List<Map<String, Object>> heartbeats) { + Map<String, Object> initVal = new HashMap<>(); putKV(initVal, WORKERS_SET, new HashSet()); putKV(initVal, BOLT_TO_STATS, new HashMap()); putKV(initVal, SPOUT_TO_STATS, new HashMap()); @@ -564,68 +600,72 @@ public class StatsUtil { putKV(initVal, WIN_TO_ACKED, new HashMap()); putKV(initVal, WIN_TO_FAILED, new HashMap()); - for (Object o : data) { - Map newData = (Map) o; - String compType = ((Keyword) getByKey(newData, TYPE)).getName(); - initVal = aggTopoExecStats(win, includeSys, initVal, newData, compType); + for (Map<String, Object> heartbeat : heartbeats) { + String compType = (String) getByKey(heartbeat, TYPE); + initVal = aggTopoExecStats(win, includeSys, initVal, heartbeat, compType); } return initVal; } - public static Map postAggregateTopoStats( - Map task2comp, Map exec2nodePort, Map accData, String topologyId, IStormClusterState clusterState) { - Map ret = new HashMap(); - putKV(ret, NUM_TASKS, task2comp.size()); - putKV(ret, NUM_WORKERS, ((Set) getByKey(accData, WORKERS_SET)).size()); - putKV(ret, NUM_EXECUTORS, exec2nodePort != null ? exec2nodePort.size() : 0); + public static TopologyPageInfo postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map<String, Object> accData, + String topologyId, IStormClusterState clusterState) { + TopologyPageInfo ret = new TopologyPageInfo(topologyId); + + ret.set_num_tasks(task2comp.size()); + ret.set_num_workers(((Set) getByKey(accData, WORKERS_SET)).size()); + ret.set_num_executors(exec2nodePort != null ? exec2nodePort.size() : 0); Map bolt2stats = getMapByKey(accData, BOLT_TO_STATS); - Map aggBolt2stats = new HashMap(); + Map<String, ComponentAggregateStats> aggBolt2stats = new HashMap<>(); for (Object o : bolt2stats.entrySet()) { Map.Entry e = (Map.Entry) o; String id = (String) e.getKey(); Map m = (Map) e.getValue(); - long executed = getByKeywordOr0(m, EXECUTED).longValue(); + long executed = getByKeyOr0(m, EXECUTED).longValue(); if (executed > 0) { - double execLatencyTotal = getByKeywordOr0(m, EXEC_LAT_TOTAL).doubleValue(); + double execLatencyTotal = getByKeyOr0(m, EXEC_LAT_TOTAL).doubleValue(); putKV(m, EXEC_LATENCY, execLatencyTotal / executed); - double procLatencyTotal = getByKeywordOr0(m, PROC_LAT_TOTAL).doubleValue(); + double procLatencyTotal = getByKeyOr0(m, PROC_LAT_TOTAL).doubleValue(); putKV(m, PROC_LATENCY, procLatencyTotal / executed); } remove(m, EXEC_LAT_TOTAL); remove(m, PROC_LAT_TOTAL); putKV(m, "last-error", getLastError(clusterState, topologyId, id)); - aggBolt2stats.put(id, m); + aggBolt2stats.put(id, thriftifyBoltAggStats(m)); } - putKV(ret, BOLT_TO_STATS, aggBolt2stats); Map spout2stats = getMapByKey(accData, SPOUT_TO_STATS); - Map spoutBolt2stats = new HashMap(); + Map<String, ComponentAggregateStats> aggSpout2stats = new HashMap<>(); for (Object o : spout2stats.entrySet()) { Map.Entry e = (Map.Entry) o; String id = (String) e.getKey(); Map m = (Map) e.getValue(); - long acked = getByKeywordOr0(m, ACKED).longValue(); + long acked = getByKeyOr0(m, ACKED).longValue(); if (acked > 0) { - double compLatencyTotal = getByKeywordOr0(m, COMP_LAT_TOTAL).doubleValue(); + double compLatencyTotal = getByKeyOr0(m, COMP_LAT_TOTAL).doubleValue(); putKV(m, COMP_LATENCY, compLatencyTotal / acked); } remove(m, COMP_LAT_TOTAL); putKV(m, "last-error", getLastError(clusterState, topologyId, id)); - spoutBolt2stats.put(id, m); + aggSpout2stats.put(id, thriftifySpoutAggStats(m)); } - putKV(ret, SPOUT_TO_STATS, spoutBolt2stats); - putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, WIN_TO_EMITTED))); - putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, WIN_TO_TRANSFERRED))); - putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, WIN_TO_ACKED))); - putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, WIN_TO_FAILED))); - putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow( + TopologyStats topologyStats = new TopologyStats(); + topologyStats.set_window_to_acked(mapKeyStr(getMapByKey(accData, WIN_TO_ACKED))); + topologyStats.set_window_to_emitted(mapKeyStr(getMapByKey(accData, WIN_TO_EMITTED))); + topologyStats.set_window_to_failed(mapKeyStr(getMapByKey(accData, WIN_TO_FAILED))); + topologyStats.set_window_to_transferred(mapKeyStr(getMapByKey(accData, WIN_TO_TRANSFERRED))); + topologyStats.set_window_to_complete_latencies_ms(computeWeightedAveragesPerWindow( accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED)); + + ret.set_topology_stats(topologyStats); + ret.set_id_to_spout_agg_stats(aggSpout2stats); + ret.set_id_to_bolt_agg_stats(aggBolt2stats); + return ret; } @@ -636,17 +676,19 @@ public class StatsUtil { * @param includeSys whether to include system streams * @return aggregated bolt stats */ - public static Map aggregateBoltStats(List statsSeq, boolean includeSys) { - Map ret = new HashMap(); - - Map commonStats = preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys); - List acked = new ArrayList(); - List failed = new ArrayList(); - List executed = new ArrayList(); - List processLatencies = new ArrayList(); - List executeLatencies = new ArrayList(); - for (Object o : statsSeq) { - ExecutorStats stat = (ExecutorStats) o; + public static <T> Map<String, Map> aggregateBoltStats(List<ExecutorSummary> statsSeq, boolean includeSys) { + Map<String, Map> ret = new HashMap<>(); + + Map<String, Map<String, Map<T, Long>>> commonStats = aggregateCommonStats(statsSeq); + commonStats = preProcessStreamSummary(commonStats, includeSys); + + List<Map<String, Map<GlobalStreamId, Long>>> acked = new ArrayList<>(); + List<Map<String, Map<GlobalStreamId, Long>>> failed = new ArrayList<>(); + List<Map<String, Map<GlobalStreamId, Long>>> executed = new ArrayList<>(); + List<Map<String, Map<GlobalStreamId, Double>>> processLatencies = new ArrayList<>(); + List<Map<String, Map<GlobalStreamId, Double>>> executeLatencies = new ArrayList<>(); + for (ExecutorSummary summary : statsSeq) { + ExecutorStats stat = summary.get_stats(); acked.add(stat.get_specific().get_bolt().get_acked()); failed.add(stat.get_specific().get_bolt().get_failed()); executed.add(stat.get_specific().get_bolt().get_executed()); @@ -670,20 +712,23 @@ public class StatsUtil { * @param includeSys whether to include system streams * @return aggregated spout stats */ - public static Map aggregateSpoutStats(List statsSeq, boolean includeSys) { - Map ret = new HashMap(); - - Map commonStats = preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys); - List acked = new ArrayList(); - List failed = new ArrayList(); - List completeLatencies = new ArrayList(); - for (Object o : statsSeq) { - ExecutorStats stat = (ExecutorStats) o; - acked.add(stat.get_specific().get_spout().get_acked()); - failed.add(stat.get_specific().get_spout().get_failed()); - completeLatencies.add(stat.get_specific().get_spout().get_complete_ms_avg()); - } - mergeMaps(ret, commonStats); + public static Map<String, Map> aggregateSpoutStats(List<ExecutorSummary> statsSeq, boolean includeSys) { + // actually Map<String, Map<String, Map<String, Long/Double>>> + Map<String, Map> ret = new HashMap<>(); + + Map<String, Map<String, Map<String, Long>>> commonStats = aggregateCommonStats(statsSeq); + commonStats = preProcessStreamSummary(commonStats, includeSys); + + List<Map<String, Map<String, Long>>> acked = new ArrayList<>(); + List<Map<String, Map<String, Long>>> failed = new ArrayList<>(); + List<Map<String, Map<String, Double>>> completeLatencies = new ArrayList<>(); + for (ExecutorSummary summary : statsSeq) { + ExecutorStats stats = summary.get_stats(); + acked.add(stats.get_specific().get_spout().get_acked()); + failed.add(stats.get_specific().get_spout().get_failed()); + completeLatencies.add(stats.get_specific().get_spout().get_complete_ms_avg()); + } + ret.putAll(commonStats); putKV(ret, ACKED, aggregateCounts(acked)); putKV(ret, FAILED, aggregateCounts(failed)); putKV(ret, COMP_LATENCIES, aggregateAverages(completeLatencies, acked)); @@ -691,25 +736,25 @@ public class StatsUtil { return ret; } - public static Map aggregateCommonStats(List statsSeq) { - Map ret = new HashMap(); + public static <T> Map<String, Map<String, Map<T, Long>>> aggregateCommonStats(List<ExecutorSummary> statsSeq) { + Map<String, Map<String, Map<T, Long>>> ret = new HashMap<>(); - List emitted = new ArrayList(); - List transferred = new ArrayList(); - for (Object o : statsSeq) { - ExecutorStats stat = (ExecutorStats) o; - emitted.add(stat.get_emitted()); - transferred.add(stat.get_transferred()); + List<Map<String, Map<String, Long>>> emitted = new ArrayList<>(); + List<Map<String, Map<String, Long>>> transferred = new ArrayList<>(); + for (ExecutorSummary summ : statsSeq) { + emitted.add(summ.get_stats().get_emitted()); + transferred.add(summ.get_stats().get_transferred()); } - putKV(ret, EMITTED, aggregateCounts(emitted)); putKV(ret, TRANSFERRED, aggregateCounts(transferred)); + return ret; } - public static Map preProcessStreamSummary(Map streamSummary, boolean includeSys) { - Map emitted = getMapByKey(streamSummary, EMITTED); - Map transferred = getMapByKey(streamSummary, TRANSFERRED); + public static <T> Map<String, Map<String, Map<T, Long>>> preProcessStreamSummary( + Map<String, Map<String, Map<T, Long>>> streamSummary, boolean includeSys) { + Map<String, Map<T, Long>> emitted = getMapByKey(streamSummary, EMITTED); + Map<String, Map<T, Long>> transferred = getMapByKey(streamSummary, TRANSFERRED); putKV(streamSummary, EMITTED, filterSysStreams(emitted, includeSys)); putKV(streamSummary, TRANSFERRED, filterSysStreams(transferred, includeSys)); @@ -717,32 +762,32 @@ public class StatsUtil { return streamSummary; } - public static Map aggregateCountStreams(Map stats) { - Map ret = new HashMap(); - for (Object o : stats.entrySet()) { - Map.Entry entry = (Map.Entry) o; - Map value = (Map) entry.getValue(); + public static <K, V extends Number> Map<String, Long> aggregateCountStreams( + Map<String, Map<K, V>> stats) { + Map<String, Long> ret = new HashMap<>(); + for (Map.Entry<String, Map<K, V>> entry : stats.entrySet()) { + Map<K, V> value = entry.getValue(); long sum = 0l; - for (Object num : value.values()) { - sum += ((Number) num).longValue(); + for (V num : value.values()) { + sum += num.longValue(); } ret.put(entry.getKey(), sum); } return ret; } - public static Map aggregateAverages(List avgSeq, List countSeq) { - Map ret = new HashMap(); + public static <K> Map<String, Map<K, Double>> aggregateAverages(List<Map<String, Map<K, Double>>> avgSeq, + List<Map<String, Map<K, Long>>> countSeq) { + Map<String, Map<K, Double>> ret = new HashMap<>(); - Map expands = expandAveragesSeq(avgSeq, countSeq); - for (Object o : expands.entrySet()) { - Map.Entry entry = (Map.Entry) o; - Object k = entry.getKey(); + Map<String, Map<K, List>> expands = expandAveragesSeq(avgSeq, countSeq); + for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) { + String k = entry.getKey(); - Map tmp = new HashMap(); - Map inner = (Map) entry.getValue(); - for (Object kk : inner.keySet()) { - List vv = (List) inner.get(kk); + Map<K, Double> tmp = new HashMap<>(); + Map<K, List> inner = entry.getValue(); + for (K kk : inner.keySet()) { + List vv = inner.get(kk); tmp.put(kk, valAvg(((Number) vv.get(0)).doubleValue(), ((Number) vv.get(1)).longValue())); } ret.put(k, tmp); @@ -751,19 +796,19 @@ public class StatsUtil { return ret; } - public static Map aggregateAvgStreams(Map avgs, Map counts) { - Map ret = new HashMap(); + public static <K> Map<String, Double> aggregateAvgStreams( + Map<String, Map<K, Double>> avgs, Map<String, Map<K, Long>> counts) { + Map<String, Double> ret = new HashMap<>(); - Map expands = expandAverages(avgs, counts); - for (Object o : expands.entrySet()) { - Map.Entry e = (Map.Entry) o; - Object win = e.getKey(); + Map<String, Map<K, List>> expands = expandAverages(avgs, counts); + for (Map.Entry<String, Map<K, List>> entry : expands.entrySet()) { + String win = entry.getKey(); double avgTotal = 0.0; long cntTotal = 0l; - Map inner = (Map) e.getValue(); - for (Object kk : inner.keySet()) { - List vv = (List) inner.get(kk); + Map<K, List> inner = entry.getValue(); + for (K kk : inner.keySet()) { + List vv = inner.get(kk); avgTotal += ((Number) vv.get(0)).doubleValue(); cntTotal += ((Number) vv.get(1)).longValue(); } @@ -773,18 +818,25 @@ public class StatsUtil { return ret; } - public static Map spoutStreamsStats(List summs, boolean includeSys) { - List statsSeq = getFilledStats(summs); + public static Map<String, Map> spoutStreamsStats(List<ExecutorSummary> summs, boolean includeSys) { + if (summs == null) { + return new HashMap<>(); + } + List<ExecutorSummary> statsSeq = getFilledStats(summs); return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, includeSys)); } - public static Map boltStreamsStats(List summs, boolean includeSys) { - List statsSeq = getFilledStats(summs); + public static Map<String, Map> boltStreamsStats(List<ExecutorSummary> summs, boolean includeSys) { + if (summs == null) { + return new HashMap<>(); + } + List<ExecutorSummary> statsSeq = getFilledStats(summs); return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys)); } - public static Map aggregateSpoutStreams(Map stats) { - Map ret = new HashMap(); + public static Map<String, Map> aggregateSpoutStreams(Map<String, Map> stats) { + // actual ret is Map<String, Map<String, Long/Double>> + Map<String, Map> ret = new HashMap<>(); putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED))); putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED))); putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, EMITTED))); @@ -794,8 +846,8 @@ public class StatsUtil { return ret; } - public static Map aggregateBoltStreams(Map stats) { - Map ret = new HashMap(); + public static Map<String, Map> aggregateBoltStreams(Map<String, Map> stats) { + Map<String, Map> ret = new HashMap<>(); putKV(ret, ACKED, aggregateCountStreams(getMapByKey(stats, ACKED))); putKV(ret, FAILED, aggregateCountStreams(getMapByKey(stats, FAILED))); putKV(ret, EMITTED, aggregateCountStreams(getMapByKey(stats, EMITTED))); @@ -811,41 +863,42 @@ public class StatsUtil { /** * A helper function that aggregates windowed stats from one spout executor. */ - public static Map aggBoltExecWinStats(Map accStats, Map newStats, boolean includeSys) { - Map ret = new HashMap(); + public static Map<String, Object> aggBoltExecWinStats( + Map<String, Object> accStats, Map<String, Object> newStats, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); - Map m = new HashMap(); + Map<String, Map<String, Number>> m = new HashMap<>(); for (Object win : getMapByKey(newStats, EXECUTED).keySet()) { - m.put(win, aggBoltLatAndCount( + m.put((String) win, aggBoltLatAndCount( (Map) (getMapByKey(newStats, EXEC_LATENCIES)).get(win), (Map) (getMapByKey(newStats, PROC_LATENCIES)).get(win), (Map) (getMapByKey(newStats, EXECUTED)).get(win))); } m = swapMapOrder(m); - Map win2execLatWgtAvg = getMapByKey(m, EXEC_LAT_TOTAL); - Map win2procLatWgtAvg = getMapByKey(m, PROC_LAT_TOTAL); - Map win2executed = getMapByKey(m, EXECUTED); + Map<String, Double> win2execLatWgtAvg = getMapByKey(m, EXEC_LAT_TOTAL); + Map<String, Double> win2procLatWgtAvg = getMapByKey(m, PROC_LAT_TOTAL); + Map<String, Long> win2executed = getMapByKey(m, EXECUTED); - Map emitted = getMapByKey(newStats, EMITTED); - emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)), + Map<String, Map<String, Long>> emitted = getMapByKey(newStats, EMITTED); + Map<String, Long> win2emitted = mergeWithSumLong(aggregateCountStreams(filterSysStreams(emitted, includeSys)), getMapByKey(accStats, WIN_TO_EMITTED)); - putKV(ret, WIN_TO_EMITTED, emitted); + putKV(ret, WIN_TO_EMITTED, win2emitted); - Map transferred = getMapByKey(newStats, TRANSFERRED); - transferred = mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)), + Map<String, Map<String, Long>> transferred = getMapByKey(newStats, TRANSFERRED); + Map<String, Long> win2transferred = mergeWithSumLong(aggregateCountStreams(filterSysStreams(transferred, includeSys)), getMapByKey(accStats, WIN_TO_TRANSFERRED)); - putKV(ret, WIN_TO_TRANSFERRED, transferred); + putKV(ret, WIN_TO_TRANSFERRED, win2transferred); - putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSum( + putKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSumDouble( getMapByKey(accStats, WIN_TO_EXEC_LAT_WGT_AVG), win2execLatWgtAvg)); - putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSum( + putKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSumDouble( getMapByKey(accStats, WIN_TO_PROC_LAT_WGT_AVG), win2procLatWgtAvg)); - putKV(ret, WIN_TO_EXECUTED, mergeWithSum( + putKV(ret, WIN_TO_EXECUTED, mergeWithSumLong( getMapByKey(accStats, WIN_TO_EXECUTED), win2executed)); - putKV(ret, WIN_TO_ACKED, mergeWithSum( + putKV(ret, WIN_TO_ACKED, mergeWithSumLong( aggregateCountStreams(getMapByKey(newStats, ACKED)), getMapByKey(accStats, WIN_TO_ACKED))); - putKV(ret, WIN_TO_FAILED, mergeWithSum( + putKV(ret, WIN_TO_FAILED, mergeWithSumLong( aggregateCountStreams(getMapByKey(newStats, FAILED)), getMapByKey(accStats, WIN_TO_FAILED))); return ret; @@ -854,36 +907,37 @@ public class StatsUtil { /** * A helper function that aggregates windowed stats from one spout executor. */ - public static Map aggSpoutExecWinStats(Map accStats, Map newStats, boolean includeSys) { - Map ret = new HashMap(); + public static Map<String, Object> aggSpoutExecWinStats( + Map<String, Object> accStats, Map<String, Object> beat, boolean includeSys) { + Map<String, Object> ret = new HashMap<>(); - Map m = new HashMap(); - for (Object win : getMapByKey(newStats, ACKED).keySet()) { - m.put(win, aggSpoutLatAndCount( - (Map) (getMapByKey(newStats, COMP_LATENCIES)).get(win), - (Map) (getMapByKey(newStats, ACKED)).get(win))); + Map<String, Map<String, Number>> m = new HashMap<>(); + for (Object win : getMapByKey(beat, ACKED).keySet()) { + m.put((String) win, aggSpoutLatAndCount( + (Map<String, Double>) (getMapByKey(beat, COMP_LATENCIES)).get(win), + (Map<String, Long>) (getMapByKey(beat, ACKED)).get(win))); } m = swapMapOrder(m); - Map win2compLatWgtAvg = getMapByKey(m, COMP_LAT_TOTAL); - Map win2acked = getMapByKey(m, ACKED); + Map<String, Double> win2compLatWgtAvg = getMapByKey(m, COMP_LAT_TOTAL); + Map<String, Long> win2acked = getMapByKey(m, ACKED); - Map emitted = getMapByKey(newStats, EMITTED); - emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)), + Map<String, Map<String, Long>> emitted = getMapByKey(beat, EMITTED); + Map<String, Long> win2emitted = mergeWithSumLong(aggregateCountStreams(filterSysStreams(emitted, includeSys)), getMapByKey(accStats, WIN_TO_EMITTED)); - putKV(ret, WIN_TO_EMITTED, emitted); + putKV(ret, WIN_TO_EMITTED, win2emitted); - Map transferred = getMapByKey(newStats, TRANSFERRED); - transferred = mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)), + Map<String, Map<String, Long>> transferred = getMapByKey(beat, TRANSFERRED); + Map<String, Long> win2transferred = mergeWithSumLong(aggregateCountStreams(filterSysStreams(transferred, includeSys)), getMapByKey(accStats, WIN_TO_TRANSFERRED)); - putKV(ret, WIN_TO_TRANSFERRED, transferred); + putKV(ret, WIN_TO_TRANSFERRED, win2transferred); - putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum( + putKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSumDouble( getMapByKey(accStats, WIN_TO_COMP_LAT_WGT_AVG), win2compLatWgtAvg)); - putKV(ret, WIN_TO_ACKED, mergeWithSum( + putKV(ret, WIN_TO_ACKED, mergeWithSumLong( getMapByKey(accStats, WIN_TO_ACKED), win2acked)); - putKV(ret, WIN_TO_FAILED, mergeWithSum( - aggregateCountStreams(getMapByKey(newStats, FAILED)), getMapByKey(accStats, WIN_TO_FAILED))); + putKV(ret, WIN_TO_FAILED, mergeWithSumLong( + aggregateCountStreams(getMapByKey(beat, FAILED)), getMapByKey(accStats, WIN_TO_FAILED))); return ret; } @@ -894,25 +948,23 @@ public class StatsUtil { * * @param countsSeq a seq of {win -> GlobalStreamId -> value} */ - public static Map aggregateCounts(List countsSeq) { - Map ret = new HashMap(); - for (Object counts : countsSeq) { - for (Object o : ((Map) counts).entrySet()) { - Map.Entry e = (Map.Entry) o; - Object win = e.getKey(); - Map stream2count = (Map) e.getValue(); + public static <T> Map<String, Map<T, Long>> aggregateCounts(List<Map<String, Map<T, Long>>> countsSeq) { + Map<String, Map<T, Long>> ret = new HashMap<>(); + for (Map<String, Map<T, Long>> counts : countsSeq) { + for (Map.Entry<String, Map<T, Long>> entry : counts.entrySet()) { + String win = entry.getKey(); + Map<T, Long> stream2count = entry.getValue(); if (!ret.containsKey(win)) { ret.put(win, stream2count); } else { - Map existing = (Map) ret.get(win); - for (Object oo : stream2count.entrySet()) { - Map.Entry ee = (Map.Entry) oo; - Object stream = ee.getKey(); + Map<T, Long> existing = ret.get(win); + for (Map.Entry<T, Long> subEntry : stream2count.entrySet()) { + T stream = subEntry.getKey(); if (!existing.containsKey(stream)) { - existing.put(stream, ee.getValue()); + existing.put(stream, subEntry.getValue()); } else { - existing.put(stream, (Long) ee.getValue() + (Long) existing.get(stream)); + existing.put(stream, subEntry.getValue() + existing.get(stream)); } } } @@ -921,23 +973,24 @@ public class StatsUtil { return ret; } - public static Map aggregateCompStats(String window, boolean includeSys, List data, String compType) { + public static Map<String, Object> aggregateCompStats(String window, boolean includeSys, + List<Map<String, Object>> beats, String compType) { boolean isSpout = SPOUT.equals(compType); - Map initVal = new HashMap(); + Map<String, Object> initVal = new HashMap<>(); putKV(initVal, WIN_TO_ACKED, new HashMap()); putKV(initVal, WIN_TO_FAILED, new HashMap()); putKV(initVal, WIN_TO_EMITTED, new HashMap()); putKV(initVal, WIN_TO_TRANSFERRED, new HashMap()); - Map stats = new HashMap(); + Map<String, Object> stats = new HashMap(); putKV(stats, EXECUTOR_STATS, new ArrayList()); putKV(stats, SID_TO_OUT_STATS, new HashMap()); if (isSpout) { - putKV(initVal, TYPE, KW_SPOUT); + putKV(initVal, TYPE, SPOUT); putKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap()); } else { - putKV(initVal, TYPE, KW_BOLT); + putKV(initVal, TYPE, BOLT); putKV(initVal, WIN_TO_EXECUTED, new HashMap()); putKV(stats, CID_SID_TO_IN_STATS, new HashMap()); putKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap()); @@ -945,8 +998,8 @@ public class StatsUtil { } putKV(initVal, STATS, stats); - for (Object o : data) { - initVal = aggCompExecStats(window, includeSys, initVal, (Map) o, compType); + for (Map<String, Object> beat : beats) { + initVal = aggCompExecStats(window, includeSys, initVal, beat, compType); } return initVal; @@ -956,41 +1009,50 @@ public class StatsUtil { * Combines the aggregate stats of one executor with the given map, selecting * the appropriate window and including system components as specified. */ - public static Map aggCompExecStats(String window, boolean includeSys, Map accStats, Map newData, String compType) { - Map ret = new HashMap(); + public static Map<String, Object> aggCompExecStats(String window, boolean includeSys, Map<String, Object> accStats, + Map<String, Object> beat, String compType) { + Map<String, Object> ret = new HashMap<>(); if (SPOUT.equals(compType)) { - ret.putAll(aggSpoutExecWinStats(accStats, getMapByKey(newData, STATS), includeSys)); + ret.putAll(aggSpoutExecWinStats(accStats, getMapByKey(beat, STATS), includeSys)); putKV(ret, STATS, mergeAggCompStatsCompPageSpout( getMapByKey(accStats, STATS), - aggPreMergeCompPageSpout(newData, window, includeSys))); + aggPreMergeCompPageSpout(beat, window, includeSys))); } else { - ret.putAll(aggBoltExecWinStats(accStats, getMapByKey(newData, STATS), includeSys)); + ret.putAll(aggBoltExecWinStats(accStats, getMapByKey(beat, STATS), includeSys)); putKV(ret, STATS, mergeAggCompStatsCompPageBolt( getMapByKey(accStats, STATS), - aggPreMergeCompPageBolt(newData, window, includeSys))); + aggPreMergeCompPageBolt(beat, window, includeSys))); } - putKV(ret, TYPE, keyword(compType)); + putKV(ret, TYPE, compType); return ret; } - public static Map postAggregateCompStats(Map task2component, Map exec2hostPort, Map accData) { - Map ret = new HashMap(); + /** + * post aggregate component stats + * + * @param task2component task -> component, note it's a clojure map + * @param exec2hostPort executor -> host+port, note it's a clojure map + * @param compStats accumulated comp stats + * @return + */ + public static Map<String, Object> postAggregateCompStats(Map task2component, Map exec2hostPort, Map<String, Object> compStats) { + Map<String, Object> ret = new HashMap<>(); - String compType = ((Keyword) getByKey(accData, TYPE)).getName(); - Map stats = getMapByKey(accData, STATS); - Integer numTasks = getByKeywordOr0(stats, NUM_TASKS).intValue(); - Integer numExecutors = getByKeywordOr0(stats, NUM_EXECUTORS).intValue(); + String compType = (String) compStats.get(TYPE); + Map stats = getMapByKey(compStats, STATS); + Integer numTasks = getByKeyOr0(stats, NUM_TASKS).intValue(); + Integer numExecutors = getByKeyOr0(stats, NUM_EXECUTORS).intValue(); Map outStats = getMapByKey(stats, SID_TO_OUT_STATS); - putKV(ret, TYPE, keyword(compType)); + putKV(ret, TYPE, compType); putKV(ret, NUM_TASKS, numTasks); putKV(ret, NUM_EXECUTORS, numExecutors); putKV(ret, EXECUTOR_STATS, getByKey(stats, EXECUTOR_STATS)); - putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(accData, WIN_TO_EMITTED))); - putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(accData, WIN_TO_TRANSFERRED))); - putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(accData, WIN_TO_ACKED))); - putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(accData, WIN_TO_FAILED))); + putKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKey(compStats, WIN_TO_EMITTED))); + putKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKey(compStats, WIN_TO_TRANSFERRED))); + putKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKey(compStats, WIN_TO_ACKED))); + putKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKey(compStats, WIN_TO_FAILED))); if (BOLT.equals(compType)) { Map inStats = getMapByKey(stats, CID_SID_TO_IN_STATS); @@ -1000,10 +1062,10 @@ public class StatsUtil { Map.Entry e = (Map.Entry) o; Object k = e.getKey(); Map v = (Map) e.getValue(); - long executed = getByKeywordOr0(v, EXECUTED).longValue(); + long executed = getByKeyOr0(v, EXECUTED).longValue(); if (executed > 0) { - double executeLatencyTotal = getByKeywordOr0(v, EXEC_LAT_TOTAL).doubleValue(); - double processLatencyTotal = getByKeywordOr0(v, PROC_LAT_TOTAL).doubleValue(); + double executeLatencyTotal = getByKeyOr0(v, EXEC_LAT_TOTAL).doubleValue(); + double processLatencyTotal = getByKeyOr0(v, PROC_LAT_TOTAL).doubleValue(); putKV(v, EXEC_LATENCY, executeLatencyTotal / executed); putKV(v, PROC_LATENCY, processLatencyTotal / executed); } else { @@ -1017,20 +1079,20 @@ public class StatsUtil { putKV(ret, CID_SID_TO_IN_STATS, inStats2); putKV(ret, SID_TO_OUT_STATS, outStats); - putKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKey(accData, WIN_TO_EXECUTED))); + putKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKey(compStats, WIN_TO_EXECUTED))); putKV(ret, WIN_TO_EXEC_LAT, computeWeightedAveragesPerWindow( - accData, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED)); + compStats, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED)); putKV(ret, WIN_TO_PROC_LAT, computeWeightedAveragesPerWindow( - accData, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED)); + compStats, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED)); } else { Map outStats2 = new HashMap(); for (Object o : outStats.entrySet()) { Map.Entry e = (Map.Entry) o; Object k = e.getKey(); Map v = (Map) e.getValue(); - long acked = getByKeywordOr0(v, ACKED).longValue(); + long acked = getByKeyOr0(v, ACKED).longValue(); if (acked > 0) { - double compLatencyTotal = getByKeywordOr0(v, COMP_LAT_TOTAL).doubleValue(); + double compLatencyTotal = getByKeyOr0(v, COMP_LAT_TOTAL).doubleValue(); putKV(v, COMP_LATENCY, compLatencyTotal / acked); } else { putKV(v, COMP_LATENCY, 0.0); @@ -1040,60 +1102,103 @@ public class StatsUtil { } putKV(ret, SID_TO_OUT_STATS, outStats2); putKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow( - accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED)); + compStats, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED)); } return ret; } public static ComponentPageInfo aggCompExecsStats( - Map exec2hostPort, Map task2component, Map beats, String window, boolean includeSys, - String topologyId, StormTopology topology, String componentId) { + Map exec2hostPort, Map task2component, Map<List<Integer>, Map<String, Object>> beats, + String window, boolean includeSys, String topologyId, StormTopology topology, String componentId) { - List beatList = extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology, componentId); - Map compStats = aggregateCompStats(window, includeSys, beatList, componentType(topology, componentId).getName()); + List<Map<String, Object>> beatList = + extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology, componentId); + Map<String, Object> compStats = aggregateCompStats(window, includeSys, beatList, componentType(topology, componentId)); compStats = postAggregateCompStats(task2component, exec2hostPort, compStats); return thriftifyCompPageData(topologyId, topology, componentId, compStats); } // ===================================================================================== - // clojurify stats methods + // convert thrift stats to java maps // ===================================================================================== - public static Map clojurifyStats(Map stats) { - Map ret = new HashMap(); - for (Object o : stats.entrySet()) { - Map.Entry entry = (Map.Entry) o; - ExecutorInfo executorInfo = (ExecutorInfo) entry.getKey(); - ExecutorStats executorStats = (ExecutorStats) entry.getValue(); + public static Map<List<Integer>, Map<String, Object>> convertExecutorBeats(Map<ExecutorInfo, ExecutorBeat> beats) { + Map<List<Integer>, Map<String, Object>> ret = new HashMap<>(); + for (Map.Entry<ExecutorInfo, ExecutorBeat> beat : beats.entrySet()) { + ExecutorInfo executorInfo = beat.getKey(); + ExecutorBeat executorBeat = beat.getValue(); + ret.put(Lists.newArrayList(executorInfo.get_task_start(), executorInfo.get_task_end()), + convertZkExecutorHb(executorBeat)); + } + + return ret; + } + + /** + * convert thrift ExecutorBeat into a java HashMap + */ + public static Map<String, Object> convertZkExecutorHb(ExecutorBeat beat) { + Map<String, Object> ret = new HashMap<>(); + if (beat != null) { + ret.put(TIME_SECS, beat.getTimeSecs()); + ret.put(UPTIME, beat.getUptime()); + ret.put(STATS, convertExecutorStats(beat.getStats())); + } + + return ret; + } + + public static Map<String, Object> convertZkWorkerHb(ClusterWorkerHeartbeat workerHb) { + Map<String, Object> ret = new HashMap<>(); + if (workerHb != null) { + ret.put("storm-id", workerHb.get_storm_id()); + ret.put(EXECUTOR_STATS, convertExecutorsStats(workerHb.get_executor_stats())); + ret.put(UPTIME, workerHb.get_uptime_secs()); + ret.put(TIME_SECS, workerHb.get_time_secs()); + } + return ret; + } + + /** + * convert executors stats into a HashMap, note that ExecutorStats are remained unchanged + */ + public static Map<List<Integer>, ExecutorStats> convertExecutorsStats(Map<ExecutorInfo, ExecutorStats> stats) { + Map<List<Integer>, ExecutorStats> ret = new HashMap<>(); + for (Map.Entry<ExecutorInfo, ExecutorStats> entry : stats.entrySet()) { + ExecutorInfo executorInfo = entry.getKey(); + ExecutorStats executorStats = entry.getValue(); ret.put(Lists.newArrayList(executorInfo.get_task_start(), executorInfo.get_task_end()), - clojurifyExecutorStats(executorStats)); + executorStats); } return ret; } - public static Map clojurifyExecutorStats(ExecutorStats stats) { - Map ret = new HashMap(); + /** + * convert thrift ExecutorStats structure into a java HashMap + */ + public static Map<String, Object> convertExecutorStats(ExecutorStats stats) { + Map<String, Object> ret = new HashMap<>(); putKV(ret, EMITTED, stats.get_emitted()); putKV(ret, TRANSFERRED, stats.get_transferred()); - putKV(ret, "rate", stats.get_rate()); + putKV(ret, RATE, stats.get_rate()); if (stats.get_specific().is_set_bolt()) { - mergeMaps(ret, clojurifySpecificStats(stats.get_specific().get_bolt())); - putKV(ret, TYPE, KW_BOLT); + ret.putAll(convertSpecificStats(stats.get_specific().get_bolt())); + putKV(ret, TYPE, BOLT); } else { - mergeMaps(ret, clojurifySpecificStats(stats.get_specific().get_spout())); - putKV(ret, TYPE, KW_SPOUT); + ret.putAll(convertSpecificStats(stats.get_specific().get_spout())); + putKV(ret, TYPE, SPOUT); } return ret; } - public static Map clojurifySpecificStats(SpoutStats stats) { - Map ret = new HashMap(); + private static Map<String, Object> convertSpecificStats(SpoutStats stats) { + Map<String, Object> ret = new HashMap<>(); putKV(ret, ACKED, stats.get_acked()); putKV(ret, FAILED, stats.get_failed()); putKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg()); @@ -1101,8 +1206,8 @@ public class StatsUtil { return ret; } - public static Map clojurifySpecificStats(BoltStats stats) { - Map ret = new HashMap(); + private static Map<String, Object> convertSpecificStats(BoltStats stats) { + Map<String, Object> ret = new HashMap<>(); Map acked = windowSetConverter(stats.get_acked(), FROM_GSID, IDENTITY); Map failed = windowSetConverter(stats.get_failed(), FROM_GSID, IDENTITY); @@ -1119,9 +1224,9 @@ public class StatsUtil { return ret; } - public static List extractNodeInfosFromHbForComp( + public static List<Map<String, Object>> extractNodeInfosFromHbForComp( Map exec2hostPort, Map task2component, boolean includeSys, String compId) { - List ret = new ArrayList(); + List<Map<String, Object>> ret = new ArrayList<>(); Set<List> hostPorts = new HashSet<>(); for (Object o : exec2hostPort.entrySet()) { @@ -1139,7 +1244,7 @@ public class StatsUtil { } for (List hostPort : hostPorts) { - Map m = new HashMap(); + Map<String, Object> m = new HashMap<>(); putKV(m, HOST, hostPort.get(0)); putKV(m, PORT, hostPort.get(1)); ret.add(m); @@ -1148,32 +1253,108 @@ public class StatsUtil { return ret; } + + // ===================================================================================== + // heartbeats related + // ===================================================================================== + + /** + * update all executor heart beats + * TODO: should move this method to nimbus when nimbus.clj is translated + * + * @param cache existing heart beats cache + * @param executorBeats new heart beats + * @param executors all executors + * @param timeout timeout + * @return a HashMap of updated executor heart beats + */ + public static Map<List<Integer>, Object> updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache, + Map<List<Integer>, Map<String, Object>> executorBeats, + Set<List<Integer>> executors, Integer timeout) { + Map<List<Integer>, Object> ret = new HashMap<>(); + if (cache == null && executorBeats == null) { + return ret; + } + + if (cache == null) { + cache = new HashMap<>(); + } + if (executorBeats == null) { + executorBeats = new HashMap<>(); + } + + for (List<Integer> executor : executors) { + ret.put(executor, updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout)); + } + + return ret; + } + + // TODO: should move this method to nimbus when nimbus.clj is translated + public static Map<String, Object> updateExecutorCache( + Map<String, Object> currBeat, Map<String, Object> newBeat, Integer timeout) { + Map<String, Object> ret = new HashMap<>(); + + Integer lastNimbusTime = null, lastReportedTime = null; + if (currBeat != null) { + lastNimbusTime = (Integer) currBeat.get("nimbus-time"); + lastReportedTime = (Integer) currBeat.get("executor-reported-time"); + } + + Integer reportedTime = null; + if (newBeat != null) { + reportedTime = (Integer) newBeat.get(TIME_SECS); + } + + if (reportedTime == null) { + if (lastReportedTime != null) { + reportedTime = lastReportedTime; + } else { + reportedTime = 0; + } + } + + if (lastNimbusTime == null || !reportedTime.equals(lastReportedTime)) { + lastNimbusTime = Time.currentTimeSecs(); + } + + ret.put("is-timed-out", Time.deltaSecs(lastNimbusTime) >= timeout); + ret.put("nimbus-time", lastNimbusTime); + ret.put("executor-reported-time", reportedTime); + ret.put(HEARTBEAT, newBeat); + + return ret; + } + + /** * extracts a list of executor data from heart beats */ - public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map beats, + public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, + Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology) { return extractDataFromHb(executor2hostPort, task2component, beats, includeSys, topology, null); } - public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, Map beats, + public static List<Map<String, Object>> extractDataFromHb(Map executor2hostPort, Map task2component, + Map<List<Integer>, Map<String, Object>> beats, boolean includeSys, StormTopology topology, String compId) { List<Map<String, Object>> ret = new ArrayList<>(); - if (executor2hostPort == null) { + if (executor2hostPort == null || beats == null) { return ret; } for (Object o : executor2hostPort.entrySet()) { Map.Entry entry = (Map.Entry) o; - List key = (List) entry.getKey(); - List value = (List) entry.getValue(); + List executor = (List) entry.getKey(); + List hostPort = (List) entry.getValue(); - Integer start = ((Number) key.get(0)).intValue(); - Integer end = ((Number) key.get(1)).intValue(); + Integer start = ((Number) executor.get(0)).intValue(); + Integer end = ((Number) executor.get(1)).intValue(); - String host = (String) value.get(0); - Integer port = ((Number) value.get(1)).intValue(); + String host = (String) hostPort.get(0); + Integer port = ((Number) hostPort.get(1)).intValue(); - Map beat = (Map) beats.get(key); + Map<String, Object> beat = beats.get(convertExecutor(executor)); if (beat == null) { continue; } @@ -1186,14 +1367,16 @@ public class StatsUtil { putKV(m, NUM_TASKS, end - start + 1); putKV(m, HOST, host); putKV(m, PORT, port); - putKV(m, UPTIME, beat.get(keyword(UPTIME))); - putKV(m, STATS, beat.get(keyword(STATS))); - Keyword type = componentType(topology, compId); + Map stats = getMapByKey(getMapByKey(beat, (HEARTBEAT)), STATS); + putKV(m, UPTIME, getMapByKey(beat, HEARTBEAT).get(UPTIME)); + putKV(m, STATS, stats); + + String type = componentType(topology, compId); if (type != null) { putKV(m, TYPE, type); } else { - putKV(m, TYPE, getByKey(getMapByKey(beat, STATS), TYPE)); + putKV(m, TYPE, stats.get(TYPE)); } ret.add(m); } @@ -1201,8 +1384,9 @@ public class StatsUtil { return ret; } - private static Map computeWeightedAveragesPerWindow(Map accData, String wgtAvgKey, String divisorKey) { - Map ret = new HashMap(); + private static Map<String, Double> computeWeightedAveragesPerWindow(Map<String, Object> accData, + String wgtAvgKey, String divisorKey) { + Map<String, Double> ret = new HashMap<>(); for (Object o : getMapByKey(accData, wgtAvgKey).entrySet()) { Map.Entry e = (Map.Entry) o; Object window = e.getKey(); @@ -1216,16 +1400,31 @@ public class StatsUtil { } + public static Set<List<Integer>> convertExecutors(Set executors) { + Set<List<Integer>> convertedExecutors = new HashSet<>(); + for (Object executor : executors) { + List l = (List) executor; + convertedExecutors.add(convertExecutor(l)); + } + return convertedExecutors; + } + + /** + * convert a clojure executor to java List<Integer> + */ + public static List<Integer> convertExecutor(List executor) { + return Lists.newArrayList(((Number) executor.get(0)).intValue(), ((Number) executor.get(1)).intValue()); + } + /** * computes max bolt capacity * * @param executorSumms a list of ExecutorSummary * @return max bolt capacity */ - public static double computeBoltCapacity(List executorSumms) { + public static double computeBoltCapacity(List<ExecutorSummary> executorSumms) { double max = 0.0; - for (Object o : executorSumms) { - ExecutorSummary summary = (ExecutorSummary) o; + for (ExecutorSummary summary : executorSumms) { double capacity = computeExecutorCapacity(summary); if (capacity > max) { max = capacity; @@ -1234,19 +1433,22 @@ public class StatsUtil { return max; } - public static double computeExecutorCapacity(ExecutorSummary summ) { - ExecutorStats stats = summ.get_stats(); + public static double computeExecutorCapacity(ExecutorSummary summary) { + ExecutorStats stats = summary.get_stats(); if (stats == null) { return 0.0; } else { - Map m = aggregateBoltStats(Lists.newArrayList(stats), true); + // Map<String, Map<String/GlobalStreamId, Long/Double>> {win -> stream -> value} + Map<String, Map> m = aggregateBoltStats(Lists.newArrayList(summary), true); + // {metric -> win -> value} ==> {win -> metric -> value} m = swapMapOrder(aggregateBoltStreams(m)); + // {metric -> value} Map data = getMapByKey(m, TEN_MIN_IN_SECONDS_STR); - int uptime = summ.get_uptime_secs(); + int uptime = summary.get_uptime_secs(); int win = Math.min(uptime, TEN_MIN_IN_SECONDS); - long executed = getByKeywordOr0(data, EXECUTED).longValue(); - double latency = getByKeywordOr0(data, EXEC_LATENCIES).doubleValue(); + long executed = getByKeyOr0(data, EXECUTED).longValue(); + double latency = getByKeyOr0(data, EXEC_LATENCIES).doubleValue(); if (win > 0) { return executed * latency / (1000 * win); } @@ -1260,35 +1462,33 @@ public class StatsUtil { * @param summs a list of ExecutorSummary * @return filtered summs */ - public static List getFilledStats(List summs) { - for (Iterator itr = summs.iterator(); itr.hasNext(); ) { - ExecutorSummary summ = (ExecutorSummary) itr.next(); - if (summ.get_stats() == null) { - itr.remove(); + public static List<ExecutorSummary> getFilledStats(List<ExecutorSummary> summs) { + List<ExecutorSummary> ret = new ArrayList<>(); + for (ExecutorSummary summ : summs) { + if (summ.get_stats() != null) { + ret.add(summ); } } - return summs; + return ret; } - private static Map mapKeyStr(Map m) { - Map ret = new HashMap(); - for (Object k : m.keySet()) { - ret.put(k.toString(), m.get(k)); + private static <K, V> Map<String, V> mapKeyStr(Map<K, V> m) { + Map<String, V> ret = new HashMap<>(); + for (Map.Entry<K, V> entry : m.entrySet()) { + ret.put(entry.getKey().toString(), entry.getValue()); } return ret; } - private static long sumStreamsLong(Map m, String key) { + private static <K1, K2> long sumStreamsLong(Map<K1, Map<K2, ?>> m, String key) { long sum = 0; if (m == null) { return sum; } - for (Object v : m.values()) { - Map sub = (Map) v; - for (Object o : sub.entrySet()) { - Map.Entry e = (Map.Entry) o; - if (((Keyword) e.getKey()).getName().equals(key)) { - sum += ((Number) e.getValue()).longValue(); + for (Map<K2, ?> v : m.values()) { + for (Map.Entry<K2, ?> entry : v.entrySet()) { + if (entry.getKey().equals(key)) { + sum += ((Number) entry.getValue()).longValue(); } } } @@ -1304,7 +1504,7 @@ public class StatsUtil { Map sub = (Map) v; for (Object o : sub.entrySet()) { Map.Entry e = (Map.Entry) o; - if (((Keyword) e.getKey()).getName().equals(key)) { + if (e.getKey().equals(key)) { sum += ((Number) e.getValue()).doubleValue(); } } @@ -1340,21 +1540,15 @@ public class StatsUtil { * @param includeSys whether to filter system streams * @return filtered stats */ - private static Map filterSysStreams(Map stats, boolean includeSys) { + private static <K, V> Map<String, Map<K, V>> filterSysStreams(Map<String, Map<K, V>> stats, boolean includeSys) { if (!includeSys) { - for (Iterator itr = stats.keySet().iterator(); itr.hasNext(); ) { - Object winOrStream = itr.next(); - if (isWindow(winOrStream)) { - Map stream2stat = (Map) stats.get(winOrStream); - for (Iterator subItr = stream2stat.keySet().iterator(); subItr.hasNext(); ) { - Object key = subItr.next(); - if (key instanceof String && Utils.isSystemId((String) key)) { - subItr.remove(); - } - } - } else { - if (winOrStream instanceof String && Utils.isSystemId((String) winOrStream)) { - itr.remove(); + for (Iterator<String> itr = stats.keySet().iterator(); itr.hasNext(); ) { + String winOrStream = itr.next(); + Map<K, V> stream2stat = stats.get(winOrStream); + for (Iterator subItr = stream2stat.keySet().iterator(); subItr.hasNext(); ) { + Object key = subItr.next(); + if (key instanceof String && Utils.isSystemId((String) key)) { + subItr.remove(); } } } @@ -1362,15 +1556,12 @@ public class StatsUtil { return stats; } - private static boolean isWindow(Object key) { - return key.equals("600") || key.equals("10800") || key.equals("86400") || key.equals(":all-time"); - } - /** * equals to clojure's: (merge-with (partial merge-with sum-or-0) acc-out spout-out) */ - private static Map fullMergeWithSum(Map m1, Map m2) { - Set<Object> allKeys = new HashSet<>(); + private static <K1, K2> Map<K1, Map<K2, Number>> fullMergeWithSum(Map<K1, Map<K2, ?>> m1, + Map<K1, Map<K2, ?>> m2) { + Set<K1> allKeys = new HashSet<>(); if (m1 != null) { allKeys.addAll(m1.keySet()); } @@ -1378,14 +1569,14 @@ public class StatsUtil { allKeys.addAll(m2.keySet()); } - Map ret = new HashMap(); - for (Object k : allKeys) { - Map mm1 = null, mm2 = null; + Map<K1, Map<K2, Number>> ret = new HashMap<>(); + for (K1 k : allKeys) { + Map<K2, ?> mm1 = null, mm2 = null; if (m1 != null) { - mm1 = (Map) m1.get(k); + mm1 = m1.get(k); } if (m2 != null) { - mm2 = (Map) m2.get(k); + mm2 = m2.get(k); } ret.put(k, mergeWithSum(mm1, mm2)); } @@ -1393,10 +1584,10 @@ public class StatsUtil { return ret; } - private static Map mergeWithSum(Map m1, Map m2) { - Map ret = new HashMap(); + private static <K> Map<K, Number> mergeWithSum(Map<K, ?> m1, Map<K, ?> m2) { + Map<K, Number> ret = new HashMap<>(); - Set<Object> allKeys = new HashSet<>(); + Set<K> allKeys = new HashSet<>(); if (m1 != null) { allKeys.addAll(m1.keySet()); } @@ -1404,10 +1595,52 @@ public class StatsUtil { allKeys.addAll(m2.keySet()); } - for (Object k : allKeys) { + for (K k : allKeys) { Number n1 = getOr0(m1, k); Number n2 = getOr0(m2, k); - ret.put(k, add(n1, n2)); + if (n1 instanceof Long) { + ret.put(k, n1.longValue() + n2.longValue()); + } else { + ret.put(k, n1.doubleValue() + n2.doubleValue()); + } + } + return ret; + } + + private static <K> Map mergeWithSumLong(Map<K, Long> m1, Map<K, Long> m2) { + Map<K, Long> ret = new HashMap<>(); + + Set<K> allKeys = new HashSet<>(); + if (m1 != null) { + allKeys.addAll(m1.keySet()); + } + if (m2 != null) { + allKeys.addAll(m2.keySet()); + } + + for (K k : allKeys) { + Number n1 = getOr0(m1, k); + Number n2 = getOr0(m2, k); + ret.put(k, n1.longValue() + n2.longValue()); + } + return ret; + } + + private static <K> Map mergeWithSumDouble(Map<K, Double> m1, Map<K, Double> m2) { + Map<K, Double> ret = new HashMap<>(); + + Set<K> allKeys = new HashSet<>(); + if (m1 != null) { + allKeys.addAll(m1.keySet()); + } + if (m2 != null) { + allKeys.addAll(m2.keySet()); + } + + for (K k : allKeys) { + Number n1 = getOr0(m1, k); + Number n2 = getOr0(m2, k); + ret.put(k, n1.doubleValue() + n2.doubleValue()); } return ret; } @@ -1416,10 +1649,11 @@ public class StatsUtil { * this method merges 2 two-level-deep maps, which is different from mergeWithSum, and we expect the two maps * have the same keys */ - private static Map mergeWithAddPair(Map m1, Map m2) { - Map ret = new HashMap(); + private static <K> Map<String, Map<K, List>> mergeWithAddPair(Map<String, Map<K, List>> m1, + Map<String, Map<K, List>> m2) { + Map<String, Map<K, List>> ret = new HashMap<>(); - Set<Object> allKeys = new HashSet<>(); + Set<String> allKeys = new HashSet<>(); if (m1 != null) { allKeys.addAll(m1.keySet()); } @@ -1427,9 +1661,9 @@ public class StatsUtil { allKeys.addAll(m2.keySet()); } - for (Object k : allKeys) { - Map mm1 = (m1 != null) ? (Map) m1.get(k) : null; - Map mm2 = (m2 != null) ? (Map) m2.get(k) : null; + for (String k : allKeys) { + Map<K, List> mm1 = (m1 != null) ? m1.get(k) : null; + Map<K, List> mm2 = (m2 != null) ? m2.get(k) : null; if (mm1 == null && mm2 == null) { continue; } else if (mm1 == null) { @@ -1437,13 +1671,17 @@ public class StatsUtil { } else if (mm2 == null) { ret.put(k, mm1); } else { - Map tmp = new HashMap(); - for (Object kk : mm1.keySet()) { - List seq1 = (List) mm1.get(kk); - List seq2 = (List) mm2.get(kk); + Map<K, List> tmp = new HashMap<>(); + for (K kk : mm1.keySet()) { + List seq1 = mm1.get(kk); + List seq2 = mm2.get(kk); List sums = new ArrayList(); for (int i = 0; i < seq1.size(); i++) { - sums.add(add((Number) seq1.get(i), (Number) seq2.get(i))); + if (seq1.get(i) instanceof Long) { + sums.add(((Number) seq1.get(i)).longValue() + ((Number) seq2.get(i)).longValue()); + } else { + sums.add(((Number) seq1.get(i)).doubleValue() + ((Number) seq2.get(i)).doubleValue()); + } } tmp.put(kk, sums); } @@ -1457,65 +1695,36 @@ public class StatsUtil { // thriftify stats methods // ===================================================================================== - private static TopologyPageInfo thriftifyTopoPageData(String topologyId, Map data) { - TopologyPageInfo ret = new TopologyPageInfo(topologyId); - Integer numTasks = getByKeywordOr0(data, NUM_TASKS).intValue(); - Integer numWorkers = getByKeywordOr0(data, NUM_WORKERS).intValue(); - Integer numExecutors = getByKeywordOr0(data, NUM_EXECUTORS).intValue(); - Map spout2stats = getMapByKey(data, SPOUT_TO_STATS); - Map bolt2stats = getMapByKey(data, BOLT_TO_STATS); - Map win2emitted = getMapByKey(data, WIN_TO_EMITTED); - Map win2transferred = getMapByKey(data, WIN_TO_TRANSFERRED); - Map win2compLatency = getMapByKey(data, WIN_TO_COMP_LAT); - Map win2acked = getMapByKey(data, WIN_TO_ACKED); - Map win2failed = getMapByKey(data, WIN_TO_FAILED); - - Map<String, ComponentAggregateStats> spoutAggStats = new HashMap<>(); - for (Object o : spout2stats.entrySet()) { - Map.Entry e = (Map.Entry) o; - String id = (String) e.getKey(); - Map v = (Map) e.getValue(); - putKV(v, TYPE, KW_SPOUT); - - spoutAggStats.put(id, thriftifySpoutAggStats(v)); - } + public static ClusterWorkerHeartbeat thriftifyZkWorkerH
<TRUNCATED>
