Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r209337293
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() {
     
         }
     
    +    private class ClusterSummaryMetricSet implements MetricSet, Runnable {
    +        static final int CACHING_WINDOW = 5;
    +        static final String SUMMARY = "summary";
    +
    +        private final Map<String, com.codahale.metrics.Metric> 
clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
    +            @Override
    +            public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
    +                return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
    +            }
    +        };
    +        private final Function<String, Histogram> registerHistogram = 
(name) -> {
    +            final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
    +            clusterSummaryMetrics.put(name, histogram);
    +            return histogram;
    +        };
    +        private volatile boolean active = false;
    +
    +        //NImbus metrics distribution
    +        private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
    +
    +        //Supervisor metrics distribution
    +        private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
    +        private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
    +        private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
    +        private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
    +        private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
    +        private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
    +        private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
    +
    +        //Topology metrics distribution
    +        private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
    +        private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
    +        private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
    +        private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
    +        private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
    +        private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
    +        private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
    +        private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
    +        private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
    +        private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
    +        private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
    +
    +        ClusterSummaryMetricSet() {
    +            //Break the code if out of sync to thrift protocol
    +            assert ClusterSummary._Fields.values().length == 3
    +                && ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
    +                && ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
    +                && ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
    +
    +            final CachedGauge<ClusterSummary> cachedSummary = new 
CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
    +                @Override
    +                protected ClusterSummary loadValue() {
    +                    try {
    +                        if (active) {
    +                            ClusterSummary newSummary = 
getClusterInfoImpl();
    +                            LOG.info("the new summary is {}", newSummary);
    +                            //This is ugly but I can't think of a better 
way to update histogram only once per caching
    +                            // It also kind of depends on the 
implementation that gauges gets updated before histograms
    +                            updateHistogram(newSummary);
    +                            return newSummary;
    +                        } else {
    +                            return null;
    +                        }
    +                    } catch (Exception e) {
    +                        LOG.warn("Get cluster info exception.", e);
    +                        throw new RuntimeException(e);
    +                    }
    +                }
    +            };
    +
    +            clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new 
DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
    +                @Override
    +                protected Long transform(ClusterSummary clusterSummary) {
    +                    return 
clusterSummary.get_nimbuses().stream().filter(NimbusSummary::is_isLeader).count();
    +                }
    +            });
    +            clusterSummaryMetrics.put("cluster:num-nimbuses", new 
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
    +                @Override
    +                protected Integer transform(ClusterSummary clusterSummary) 
{
    +                    return clusterSummary.get_nimbuses_size();
    +                }
    +            });
    +            clusterSummaryMetrics.put("cluster:num-supervisors", new 
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
    +                @Override
    +                protected Integer transform(ClusterSummary clusterSummary) 
{
    +                    return clusterSummary.get_supervisors_size();
    +                }
    +            });
    +            clusterSummaryMetrics.put("cluster:num-topologies", new 
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
    +                @Override
    +                protected Integer transform(ClusterSummary clusterSummary) 
{
    +                    return clusterSummary.get_topologies_size();
    +                }
    +            });
    +            clusterSummaryMetrics.put("cluster:num-total-workers", new 
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
    +                @Override
    +                protected Integer transform(ClusterSummary clusterSummary) 
{
    +                    return 
clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_workers).sum();
    +                }
    +            });
    +            clusterSummaryMetrics.put("cluster:num-total-used-workers", 
new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
    +                @Override
    +                protected Integer transform(ClusterSummary clusterSummary) 
{
    +                    return 
clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_used_workers).sum();
    +                }
    +            });
    +            
clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new 
DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
    +                @Override
    +                protected Double transform(ClusterSummary clusterSummary) {
    +                    return clusterSummary.get_supervisors().stream()
    +                        //Filtered negative value
    +                        .mapToDouble(supervisorSummary -> 
Math.max(supervisorSummary.get_fragmented_mem(), 0)).sum();
    +                }
    +            });
    +            
clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new 
DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
    +                @Override
    +                protected Double transform(ClusterSummary clusterSummary) {
    +                    return clusterSummary.get_supervisors().stream()
    +                        //Filtered negative value
    +                        .mapToDouble(supervisorSummary -> 
Math.max(supervisorSummary.get_fragmented_cpu(), 0)).sum();
    +                }
    +            });
    +        }
    +
    +        private void updateHistogram(ClusterSummary newSummary) {
    +            for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
    +                nimbusUptime.update(nimbusSummary.get_uptime_secs());
    +            }
    +            for (SupervisorSummary summary : newSummary.get_supervisors()) 
{
    +                supervisorsUptime.update(summary.get_uptime_secs());
    +                supervisorsNumWorkers.update(summary.get_num_workers());
    +                
supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
    +                
supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
    +                
supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
    +                
supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
    +                
supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
    +            }
    +            for (TopologySummary summary : newSummary.get_topologies()) {
    +                topologiesNumTasks.update(summary.get_num_tasks());
    +                topologiesNumExecutors.update(summary.get_num_executors());
    +                topologiesNumWorker.update(summary.get_num_workers());
    +                topologiesUptime.update(summary.get_uptime_secs());
    +                
topologiesReplicationCount.update(summary.get_replication_count());
    +                
topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
    +                
topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
    +                
topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
    +                
topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
    +                
topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
    +                
topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
    +            }
    +        }
    +
    +        //This is not thread safe
    --- End diff --
    
    Like I said. This implementation isn't pretty, and I hope it can be replace 
with some better implementation later. It is not thread safe but with current 
implementation it is guaranteed to be called only by one thread at a time, and 
I don't see anywhere else we'll be using it. So I think it should be good.


---

Reply via email to