[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209381686
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209367723
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209367613
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209367421
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209367121
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209366165
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209365696
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209363589
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209354005
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+//This histogram reflects the data distribution across only 
one ClusterSummary, i.e.,
+// data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
+// Hence we use half of the CACHING_WINDOW time to ensure it 
retains only data from the most recent update
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+/**
+ * Constructor to put all items in ClusterSummary in MetricSet as 
a metric.
+ * All metrics are derived from a cached ClusterSummary object,
+ * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds 
after first query in a while from reporters.
+ * In case of {@link com.codahale.metrics.ScheduledReporter}, 
CACHING_WINDOW should be set shorter than
+ * reporting interval to avoid outdated reporting.
+ */
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209348428
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4745,194 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
--- End diff --

This better to be `LOG.debug`?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209341987
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2865,16 +2890,14 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+// Num supervisor, and fragmented resources have been included 
in cluster summary
--- End diff --

Please remove, unless you're looking at the diff, you won't know what this 
means


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209337293
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+updateHistogram(newSummary);
+return newSummary;
+} else {
+return null;
+  

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209336348
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2905,6 +2929,10 @@ public void launchServer() throws Exception {
 throw new RuntimeException(e);
 }
 });
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
--- End diff --

Okay, we can do it later. Filed 
https://issues.apache.org/jira/browse/STORM-3192. Please remove the comment 
since it seems like it wouldn't be useful outside this PR. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209335050
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2905,6 +2929,10 @@ public void launchServer() throws Exception {
 throw new RuntimeException(e);
 }
 });
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
--- End diff --

I would very much love to. However based on current Nimbus implementation 
there's no way to do it. There's is however one leadership change callback 
buried in Zookeeper side code (See `LeaderListenerCallback`), which 
communicates through IStormClusterState. It'll be great if you can open a PR to 
improve Nimbus in general, along with 
[STORM-3187](https://issues.apache.org/jira/browse/STORM-3187). Otherwise we 
kind of have to using busy waiting here. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209333494
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2905,6 +2929,10 @@ public void launchServer() throws Exception {
 throw new RuntimeException(e);
 }
 });
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
--- End diff --

Why not update the metricset active flag when Nimbus becomes active instead 
of polling for changes?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209333088
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+updateHistogram(newSummary);
+return newSummary;
+} else {
+return null;
+

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209331742
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() {
 
 }
 
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final String SUMMARY = "summary";
+
+private final Map 
clusterSummaryMetrics = new HashMap() {
+@Override
+public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
+return super.put(StormMetricsRegistry.name(SUMMARY, key), 
value);
+}
+};
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+clusterSummaryMetrics.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+updateHistogram(newSummary);
+return newSummary;
+} else {
+return null;
+

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209312212
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
--- End diff --

fixed


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209311621
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
--- End diff --

Jira issued.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209311426
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

Yes, I agree. The current implementation isn't ideal.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209306763
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

That can be solved by making `getClusterInfoImpl` package private, but I 
see your point. Maybe we just leave it here for now, and I'll see if we can do 
something with it when I look at the non-static registry branch. It's not like 
it hurts anything to have it here, I'd just rather have it in another file if 
possible.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209305125
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

Additionally, we can't call `getClusterInfoImpl` directly, so we have to 
use the public thrift API `getClusterInfo`. This will disrupt the accuracy of 
its meter "nimbus:num-getClusterInfo-calls", because I think it's for tracking 
public API invocation only.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209304885
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
--- End diff --

Okay, that makes sense.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209303203
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
--- End diff --

The intention is we fixed it in development stage so it'll just work in 
production, since thrift protocol is compiled, not evaluated at runtime.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209302512
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
--- End diff --

Okay


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209154625
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-cpu");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-cpu");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-cpu");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-cpu");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
--- End diff --

Keep in mind that asserts are only evaluated in test mode, or if people run 
with assertions enabled. If you want to always validate this, use e.g. 
`Validate`.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-10 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209154387
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

Then `isLeader` can be made public or protected, and Nimbus can be injected 
into this class via a constructor. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209100932
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

ClusterSummaryMetricSet rely on `isLeader` from Nimbus.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209084882
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
--- End diff --

