[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209381686 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209367723 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209367613 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209367421 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209367121 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209366165 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209365696 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209363589 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209354005 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4744,206 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +//This histogram reflects the data distribution across only one ClusterSummary, i.e., +// data distribution across all entities of a type (e.g., data from all nimbus/topologies) at one moment. +// Hence we use half of the CACHING_WINDOW time to ensure it retains only data from the most recent update +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +/** + * Constructor to put all items in ClusterSummary in MetricSet as a metric. + * All metrics are derived from a cached ClusterSummary object, + * expired {@link ClusterSummaryMetricSet#CACHING_WINDOW} seconds after first query in a while from reporters. + * In case of {@link com.codahale.metrics.ScheduledReporter}, CACHING_WINDOW should be set shorter than + * reporting interval to avoid outdated reporting. + */ +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209348428 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4745,194 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); --- End diff -- This better to be `LOG.debug`? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209341987 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2865,16 +2890,14 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +// Num supervisor, and fragmented resources have been included in cluster summary --- End diff -- Please remove, unless you're looking at the diff, you won't know what this means ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209337293 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +updateHistogram(newSummary); +return newSummary; +} else { +return null; +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209336348 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2905,6 +2929,10 @@ public void launchServer() throws Exception { throw new RuntimeException(e); } }); + +//Should we make the delaySecs and recurSecs in sync with any conf value? --- End diff -- Okay, we can do it later. Filed https://issues.apache.org/jira/browse/STORM-3192. Please remove the comment since it seems like it wouldn't be useful outside this PR. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209335050 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2905,6 +2929,10 @@ public void launchServer() throws Exception { throw new RuntimeException(e); } }); + +//Should we make the delaySecs and recurSecs in sync with any conf value? --- End diff -- I would very much love to. However based on current Nimbus implementation there's no way to do it. There's is however one leadership change callback buried in Zookeeper side code (See `LeaderListenerCallback`), which communicates through IStormClusterState. It'll be great if you can open a PR to improve Nimbus in general, along with [STORM-3187](https://issues.apache.org/jira/browse/STORM-3187). Otherwise we kind of have to using busy waiting here. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209333494 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2905,6 +2929,10 @@ public void launchServer() throws Exception { throw new RuntimeException(e); } }); + +//Should we make the delaySecs and recurSecs in sync with any conf value? --- End diff -- Why not update the metricset active flag when Nimbus becomes active instead of polling for changes? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209333088 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +updateHistogram(newSummary); +return newSummary; +} else { +return null; +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209331742 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4716,4 +4746,186 @@ public IScheduler getForcedScheduler() { } +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final String SUMMARY = "summary"; + +private final Map clusterSummaryMetrics = new HashMap() { +@Override +public com.codahale.metrics.Metric put(String key, com.codahale.metrics.Metric value) { +return super.put(StormMetricsRegistry.name(SUMMARY, key), value); +} +}; +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +clusterSummaryMetrics.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +updateHistogram(newSummary); +return newSummary; +} else { +return null; +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209312212 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; --- End diff -- fixed ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209311621 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} --- End diff -- Jira issued. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209311426 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- Yes, I agree. The current implementation isn't ideal. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209306763 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- That can be solved by making `getClusterInfoImpl` package private, but I see your point. Maybe we just leave it here for now, and I'll see if we can do something with it when I look at the non-static registry branch. It's not like it hurts anything to have it here, I'd just rather have it in another file if possible. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209305125 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- Additionally, we can't call `getClusterInfoImpl` directly, so we have to use the public thrift API `getClusterInfo`. This will disrupt the accuracy of its meter "nimbus:num-getClusterInfo-calls", because I think it's for tracking public API invocation only. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209304885 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 --- End diff -- Okay, that makes sense. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209303203 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 --- End diff -- The intention is we fixed it in development stage so it'll just work in production, since thrift protocol is compiled, not evaluated at runtime. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209302512 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; --- End diff -- Okay ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209154625 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-cpu"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-cpu"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-cpu"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-cpu"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 --- End diff -- Keep in mind that asserts are only evaluated in test mode, or if people run with assertions enabled. If you want to always validate this, use e.g. `Validate`. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209154387 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- Then `isLeader` can be made public or protected, and Nimbus can be injected into this class via a constructor. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209100932 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- ClusterSummaryMetricSet rely on `isLeader` from Nimbus. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209084882 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4652,4 +4745,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { --- End diff -- Can this class be moved to another file? Nimbus is already huge. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209083160 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- I'd still rather get rid of it, because it's a redundant branch point, and less indentation is great. Personal preference though, so up to you. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209081264 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); --- End diff -- Okay, but I don't see how that fixes the race? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209064682 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- You're right. I don't see a good fix. I agree that the race is not that important, I'd maybe just remove the comments, since users won't see them here. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209059400 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); --- End diff -- Now I remembered. I did this to alter the evaluation order so that `#getAndSet` is evaluated first. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209048016 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- If no code reordering happens, gauge should always evaluate `currTime` first, and has to get startTime from `schedulingStartTimeNs` is set to null. So if we guarantee that elpased is evaluated after that, longest-scheduling-time-ms will not exceeds the real longest scheduling time. That being said, I think the race here should be pretty negligible especially if we discard the decimals in ns-to-ms conversion. Meanwhile I did hear about complaints of hanging schedulers before, so I say we keep partial measurement and remove the comments, or just "please be noticed that it's normal to see minor jiggling in the longest scheduling time due to race condition." ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209045056 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- If we keep the update and race, I think we need to either remove the comments about the race, or expand them to provide the example you posted. Leaving the comment as is won't help someone reading it later understand what race is there. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209044589 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- Right, that makes sense. I don't think it requires reordering though, the sequence you posted can happen without the compiler reordering anything. I don't think it really hurts anything to have this race, but maybe we should consider removing the longest update in the gauge. Unless the scheduler is outright hanging, I'm not sure what the value is for us to report a partial measurement like that? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209042043 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2918,6 +2943,10 @@ public void launchServer() throws Exception { } }); } + +//Should we make the delaySecs and recurSecs in sync with any conf value? +// They should be around the reporting interval, but it's not configurable +timer.scheduleRecurring(5, 5, clusterMetricSet); --- End diff -- Just to be in sync with the caching frequency ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209041653 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209039674 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2918,6 +2943,10 @@ public void launchServer() throws Exception { } }); } + +//Should we make the delaySecs and recurSecs in sync with any conf value? +// They should be around the reporting interval, but it's not configurable +timer.scheduleRecurring(5, 5, clusterMetricSet); --- End diff -- It's a random number, really. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209034728 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- If compiler is allowed to hoist code, we can ended up with the case: 1) gauge "longest-scheduling-time-ms" acquires scheduling start time 2) elpased capture end time, update `schedulingDuration` Timer and `longestSchedulingTime` 3) gauge acquires longest time from `longestSchedulingTime` and `currTime` 4) since start time isn't null, program execute statement ```java longest = currTime - startTime > longest ? currTime - startTime : longest; ``` So it'll be slightly longer than the value of `longestSchedulingTime` for this round. Which means we might saw jiggles in gauge value ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209029787 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; --- End diff -- There has been inconsistency on how I should naming the metrics. In previous commits I have all of the metrics in metric set starting with prefix "summary.". But I'm not sure. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209029163 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209029068 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol --- End diff -- People probably won't notice this part of code when they change thrift protocol. We should break the code to remind them to update this part as well. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209028432 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2871,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() --- End diff -- See https://issues.apache.org/jira/browse/STORM-3151?filter=-2 ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209016305 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208965124 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} --- End diff -- you can file a separate jira to discuss about it and remove it from this pr ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208987159 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); --- End diff -- use lowercase for "CPU" to be consistent with other names? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208984273 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2871,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() --- End diff -- why do we need `non-negative` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r209007420 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4723,4 +4754,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; --- End diff -- this is not used ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208986783 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); --- End diff -- `ported` is not clear out of context of this PR. better to use something like `clusterSummaryMetrics` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208965019 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2918,6 +2943,10 @@ public void launchServer() throws Exception { } }); } + +//Should we make the delaySecs and recurSecs in sync with any conf value? +// They should be around the reporting interval, but it's not configurable +timer.scheduleRecurring(5, 5, clusterMetricSet); --- End diff -- why do we set to 5 and 5 ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208988762 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol --- End diff -- assertion is good. Just curious in which case this will happen ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208991078 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; +static final String SUMMARY = "summary"; + +private final Map ported = new HashMap<>(PORTED_METRICS); +private final Function registerHistogram = (name) -> { +final Histogram histogram = new Histogram(new SlidingTimeWindowReservoir(CACHING_WINDOW / 2, TimeUnit.SECONDS)); +ported.put(name, histogram); +return histogram; +}; +private volatile boolean active = false; + +//NImbus metrics distribution +private final Histogram nimbusUptime = registerHistogram.apply("nimbuses:uptime-secs"); + +//Supervisor metrics distribution +private final Histogram supervisorsUptime = registerHistogram.apply("supervisors:uptime-secs"); +private final Histogram supervisorsNumWorkers = registerHistogram.apply("supervisors:num-workers"); +private final Histogram supervisorsNumUsedWorkers = registerHistogram.apply("supervisors:num-used-workers"); +private final Histogram supervisorsUsedMem = registerHistogram.apply("supervisors:used-mem"); +private final Histogram supervisorsUsedCpu = registerHistogram.apply("supervisors:used-CPU"); +private final Histogram supervisorsFragmentedMem = registerHistogram.apply("supervisors:fragmented-mem"); +private final Histogram supervisorsFragmentedCpu = registerHistogram.apply("supervisors:fragmented-CPU"); + +//Topology metrics distribution +private final Histogram topologiesNumTasks = registerHistogram.apply("topologies:num-tasks"); +private final Histogram topologiesNumExecutors = registerHistogram.apply("topologies:num-executors"); +private final Histogram topologiesNumWorker = registerHistogram.apply("topologies:num-workers"); +private final Histogram topologiesUptime = registerHistogram.apply("topologies:uptime-secs"); +private final Histogram topologiesReplicationCount = registerHistogram.apply("topologies:replication-count"); +private final Histogram topologiesRequestedMemOnHeap = registerHistogram.apply("topologies:requested-mem-on-heap"); +private final Histogram topologiesRequestedMemOffHeap = registerHistogram.apply("topologies:requested-mem-off-heap"); +private final Histogram topologiesRequestedCpu = registerHistogram.apply("topologies:requested-CPU"); +private final Histogram topologiesAssignedMemOnHeap = registerHistogram.apply("topologies:assigned-mem-on-heap"); +private final Histogram topologiesAssignedMemOffHeap = registerHistogram.apply("topologies:assigned-mem-off-heap"); +private final Histogram topologiesAssignedCpu = registerHistogram.apply("topologies:assigned-CPU"); + +ClusterSummaryMetricSet() { +//Break the code if out of sync to thrift protocol +assert ClusterSummary._Fields.values().length == 3 +&& ClusterSummary._Fields.findByName("supervisors") == ClusterSummary._Fields.SUPERVISORS +&& ClusterSummary._Fields.findByName("topologies") == ClusterSummary._Fields.TOPOLOGIES +&& ClusterSummary._Fields.findByName("nimbuses") == ClusterSummary._Fields.NIMBUSES; + +final CachedGauge cachedSummary = new CachedGauge(CACHING_WINDOW, TimeUnit.SECONDS) { +@Override +protected ClusterSummary loadValue() { +try { +if (active) { +ClusterSummary newSummary = getClusterInfoImpl(); +LOG.info("the new summary is {}", newSummary); +//This is ugly but I can't think of a better way to update histogram only once per caching +// It also kind of depends on the implementation that gauges gets updated before histograms +
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208986545 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -4729,4 +4760,192 @@ public IScheduler getForcedScheduler() { } +//enum NotPorted { +////Declared in StormConf. I don't see the value in reporting so. +//SUPERVISOR_TOTAL_RESOURCE, +////May be able to aggregate based on status; +//TOPOLOGY_STATUS, +//TOPOLOGY_SCHED_STATUS, +////May be aggregated, as well as other distinct values +//NUM_DISTINCT_NIMBUS_VERSION; +//} + +private class ClusterSummaryMetricSet implements MetricSet, Runnable { +static final int CACHING_WINDOW = 5; +static final int PORTED_METRICS = 25; --- End diff -- Don't need constant since it's only an initial capacity of map ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208998511 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- Sure, sounds good. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208997165 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- I don't think we should use `calculateAssignmentChanged` as it doesn't really calculate anything for scheduling process so much as inspect/audit the throughput of scheduling. I would prefer naming `auditAssignmentChanges` or `reviewAssignmentChanges`, etc. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208987578 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- I see what you mean. It appears to me that this improved the readability when I first wrote this block. Like you said if `if` here is evaluated true, all there loops will be skipped anyway, with or without if guard. So I thought we can just wrap it in an `if` statement just to make this clear, and also get rid of some loop overhead. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208982864 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- Will remove the comment and log it in Jira ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208976776 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- I think we're talking past each other a little. What I'm saying is that this code block looks to me like ``` if (A == B) { for (a only in A) {} for (b only in B) {} for (c in A or B where the value is different) {} } ``` What I'm saying is that the `if` guard is not necessary, because if the `if` returns false, we'd be skipping all 3 for loops anyway, because the iterated sets are empty. If that's the case, we can still update `anyChanged`, but it shouldn't be happening in an `if`. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208975129 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- Eh, it's probably fine. If you want to keep this method, I don't think the comment should be here though. It won't make sense outside the context of this PR. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208972356 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208971197 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- Yes, or if the decimals are meaningful the gauge should be using nanoseconds instead of millis. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208970512 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; --- End diff -- Okay, that makes sense. Thanks for explaining. Could you add a comment to this effect here? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208970103 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() .parallelStream() -.mapToDouble(SupervisorResources::getAvailableMem) +.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0)) .sum()); -StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values() +StormMetricsRegistry.registerGauge("nimbus:available-cpu (nonnegative)", () -> nodeIdToResources.get().values() --- End diff -- Thanks. Sorry I was unclear. What I meant what I don't know if we need to worry about not changing the names of existing metrics (backward compatibility). I'm not sure why we want to change the name to include `(nonnegative)`, and if we want to include it, why not use the same syntax as the rest of the name (words separated by `-`)? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208970052 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- We don't know whether two non empty maps are equal or not until we do `Maps#difference`. Then I can assign the variable `anyChanged` to remember if any changes in ScheduledAssignments have been made, which is the return value as well as the determinant if I should add more loggings ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208968787 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation --- End diff -- Yes, I think I've seen it before too :) ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208968475 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- How? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208968113 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208967564 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- Makes sense, but what I'm asking is why is this check here at all? if the maps are equal, the for loops below will all be skipped anyway, because entriesOnlyOnLeft/Right and entriesDiffering are all empty, right? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208967428 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- Because the decimals are too trivial? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208966535 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- Sorry, I get what you're saying. Disregard the above. If you're making the gauge measure milliseconds it probably still makes sense to use whole milliseconds rather than a floating point representation. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208965327 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- Why would there be a loss of precision? The input number is a long, which is an integral type (not floating point). ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208748740 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749692 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() .parallelStream() -.mapToDouble(SupervisorResources::getAvailableMem) +.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0)) .sum()); -StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values() +StormMetricsRegistry.registerGauge("nimbus:available-cpu (nonnegative)", () -> nodeIdToResources.get().values() --- End diff -- See Jira [STORM-3151](https://issues.apache.org/jira/browse/STORM-3151?filter=-2) ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208751049 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; --- End diff -- We would like to compute the distribution of scheduler latency as well as the longest scheduling iteration. If a scheduler is stuck in the middle of a scheduling iteration, the histogram won't reflect that until the scheduling iteration has ended because timer only report the time for a complete cycle. Hence I added this gauge to track the longest scheduling iteration in real time. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749524 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation --- End diff -- Interesting, I thought I removed this already. Maybe it's rebased back in accidentally. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749175 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208748414 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- Good catch. I'll use `areEqual` instead. This method is for inspecting the performance (throughput) of scheduler, specifically, how many assignments, workers and executors are changed. It doesn't affect any actual assignments results. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749367 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- Will log in Jira ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208747514 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- Okay will come up with a better name. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746949 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) throws Exception { } } // make the new assignments for topologies -Map newSchedulerAssignments = null; synchronized (schedLock) { -newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +Map newSchedulerAssignments = +computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +//Should probably change List to Tuple for better readability --- End diff -- Again, probably better with another Jira for general code cleanup ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746736 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- This is probably trivial, but I think it's possible to have longestSchdulingTime slightly higher max of scheduleTopologyTimeMs. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746266 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); --- End diff -- Oh my bad. This is probably premature optimization. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746090 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- I'm concerned of loss of precision here since all of them return long instead of double after conversion. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208745736 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- I thought it looks cleaner to on Nimbus side. The legacy implementation isn't pretty to start with. (a lot of List or List) If you want I can file a Jira to improve `mkAssignment` in general, to eliminate them altogether. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208744273 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) throws Exception { if (!newAssignments.equals(existingAssignments)) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); +//Should we change these logs from info to debug after they are port to metrics? --- End diff -- Will log in Jira. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208743919 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java --- @@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf public void start() { if (reporter != null) { LOG.debug("Starting..."); +//TODO: will we make the period customizable? --- End diff -- Will log in Jira. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208714291 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208704942 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- You can use TimeUnit instead to do conversions, removing the need for magic numbers https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208721202 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- My guess would be for readability. If you want to refactor go ahead. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208699933 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- This seems to only be used in one place. Why is this better than invoking `Arrays.asList(slot.getNodeId(), slot.getPort())`? The reason I'm asking is that to me, there isn't an obvious WorkerSlot to List translation, so I think a generic toList like this decreases readability compared to just writing out this code in Nimbus. Same comment for ExecutorDetails. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208703156 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -780,15 +870,15 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { key.add(ni.get_node()); key.add(ni.get_port_iterator().next()); List> value = new ArrayList<>(entry.getValue()); -value.sort((a, b) -> a.get(0).compareTo(b.get(0))); +value.sort(Comparator.comparing(a -> a.get(0))); --- End diff -- Neat, didn't know about this method. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208707855 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) throws Exception { } } // make the new assignments for topologies -Map newSchedulerAssignments = null; synchronized (schedLock) { -newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +Map newSchedulerAssignments = +computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +//Should probably change List to Tuple for better readability --- End diff -- If you want to do it, go ahead IMO. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208705728 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); +longestSchedulingTime.updateAndGet(t -> t > elapsed ? t : elapsed); --- End diff -- Nit: Could use Math.max for this. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208721916 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation --- End diff -- Nit: This is a hint about the language, I'd rather not have it here. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208703338 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -411,6 +431,10 @@ private final StormTimer timer; private final IScheduler scheduler; private final IScheduler underlyingScheduler; +//Metrics related +private final AtomicReference schedulingStartTime = new AtomicReference<>(null); --- End diff -- Nit: Consider renaming these so it's obvious what time unit it is, e.g. `schedulingStartTimeNanos` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208720133 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- The name is a little vague. How about `calculateAssignmentChanged` or something like that? Also there's a missing e in scheduling. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208718858 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208709991 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- Is this check correct? If existing is `A B C` and new is `B C`, entriesInCommon is `B C`, so this would be false. Why is the check even needed? Don't we always want to remove all existing that aren't in new, and add all new that aren't in existing? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208714561 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>