fix possible NPE & ClassCastException
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/faaacaee Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/faaacaee Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/faaacaee Branch: refs/heads/master Commit: faaacaee046bfa4f458c19cade678515a021d836 Parents: abe9b67 Author: å«ä¹ <[email protected]> Authored: Tue Mar 1 11:47:51 2016 +0800 Committer: å«ä¹ <[email protected]> Committed: Tue Mar 1 11:47:51 2016 +0800 ---------------------------------------------------------------------- .../jvm/org/apache/storm/stats/StatsUtil.java | 23 +++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/faaacaee/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java index efdf8e0..0ed2af9 100644 --- a/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java +++ b/storm-core/src/jvm/org/apache/storm/stats/StatsUtil.java @@ -578,7 +578,7 @@ public class StatsUtil { Map ret = new HashMap(); putRawKV(ret, NUM_TASKS, task2comp.size()); putRawKV(ret, NUM_WORKERS, ((Set) getByKeyword(accData, WORKERS_SET)).size()); - putRawKV(ret, NUM_EXECUTORS, exec2nodePort.size()); + putRawKV(ret, NUM_EXECUTORS, exec2nodePort != null ? exec2nodePort.size() : 0); Map bolt2stats = getMapByKeyword(accData, BOLT_TO_STATS); Map aggBolt2stats = new HashMap(); @@ -1339,11 +1339,18 @@ public class StatsUtil { */ private static Map filterSysStreams(Map stats, boolean includeSys) { if (!includeSys) { - for (Object win : stats.keySet()) { - Map stream2stat = (Map) stats.get(win); - for (Iterator itr = stream2stat.keySet().iterator(); itr.hasNext(); ) { - Object key = itr.next(); - if (key instanceof String && Utils.isSystemId((String) key)) { + for (Iterator itr = stats.keySet().iterator(); itr.hasNext(); ) { + Object winOrStream = itr.next(); + if (isWindow(winOrStream)) { + Map stream2stat = (Map) stats.get(winOrStream); + for (Iterator subItr = stream2stat.keySet().iterator(); subItr.hasNext(); ) { + Object key = subItr.next(); + if (key instanceof String && Utils.isSystemId((String) key)) { + subItr.remove(); + } + } + } else { + if (winOrStream instanceof String && Utils.isSystemId((String) winOrStream)) { itr.remove(); } } @@ -1352,6 +1359,10 @@ public class StatsUtil { return stats; } + private static boolean isWindow(Object key) { + return key.equals("600") || key.equals("10800") || key.equals("86400") || key.equals(":all-time"); + } + /** * equals to clojure's: (merge-with (partial merge-with sum-or-0) acc-out spout-out) */
