Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r208991078
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
     
         }
     
    +    //enum NotPorted {
    +    //    //Declared in StormConf. I don't see the value in reporting so.
    +    //    SUPERVISOR_TOTAL_RESOURCE,
    +    //    //May be able to aggregate based on status;
    +    //    TOPOLOGY_STATUS,
    +    //    TOPOLOGY_SCHED_STATUS,
    +    //    //May be aggregated, as well as other distinct values
    +    //    NUM_DISTINCT_NIMBUS_VERSION;
    +    //}
    +
    +    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
    +        static final int CACHING_WINDOW = 5;
    +        static final int PORTED_METRICS = 25;
    +        static final String SUMMARY = "summary";
    +
    +        private final Map<String, com.codahale.metrics.Metric> ported = 
new HashMap<>(PORTED_METRICS);
    +        private final Function<String, Histogram> registerHistogram = 
(name) -> {
    +            final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
    +            ported.put(name, histogram);
    +            return histogram;
    +        };
    +        private volatile boolean active = false;
    +
    +        //NImbus metrics distribution
    +        private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
    +
    +        //Supervisor metrics distribution
    +        private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
    +        private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
    +        private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
    +        private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
    +        private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
    +        private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
    +        private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
    +
    +        //Topology metrics distribution
    +        private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
    +        private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
    +        private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
    +        private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
    +        private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
    +        private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
    +        private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
    +        private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
    +        private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
    +        private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
    +        private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
    +
    +        ClusterSummaryMetricSet() {
    +            //Break the code if out of sync to thrift protocol
    +            assert ClusterSummary._Fields.values().length == 3
    +                && ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
    +                && ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
    +                && ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
    +
    +            final CachedGauge<ClusterSummary> cachedSummary = new 
CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
    +                @Override
    +                protected ClusterSummary loadValue() {
    +                    try {
    +                        if (active) {
    +                            ClusterSummary newSummary = 
getClusterInfoImpl();
    +                            LOG.info("the new summary is {}", newSummary);
    +                            //This is ugly but I can't think of a better 
way to update histogram only once per caching
    +                            // It also kind of depends on the 
implementation that gauges gets updated before histograms
    +                            updateHistogram(newSummary);
    +                            return newSummary;
    +                        } else {
    +                            return null;
    +                        }
    +                    } catch (Exception e) {
    +                        LOG.warn("Get cluster info exception.", e);
    +                        throw new RuntimeException(e);
    +                    }
    +                }
    +            };
    +
    +            ported.put("cluster:num-nimbus-leaders", new 
DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
    +                @Override
    +                protected Long transform(ClusterSummary clusterSummary) {
    +                    return 
clusterSummary.get_nimbuses().stream().filter(NimbusSummary::is_isLeader).count();
    --- End diff --
    
    Will this always be 1?


---

Reply via email to