http://git-wip-us.apache.org/repos/asf/storm/blob/52d3b587/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java new file mode 100644 index 0000000..144872f --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -0,0 +1,2178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.stats; + +import clojure.lang.Keyword; +import clojure.lang.PersistentVector; +import clojure.lang.RT; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.BoltAggregateStats; +import org.apache.storm.generated.BoltStats; +import org.apache.storm.generated.CommonAggregateStats; +import org.apache.storm.generated.ComponentAggregateStats; +import org.apache.storm.generated.ComponentPageInfo; +import org.apache.storm.generated.ComponentType; +import org.apache.storm.generated.ErrorInfo; +import org.apache.storm.generated.ExecutorAggregateStats; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.SpecificAggregateStats; +import org.apache.storm.generated.SpoutAggregateStats; +import org.apache.storm.generated.SpoutStats; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.TopologyPageInfo; +import org.apache.storm.generated.TopologyStats; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("unchecked, unused") +public class StatsUtil { + private static final Logger logger = LoggerFactory.getLogger(StatsUtil.class); + + private static final String TYPE = "type"; + private static final String SPOUT = "spout"; + private static final String BOLT = "bolt"; + + private static final String UPTIME = "uptime"; + private static final String HOST = "host"; + private static final String PORT = "port"; + private static final String NUM_TASKS = "num-tasks"; + private static final String NUM_EXECUTORS = "num-executors"; + private static final String NUM_WORKERS = "num-workers"; + private static final String CAPACITY = "capacity"; + private static final String STATS = "stats"; + private static final String EXECUTOR_STATS = "executor-stats"; + private static final String EXECUTOR_ID = "executor-id"; + private static final String LAST_ERROR = "lastError"; + + private static final String ACKED = "acked"; + private static final String FAILED = "failed"; + private static final String EXECUTED = "executed"; + private static final String EMITTED = "emitted"; + private static final String TRANSFERRED = "transferred"; + + private static final String EXEC_LATENCIES = "execute-latencies"; + private static final String PROC_LATENCIES = "process-latencies"; + private static final String COMP_LATENCIES = "complete-latencies"; + + private static final String EXEC_LATENCY = "execute-latency"; + private static final String PROC_LATENCY = "process-latency"; + private static final String COMP_LATENCY = "complete-latency"; + + private static final String EXEC_LAT_TOTAL = "executeLatencyTotal"; + private static final String PROC_LAT_TOTAL = "processLatencyTotal"; + private static final String COMP_LAT_TOTAL = "completeLatencyTotal"; + + private static final String WIN_TO_EMITTED = "window->emitted"; + private static final String WIN_TO_ACKED = "window->acked"; + private static final String WIN_TO_FAILED = "window->failed"; + private static final String WIN_TO_EXECUTED = "window->executed"; + private static final String WIN_TO_TRANSFERRED = "window->transferred"; + private static final String WIN_TO_EXEC_LAT = "window->execute-latency"; + private static final String WIN_TO_PROC_LAT = "window->process-latency"; + private static final String WIN_TO_COMP_LAT = "window->complete-latency"; + private static final String WIN_TO_COMP_LAT_WGT_AVG = "window->comp-lat-wgt-avg"; + private static final String WIN_TO_EXEC_LAT_WGT_AVG = "window->exec-lat-wgt-avg"; + private static final String WIN_TO_PROC_LAT_WGT_AVG = "window->proc-lat-wgt-avg"; + + private static final String BOLT_TO_STATS = "bolt-id->stats"; + private static final String SPOUT_TO_STATS = "spout-id->stats"; + private static final String SID_TO_OUT_STATS = "sid->output-stats"; + private static final String CID_SID_TO_IN_STATS = "cid+sid->input-stats"; + private static final String WORKERS_SET = "workers-set"; + + private static final Keyword KW_SPOUT = keyword(SPOUT); + private static final Keyword KW_BOLT = keyword(BOLT); + + public static final int TEN_MIN_IN_SECONDS = 60 * 10; + public static final String TEN_MIN_IN_SECONDS_STR = TEN_MIN_IN_SECONDS + ""; + + private static final IdentityTransformer IDENTITY = new IdentityTransformer(); + private static final ToStringTransformer TO_STRING = new ToStringTransformer(); + private static final FromGlobalStreamIdTransformer FROM_GSID = new FromGlobalStreamIdTransformer(); + private static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer(); + + + // ===================================================================================== + // update stats methods + // ===================================================================================== + + public static BoltExecutorStats mkBoltStats(int rate) { + BoltExecutorStats stats = new BoltExecutorStats(); + stats.setRate(rate); + return stats; + } + + public static SpoutExecutorStats mkSpoutStats(int rate) { + SpoutExecutorStats stats = new SpoutExecutorStats(); + stats.setRate(rate); + return stats; + } + + public static void emittedTuple(CommonStats stats, String stream) { + stats.getEmitted().incBy(stream, stats.rate); + } + + public static void transferredTuples(CommonStats stats, String stream, int amount) { + stats.getTransferred().incBy(stream, stats.rate * amount); + } + + public static void boltExecuteTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + stats.getExecuted().incBy(key, stats.rate); + stats.getExecuteLatencies().record(key, latencyMs); + } + + public static void boltAckedTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + stats.getAcked().incBy(key, stats.rate); + stats.getProcessLatencies().record(key, latencyMs); + } + + public static void boltFailedTuple(BoltExecutorStats stats, String component, String stream, long latencyMs) { + Object key = PersistentVector.create(component, stream); + stats.getFailed().incBy(key, stats.rate); + + } + + public static void spoutAckedTuple(SpoutExecutorStats stats, String stream, long latencyMs) { + stats.getAcked().incBy(stream, stats.rate); + stats.getCompleteLatencies().record(stream, latencyMs); + } + + public static void spoutFailedTuple(SpoutExecutorStats stats, String stream, long latencyMs) { + stats.getFailed().incBy(stream, stats.rate); + } + + private static void cleanupStat(IMetric metric) { + if (metric instanceof MultiCountStatAndMetric) { + ((MultiCountStatAndMetric) metric).close(); + } else if (metric instanceof MultiLatencyStatAndMetric) { + ((MultiLatencyStatAndMetric) metric).close(); + } + } + + public static Map renderStats(SpoutExecutorStats stats) { + cleanupSpoutStats(stats); + Map ret = new HashMap(); + ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(stats, SpoutExecutorStats.SPOUT_FIELDS)); + putRawKV(ret, TYPE, KW_SPOUT); + + return ret; + } + + public static Map renderStats(BoltExecutorStats stats) { + cleanupBoltStats(stats); + Map ret = new HashMap(); + ret.putAll(valueStats(stats, CommonStats.COMMON_FIELDS)); + ret.putAll(valueStats(stats, BoltExecutorStats.BOLT_FIELDS)); + putRawKV(ret, TYPE, KW_BOLT); + + return ret; + } + + public static void cleanupSpoutStats(SpoutExecutorStats stats) { + cleanupCommonStats(stats); + for (String field : SpoutExecutorStats.SPOUT_FIELDS) { + cleanupStat(stats.get(field)); + } + } + + public static void cleanupBoltStats(BoltExecutorStats stats) { + cleanupCommonStats(stats); + for (String field : BoltExecutorStats.BOLT_FIELDS) { + cleanupStat(stats.get(field)); + } + } + + public static void cleanupCommonStats(CommonStats stats) { + for (String field : CommonStats.COMMON_FIELDS) { + cleanupStat(stats.get(field)); + } + } + + private static Map valueStats(CommonStats stats, String[] fields) { + Map ret = new HashMap(); + for (String field : fields) { + IMetric metric = stats.get(field); + if (metric instanceof MultiCountStatAndMetric) { + putRawKV(ret, field, ((MultiCountStatAndMetric) metric).getTimeCounts()); + } else if (metric instanceof MultiLatencyStatAndMetric) { + putRawKV(ret, field, ((MultiLatencyStatAndMetric) metric).getTimeLatAvg()); + } + } + putRawKV(ret, CommonStats.RATE, stats.getRate()); + + return ret; + } + + // ===================================================================================== + // aggregation stats methods + // ===================================================================================== + + /** + * Aggregates number executed, process latency, and execute latency across all streams. + * + * @param id2execAvg { global stream id -> exec avg value }, e.g., {["split" "default"] 0.44313} + * @param id2procAvg { global stream id -> proc avg value } + * @param id2numExec { global stream id -> executed } + */ + public static Map aggBoltLatAndCount(Map id2execAvg, Map id2procAvg, Map id2numExec) { + Map ret = new HashMap(); + putRawKV(ret, EXEC_LAT_TOTAL, weightAvgAndSum(id2execAvg, id2numExec)); + putRawKV(ret, PROC_LAT_TOTAL, weightAvgAndSum(id2procAvg, id2numExec)); + putRawKV(ret, EXECUTED, sumValues(id2numExec)); + + return ret; + } + + /** + * Aggregates number acked and complete latencies across all streams. + */ + public static Map aggSpoutLatAndCount(Map id2compAvg, Map id2numAcked) { + Map ret = new HashMap(); + putRawKV(ret, COMP_LAT_TOTAL, weightAvgAndSum(id2compAvg, id2numAcked)); + putRawKV(ret, ACKED, sumValues(id2numAcked)); + + return ret; + } + + /** + * Aggregates number executed and process & execute latencies. + */ + public static Map aggBoltStreamsLatAndCount(Map id2execAvg, Map id2procAvg, Map id2numExec) { + Map ret = new HashMap(); + if (id2execAvg == null || id2procAvg == null || id2numExec == null) { + return ret; + } + for (Object k : id2execAvg.keySet()) { + Map subMap = new HashMap(); + putRawKV(subMap, EXEC_LAT_TOTAL, weightAvg(id2execAvg, id2numExec, k)); + putRawKV(subMap, PROC_LAT_TOTAL, weightAvg(id2procAvg, id2numExec, k)); + putRawKV(subMap, EXECUTED, id2numExec.get(k)); + ret.put(k, subMap); + } + return ret; + } + + /** + * Aggregates number acked and complete latencies. + */ + public static Map aggSpoutStreamsLatAndCount(Map id2compAvg, Map id2acked) { + Map ret = new HashMap(); + if (id2compAvg == null || id2acked == null) { + return ret; + } + for (Object k : id2compAvg.keySet()) { + Map subMap = new HashMap(); + putRawKV(subMap, COMP_LAT_TOTAL, weightAvg(id2compAvg, id2acked, k)); + putRawKV(subMap, ACKED, id2acked.get(k)); + ret.put(k, subMap); + } + return ret; + } + + public static Map aggPreMergeCompPageBolt(Map m, String window, boolean includeSys) { + Map ret = new HashMap(); + putRawKV(ret, EXECUTOR_ID, getByKeyword(m, "exec-id")); + putRawKV(ret, HOST, getByKeyword(m, HOST)); + putRawKV(ret, PORT, getByKeyword(m, PORT)); + putRawKV(ret, UPTIME, getByKeyword(m, UPTIME)); + putRawKV(ret, NUM_EXECUTORS, 1); + putRawKV(ret, NUM_TASKS, getByKeyword(m, NUM_TASKS)); + + Map stat2win2sid2num = getMapByKeyword(m, STATS); + putRawKV(ret, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeywordOr0(m, UPTIME).intValue())); + + // calc cid+sid->input_stats + Map inputStats = new HashMap(); + Map sid2acked = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), TO_STRING).get(window); + Map sid2failed = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, FAILED), TO_STRING).get(window); + putRawKV(inputStats, ACKED, sid2acked != null ? sid2acked : new HashMap()); + putRawKV(inputStats, FAILED, sid2failed != null ? sid2failed : new HashMap()); + + inputStats = swapMapOrder(inputStats); + + Map sid2execLat = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, EXEC_LATENCIES), TO_STRING).get(window); + Map sid2procLat = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, PROC_LATENCIES), TO_STRING).get(window); + Map sid2exec = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, EXECUTED), TO_STRING).get(window); + mergeMaps(inputStats, aggBoltStreamsLatAndCount(sid2execLat, sid2procLat, sid2exec)); + putRawKV(ret, CID_SID_TO_IN_STATS, inputStats); + + // calc sid->output_stats + Map outputStats = new HashMap(); + Map sid2emitted = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, EMITTED), TO_STRING).get(window); + Map sid2transferred = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, TRANSFERRED), TO_STRING).get(window); + if (sid2emitted != null) { + putRawKV(outputStats, EMITTED, filterSysStreams(sid2emitted, includeSys)); + } else { + putRawKV(outputStats, EMITTED, new HashMap()); + } + if (sid2transferred != null) { + putRawKV(outputStats, TRANSFERRED, filterSysStreams(sid2transferred, includeSys)); + } else { + putRawKV(outputStats, TRANSFERRED, new HashMap()); + } + outputStats = swapMapOrder(outputStats); + putRawKV(ret, SID_TO_OUT_STATS, outputStats); + + return ret; + } + + public static Map aggPreMergeCompPageSpout(Map m, String window, boolean includeSys) { + Map ret = new HashMap(); + putRawKV(ret, EXECUTOR_ID, getByKeyword(m, "exec-id")); + putRawKV(ret, HOST, getByKeyword(m, HOST)); + putRawKV(ret, PORT, getByKeyword(m, PORT)); + putRawKV(ret, UPTIME, getByKeyword(m, UPTIME)); + putRawKV(ret, NUM_EXECUTORS, 1); + putRawKV(ret, NUM_TASKS, getByKeyword(m, NUM_TASKS)); + + Map stat2win2sid2num = getMapByKeyword(m, STATS); + + // calc sid->output-stats + Map outputStats = new HashMap(); + Map win2sid2acked = windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), TO_STRING); + Map win2sid2failed = windowSetConverter(getMapByKeyword(stat2win2sid2num, FAILED), TO_STRING); + Map win2sid2emitted = windowSetConverter(getMapByKeyword(stat2win2sid2num, EMITTED), TO_STRING); + Map win2sid2transferred = windowSetConverter(getMapByKeyword(stat2win2sid2num, TRANSFERRED), TO_STRING); + Map win2sid2compLat = windowSetConverter(getMapByKeyword(stat2win2sid2num, COMP_LATENCIES), TO_STRING); + + putRawKV(outputStats, ACKED, win2sid2acked.get(window)); + putRawKV(outputStats, FAILED, win2sid2failed.get(window)); + putRawKV(outputStats, EMITTED, filterSysStreams((Map) win2sid2emitted.get(window), includeSys)); + putRawKV(outputStats, TRANSFERRED, filterSysStreams((Map) win2sid2transferred.get(window), includeSys)); + outputStats = swapMapOrder(outputStats); + + Map sid2compLat = (Map) win2sid2compLat.get(window); + Map sid2acked = (Map) win2sid2acked.get(window); + mergeMaps(outputStats, aggSpoutStreamsLatAndCount(sid2compLat, sid2acked)); + putRawKV(ret, SID_TO_OUT_STATS, outputStats); + + return ret; + } + + public static Map aggPreMergeTopoPageBolt(Map m, String window, boolean includeSys) { + Map ret = new HashMap(); + + Map subRet = new HashMap(); + putRawKV(subRet, NUM_EXECUTORS, 1); + putRawKV(subRet, NUM_TASKS, getByKeyword(m, NUM_TASKS)); + + Map stat2win2sid2num = getMapByKeyword(m, STATS); + putRawKV(subRet, CAPACITY, computeAggCapacity(stat2win2sid2num, getByKeywordOr0(m, UPTIME).intValue())); + + for (String key : new String[]{EMITTED, TRANSFERRED, ACKED, FAILED}) { + Map stat = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, key), TO_STRING).get(window); + if (EMITTED.equals(key) || TRANSFERRED.equals(key)) { + stat = filterSysStreams(stat, includeSys); + } + long sum = 0; + if (stat != null) { + for (Object o : stat.values()) { + sum += ((Number) o).longValue(); + } + } + putRawKV(subRet, key, sum); + } + + Map win2sid2execLat = windowSetConverter(getMapByKeyword(stat2win2sid2num, EXEC_LATENCIES), TO_STRING); + Map win2sid2procLat = windowSetConverter(getMapByKeyword(stat2win2sid2num, PROC_LATENCIES), TO_STRING); + Map win2sid2exec = windowSetConverter(getMapByKeyword(stat2win2sid2num, EXECUTED), TO_STRING); + subRet.putAll(aggBoltLatAndCount( + (Map) win2sid2execLat.get(window), (Map) win2sid2procLat.get(window), (Map) win2sid2exec.get(window))); + + ret.put(getByKeyword(m, "comp-id"), subRet); + return ret; + } + + public static Map aggPreMergeTopoPageSpout(Map m, String window, boolean includeSys) { + Map ret = new HashMap(); + + Map subRet = new HashMap(); + putRawKV(subRet, NUM_EXECUTORS, 1); + putRawKV(subRet, NUM_TASKS, getByKeyword(m, NUM_TASKS)); + + // no capacity for spout + Map stat2win2sid2num = getMapByKeyword(m, STATS); + for (String key : new String[]{EMITTED, TRANSFERRED, FAILED}) { + Map stat = (Map) windowSetConverter(getMapByKeyword(stat2win2sid2num, key), TO_STRING).get(window); + if (EMITTED.equals(key) || TRANSFERRED.equals(key)) { + stat = filterSysStreams(stat, includeSys); + } + long sum = 0; + if (stat != null) { + for (Object o : stat.values()) { + sum += ((Number) o).longValue(); + } + } + putRawKV(subRet, key, sum); + } + + Map win2sid2compLat = windowSetConverter(getMapByKeyword(stat2win2sid2num, COMP_LATENCIES), TO_STRING); + Map win2sid2acked = windowSetConverter(getMapByKeyword(stat2win2sid2num, ACKED), TO_STRING); + subRet.putAll(aggSpoutLatAndCount((Map) win2sid2compLat.get(window), (Map) win2sid2acked.get(window))); + + ret.put(getByKeyword(m, "comp-id"), subRet); + return ret; + } + + public static Map mergeAggCompStatsCompPageBolt(Map accBoltStats, Map boltStats) { + Map ret = new HashMap(); + + Map accIn = getMapByKeyword(accBoltStats, CID_SID_TO_IN_STATS); + Map accOut = getMapByKeyword(accBoltStats, SID_TO_OUT_STATS); + Map boltIn = getMapByKeyword(boltStats, CID_SID_TO_IN_STATS); + Map boltOut = getMapByKeyword(boltStats, SID_TO_OUT_STATS); + + int numExecutors = getByKeywordOr0(accBoltStats, NUM_EXECUTORS).intValue(); + putRawKV(ret, NUM_EXECUTORS, numExecutors + 1); + putRawKV(ret, NUM_TASKS, sumOr0( + getByKeywordOr0(accBoltStats, NUM_TASKS), getByKeywordOr0(boltStats, NUM_TASKS))); + + // (merge-with (partial merge-with sum-or-0) acc-out spout-out) + putRawKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, boltOut)); + putRawKV(ret, CID_SID_TO_IN_STATS, fullMergeWithSum(accIn, boltIn)); + + long executed = sumStreamsLong(boltIn, EXECUTED); + putRawKV(ret, EXECUTED, executed); + + Map executorStats = new HashMap(); + putRawKV(executorStats, EXECUTOR_ID, getByKeyword(boltStats, EXECUTOR_ID)); + putRawKV(executorStats, UPTIME, getByKeyword(boltStats, UPTIME)); + putRawKV(executorStats, HOST, getByKeyword(boltStats, HOST)); + putRawKV(executorStats, PORT, getByKeyword(boltStats, PORT)); + putRawKV(executorStats, CAPACITY, getByKeyword(boltStats, CAPACITY)); + + putRawKV(executorStats, EMITTED, sumStreamsLong(boltOut, EMITTED)); + putRawKV(executorStats, TRANSFERRED, sumStreamsLong(boltOut, TRANSFERRED)); + putRawKV(executorStats, ACKED, sumStreamsLong(boltIn, ACKED)); + putRawKV(executorStats, FAILED, sumStreamsLong(boltIn, FAILED)); + putRawKV(executorStats, EXECUTED, executed); + + if (executed > 0) { + putRawKV(executorStats, EXEC_LATENCY, sumStreamsDouble(boltIn, EXEC_LAT_TOTAL) / executed); + putRawKV(executorStats, PROC_LATENCY, sumStreamsDouble(boltIn, PROC_LAT_TOTAL) / executed); + } else { + putRawKV(executorStats, EXEC_LATENCY, null); + putRawKV(executorStats, PROC_LATENCY, null); + } + List executorStatsList = ((List) getByKeyword(accBoltStats, EXECUTOR_STATS)); + executorStatsList.add(executorStats); + putRawKV(ret, EXECUTOR_STATS, executorStatsList); + + return ret; + } + + public static Map mergeAggCompStatsCompPageSpout(Map accSpoutStats, Map spoutStats) { + Map ret = new HashMap(); + + Map accOut = getMapByKeyword(accSpoutStats, SID_TO_OUT_STATS); + Map spoutOut = getMapByKeyword(spoutStats, SID_TO_OUT_STATS); + + int numExecutors = getByKeywordOr0(accSpoutStats, NUM_EXECUTORS).intValue(); + putRawKV(ret, NUM_EXECUTORS, numExecutors + 1); + putRawKV(ret, NUM_TASKS, sumOr0( + getByKeywordOr0(accSpoutStats, NUM_TASKS), getByKeywordOr0(spoutStats, NUM_TASKS))); + putRawKV(ret, SID_TO_OUT_STATS, fullMergeWithSum(accOut, spoutOut)); + + Map executorStats = new HashMap(); + putRawKV(executorStats, EXECUTOR_ID, getByKeyword(spoutStats, EXECUTOR_ID)); + putRawKV(executorStats, UPTIME, getByKeyword(spoutStats, UPTIME)); + putRawKV(executorStats, HOST, getByKeyword(spoutStats, HOST)); + putRawKV(executorStats, PORT, getByKeyword(spoutStats, PORT)); + + putRawKV(executorStats, EMITTED, sumStreamsLong(spoutOut, EMITTED)); + putRawKV(executorStats, TRANSFERRED, sumStreamsLong(spoutOut, TRANSFERRED)); + putRawKV(executorStats, FAILED, sumStreamsLong(spoutOut, FAILED)); + long acked = sumStreamsLong(spoutOut, ACKED); + putRawKV(executorStats, ACKED, acked); + if (acked > 0) { + putRawKV(executorStats, COMP_LATENCY, sumStreamsDouble(spoutOut, COMP_LAT_TOTAL) / acked); + } else { + putRawKV(executorStats, COMP_LATENCY, null); + } + List executorStatsList = ((List) getByKeyword(accSpoutStats, EXECUTOR_STATS)); + executorStatsList.add(executorStats); + putRawKV(ret, EXECUTOR_STATS, executorStatsList); + + return ret; + } + + public static Map mergeAggCompStatsTopoPageBolt(Map accBoltStats, Map boltStats) { + Map ret = new HashMap(); + Integer numExecutors = getByKeywordOr0(accBoltStats, NUM_EXECUTORS).intValue(); + putRawKV(ret, NUM_EXECUTORS, numExecutors + 1); + putRawKV(ret, NUM_TASKS, sumOr0( + getByKeywordOr0(accBoltStats, NUM_TASKS), getByKeywordOr0(boltStats, NUM_TASKS))); + putRawKV(ret, EMITTED, sumOr0( + getByKeywordOr0(accBoltStats, EMITTED), getByKeywordOr0(boltStats, EMITTED))); + putRawKV(ret, TRANSFERRED, sumOr0( + getByKeywordOr0(accBoltStats, TRANSFERRED), getByKeywordOr0(boltStats, TRANSFERRED))); + putRawKV(ret, EXEC_LAT_TOTAL, sumOr0( + getByKeywordOr0(accBoltStats, EXEC_LAT_TOTAL), getByKeywordOr0(boltStats, EXEC_LAT_TOTAL))); + putRawKV(ret, PROC_LAT_TOTAL, sumOr0( + getByKeywordOr0(accBoltStats, PROC_LAT_TOTAL), getByKeywordOr0(boltStats, PROC_LAT_TOTAL))); + putRawKV(ret, EXECUTED, sumOr0( + getByKeywordOr0(accBoltStats, EXECUTED), getByKeywordOr0(boltStats, EXECUTED))); + putRawKV(ret, ACKED, sumOr0( + getByKeywordOr0(accBoltStats, ACKED), getByKeywordOr0(boltStats, ACKED))); + putRawKV(ret, FAILED, sumOr0( + getByKeywordOr0(accBoltStats, FAILED), getByKeywordOr0(boltStats, FAILED))); + putRawKV(ret, CAPACITY, maxOr0( + getByKeywordOr0(accBoltStats, CAPACITY), getByKeywordOr0(boltStats, CAPACITY))); + + return ret; + } + + public static Map mergeAggCompStatsTopoPageSpout(Map accSpoutStats, Map spoutStats) { + Map ret = new HashMap(); + Integer numExecutors = getByKeywordOr0(accSpoutStats, NUM_EXECUTORS).intValue(); + putRawKV(ret, NUM_EXECUTORS, numExecutors + 1); + putRawKV(ret, NUM_TASKS, sumOr0( + getByKeywordOr0(accSpoutStats, NUM_TASKS), getByKeywordOr0(spoutStats, NUM_TASKS))); + putRawKV(ret, EMITTED, sumOr0( + getByKeywordOr0(accSpoutStats, EMITTED), getByKeywordOr0(spoutStats, EMITTED))); + putRawKV(ret, TRANSFERRED, sumOr0( + getByKeywordOr0(accSpoutStats, TRANSFERRED), getByKeywordOr0(spoutStats, TRANSFERRED))); + putRawKV(ret, COMP_LAT_TOTAL, sumOr0( + getByKeywordOr0(accSpoutStats, COMP_LAT_TOTAL), getByKeywordOr0(spoutStats, COMP_LAT_TOTAL))); + putRawKV(ret, ACKED, sumOr0( + getByKeywordOr0(accSpoutStats, ACKED), getByKeywordOr0(spoutStats, ACKED))); + putRawKV(ret, FAILED, sumOr0( + getByKeywordOr0(accSpoutStats, FAILED), getByKeywordOr0(spoutStats, FAILED))); + + return ret; + } + + /** + * A helper function that does the common work to aggregate stats of one + * executor with the given map for the topology page. + */ + public static Map aggTopoExecStats(String window, boolean includeSys, Map accStats, Map newData, String compType) { + Map ret = new HashMap(); + + Set workerSet = (Set) getByKeyword(accStats, WORKERS_SET); + Map bolt2stats = getMapByKeyword(accStats, BOLT_TO_STATS); + Map spout2stats = getMapByKeyword(accStats, SPOUT_TO_STATS); + Map win2emitted = getMapByKeyword(accStats, WIN_TO_EMITTED); + Map win2transferred = getMapByKeyword(accStats, WIN_TO_TRANSFERRED); + Map win2compLatWgtAvg = getMapByKeyword(accStats, WIN_TO_COMP_LAT_WGT_AVG); + Map win2acked = getMapByKeyword(accStats, WIN_TO_ACKED); + Map win2failed = getMapByKeyword(accStats, WIN_TO_FAILED); + Map stats = getMapByKeyword(newData, STATS); + + boolean isSpout = compType.equals(SPOUT); + Map cid2stat2num; + if (isSpout) { + cid2stat2num = aggPreMergeTopoPageSpout(newData, window, includeSys); + } else { + cid2stat2num = aggPreMergeTopoPageBolt(newData, window, includeSys); + } + + Map w2compLatWgtAvg, w2acked; + Map compLatStats = getMapByKeyword(stats, COMP_LATENCIES); + if (isSpout) { // agg spout stats + Map mm = new HashMap(); + + Map acked = getMapByKeyword(stats, ACKED); + for (Object win : acked.keySet()) { + mm.put(win, aggSpoutLatAndCount((Map) compLatStats.get(win), (Map) acked.get(win))); + } + mm = swapMapOrder(mm); + w2compLatWgtAvg = getMapByKeyword(mm, COMP_LAT_TOTAL); + w2acked = getMapByKeyword(mm, ACKED); + } else { + w2compLatWgtAvg = null; + w2acked = aggregateCountStreams(getMapByKeyword(stats, ACKED)); + } + + workerSet.add(Lists.newArrayList(getByKeyword(newData, HOST), getByKeyword(newData, PORT))); + putRawKV(ret, WORKERS_SET, workerSet); + putRawKV(ret, BOLT_TO_STATS, bolt2stats); + putRawKV(ret, SPOUT_TO_STATS, spout2stats); + putRawKV(ret, WIN_TO_EMITTED, mergeWithSum(win2emitted, aggregateCountStreams( + filterSysStreams(getMapByKeyword(stats, EMITTED), includeSys)))); + putRawKV(ret, WIN_TO_TRANSFERRED, mergeWithSum(win2transferred, aggregateCountStreams( + filterSysStreams(getMapByKeyword(stats, TRANSFERRED), includeSys)))); + putRawKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum(win2compLatWgtAvg, w2compLatWgtAvg)); + + //boolean isSpoutStat = SPOUT.equals(((Keyword) getByKeyword(stats, TYPE)).getName()); + putRawKV(ret, WIN_TO_ACKED, isSpout ? mergeWithSum(win2acked, w2acked) : win2acked); + putRawKV(ret, WIN_TO_FAILED, isSpout ? + mergeWithSum(aggregateCountStreams(getMapByKeyword(stats, FAILED)), win2failed) : win2failed); + putRawKV(ret, TYPE, getByKeyword(stats, TYPE)); + + // (merge-with merge-agg-comp-stats-topo-page-bolt/spout (acc-stats comp-key) cid->statk->num) + // (acc-stats comp-key) ==> bolt2stats/spout2stats + if (isSpout) { + Set<Object> keySet = new HashSet<>(); + keySet.addAll(spout2stats.keySet()); + keySet.addAll(cid2stat2num.keySet()); + + Map mm = new HashMap(); + for (Object k : keySet) { + mm.put(k, mergeAggCompStatsTopoPageSpout((Map) spout2stats.get(k), (Map) cid2stat2num.get(k))); + } + putRawKV(ret, SPOUT_TO_STATS, mm); + } else { + Set<Object> keySet = new HashSet<>(); + keySet.addAll(bolt2stats.keySet()); + keySet.addAll(cid2stat2num.keySet()); + + Map mm = new HashMap(); + for (Object k : keySet) { + mm.put(k, mergeAggCompStatsTopoPageBolt((Map) bolt2stats.get(k), (Map) cid2stat2num.get(k))); + } + putRawKV(ret, BOLT_TO_STATS, mm); + } + + return ret; + } + + // TODO: add last-error-fn arg to get last error + public static TopologyPageInfo aggTopoExecsStats( + String topologyId, Map exec2nodePort, Map task2component, + Map beats, StormTopology topology, String window, boolean includeSys) { + List beatList = extractDataFromHb(exec2nodePort, task2component, beats, includeSys, topology); + Map topoStats = aggregateTopoStats(window, includeSys, beatList); + topoStats = postAggregateTopoStats(task2component, exec2nodePort, topoStats); + + return thriftifyTopoPageData(topologyId, topoStats); + } + + public static Map aggregateTopoStats(String win, boolean includeSys, List data) { + Map initVal = new HashMap(); + putRawKV(initVal, WORKERS_SET, new HashSet()); + putRawKV(initVal, BOLT_TO_STATS, new HashMap()); + putRawKV(initVal, SPOUT_TO_STATS, new HashMap()); + putRawKV(initVal, WIN_TO_EMITTED, new HashMap()); + putRawKV(initVal, WIN_TO_TRANSFERRED, new HashMap()); + putRawKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap()); + putRawKV(initVal, WIN_TO_ACKED, new HashMap()); + putRawKV(initVal, WIN_TO_FAILED, new HashMap()); + + for (Object o : data) { + Map newData = (Map) o; + String compType = ((Keyword) getByKeyword(newData, TYPE)).getName(); + initVal = aggTopoExecStats(win, includeSys, initVal, newData, compType); + } + + return initVal; + } + + public static Map postAggregateTopoStats(Map task2comp, Map exec2nodePort, Map accData) { + Map ret = new HashMap(); + putRawKV(ret, NUM_TASKS, task2comp.size()); + putRawKV(ret, NUM_WORKERS, ((Set) getByKeyword(accData, WORKERS_SET)).size()); + putRawKV(ret, NUM_EXECUTORS, exec2nodePort.size()); + + Map bolt2stats = getMapByKeyword(accData, BOLT_TO_STATS); + Map aggBolt2stats = new HashMap(); + for (Object o : bolt2stats.entrySet()) { + Map.Entry e = (Map.Entry) o; + String id = (String) e.getKey(); + Map m = (Map) e.getValue(); + long executed = getByKeywordOr0(m, EXECUTED).longValue(); + if (executed > 0) { + double execLatencyTotal = getByKeywordOr0(m, EXEC_LAT_TOTAL).doubleValue(); + putRawKV(m, EXEC_LATENCY, execLatencyTotal / executed); + + double procLatencyTotal = getByKeywordOr0(m, PROC_LAT_TOTAL).doubleValue(); + putRawKV(m, PROC_LATENCY, procLatencyTotal / executed); + } + removeByKeyword(m, EXEC_LAT_TOTAL); + removeByKeyword(m, PROC_LAT_TOTAL); + //TODO: get last error depends on cluster.clj + putRawKV(m, "last-error", null); + + aggBolt2stats.put(id, m); + } + putRawKV(ret, BOLT_TO_STATS, aggBolt2stats); + + Map spout2stats = getMapByKeyword(accData, SPOUT_TO_STATS); + Map spoutBolt2stats = new HashMap(); + for (Object o : spout2stats.entrySet()) { + Map.Entry e = (Map.Entry) o; + String id = (String) e.getKey(); + Map m = (Map) e.getValue(); + long acked = getByKeywordOr0(m, ACKED).longValue(); + if (acked > 0) { + double compLatencyTotal = getByKeywordOr0(m, COMP_LAT_TOTAL).doubleValue(); + putRawKV(m, COMP_LATENCY, compLatencyTotal / acked); + } + removeByKeyword(m, COMP_LAT_TOTAL); + //TODO: get last error depends on cluster.clj + putRawKV(m, "last-error", null); + + spoutBolt2stats.put(id, m); + } + putRawKV(ret, SPOUT_TO_STATS, spoutBolt2stats); + + putRawKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKeyword(accData, WIN_TO_EMITTED))); + putRawKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKeyword(accData, WIN_TO_TRANSFERRED))); + putRawKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKeyword(accData, WIN_TO_ACKED))); + putRawKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKeyword(accData, WIN_TO_FAILED))); + putRawKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow( + accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED)); + return ret; + } + + /** + * aggregate bolt stats + * + * @param statsSeq a seq of ExecutorStats + * @param includeSys whether to include system streams + * @return aggregated bolt stats + */ + public static Map aggregateBoltStats(List statsSeq, boolean includeSys) { + Map ret = new HashMap(); + + Map commonStats = preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys); + List acked = new ArrayList(); + List failed = new ArrayList(); + List executed = new ArrayList(); + List processLatencies = new ArrayList(); + List executeLatencies = new ArrayList(); + for (Object o : statsSeq) { + ExecutorStats stat = (ExecutorStats) o; + acked.add(stat.get_specific().get_bolt().get_acked()); + failed.add(stat.get_specific().get_bolt().get_failed()); + executed.add(stat.get_specific().get_bolt().get_executed()); + processLatencies.add(stat.get_specific().get_bolt().get_process_ms_avg()); + executeLatencies.add(stat.get_specific().get_bolt().get_execute_ms_avg()); + } + mergeMaps(ret, commonStats); + putRawKV(ret, ACKED, aggregateCounts(acked)); + putRawKV(ret, FAILED, aggregateCounts(failed)); + putRawKV(ret, EXECUTED, aggregateCounts(executed)); + putRawKV(ret, PROC_LATENCIES, aggregateAverages(processLatencies, acked)); + putRawKV(ret, EXEC_LATENCIES, aggregateAverages(executeLatencies, executed)); + + return ret; + } + + /** + * aggregate spout stats + * + * @param statsSeq a seq of ExecutorStats + * @param includeSys whether to include system streams + * @return aggregated spout stats + */ + public static Map aggregateSpoutStats(List statsSeq, boolean includeSys) { + Map ret = new HashMap(); + + Map commonStats = preProcessStreamSummary(aggregateCommonStats(statsSeq), includeSys); + List acked = new ArrayList(); + List failed = new ArrayList(); + List completeLatencies = new ArrayList(); + for (Object o : statsSeq) { + ExecutorStats stat = (ExecutorStats) o; + acked.add(stat.get_specific().get_spout().get_acked()); + failed.add(stat.get_specific().get_spout().get_failed()); + completeLatencies.add(stat.get_specific().get_spout().get_complete_ms_avg()); + } + mergeMaps(ret, commonStats); + putRawKV(ret, ACKED, aggregateCounts(acked)); + putRawKV(ret, FAILED, aggregateCounts(failed)); + putRawKV(ret, COMP_LATENCIES, aggregateAverages(completeLatencies, acked)); + + return ret; + } + + public static Map aggregateCommonStats(List statsSeq) { + Map ret = new HashMap(); + + List emitted = new ArrayList(); + List transferred = new ArrayList(); + for (Object o : statsSeq) { + ExecutorStats stat = (ExecutorStats) o; + emitted.add(stat.get_emitted()); + transferred.add(stat.get_transferred()); + } + + putRawKV(ret, EMITTED, aggregateCounts(emitted)); + putRawKV(ret, TRANSFERRED, aggregateCounts(transferred)); + return ret; + } + + public static Map preProcessStreamSummary(Map streamSummary, boolean includeSys) { + Map emitted = getMapByKeyword(streamSummary, EMITTED); + Map transferred = getMapByKeyword(streamSummary, TRANSFERRED); + + putRawKV(streamSummary, EMITTED, filterSysStreams(emitted, includeSys)); + putRawKV(streamSummary, TRANSFERRED, filterSysStreams(transferred, includeSys)); + + return streamSummary; + } + + public static Map aggregateCountStreams(Map stats) { + Map ret = new HashMap(); + for (Object o : stats.entrySet()) { + Map.Entry entry = (Map.Entry) o; + Map value = (Map) entry.getValue(); + long sum = 0l; + for (Object num : value.values()) { + sum += ((Number) num).longValue(); + } + ret.put(entry.getKey(), sum); + } + return ret; + } + + public static Map aggregateAverages(List avgSeq, List countSeq) { + Map ret = new HashMap(); + + Map expands = expandAveragesSeq(avgSeq, countSeq); + for (Object o : expands.entrySet()) { + Map.Entry entry = (Map.Entry) o; + Object k = entry.getKey(); + + Map tmp = new HashMap(); + Map inner = (Map) entry.getValue(); + for (Object kk : inner.keySet()) { + List vv = (List) inner.get(kk); + tmp.put(kk, valAvg(((Number) vv.get(0)).doubleValue(), ((Number) vv.get(1)).longValue())); + } + ret.put(k, tmp); + } + + return ret; + } + + public static Map aggregateAvgStreams(Map avgs, Map counts) { + Map ret = new HashMap(); + + Map expands = expandAverages(avgs, counts); + for (Object o : expands.entrySet()) { + Map.Entry e = (Map.Entry) o; + Object win = e.getKey(); + + double avgTotal = 0.0; + long cntTotal = 0l; + Map inner = (Map) e.getValue(); + for (Object kk : inner.keySet()) { + List vv = (List) inner.get(kk); + avgTotal += ((Number) vv.get(0)).doubleValue(); + cntTotal += ((Number) vv.get(1)).longValue(); + } + ret.put(win, valAvg(avgTotal, cntTotal)); + } + + return ret; + } + + public static Map spoutStreamsStats(List summs, boolean includeSys) { + List statsSeq = getFilledStats(summs); + return aggregateSpoutStreams(aggregateSpoutStats(statsSeq, includeSys)); + } + + public static Map boltStreamsStats(List summs, boolean includeSys) { + List statsSeq = getFilledStats(summs); + return aggregateBoltStreams(aggregateBoltStats(statsSeq, includeSys)); + } + + public static Map aggregateSpoutStreams(Map stats) { + Map ret = new HashMap(); + putRawKV(ret, ACKED, aggregateCountStreams(getMapByKeyword(stats, ACKED))); + putRawKV(ret, FAILED, aggregateCountStreams(getMapByKeyword(stats, FAILED))); + putRawKV(ret, EMITTED, aggregateCountStreams(getMapByKeyword(stats, EMITTED))); + putRawKV(ret, TRANSFERRED, aggregateCountStreams(getMapByKeyword(stats, TRANSFERRED))); + putRawKV(ret, COMP_LATENCIES, aggregateAvgStreams( + getMapByKeyword(stats, COMP_LATENCIES), getMapByKeyword(stats, ACKED))); + return ret; + } + + public static Map aggregateBoltStreams(Map stats) { + Map ret = new HashMap(); + putRawKV(ret, ACKED, aggregateCountStreams(getMapByKeyword(stats, ACKED))); + putRawKV(ret, FAILED, aggregateCountStreams(getMapByKeyword(stats, FAILED))); + putRawKV(ret, EMITTED, aggregateCountStreams(getMapByKeyword(stats, EMITTED))); + putRawKV(ret, TRANSFERRED, aggregateCountStreams(getMapByKeyword(stats, TRANSFERRED))); + putRawKV(ret, EXECUTED, aggregateCountStreams(getMapByKeyword(stats, EXECUTED))); + putRawKV(ret, PROC_LATENCIES, aggregateAvgStreams( + getMapByKeyword(stats, PROC_LATENCIES), getMapByKeyword(stats, ACKED))); + putRawKV(ret, EXEC_LATENCIES, aggregateAvgStreams( + getMapByKeyword(stats, EXEC_LATENCIES), getMapByKeyword(stats, EXECUTED))); + return ret; + } + + /** + * A helper function that aggregates windowed stats from one spout executor. + */ + public static Map aggBoltExecWinStats(Map accStats, Map newStats, boolean includeSys) { + Map ret = new HashMap(); + + Map m = new HashMap(); + for (Object win : getMapByKeyword(newStats, EXECUTED).keySet()) { + m.put(win, aggBoltLatAndCount( + (Map) (getMapByKeyword(newStats, EXEC_LATENCIES)).get(win), + (Map) (getMapByKeyword(newStats, PROC_LATENCIES)).get(win), + (Map) (getMapByKeyword(newStats, EXECUTED)).get(win))); + } + m = swapMapOrder(m); + + Map win2execLatWgtAvg = getMapByKeyword(m, EXEC_LAT_TOTAL); + Map win2procLatWgtAvg = getMapByKeyword(m, PROC_LAT_TOTAL); + Map win2executed = getMapByKeyword(m, EXECUTED); + + Map emitted = getMapByKeyword(newStats, EMITTED); + emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)), + getMapByKeyword(accStats, WIN_TO_EMITTED)); + putRawKV(ret, WIN_TO_EMITTED, emitted); + + Map transferred = getMapByKeyword(newStats, TRANSFERRED); + transferred = mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)), + getMapByKeyword(accStats, WIN_TO_TRANSFERRED)); + putRawKV(ret, WIN_TO_TRANSFERRED, transferred); + + putRawKV(ret, WIN_TO_EXEC_LAT_WGT_AVG, mergeWithSum( + getMapByKeyword(accStats, WIN_TO_EXEC_LAT_WGT_AVG), win2execLatWgtAvg)); + putRawKV(ret, WIN_TO_PROC_LAT_WGT_AVG, mergeWithSum( + getMapByKeyword(accStats, WIN_TO_PROC_LAT_WGT_AVG), win2procLatWgtAvg)); + putRawKV(ret, WIN_TO_EXECUTED, mergeWithSum( + getMapByKeyword(accStats, WIN_TO_EXECUTED), win2executed)); + putRawKV(ret, WIN_TO_ACKED, mergeWithSum( + aggregateCountStreams(getMapByKeyword(newStats, ACKED)), getMapByKeyword(accStats, WIN_TO_ACKED))); + putRawKV(ret, WIN_TO_FAILED, mergeWithSum( + aggregateCountStreams(getMapByKeyword(newStats, FAILED)), getMapByKeyword(accStats, WIN_TO_FAILED))); + + return ret; + } + + /** + * A helper function that aggregates windowed stats from one spout executor. + */ + public static Map aggSpoutExecWinStats(Map accStats, Map newStats, boolean includeSys) { + Map ret = new HashMap(); + + Map m = new HashMap(); + for (Object win : getMapByKeyword(newStats, ACKED).keySet()) { + m.put(win, aggSpoutLatAndCount( + (Map) (getMapByKeyword(newStats, COMP_LATENCIES)).get(win), + (Map) (getMapByKeyword(newStats, ACKED)).get(win))); + } + m = swapMapOrder(m); + + Map win2compLatWgtAvg = getMapByKeyword(m, COMP_LAT_TOTAL); + Map win2acked = getMapByKeyword(m, ACKED); + + Map emitted = getMapByKeyword(newStats, EMITTED); + emitted = mergeWithSum(aggregateCountStreams(filterSysStreams(emitted, includeSys)), + getMapByKeyword(accStats, WIN_TO_EMITTED)); + putRawKV(ret, WIN_TO_EMITTED, emitted); + + Map transferred = getMapByKeyword(newStats, TRANSFERRED); + transferred = mergeWithSum(aggregateCountStreams(filterSysStreams(transferred, includeSys)), + getMapByKeyword(accStats, WIN_TO_TRANSFERRED)); + putRawKV(ret, WIN_TO_TRANSFERRED, transferred); + + putRawKV(ret, WIN_TO_COMP_LAT_WGT_AVG, mergeWithSum( + getMapByKeyword(accStats, WIN_TO_COMP_LAT_WGT_AVG), win2compLatWgtAvg)); + putRawKV(ret, WIN_TO_ACKED, mergeWithSum( + getMapByKeyword(accStats, WIN_TO_ACKED), win2acked)); + putRawKV(ret, WIN_TO_FAILED, mergeWithSum( + aggregateCountStreams(getMapByKeyword(newStats, FAILED)), getMapByKeyword(accStats, WIN_TO_FAILED))); + + return ret; + } + + + /** + * aggregate counts + * + * @param countsSeq a seq of {win -> GlobalStreamId -> value} + */ + public static Map aggregateCounts(List countsSeq) { + Map ret = new HashMap(); + for (Object counts : countsSeq) { + for (Object o : ((Map) counts).entrySet()) { + Map.Entry e = (Map.Entry) o; + Object win = e.getKey(); + Map stream2count = (Map) e.getValue(); + + if (!ret.containsKey(win)) { + ret.put(win, stream2count); + } else { + Map existing = (Map) ret.get(win); + for (Object oo : stream2count.entrySet()) { + Map.Entry ee = (Map.Entry) oo; + Object stream = ee.getKey(); + if (!existing.containsKey(stream)) { + existing.put(stream, ee.getValue()); + } else { + existing.put(stream, (Long) ee.getValue() + (Long) existing.get(stream)); + } + } + } + } + } + return ret; + } + + public static Map aggregateCompStats(String window, boolean includeSys, List data, String compType) { + boolean isSpout = SPOUT.equals(compType); + + Map initVal = new HashMap(); + putRawKV(initVal, WIN_TO_ACKED, new HashMap()); + putRawKV(initVal, WIN_TO_FAILED, new HashMap()); + putRawKV(initVal, WIN_TO_EMITTED, new HashMap()); + putRawKV(initVal, WIN_TO_TRANSFERRED, new HashMap()); + + Map stats = new HashMap(); + putRawKV(stats, EXECUTOR_STATS, new ArrayList()); + putRawKV(stats, SID_TO_OUT_STATS, new HashMap()); + if (isSpout) { + putRawKV(initVal, TYPE, KW_SPOUT); + putRawKV(initVal, WIN_TO_COMP_LAT_WGT_AVG, new HashMap()); + } else { + putRawKV(initVal, TYPE, KW_BOLT); + putRawKV(initVal, WIN_TO_EXECUTED, new HashMap()); + putRawKV(stats, CID_SID_TO_IN_STATS, new HashMap()); + putRawKV(initVal, WIN_TO_EXEC_LAT_WGT_AVG, new HashMap()); + putRawKV(initVal, WIN_TO_PROC_LAT_WGT_AVG, new HashMap()); + } + putRawKV(initVal, STATS, stats); + + for (Object o : data) { + initVal = aggCompExecStats(window, includeSys, initVal, (Map) o, compType); + } + + return initVal; + } + + /** + * Combines the aggregate stats of one executor with the given map, selecting + * the appropriate window and including system components as specified. + */ + public static Map aggCompExecStats(String window, boolean includeSys, Map accStats, Map newData, String compType) { + Map ret = new HashMap(); + if (SPOUT.equals(compType)) { + ret.putAll(aggSpoutExecWinStats(accStats, getMapByKeyword(newData, STATS), includeSys)); + putRawKV(ret, STATS, mergeAggCompStatsCompPageSpout( + getMapByKeyword(accStats, STATS), + aggPreMergeCompPageSpout(newData, window, includeSys))); + } else { + ret.putAll(aggBoltExecWinStats(accStats, getMapByKeyword(newData, STATS), includeSys)); + putRawKV(ret, STATS, mergeAggCompStatsCompPageBolt( + getMapByKeyword(accStats, STATS), + aggPreMergeCompPageBolt(newData, window, includeSys))); + } + putRawKV(ret, TYPE, keyword(compType)); + + return ret; + } + + public static Map postAggregateCompStats(Map task2component, Map exec2hostPort, Map accData) { + Map ret = new HashMap(); + + String compType = ((Keyword) getByKeyword(accData, TYPE)).getName(); + Map stats = getMapByKeyword(accData, STATS); + Integer numTasks = getByKeywordOr0(stats, NUM_TASKS).intValue(); + Integer numExecutors = getByKeywordOr0(stats, NUM_EXECUTORS).intValue(); + Map outStats = getMapByKeyword(stats, SID_TO_OUT_STATS); + + putRawKV(ret, TYPE, keyword(compType)); + putRawKV(ret, NUM_TASKS, numTasks); + putRawKV(ret, NUM_EXECUTORS, numExecutors); + putRawKV(ret, EXECUTOR_STATS, getByKeyword(stats, EXECUTOR_STATS)); + putRawKV(ret, WIN_TO_EMITTED, mapKeyStr(getMapByKeyword(accData, WIN_TO_EMITTED))); + putRawKV(ret, WIN_TO_TRANSFERRED, mapKeyStr(getMapByKeyword(accData, WIN_TO_TRANSFERRED))); + putRawKV(ret, WIN_TO_ACKED, mapKeyStr(getMapByKeyword(accData, WIN_TO_ACKED))); + putRawKV(ret, WIN_TO_FAILED, mapKeyStr(getMapByKeyword(accData, WIN_TO_FAILED))); + + if (BOLT.equals(compType)) { + Map inStats = getMapByKeyword(stats, CID_SID_TO_IN_STATS); + + Map inStats2 = new HashMap(); + for (Object o : inStats.entrySet()) { + Map.Entry e = (Map.Entry) o; + Object k = e.getKey(); + Map v = (Map) e.getValue(); + long executed = getByKeywordOr0(v, EXECUTED).longValue(); + if (executed > 0) { + double executeLatencyTotal = getByKeywordOr0(v, EXEC_LAT_TOTAL).doubleValue(); + double processLatencyTotal = getByKeywordOr0(v, PROC_LAT_TOTAL).doubleValue(); + putRawKV(v, EXEC_LATENCY, executeLatencyTotal / executed); + putRawKV(v, PROC_LATENCY, processLatencyTotal / executed); + } else { + putRawKV(v, EXEC_LATENCY, 0.0); + putRawKV(v, PROC_LATENCY, 0.0); + } + removeByKeyword(v, EXEC_LAT_TOTAL); + removeByKeyword(v, PROC_LAT_TOTAL); + inStats2.put(k, v); + } + putRawKV(ret, CID_SID_TO_IN_STATS, inStats2); + + putRawKV(ret, SID_TO_OUT_STATS, outStats); + putRawKV(ret, WIN_TO_EXECUTED, mapKeyStr(getMapByKeyword(accData, WIN_TO_EXECUTED))); + putRawKV(ret, WIN_TO_EXEC_LAT, computeWeightedAveragesPerWindow( + accData, WIN_TO_EXEC_LAT_WGT_AVG, WIN_TO_EXECUTED)); + putRawKV(ret, WIN_TO_PROC_LAT, computeWeightedAveragesPerWindow( + accData, WIN_TO_PROC_LAT_WGT_AVG, WIN_TO_EXECUTED)); + } else { + Map outStats2 = new HashMap(); + for (Object o : outStats.entrySet()) { + Map.Entry e = (Map.Entry) o; + Object k = e.getKey(); + Map v = (Map) e.getValue(); + long acked = getByKeywordOr0(v, ACKED).longValue(); + if (acked > 0) { + double compLatencyTotal = getByKeywordOr0(v, COMP_LAT_TOTAL).doubleValue(); + putRawKV(v, COMP_LATENCY, compLatencyTotal / acked); + } else { + putRawKV(v, COMP_LATENCY, 0.0); + } + removeByKeyword(v, COMP_LAT_TOTAL); + outStats2.put(k, v); + } + putRawKV(ret, SID_TO_OUT_STATS, outStats2); + putRawKV(ret, WIN_TO_COMP_LAT, computeWeightedAveragesPerWindow( + accData, WIN_TO_COMP_LAT_WGT_AVG, WIN_TO_ACKED)); + } + + return ret; + } + + /** + * called in nimbus.clj + */ + public static ComponentPageInfo aggCompExecsStats( + Map exec2hostPort, Map task2component, Map beats, String window, boolean includeSys, + String topologyId, StormTopology topology, String componentId) { + + List beatList = extractDataFromHb(exec2hostPort, task2component, beats, includeSys, topology, componentId); + Map compStats = aggregateCompStats(window, includeSys, beatList, componentType(topology, componentId).getName()); + compStats = postAggregateCompStats(task2component, exec2hostPort, compStats); + return thriftifyCompPageData(topologyId, topology, componentId, compStats); + } + + + // ===================================================================================== + // clojurify stats methods + // ===================================================================================== + + /** + * called in converter.clj + */ + public static Map clojurifyStats(Map stats) { + Map ret = new HashMap(); + for (Object o : stats.entrySet()) { + Map.Entry entry = (Map.Entry) o; + ExecutorInfo executorInfo = (ExecutorInfo) entry.getKey(); + ExecutorStats executorStats = (ExecutorStats) entry.getValue(); + + ret.put(Lists.newArrayList(executorInfo.get_task_start(), executorInfo.get_task_end()), + clojurifyExecutorStats(executorStats)); + } + return ret; + } + + public static Map clojurifyExecutorStats(ExecutorStats stats) { + Map ret = new HashMap(); + + putRawKV(ret, EMITTED, stats.get_emitted()); + putRawKV(ret, TRANSFERRED, stats.get_transferred()); + putRawKV(ret, "rate", stats.get_rate()); + + if (stats.get_specific().is_set_bolt()) { + mergeMaps(ret, clojurifySpecificStats(stats.get_specific().get_bolt())); + putRawKV(ret, TYPE, KW_BOLT); + } else { + mergeMaps(ret, clojurifySpecificStats(stats.get_specific().get_spout())); + putRawKV(ret, TYPE, KW_SPOUT); + } + + return ret; + } + + public static Map clojurifySpecificStats(SpoutStats stats) { + Map ret = new HashMap(); + putRawKV(ret, ACKED, stats.get_acked()); + putRawKV(ret, FAILED, stats.get_failed()); + putRawKV(ret, COMP_LATENCIES, stats.get_complete_ms_avg()); + + return ret; + } + + public static Map clojurifySpecificStats(BoltStats stats) { + Map ret = new HashMap(); + + Map acked = windowSetConverter(stats.get_acked(), FROM_GSID, IDENTITY); + Map failed = windowSetConverter(stats.get_failed(), FROM_GSID, IDENTITY); + Map processAvg = windowSetConverter(stats.get_process_ms_avg(), FROM_GSID, IDENTITY); + Map executed = windowSetConverter(stats.get_executed(), FROM_GSID, IDENTITY); + Map executeAvg = windowSetConverter(stats.get_execute_ms_avg(), FROM_GSID, IDENTITY); + + putRawKV(ret, ACKED, acked); + putRawKV(ret, FAILED, failed); + putRawKV(ret, PROC_LATENCIES, processAvg); + putRawKV(ret, EXECUTED, executed); + putRawKV(ret, EXEC_LATENCIES, executeAvg); + + return ret; + } + + /** + * caller: nimbus.clj + */ + public static List extractNodeInfosFromHbForComp( + Map exec2hostPort, Map task2component, boolean includeSys, String compId) { + List ret = new ArrayList(); + + Set<List> hostPorts = new HashSet<>(); + for (Object o : exec2hostPort.entrySet()) { + Map.Entry entry = (Map.Entry) o; + List key = (List) entry.getKey(); + List value = (List) entry.getValue(); + + Integer start = ((Number) key.get(0)).intValue(); + String host = (String) value.get(0); + Integer port = (Integer) value.get(1); + String comp = (String) task2component.get(start); + if ((compId == null || compId.equals(comp)) && (includeSys || !Utils.isSystemId(comp))) { + hostPorts.add(Lists.newArrayList(host, port)); + } + } + + for (List hostPort : hostPorts) { + Map m = new HashMap(); + putRawKV(m, HOST, hostPort.get(0)); + putRawKV(m, PORT, hostPort.get(1)); + ret.add(m); + } + + return ret; + } + + public static List extractDataFromHb(Map executor2hostPort, Map task2component, Map beats, + boolean includeSys, StormTopology topology) { + return extractDataFromHb(executor2hostPort, task2component, beats, includeSys, topology, null); + } + + public static List extractDataFromHb(Map executor2hostPort, Map task2component, Map beats, + boolean includeSys, StormTopology topology, String compId) { + List ret = new ArrayList(); + for (Object o : executor2hostPort.entrySet()) { + Map.Entry entry = (Map.Entry) o; + List key = (List) entry.getKey(); + List value = (List) entry.getValue(); + + Integer start = ((Number) key.get(0)).intValue(); + Integer end = ((Number) key.get(1)).intValue(); + + String host = (String) value.get(0); + Integer port = ((Number) value.get(1)).intValue(); + + Map beat = (Map) beats.get(key); + if (beat == null) { + continue; + } + String id = (String) task2component.get(start); + + Map m = new HashMap(); + if ((compId == null || compId.equals(id)) && (includeSys || !Utils.isSystemId(id))) { + putRawKV(m, "exec-id", entry.getKey()); + putRawKV(m, "comp-id", id); + putRawKV(m, NUM_TASKS, end - start + 1); + putRawKV(m, HOST, host); + putRawKV(m, PORT, port); + putRawKV(m, UPTIME, beat.get(keyword(UPTIME))); + putRawKV(m, STATS, beat.get(keyword(STATS))); + + Keyword type = componentType(topology, compId); + if (type != null) { + putRawKV(m, TYPE, type); + } else { + putRawKV(m, TYPE, getByKeyword(getMapByKeyword(beat, STATS), TYPE)); + } + ret.add(m); + } + } + return ret; + } + + private static Map computeWeightedAveragesPerWindow(Map accData, String wgtAvgKey, String divisorKey) { + Map ret = new HashMap(); + for (Object o : getMapByKeyword(accData, wgtAvgKey).entrySet()) { + Map.Entry e = (Map.Entry) o; + Object window = e.getKey(); + double wgtAvg = ((Number) e.getValue()).doubleValue(); + long divisor = ((Number) getMapByKeyword(accData, divisorKey).get(window)).longValue(); + if (divisor > 0) { + ret.put(window.toString(), wgtAvg / divisor); + } + } + return ret; + } + + + /** + * caller: core.clj + * + * @param executorSumms a list of ExecutorSummary + * @return max bolt capacity + */ + public static double computeBoltCapacity(List executorSumms) { + double max = 0.0; + for (Object o : executorSumms) { + ExecutorSummary summary = (ExecutorSummary) o; + double capacity = computeExecutorCapacity(summary); + if (capacity > max) { + max = capacity; + } + } + return max; + } + + public static double computeExecutorCapacity(ExecutorSummary summ) { + ExecutorStats stats = summ.get_stats(); + if (stats == null) { + return 0.0; + } else { + Map m = aggregateBoltStats(Lists.newArrayList(stats), true); + m = swapMapOrder(aggregateBoltStreams(m)); + Map data = getMapByKeyword(m, TEN_MIN_IN_SECONDS_STR); + + int uptime = summ.get_uptime_secs(); + int win = Math.min(uptime, TEN_MIN_IN_SECONDS); + long executed = getByKeywordOr0(data, EXECUTED).longValue(); + double latency = getByKeywordOr0(data, EXEC_LATENCIES).doubleValue(); + if (win > 0) { + return executed * latency / (1000 * win); + } + return 0.0; + } + } + + /** + * filter ExecutorSummary whose stats is null + * + * @param summs a list of ExecutorSummary + * @return filtered summs + */ + public static List getFilledStats(List summs) { + for (Iterator itr = summs.iterator(); itr.hasNext(); ) { + ExecutorSummary summ = (ExecutorSummary) itr.next(); + if (summ.get_stats() == null) { + itr.remove(); + } + } + return summs; + } + + private static Map mapKeyStr(Map m) { + Map ret = new HashMap(); + for (Object k : m.keySet()) { + ret.put(k.toString(), m.get(k)); + } + return ret; + } + + private static long sumStreamsLong(Map m, String key) { + long sum = 0; + if (m == null) { + return sum; + } + for (Object v : m.values()) { + Map sub = (Map) v; + for (Object o : sub.entrySet()) { + Map.Entry e = (Map.Entry) o; + if (((Keyword) e.getKey()).getName().equals(key)) { + sum += ((Number) e.getValue()).longValue(); + } + } + } + return sum; + } + + private static double sumStreamsDouble(Map m, String key) { + double sum = 0; + if (m == null) { + return sum; + } + for (Object v : m.values()) { + Map sub = (Map) v; + for (Object o : sub.entrySet()) { + Map.Entry e = (Map.Entry) o; + if (((Keyword) e.getKey()).getName().equals(key)) { + sum += ((Number) e.getValue()).doubleValue(); + } + } + } + return sum; + } + + /** + * same as clojure's (merge-with merge m1 m2) + */ + private static Map mergeMaps(Map m1, Map m2) { + if (m2 == null) { + return m1; + } + for (Object o : m2.entrySet()) { + Map.Entry entry = (Map.Entry) o; + Object k = entry.getKey(); + + Map existing = (Map) m1.get(k); + if (existing == null) { + m1.put(k, entry.getValue()); + } else { + existing.putAll((Map) m2.get(k)); + } + } + return m1; + } + + /** + * filter system streams from stats + * + * @param stats { win -> stream id -> value } + * @param includeSys whether to filter system streams + * @return filtered stats + */ + private static Map filterSysStreams(Map stats, boolean includeSys) { + if (!includeSys) { + for (Object win : stats.keySet()) { + Map stream2stat = (Map) stats.get(win); + for (Iterator itr = stream2stat.keySet().iterator(); itr.hasNext(); ) { + Object key = itr.next(); + if (key instanceof String && Utils.isSystemId((String) key)) { + itr.remove(); + } + } + } + } + return stats; + } + + /** + * equals to clojure's: (merge-with (partial merge-with sum-or-0) acc-out spout-out) + */ + private static Map fullMergeWithSum(Map m1, Map m2) { + Set<Object> allKeys = new HashSet<>(); + if (m1 != null) { + allKeys.addAll(m1.keySet()); + } + if (m2 != null) { + allKeys.addAll(m2.keySet()); + } + + Map ret = new HashMap(); + for (Object k : allKeys) { + Map mm1 = null, mm2 = null; + if (m1 != null) { + mm1 = (Map) m1.get(k); + } + if (m2 != null) { + mm2 = (Map) m2.get(k); + } + ret.put(k, mergeWithSum(mm1, mm2)); + } + + return ret; + } + + private static Map mergeWithSum(Map m1, Map m2) { + Map ret = new HashMap(); + + Set<Object> allKeys = new HashSet<>(); + if (m1 != null) { + allKeys.addAll(m1.keySet()); + } + if (m2 != null) { + allKeys.addAll(m2.keySet()); + } + + for (Object k : allKeys) { + Number n1 = getOr0(m1, k); + Number n2 = getOr0(m2, k); + ret.put(k, add(n1, n2)); + } + return ret; + } + + /** + * this method merges 2 two-level-deep maps, which is different from mergeWithSum, and we expect the two maps + * have the same keys + */ + private static Map mergeWithAddPair(Map m1, Map m2) { + Map ret = new HashMap(); + + Set<Object> allKeys = new HashSet<>(); + if (m1 != null) { + allKeys.addAll(m1.keySet()); + } + if (m2 != null) { + allKeys.addAll(m2.keySet()); + } + + for (Object k : allKeys) { + Map mm1 = (m1 != null) ? (Map) m1.get(k) : null; + Map mm2 = (m2 != null) ? (Map) m2.get(k) : null; + if (mm1 == null && mm2 == null) { + continue; + } else if (mm1 == null) { + ret.put(k, mm2); + } else if (mm2 == null) { + ret.put(k, mm1); + } else { + Map tmp = new HashMap(); + for (Object kk : mm1.keySet()) { + List seq1 = (List) mm1.get(kk); + List seq2 = (List) mm2.get(kk); + List sums = new ArrayList(); + for (int i = 0; i < seq1.size(); i++) { + sums.add(add((Number) seq1.get(i), (Number) seq2.get(i))); + } + tmp.put(kk, sums); + } + ret.put(k, tmp); + } + } + return ret; + } + + // ===================================================================================== + // thriftify stats methods + // ===================================================================================== + + private static TopologyPageInfo thriftifyTopoPageData(String topologyId, Map data) { + TopologyPageInfo ret = new TopologyPageInfo(topologyId); + Integer numTasks = getByKeywordOr0(data, NUM_TASKS).intValue(); + Integer numWorkers = getByKeywordOr0(data, NUM_WORKERS).intValue(); + Integer numExecutors = getByKeywordOr0(data, NUM_EXECUTORS).intValue(); + Map spout2stats = getMapByKeyword(data, SPOUT_TO_STATS); + Map bolt2stats = getMapByKeyword(data, BOLT_TO_STATS); + Map win2emitted = getMapByKeyword(data, WIN_TO_EMITTED); + Map win2transferred = getMapByKeyword(data, WIN_TO_TRANSFERRED); + Map win2compLatency = getMapByKeyword(data, WIN_TO_COMP_LAT); + Map win2acked = getMapByKeyword(data, WIN_TO_ACKED); + Map win2failed = getMapByKeyword(data, WIN_TO_FAILED); + + Map<String, ComponentAggregateStats> spoutAggStats = new HashMap<>(); + for (Object o : spout2stats.entrySet()) { + Map.Entry e = (Map.Entry) o; + String id = (String) e.getKey(); + Map v = (Map) e.getValue(); + putRawKV(v, TYPE, KW_SPOUT); + + spoutAggStats.put(id, thriftifySpoutAggStats(v)); + } + + Map<String, ComponentAggregateStats> boltAggStats = new HashMap<>(); + for (Object o : bolt2stats.entrySet()) { + Map.Entry e = (Map.Entry) o; + String id = (String) e.getKey(); + Map v = (Map) e.getValue(); + putRawKV(v, TYPE, KW_BOLT); + + boltAggStats.put(id, thriftifyBoltAggStats(v)); + } + + TopologyStats topologyStats = new TopologyStats(); + topologyStats.set_window_to_acked(win2acked); + topologyStats.set_window_to_emitted(win2emitted); + topologyStats.set_window_to_failed(win2failed); + topologyStats.set_window_to_transferred(win2transferred); + topologyStats.set_window_to_complete_latencies_ms(win2compLatency); + + ret.set_num_tasks(numTasks); + ret.set_num_workers(numWorkers); + ret.set_num_executors(numExecutors); + ret.set_id_to_spout_agg_stats(spoutAggStats); + ret.set_id_to_bolt_agg_stats(boltAggStats); + ret.set_topology_stats(topologyStats); + + return ret; + } + + private static ComponentAggregateStats thriftifySpoutAggStats(Map m) { + ComponentAggregateStats stats = new ComponentAggregateStats(); + stats.set_type(ComponentType.SPOUT); + stats.set_last_error((ErrorInfo) getByKeyword(m, LAST_ERROR)); + thriftifyCommonAggStats(stats, m); + + SpoutAggregateStats spoutAggStats = new SpoutAggregateStats(); + spoutAggStats.set_complete_latency_ms(getByKeywordOr0(m, COMP_LATENCY).doubleValue()); + SpecificAggregateStats specificStats = SpecificAggregateStats.spout(spoutAggStats); + + stats.set_specific_stats(specificStats); + return stats; + } + + private static ComponentAggregateStats thriftifyBoltAggStats(Map m) { + ComponentAggregateStats stats = new ComponentAggregateStats(); + stats.set_type(ComponentType.BOLT); + stats.set_last_error((ErrorInfo) getByKeyword(m, LAST_ERROR)); + thriftifyCommonAggStats(stats, m); + + BoltAggregateStats boltAggStats = new BoltAggregateStats(); + boltAggStats.set_execute_latency_ms(getByKeywordOr0(m, EXEC_LATENCY).doubleValue()); + boltAggStats.set_process_latency_ms(getByKeywordOr0(m, PROC_LATENCY).doubleValue()); + boltAggStats.set_executed(getByKeywordOr0(m, EXECUTED).longValue()); + boltAggStats.set_capacity(getByKeywordOr0(m, CAPACITY).doubleValue()); + SpecificAggregateStats specificStats = SpecificAggregateStats.bolt(boltAggStats); + + stats.set_specific_stats(specificStats); + return stats; + } + + private static ExecutorAggregateStats thriftifyExecAggStats(String compId, Keyword compType, Map m) { + ExecutorAggregateStats stats = new ExecutorAggregateStats(); + + ExecutorSummary executorSummary = new ExecutorSummary(); + List executor = (List) getByKeyword(m, EXECUTOR_ID); + executorSummary.set_executor_info(new ExecutorInfo(((Number) executor.get(0)).intValue(), + ((Number) executor.get(1)).intValue())); + executorSummary.set_component_id(compId); + executorSummary.set_host((String) getByKeyword(m, HOST)); + executorSummary.set_port(getByKeywordOr0(m, PORT).intValue()); + int uptime = getByKeywordOr0(m, UPTIME).intValue(); + executorSummary.set_uptime_secs(uptime); + stats.set_exec_summary(executorSummary); + + if (compType.getName().equals(SPOUT)) { + stats.set_stats(thriftifySpoutAggStats(m)); + } else { + stats.set_stats(thriftifyBoltAggStats(m)); + } + + return stats; + } + + private static Map thriftifyBoltOutputStats(Map id2outStats) { + Map ret = new HashMap(); + for (Object k : id2outStats.keySet()) { + ret.put(k, thriftifyBoltAggStats((Map) id2outStats.get(k))); + } + return ret; + } + + private static Map thriftifySpoutOutputStats(Map id2outStats) { + Map ret = new HashMap(); + for (Object k : id2outStats.keySet()) { + ret.put(k, thriftifySpoutAggStats((Map) id2outStats.get(k))); + } + return ret; + } + + private static Map thriftifyBoltInputStats(Map cidSid2inputStats) { + Map ret = new HashMap(); + for (Object e : cidSid2inputStats.entrySet()) { + Map.Entry entry = (Map.Entry) e; + ret.put(toGlobalStreamId((List) entry.getKey()), + thriftifyBoltAggStats((Map) entry.getValue())); + } + return ret; + } + + private static ComponentAggregateStats thriftifyCommonAggStats(ComponentAggregateStats stats, Map m) { + CommonAggregateStats commonStats = new CommonAggregateStats(); + commonStats.set_num_tasks(getByKeywordOr0(m, NUM_TASKS).intValue()); + commonStats.set_num_executors(getByKeywordOr0(m, NUM_EXECUTORS).intValue()); + commonStats.set_emitted(getByKeywordOr0(m, EMITTED).longValue()); + commonStats.set_transferred(getByKeywordOr0(m, TRANSFERRED).longValue()); + commonStats.set_acked(getByKeywordOr0(m, ACKED).longValue()); + commonStats.set_failed(getByKeywordOr0(m, FAILED).longValue()); + + stats.set_common_stats(commonStats); + return stats; + } + + private static ComponentPageInfo thriftifyCompPageData( + String topologyId, StormTopology topology, String compId, Map data) { + ComponentPageInfo ret = new ComponentPageInfo(); + ret.set_component_id(compId); + + Map win2stats = new HashMap(); + putRawKV(win2stats, EMITTED, getMapByKeyword(data, WIN_TO_EMITTED)); + putRawKV(win2stats, TRANSFERRED, getMapByKeyword(data, WIN_TO_TRANSFERRED)); + putRawKV(win2stats, ACKED, getMapByKeyword(data, WIN_TO_ACKED)); + putRawKV(win2stats, FAILED, getMapByKeyword(data, WIN_TO_FAILED)); + + Keyword type = (Keyword) getByKeyword(data, TYPE); + String compType = type.getName(); + if (compType.equals(SPOUT)) { + ret.set_component_type(ComponentType.SPOUT); + putRawKV(win2stats, COMP_LATENCY, getMapByKeyword(data, WIN_TO_COMP_LAT)); + } else { + ret.set_component_type(ComponentType.BOLT); + putRawKV(win2stats, EXEC_LATENCY, getMapByKeyword(data, WIN_TO_EXEC_LAT)); + putRawKV(win2stats, PROC_LATENCY, getMapByKeyword(data, WIN_TO_PROC_LAT)); + putRawKV(win2stats, EXECUTED, getMapByKeyword(data, WIN_TO_EXECUTED)); + } + win2stats = swapMapOrder(win2stats); + + List<ExecutorAggregateStats> execStats = new ArrayList<>(); + List executorStats = (List) getByKeyword(data, EXECUTOR_STATS); + if (executorStats != null) { + for (Object o : executorStats) { + execStats.add(thriftifyExecAggStats(compId, type, (Map) o)); + } + } + + Map gsid2inputStats, sid2outputStats; + if (compType.equals(SPOUT)) { + Map tmp = new HashMap(); + for (Object k : win2stats.keySet()) { + tmp.put(k, thriftifySpoutAggStats((Map) win2stats.get(k))); + } + win2stats = tmp; + gsid2inputStats = null; + sid2outputStats = thriftifySpoutOutputStats(getMapByKeyword(data, SID_TO_OUT_STATS)); + } else { + Map tmp = new HashMap(); + for (Object k : win2stats.keySet()) { + tmp.put(k, thriftifyBoltAggStats((Map) win2stats.get(k))); + } + win2stats = tmp; + gsid2inputStats = thriftifyBoltInputStats(getMapByKeyword(data, CID_SID_TO_IN_STATS)); + sid2outputStats = thriftifyBoltOutputStats(getMapByKeyword(data, SID_TO_OUT_STATS)); + } + ret.set_num_executors(getByKeywordOr0(data, NUM_EXECUTORS).intValue()); + ret.set_num_tasks(getByKeywordOr0(data, NUM_TASKS).intValue()); + ret.set_topology_id(topologyId); + ret.set_topology_name(null); + ret.set_window_to_stats(win2stats); + ret.set_sid_to_output_stats(sid2outputStats); + ret.set_exec_stats(execStats); + ret.set_gsid_to_input_stats(gsid2inputStats); + + return ret; + } + + /** + * called in converter.clj + */ + public static Map thriftifyStats(List stats) { + Map ret = new HashMap(); + for (Object o : stats) { + List stat = (List) o; + List executor = (List) stat.get(0); + int start = ((Number) executor.get(0)).intValue(); + int end = ((Number) executor.get(1)).intValue(); + Map executorStat = (Map) stat.get(1); + ExecutorInfo executorInfo = new ExecutorInfo(start, end); + ret.put(executorInfo, thriftifyExecutorStats(executorStat)); + } + return ret; + } + + /** + * called in nimbus.clj + */ + public static ExecutorStats thriftifyExecutorStats(Map stats) { + ExecutorStats ret = new ExecutorStats(); + ExecutorSpecificStats specificStats = thriftifySpecificStats(stats); + ret.set_specific(specificStats); + + ret.set_emitted(windowSetConverter(getMapByKeyword(stats, EMITTED), TO_STRING, TO_STRING)); + ret.set_transferred(windowSetConverter(getMapByKeyword(stats, TRANSFERRED), TO_STRING, TO_STRING)); + ret.set_rate(((Number) getByKeyword(stats, "rate")).doubleValue()); + + return ret; + } + + private static ExecutorSpecificStats thriftifySpecificStats(Map stats) { + ExecutorSpecificStats specificStats = new ExecutorSpecificStats(); + + String compType = ((Keyword) getByKeyword(stats, TYPE)).getName(); + if (BOLT.equals(compType)) { + BoltStats boltStats = new BoltStats(); + boltStats.set_acked(windowSetConverter(getMapByKeyword(stats, ACKED), TO_GSID, TO_STRING)); + boltStats.set_executed(windowSetConverter(getMapByKeyword(stats, EXECUTED), TO_GSID, TO_STRING)); + boltStats.set_execute_ms_avg(windowSetConverter(getMapByKeyword(stats, EXEC_LATENCIES), TO_GSID, TO_STRING)); + boltStats.set_failed(windowSetConverter(getMapByKeyword(stats, FAILED), TO_GSID, TO_STRING)); + boltStats.set_process_ms_avg(windowSetConverter(getMapByKeyword(stats, PROC_LATENCIES), TO_GSID, TO_STRING)); + specificStats.set_bolt(boltStats); + } else { + SpoutStats spoutStats = new SpoutStats(); + spoutStats.set_acked(windowSetConverter(getMapByKeyword(stats, ACKED), TO_STRING, TO_STRING)); + spoutStats.set_failed(windowSetConverter(getMapByKeyword(stats, FAILED), TO_STRING, TO_STRING)); + spoutStats.set_complete_ms_avg(windowSetConverter(getMapByKeyword(stats, COMP_LATENCIES), TO_STRING, TO_STRING)); + specificStats.set_spout(spoutStats); + } + return specificStats; + } + + + // ===================================================================================== + // helper methods + // ===================================================================================== + + private static GlobalStreamId toGlobalStreamId(List list) { + return new GlobalStreamId((String) list.get(0), (String) list.get(1)); + } + + /** + * Returns true if x is a number that is not NaN or Infinity, false otherwise + */ + private static boolean isValidNumber(Object x) { + return x != null && x instanceof Number && + !Double.isNaN(((Number) x).doubleValue()) && + !Double.isInfinite(((Number) x).doubleValue()); + } + + /** + * the value of m is as follows: + * <pre> + * #org.apache.storm.stats.CommonStats { + * :executed { + * ":all-time" {["split" "default"] 18727460}, + * "600" {["split" "default"] 11554}, + * "10800" {["split" "default"] 207269}, + * "86400" {["split" "default"] 1659614}}, + * :execute-latencies { + * ":all-time" {["split" "default"] 0.5874528633354443}, + * "600" {["split" "default"] 0.6140350877192983}, + * "10800" {["split" "default"] 0.5864434687156971}, + * "86400" {["split" "default"] 0.5815376460556336}} + * } + * </pre> + */ + private static double computeAggCapacity(Map m, Integer uptime) { + if (uptime != null) { + Map execAvg = (Map) ((Map) getByKeyword(m, EXEC_LATENCIES)).get(TEN_MIN_IN_SECONDS_STR); + Map exec = (Map) ((Map) getByKeyword(m, EXECUTED)).get(TEN_MIN_IN_SECONDS_STR); + + Set<Object> allKeys = new HashSet<>(); + if (execAvg != null) { + allKeys.addAll(execAvg.keySet()); + } + if (exec != null) { + allKeys.addAll(exec.keySet()); + } + + double totalAvg = 0; + for (Object k : allKeys) { + double avg = getOr0(execAvg, k).doubleValue(); + long cnt = getOr0(exec, k).longValue(); + totalAvg += avg * cnt; + } + return totalAvg / (Math.min(uptime, TEN_MIN_IN_SECONDS) * 1000); + } + return 0.0; + } + + private static Number getOr0(Map m, Object k) { + if (m == null) { + return 0; + } + + Number n = (Number) m.get(k); + if (n == null) { + return 0; + } + return n; + } + + private static Number getByKeywordOr0(Map m, String k) { + if (m == null) { + return 0; + } + + Number n = (Number) m.get(keyword(k)); + if (n == null) { + return 0; + } + return n; + } + + private static Double weightAvgAndSum(Map id2Avg, Map id2num) { + double ret = 0; + if (id2Avg == null || id2num == null) { + return ret; + } + + for (Object o : id2Avg.entrySet()) { + Map.Entry entry = (Map.Entry) o; + Object k = entry.getKey(); + double v = ((Number) entry.getValue()).doubleValue(); + long n = ((Number) id2num.get(k)).longValue(); + ret += productOr0(v, n); + } + return ret; + } + + private static double weightAvg(Map id2Avg, Map id2num, Object key) { + if (id2Avg == null || id2num == null) { + return 0.0; + } + return productOr0(id2Avg.get(key), id2num.get(key)); + } + + public static Keyword componentType(StormTopology topology, String compId) { + if (compId == null) { + return null; + } + + Map<String, Bolt> bolts = topology.get_bolts(); + if (Utils.isSystemId(compId) || bolts.containsKey(compId)) { + return KW_BOLT; + } + return KW_SPOUT; + } + + public static void putRawKV(Map map, String k, Object v) { + map.put(keyword(k), v); + } + + private static void removeByKeyword(Map map, String k) { + map.remove(keyword(k)); + } + + public static Object getByKeyword(Map map, String key) { + return map.get(keyword(key)); + } + + public static Map getMapByKeyword(Map map, String key) { + if (map == null) { + return null; + } + return (Map) map.get(keyword(key)); + } + + private static Number add(Number n1, Number n2) { + if (n1 instanceof Long || n1 instanceof Integer) { + return n1.longValue() + n2.longValue(); + } + return n1.doubleValue() + n2.doubleValue(); + } + + private static long sumValues(Map m) { + long ret = 0L; + if (m == null) { + return ret; + } + + for (Object o : m.values()) { + ret += ((Number) o).longValue(); + } + return ret; + } + + private static Number sumOr0(Object a, Object b) { + if (isValidNumber(a) && isValidNumber(b)) { + if (a instanceof Long || a instanceof Integer) { + return ((Number) a).longValue() + ((Number) b).longValue(); + } else { + return ((Number) a).doubleValue() + ((Number) b).doubleValue(); + } + } + return 0; + } + + private static double productOr0(Object a, Object b) { + if (isValidNumber(a) && isValidNumber(b)) { + return ((Number) a).doubleValue() * ((Number) b).doubleValue(); + } + return 0; + } + + private static double maxOr0(Object a, Object b) { + if (isValidNumber(a) && isValidNumber(b)) { + return Math.max(((Number) a).doubleValue(), ((Number) b).doubleValue()); + } + return 0; + } + + /** + * For a nested map, rearrange data such that the top-level keys become the + * nested map's keys and vice versa. + * Example: + * {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}} + * -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}" + */ + private static Map swapMapOrder(Map m) { + if (m.size() == 0) { + return m; + } + + Map ret = new HashMap(); + for (Object k1 : m.keySet()) { + Map v = (Map) m.get(k1); + if (v != null) { + for (Object k2 : v.keySet()) { + Map subRet = (Map) ret.get(k2); + if (subRet == null) { + subRet = new HashMap(); + ret.put(k2, subRet); + } + subRet.put(k1, v.get(k2)); + } + } + } + return ret; + } + + /** + * @param avgs a PersistentHashMap of values: { win -> GlobalStreamId -> value } + * @param counts a PersistentHashMap of values: { win -> GlobalStreamId -> value } + * @return a PersistentHashMap of values: {win -> GlobalStreamId -> [cnt*avg, cnt]} + */ + private static Map expandAverages(Map avgs, Map counts) { + Map ret = new HashMap(); + + for (Object win : counts.keySet()) { + Map inner = new HashMap(); + + Map stream2cnt = (Map) counts.get(win); + for (Object stream : stream2cnt.keySet()) { + Long cnt = (Long) stream2cnt.get(stream); + Double avg = (Double) ((Map) avgs.get(win)).get(stream); + if (avg == null) { + avg = 0.0; + } + inner.put(stream, Lists.newArrayList(cnt * avg, cnt)); + } + ret.put(win, inner); + } + + return ret; + } + + /** + * first zip the two seqs, then do expand-average, then merge with sum + * + * @param avgSeq list of avgs like: [{win -> Glob
<TRUNCATED>
