Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209337293 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() { } + private class ClusterSummaryMetricSet implements MetricSet, Runnable { + static final int CACHING_WINDOW = 5; + static final String SUMMARY = "summary"; + + private final Map<String, com.codahale.metrics.Metric> clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() { + @Override + public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { + return super.put(StormMetricsRegistry.name(SUMMARY, key), value); + } + }; + private final Function<String, Histogram> registerHistogram = (name) -> { + final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); + clusterSummaryMetrics.put(name, histogram); + return histogram; + }; + private volatile boolean active = false; + + //NImbus metrics distribution + private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + + //Supervisor metrics distribution + private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); + private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); + private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); + private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); + private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); + private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); + private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + + //Topology metrics distribution + private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); + private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); + private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); + private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); + private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); + private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); + private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); + private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); + private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); + private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); + private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + + ClusterSummaryMetricSet() { + //Break the code if out of sync to thrift protocol + assert ClusterSummary._Fields.values().length == 3 + && ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS + && ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES + && ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + + final CachedGauge<ClusterSummary> cachedSummary = new CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) { + @Override + protected ClusterSummary loadValue() { + try { + if (active) { + ClusterSummary newSummary = getClusterInfoImpl(); + LOG.info("the new summary is {}", newSummary); + //This is ugly but I can't think of a better way to update histogram only once per caching + // It also kind of depends on the implementation that gauges gets updated before histograms + updateHistogram(newSummary); + return newSummary; + } else { + return null; + } + } catch (Exception e) { + LOG.warn("Get cluster info exception.", e); + throw new RuntimeException(e); + } + } + }; + + clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new DerivativeGauge<ClusterSummary, Long>(cachedSummary) { + @Override + protected Long transform(ClusterSummary clusterSummary) { + return clusterSummary.get_nimbuses().stream().filter(NimbusSummary::is_isLeader).count(); + } + }); + clusterSummaryMetrics.put("cluster:num-nimbuses", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { + @Override + protected Integer transform(ClusterSummary clusterSummary) { + return clusterSummary.get_nimbuses_size(); + } + }); + clusterSummaryMetrics.put("cluster:num-supervisors", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { + @Override + protected Integer transform(ClusterSummary clusterSummary) { + return clusterSummary.get_supervisors_size(); + } + }); + clusterSummaryMetrics.put("cluster:num-topologies", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { + @Override + protected Integer transform(ClusterSummary clusterSummary) { + return clusterSummary.get_topologies_size(); + } + }); + clusterSummaryMetrics.put("cluster:num-total-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { + @Override + protected Integer transform(ClusterSummary clusterSummary) { + return clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_workers).sum(); + } + }); + clusterSummaryMetrics.put("cluster:num-total-used-workers", new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) { + @Override + protected Integer transform(ClusterSummary clusterSummary) { + return clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_used_workers).sum(); + } + }); + clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { + @Override + protected Double transform(ClusterSummary clusterSummary) { + return clusterSummary.get_supervisors().stream() + //Filtered negative value + .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_mem(), 0)).sum(); + } + }); + clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new DerivativeGauge<ClusterSummary, Double>(cachedSummary) { + @Override + protected Double transform(ClusterSummary clusterSummary) { + return clusterSummary.get_supervisors().stream() + //Filtered negative value + .mapToDouble(supervisorSummary -> Math.max(supervisorSummary.get_fragmented_cpu(), 0)).sum(); + } + }); + } + + private void updateHistogram(ClusterSummary newSummary) { + for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) { + nimbusUptime.update(nimbusSummary.get_uptime_secs()); + } + for (SupervisorSummary summary : newSummary.get_supervisors()) { + supervisorsUptime.update(summary.get_uptime_secs()); + supervisorsNumWorkers.update(summary.get_num_workers()); + supervisorsNumUsedWorkers.update(summary.get_num_used_workers()); + supervisorsUsedMem.update(Math.round(summary.get_used_mem())); + supervisorsUsedCpu.update(Math.round(summary.get_used_cpu())); + supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem())); + supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu())); + } + for (TopologySummary summary : newSummary.get_topologies()) { + topologiesNumTasks.update(summary.get_num_tasks()); + topologiesNumExecutors.update(summary.get_num_executors()); + topologiesNumWorker.update(summary.get_num_workers()); + topologiesUptime.update(summary.get_uptime_secs()); + topologiesReplicationCount.update(summary.get_replication_count()); + topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap())); + topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap())); + topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu())); + topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap())); + topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap())); + topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu())); + } + } + + //This is not thread safe --- End diff -- Like I said. This implementation isn't pretty, and I hope it can be replace with some better implementation later. It is not thread safe but with current implementation it is guaranteed to be called only by one thread at a time, and I don't see anywhere else we'll be using it. So I think it should be good.
---