Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209366165 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { + @Override + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return super.put(StormMetricsRegistry.name(SUMMARY, key), value); + } + }; + private final Function<String, Histogram> registerHistogram = (name) -> { + //This histogram reflects the data distribution across only one ClusterSummary, i.e., + // data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. + // Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + clusterSummaryMetrics.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + + /** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.debug("the new summary is {}", newSummary); + //Force update histogram upon each cache refresh + //This behavior relies on the fact that most common implementation of Reporter --- End diff -- I have another potentially related question: What does this gauge output to the reporter? Is it going to be the toString on the ClusterSummary, or what happens when the value is delivered to the reporter? Is the gauge just here to update the other metrics?
---