Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209381686 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { + @Override + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return super.put(StormMetricsRegistry.name(SUMMARY, key), value); + } + }; + private final Function<String, Histogram> registerHistogram = (name) -> { + //This histogram reflects the data distribution across only one ClusterSummary, i.e., + // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. + // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + clusterSummaryMetrics.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + + /** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.debug("the new summary is {}", newSummary); + //Force update histogram upon each cache refresh + //This behavior relies on the fact that most common implementation of Reporter --- End diff -- How about making a repeating timer task that fetches the cluster summary, updates the histograms, and updates fields that are then read by the gauges? The reason I don't like this is that it makes a lot of assumptions about the internals of code we don't control. If the reporter looks at histograms before gauges, you get the issue you mention (and keep in mind anyone can implement a reporter). If the DerivativeGauge were implemented differently (e.g. if the parent gauge pushed the value to the child, rather than the other way around) this also would break.
---