Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r209365696
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
     
         }
     
    +    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
    +        static final int CACHING_WINDOW = 5;
    +        static final String SUMMARY = "summary";
    +
    +        private final Map<String, com.codahale.metrics.Metric> 
clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
    +            @Override
    +            public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
    +                return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
    +            }
    +        };
    +        private final Function<String, Histogram> registerHistogram = 
(name) -> {
    +            //This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
    +            // data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
    +            // Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
    +            final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
    +            clusterSummaryMetrics.put(name, histogram);
    +            return histogram;
    +        };
    +        private volatile boolean active = false;
    +
    +        //NImbus metrics distribution
    +        private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
    +
    +        //Supervisor metrics distribution
    +        private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
    +        private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
    +        private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
    +        private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
    +        private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
    +        private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
    +        private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
    +
    +        //Topology metrics distribution
    +        private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
    +        private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
    +        private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
    +        private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
    +        private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
    +        private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
    +        private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
    +        private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
    +        private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
    +        private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
    +        private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
    +
    +        /**
    +         * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
    +         * All metrics are derived from a cached ClusterSummary object,
    +         * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
    +         * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
    +         * reporting interval to avoid outdated reporting.
    +         */
    +        ClusterSummaryMetricSet() {
    +            //Break the code if out of sync to thrift protocol
    +            assert ClusterSummary._Fields.values().length == 3
    +                && ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
    +                && ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
    +                && ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
    +
    +            final CachedGauge<ClusterSummary> cachedSummary = new 
CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
    +                @Override
    +                protected ClusterSummary loadValue() {
    +                    try {
    +                        if (active) {
    +                            ClusterSummary newSummary = 
getClusterInfoImpl();
    +                            LOG.debug("the new summary is {}", newSummary);
    +                            //Force update histogram upon each cache 
refresh
    +                            //This behavior relies on the fact that most 
common implementation of Reporter
    --- End diff --
    
    We can also develop a passive histogram that simply wraps a round a gauge. 
It's better to file another Jira then.


---

Reply via email to