Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209365696 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { + @Override + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return super.put(StormMetricsRegistry.name(SUMMARY, key), value); + } + }; + private final Function<String, Histogram> registerHistogram = (name) -> { + //This histogram reflects the data distribution across only one ClusterSummary, i.e., + // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. + // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + clusterSummaryMetrics.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + + /** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.debug("the new summary is {}", newSummary); + //Force update histogram upon each cache refresh + //This behavior relies on the fact that most common implementation of Reporter --- End diff -- We can also develop a passive histogram that simply wraps a round a gauge. It's better to file another Jira then.
---