Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2764#discussion_r209333088
--- Diff:
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() {
}
+ private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+ static final int CACHING_WINDOW = 5;
+ static final String SUMMARY = "summary";
+
+ private final Map<String, com.codahale.metrics.Metric>
clusterSummaryMetrics = new HashMap<String, com.codahale.metrics.Metric>() {
+ @Override
+ public com.codahale.metrics.Metric put(String key,
com.codahale.metrics.Metric value) {
+ return super.put(StormMetricsRegistry.name(SUMMARY, key),
value);
+ }
+ };
+ private final Function<String, Histogram> registerHistogram =
(name) -> {
+ final Histogram histogram = new Histogram(new
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ clusterSummaryMetrics.put(name, histogram);
+ return histogram;
+ };
+ private volatile boolean active = false;
+
+ //NImbus metrics distribution
+ private final Histogram nimbusUptime =
registerHistogram.apply("nimbuses:uptime-secs");
+
+ //Supervisor metrics distribution
+ private final Histogram supervisorsUptime =
registerHistogram.apply("supervisors:uptime-secs");
+ private final Histogram supervisorsNumWorkers =
registerHistogram.apply("supervisors:num-workers");
+ private final Histogram supervisorsNumUsedWorkers =
registerHistogram.apply("supervisors:num-used-workers");
+ private final Histogram supervisorsUsedMem =
registerHistogram.apply("supervisors:used-mem");
+ private final Histogram supervisorsUsedCpu =
registerHistogram.apply("supervisors:used-cpu");
+ private final Histogram supervisorsFragmentedMem =
registerHistogram.apply("supervisors:fragmented-mem");
+ private final Histogram supervisorsFragmentedCpu =
registerHistogram.apply("supervisors:fragmented-cpu");
+
+ //Topology metrics distribution
+ private final Histogram topologiesNumTasks =
registerHistogram.apply("topologies:num-tasks");
+ private final Histogram topologiesNumExecutors =
registerHistogram.apply("topologies:num-executors");
+ private final Histogram topologiesNumWorker =
registerHistogram.apply("topologies:num-workers");
+ private final Histogram topologiesUptime =
registerHistogram.apply("topologies:uptime-secs");
+ private final Histogram topologiesReplicationCount =
registerHistogram.apply("topologies:replication-count");
+ private final Histogram topologiesRequestedMemOnHeap =
registerHistogram.apply("topologies:requested-mem-on-heap");
+ private final Histogram topologiesRequestedMemOffHeap =
registerHistogram.apply("topologies:requested-mem-off-heap");
+ private final Histogram topologiesRequestedCpu =
registerHistogram.apply("topologies:requested-cpu");
+ private final Histogram topologiesAssignedMemOnHeap =
registerHistogram.apply("topologies:assigned-mem-on-heap");
+ private final Histogram topologiesAssignedMemOffHeap =
registerHistogram.apply("topologies:assigned-mem-off-heap");
+ private final Histogram topologiesAssignedCpu =
registerHistogram.apply("topologies:assigned-cpu");
+
+ ClusterSummaryMetricSet() {
+ //Break the code if out of sync to thrift protocol
+ assert ClusterSummary._Fields.values().length == 3
+ && ClusterSummary._Fields.findByName("supervisors") ==
ClusterSummary._Fields.SUPERVISORS
+ && ClusterSummary._Fields.findByName("topologies") ==
ClusterSummary._Fields.TOPOLOGIES
+ && ClusterSummary._Fields.findByName("nimbuses") ==
ClusterSummary._Fields.NIMBUSES;
+
+ final CachedGauge<ClusterSummary> cachedSummary = new
CachedGauge<ClusterSummary>(CACHING_WINDOW, TimeUnit.SECONDS) {
+ @Override
+ protected ClusterSummary loadValue() {
+ try {
+ if (active) {
+ ClusterSummary newSummary =
getClusterInfoImpl();
+ LOG.info("the new summary is {}", newSummary);
+ //This is ugly but I can't think of a better
way to update histogram only once per caching
+ // It also kind of depends on the
implementation that gauges gets updated before histograms
+ updateHistogram(newSummary);
+ return newSummary;
+ } else {
+ return null;
+ }
+ } catch (Exception e) {
+ LOG.warn("Get cluster info exception.", e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ clusterSummaryMetrics.put("cluster:num-nimbus-leaders", new
DerivativeGauge<ClusterSummary, Long>(cachedSummary) {
+ @Override
+ protected Long transform(ClusterSummary clusterSummary) {
+ return
clusterSummary.get_nimbuses().stream().filter(NimbusSummary::is_isLeader).count();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-nimbuses", new
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary)
{
+ return clusterSummary.get_nimbuses_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-supervisors", new
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary)
{
+ return clusterSummary.get_supervisors_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-topologies", new
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary)
{
+ return clusterSummary.get_topologies_size();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-total-workers", new
DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary)
{
+ return
clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_workers).sum();
+ }
+ });
+ clusterSummaryMetrics.put("cluster:num-total-used-workers",
new DerivativeGauge<ClusterSummary, Integer>(cachedSummary) {
+ @Override
+ protected Integer transform(ClusterSummary clusterSummary)
{
+ return
clusterSummary.get_supervisors().stream().mapToInt(SupervisorSummary::get_num_used_workers).sum();
+ }
+ });
+
clusterSummaryMetrics.put("cluster:total-fragmented-memory-non-negative", new
DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+ @Override
+ protected Double transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ //Filtered negative value
+ .mapToDouble(supervisorSummary ->
Math.max(supervisorSummary.get_fragmented_mem(), 0)).sum();
+ }
+ });
+
clusterSummaryMetrics.put("cluster:total-fragmented-cpu-non-negative", new
DerivativeGauge<ClusterSummary, Double>(cachedSummary) {
+ @Override
+ protected Double transform(ClusterSummary clusterSummary) {
+ return clusterSummary.get_supervisors().stream()
+ //Filtered negative value
+ .mapToDouble(supervisorSummary ->
Math.max(supervisorSummary.get_fragmented_cpu(), 0)).sum();
+ }
+ });
+ }
+
+ private void updateHistogram(ClusterSummary newSummary) {
+ for (NimbusSummary nimbusSummary : newSummary.get_nimbuses()) {
+ nimbusUptime.update(nimbusSummary.get_uptime_secs());
+ }
+ for (SupervisorSummary summary : newSummary.get_supervisors())
{
+ supervisorsUptime.update(summary.get_uptime_secs());
+ supervisorsNumWorkers.update(summary.get_num_workers());
+
supervisorsNumUsedWorkers.update(summary.get_num_used_workers());
+
supervisorsUsedMem.update(Math.round(summary.get_used_mem()));
+
supervisorsUsedCpu.update(Math.round(summary.get_used_cpu()));
+
supervisorsFragmentedMem.update(Math.round(summary.get_fragmented_mem()));
+
supervisorsFragmentedCpu.update(Math.round(summary.get_fragmented_cpu()));
+ }
+ for (TopologySummary summary : newSummary.get_topologies()) {
+ topologiesNumTasks.update(summary.get_num_tasks());
+ topologiesNumExecutors.update(summary.get_num_executors());
+ topologiesNumWorker.update(summary.get_num_workers());
+ topologiesUptime.update(summary.get_uptime_secs());
+
topologiesReplicationCount.update(summary.get_replication_count());
+
topologiesRequestedMemOnHeap.update(Math.round(summary.get_requested_memonheap()));
+
topologiesRequestedMemOffHeap.update(Math.round(summary.get_requested_memoffheap()));
+
topologiesRequestedCpu.update(Math.round(summary.get_requested_cpu()));
+
topologiesAssignedMemOnHeap.update(Math.round(summary.get_assigned_memonheap()));
+
topologiesAssignedMemOffHeap.update(Math.round(summary.get_assigned_memoffheap()));
+
topologiesAssignedCpu.update(Math.round(summary.get_assigned_cpu()));
+ }
+ }
+
+ //This is not thread safe
--- End diff --
Why not make this thread safe, isn't the only issue here that `active`
isn't an AtomicBoolean?
---