Can this class be moved to another file? Nimbus is already huge.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209083160
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

I'd still rather get rid of it, because it's a redundant branch point, and 
less indentation is great. Personal preference though, so up to you. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209081264
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
--- End diff --

Okay, but I don't see how that fixes the race?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209064682
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

You're right. I don't see a good fix. I agree that the race is not that 
important, I'd maybe just remove the comments, since users won't see them here.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209059400
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
--- End diff --

Now I remembered. I did this to alter the evaluation order so that 
`#getAndSet` is evaluated first.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209048016
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

If no code reordering happens, gauge should always evaluate `currTime` 
first, and has to get startTime from `schedulingStartTimeNs` is set to null. So 
if we guarantee that elpased is evaluated after that, 
longest-scheduling-time-ms will not exceeds the real longest scheduling time.

That being said, I think the race here should be pretty negligible 
especially if we discard the decimals in ns-to-ms conversion. Meanwhile I did 
hear about complaints of hanging schedulers before, so I say we keep partial 
measurement and remove the comments, or just "please be noticed that it's 
normal to see minor jiggling in the longest scheduling time due to race 
condition."


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209045056
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

If we keep the update and race, I think we need to either remove the 
comments about the race, or expand them to provide the example you posted. 
Leaving the comment as is won't help someone reading it later understand what 
race is there.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209044589
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

Right, that makes sense. I don't think it requires reordering though, the 
sequence you posted can happen without the compiler reordering anything.

I don't think it really hurts anything to have this race, but maybe we 
should consider removing the longest update in the gauge. Unless the scheduler 
is outright hanging, I'm not sure what the value is for us to report a partial 
measurement like that? 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209042043
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2918,6 +2943,10 @@ public void launchServer() throws Exception {
 }
 });
 }
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
+// They should be around the reporting interval, but it's not 
configurable
+timer.scheduleRecurring(5, 5, clusterMetricSet);
--- End diff --

Just to be in sync with the caching frequency


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209041653
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+  

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209039674
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2918,6 +2943,10 @@ public void launchServer() throws Exception {
 }
 });
 }
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
+// They should be around the reporting interval, but it's not 
configurable
+timer.scheduleRecurring(5, 5, clusterMetricSet);
--- End diff --

It's a random number, really.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209034728
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

If compiler is allowed to hoist code, we can ended up with the case: 
1) gauge "longest-scheduling-time-ms" acquires scheduling start time 
2) elpased capture end time, update `schedulingDuration` Timer and 
`longestSchedulingTime`
3) gauge acquires longest time from `longestSchedulingTime` and `currTime` 
4) since start time isn't null, program execute statement 

```java
longest = currTime - startTime > longest ? currTime - startTime : longest;
```
So it'll be slightly longer than the value of `longestSchedulingTime` for 
this round. Which means we might saw jiggles in gauge value


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209029787
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
--- End diff --

There has been inconsistency on how I should naming the metrics. In 
previous commits I have all of the metrics in metric set starting with prefix 
"summary.". But I'm not sure.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209029163
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+  

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209029068
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
--- End diff --

People probably won't notice this part of code when they change thrift 
protocol. We should break the code to remind them to update this part as well.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209028432
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2871,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
--- End diff --

See https://issues.apache.org/jira/browse/STORM-3151?filter=-2


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209016305
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208965124
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
--- End diff --

you can file a separate jira to discuss about it and remove it from this pr


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208987159
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
--- End diff --

use lowercase for "CPU" to be consistent with other names?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208984273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2871,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
--- End diff --

why do we need `non-negative`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r209007420
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
--- End diff --

this is not used


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208986783
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
--- End diff --

`ported` is not clear out of context of this PR. better to use something 
like `clusterSummaryMetrics`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208965019
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2918,6 +2943,10 @@ public void launchServer() throws Exception {
 }
 });
 }
+
+//Should we make the delaySecs and recurSecs in sync with any 
conf value?
+// They should be around the reporting interval, but it's not 
configurable
+timer.scheduleRecurring(5, 5, clusterMetricSet);
--- End diff --

why do we set to 5 and 5


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208988762
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
--- End diff --

assertion is good. Just curious in which case this will happen


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208991078
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
+static final String SUMMARY = "summary";
+
+private final Map ported = 
new HashMap<>(PORTED_METRICS);
+private final Function registerHistogram = 
(name) -> {
+final Histogram histogram = new Histogram(new 
SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS));
+ported.put(name, histogram);
+return histogram;
+};
+private volatile boolean active = false;
+
+//NImbus metrics distribution
+private final Histogram nimbusUptime = 
registerHistogram.apply("nimbuses:uptime-secs");
+
+//Supervisor metrics distribution
+private final Histogram supervisorsUptime = 
registerHistogram.apply("supervisors:uptime-secs");
+private final Histogram supervisorsNumWorkers = 
registerHistogram.apply("supervisors:num-workers");
+private final Histogram supervisorsNumUsedWorkers = 
registerHistogram.apply("supervisors:num-used-workers");
+private final Histogram supervisorsUsedMem = 
registerHistogram.apply("supervisors:used-mem");
+private final Histogram supervisorsUsedCpu = 
registerHistogram.apply("supervisors:used-CPU");
+private final Histogram supervisorsFragmentedMem = 
registerHistogram.apply("supervisors:fragmented-mem");
+private final Histogram supervisorsFragmentedCpu = 
registerHistogram.apply("supervisors:fragmented-CPU");
+
+//Topology metrics distribution
+private final Histogram topologiesNumTasks = 
registerHistogram.apply("topologies:num-tasks");
+private final Histogram topologiesNumExecutors = 
registerHistogram.apply("topologies:num-executors");
+private final Histogram topologiesNumWorker = 
registerHistogram.apply("topologies:num-workers");
+private final Histogram topologiesUptime = 
registerHistogram.apply("topologies:uptime-secs");
+private final Histogram topologiesReplicationCount = 
registerHistogram.apply("topologies:replication-count");
+private final Histogram topologiesRequestedMemOnHeap = 
registerHistogram.apply("topologies:requested-mem-on-heap");
+private final Histogram topologiesRequestedMemOffHeap = 
registerHistogram.apply("topologies:requested-mem-off-heap");
+private final Histogram topologiesRequestedCpu = 
registerHistogram.apply("topologies:requested-CPU");
+private final Histogram topologiesAssignedMemOnHeap = 
registerHistogram.apply("topologies:assigned-mem-on-heap");
+private final Histogram topologiesAssignedMemOffHeap = 
registerHistogram.apply("topologies:assigned-mem-off-heap");
+private final Histogram topologiesAssignedCpu = 
registerHistogram.apply("topologies:assigned-CPU");
+
+ClusterSummaryMetricSet() {
+//Break the code if out of sync to thrift protocol
+assert ClusterSummary._Fields.values().length == 3
+&& ClusterSummary._Fields.findByName("supervisors") == 
ClusterSummary._Fields.SUPERVISORS
+&& ClusterSummary._Fields.findByName("topologies") == 
ClusterSummary._Fields.TOPOLOGIES
+&& ClusterSummary._Fields.findByName("nimbuses") == 
ClusterSummary._Fields.NIMBUSES;
+
+final CachedGauge cachedSummary = new 
CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) {
+@Override
+protected ClusterSummary loadValue() {
+try {
+if (active) {
+ClusterSummary newSummary = 
getClusterInfoImpl();
+LOG.info("the new summary is {}", newSummary);
+//This is ugly but I can't think of a better 
way to update histogram only once per caching
+// It also kind of depends on the 
implementation that gauges gets updated before histograms
+

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208986545
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() {
 
 }
 
+//enum NotPorted {
+////Declared in StormConf. I don't see the value in reporting so.
+//SUPERVISOR_TOTAL_RESOURCE,
+////May be able to aggregate based on status;
+//TOPOLOGY_STATUS,
+//TOPOLOGY_SCHED_STATUS,
+////May be aggregated, as well as other distinct values
+//NUM_DISTINCT_NIMBUS_VERSION;
+//}
+
+private class ClusterSummaryMetricSet implements MetricSet, Runnable {
+static final int CACHING_WINDOW = 5;
+static final int PORTED_METRICS = 25;
--- End diff --

Don't need constant since it's only an initial capacity of map


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208998511
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

Sure, sounds good.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208997165
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

I don't think we should use `calculateAssignmentChanged` as it doesn't 
really calculate anything for scheduling process so much as inspect/audit the 
throughput of scheduling. I would prefer naming `auditAssignmentChanges` or 
`reviewAssignmentChanges`, etc.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208987578
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

I see what you mean. It appears to me that this improved the readability 
when I first wrote this block. Like you said if `if` here is evaluated true, 
all there loops will be skipped anyway, with or without if guard. So I thought 
we can just wrap it in an `if` statement just to make this clear, and also get 
rid of some loop overhead.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208982864
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

Will remove the comment and log it in Jira


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208976776
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

I think we're talking past each other a little.

What I'm saying is that this code block looks to me like
```
if (A == B) {
  for (a only in A) {}
  for (b only in B) {}
  for (c in A or B where the value is different) {}
}
```

What I'm saying is that the `if` guard is not necessary, because if the 
`if` returns false, we'd be skipping all 3 for loops anyway, because the 
iterated sets are empty. If that's the case, we can still update `anyChanged`, 
but it shouldn't be happening in an `if`.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208975129
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

Eh, it's probably fine. If you want to keep this method, I don't think the 
comment should be here though. It won't make sense outside the context of this 
PR. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208972356
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208971197
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

Yes, or if the decimals are meaningful the gauge should be using 
nanoseconds instead of millis.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208970512
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
--- End diff --

Okay, that makes sense. Thanks for explaining. Could you add a comment to 
this effect here?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208970103
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
 .parallelStream()
-.mapToDouble(SupervisorResources::getAvailableMem)
+.mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableMem(), 0))
 .sum());
-StormMetricsRegistry.registerGauge("nimbus:available-cpu", () 
-> nodeIdToResources.get().values()
+StormMetricsRegistry.registerGauge("nimbus:available-cpu 
(nonnegative)", () -> nodeIdToResources.get().values()
--- End diff --

Thanks. Sorry I was unclear. What I meant what I don't know if we need to 
worry about not changing the names of existing metrics (backward 
compatibility). I'm not sure why we want to change the name to include 
`(nonnegative)`, and if we want to include it, why not use the same syntax as 
the rest of the name (words separated by `-`)?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208970052
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

We don't know whether two non empty maps are equal or not until we do 
`Maps#difference`. Then I can assign the variable `anyChanged` to remember if 
any changes in ScheduledAssignments have been made, which is the return value 
as well as the determinant if I should add more loggings 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208968787
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
--- End diff --

Yes, I think I've seen it before too :)


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208968475
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

How?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208968113
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208967564
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

Makes sense, but what I'm asking is why is this check here at all? if the 
maps are equal, the for loops below will all be skipped anyway, because 
entriesOnlyOnLeft/Right and entriesDiffering are all empty, right?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208967428
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

Because the decimals are too trivial?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208966535
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

Sorry, I get what you're saying. Disregard the above. If you're making the 
gauge measure milliseconds it probably still makes sense to use whole 
milliseconds rather than a floating point representation.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-09 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208965327
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

Why would there be a loss of precision? The input number is a long, which 
is an integral type (not floating point). 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208748740
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-   

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749692
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
 .parallelStream()
-.mapToDouble(SupervisorResources::getAvailableMem)
+.mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableMem(), 0))
 .sum());
-StormMetricsRegistry.registerGauge("nimbus:available-cpu", () 
-> nodeIdToResources.get().values()
+StormMetricsRegistry.registerGauge("nimbus:available-cpu 
(nonnegative)", () -> nodeIdToResources.get().values()
--- End diff --

See Jira 
[STORM-3151](https://issues.apache.org/jira/browse/STORM-3151?filter=-2)


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208751049
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
--- End diff --

We would like to compute the distribution of scheduler latency as well as 
the longest scheduling iteration. If a scheduler is stuck in the middle of a 
scheduling iteration, the histogram won't reflect that until the scheduling 
iteration has ended because timer only report the time for a complete cycle. 
Hence I added this gauge to track the longest scheduling iteration in real time.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749524
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
--- End diff --

Interesting, I thought I removed this already. Maybe it's rebased back in 
accidentally.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749175
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-   

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208748414
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

Good catch. I'll use `areEqual` instead. This method is for inspecting the 
performance (throughput) of scheduler, specifically, how many assignments, 
workers and executors are changed. It doesn't affect any actual assignments 
results.








---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749367
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

Will log in Jira


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208747514
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

Okay will come up with a better name.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746949
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 }
 }
 // make the new assignments for topologies
-Map newSchedulerAssignments = null;
 synchronized (schedLock) {
-newSchedulerAssignments = 
computeNewSchedulerAssignments(existingAssignments, topologies, bases, 
scratchTopoId);
+Map newSchedulerAssignments =
+computeNewSchedulerAssignments(existingAssignments, 
topologies, bases, scratchTopoId);
 
+//Should probably change List to Tuple 
for better readability
--- End diff --

Again, probably better with another Jira for general code cleanup


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746736
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

This is probably trivial, but I think it's possible to have 
longestSchdulingTime slightly higher max of scheduleTopologyTimeMs.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746266
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
--- End diff --

Oh my bad. This is probably premature optimization.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746090
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

I'm concerned of loss of precision here since all of them return long 
instead of double after conversion.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208745736
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

I thought it looks cleaner to on Nimbus side. The legacy implementation 
isn't pretty to start with. (a lot of List or List) If you want I 
can file a Jira to improve `mkAssignment` in general, to eliminate them 
altogether.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208744273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 
 if (!newAssignments.equals(existingAssignments)) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
+//Should we change these logs from info to debug after 
they are port to metrics?
--- End diff --

Will log in Jira.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208743919
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 ---
@@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map topoConf
 public void start() {
 if (reporter != null) {
 LOG.debug("Starting...");
+//TODO: will we make the period customizable?
--- End diff --

Will log in Jira.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208714291
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208704942
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

You can use TimeUnit instead to do conversions, removing the need for magic 
numbers 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208721202
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

My guess would be for readability. If you want to refactor go ahead. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208699933
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

This seems to only be used in one place. Why is this better than invoking 
`Arrays.asList(slot.getNodeId(), slot.getPort())`?

The reason I'm asking is that to me, there isn't an obvious WorkerSlot to 
List translation, so I think a generic toList like this decreases readability 
compared to just writing out this code in Nimbus.

Same comment for ExecutorDetails.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208703156
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -780,15 +870,15 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 key.add(ni.get_node());
 key.add(ni.get_port_iterator().next());
 List> value = new ArrayList<>(entry.getValue());
-value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+value.sort(Comparator.comparing(a -> a.get(0)));
--- End diff --

Neat, didn't know about this method.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208707855
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 }
 }
 // make the new assignments for topologies
-Map newSchedulerAssignments = null;
 synchronized (schedLock) {
-newSchedulerAssignments = 
computeNewSchedulerAssignments(existingAssignments, topologies, bases, 
scratchTopoId);
+Map newSchedulerAssignments =
+computeNewSchedulerAssignments(existingAssignments, 
topologies, bases, scratchTopoId);
 
+//Should probably change List to Tuple 
for better readability
--- End diff --

If you want to do it, go ahead IMO.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208705728
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
+longestSchedulingTime.updateAndGet(t -> t > elapsed ? t : elapsed);
--- End diff --

Nit: Could use Math.max for this.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208721916
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
--- End diff --

Nit: This is a hint about the language, I'd rather not have it here.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208703338
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -411,6 +431,10 @@
 private final StormTimer timer;
 private final IScheduler scheduler;
 private final IScheduler underlyingScheduler;
+//Metrics related
+private final AtomicReference schedulingStartTime = new 
AtomicReference<>(null);
--- End diff --

Nit: Consider renaming these so it's obvious what time unit it is, e.g. 
`schedulingStartTimeNanos`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208720133
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

The name is a little vague. How about `calculateAssignmentChanged` or 
something like that? Also there's a missing e in scheduling.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208718858
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208709991
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

Is this check correct? If existing is `A B C` and new is `B C`, 
entriesInCommon is `B C`, so this would be false.

Why is the check even needed? Don't we always want to remove all existing 
that aren't in new, and add all new that aren't in existing?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208714561
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

  1   2   >