[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208748740
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-   

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749692
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
 .parallelStream()
-.mapToDouble(SupervisorResources::getAvailableMem)
+.mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableMem(), 0))
 .sum());
-StormMetricsRegistry.registerGauge("nimbus:available-cpu", () 
-> nodeIdToResources.get().values()
+StormMetricsRegistry.registerGauge("nimbus:available-cpu 
(nonnegative)", () -> nodeIdToResources.get().values()
--- End diff --

See Jira 
[STORM-3151](https://issues.apache.org/jira/browse/STORM-3151?filter=-2)


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208751049
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
--- End diff --

We would like to compute the distribution of scheduler latency as well as 
the longest scheduling iteration. If a scheduler is stuck in the middle of a 
scheduling iteration, the histogram won't reflect that until the scheduling 
iteration has ended because timer only report the time for a complete cycle. 
Hence I added this gauge to track the longest scheduling iteration in real time.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749524
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
--- End diff --

Interesting, I thought I removed this already. Maybe it's rebased back in 
accidentally.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749175
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-   

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208748414
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

Good catch. I'll use `areEqual` instead. This method is for inspecting the 
performance (throughput) of scheduler, specifically, how many assignments, 
workers and executors are changed. It doesn't affect any actual assignments 
results.








---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208749367
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

Will log in Jira


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208747514
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

Okay will come up with a better name.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746949
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 }
 }
 // make the new assignments for topologies
-Map newSchedulerAssignments = null;
 synchronized (schedLock) {
-newSchedulerAssignments = 
computeNewSchedulerAssignments(existingAssignments, topologies, bases, 
scratchTopoId);
+Map newSchedulerAssignments =
+computeNewSchedulerAssignments(existingAssignments, 
topologies, bases, scratchTopoId);
 
+//Should probably change List to Tuple 
for better readability
--- End diff --

Again, probably better with another Jira for general code cleanup


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746736
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

This is probably trivial, but I think it's possible to have 
longestSchdulingTime slightly higher max of scheduleTopologyTimeMs.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746266
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
--- End diff --

Oh my bad. This is probably premature optimization.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208746090
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

I'm concerned of loss of precision here since all of them return long 
instead of double after conversion.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208745736
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

I thought it looks cleaner to on Nimbus side. The legacy implementation 
isn't pretty to start with. (a lot of List or List) If you want I 
can file a Jira to improve `mkAssignment` in general, to eliminate them 
altogether.


---


[GitHub] storm issue #2764: STORM-3147: Port ClusterSummary as metrics to StormMetric...

2018-08-08 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2764
  
Please notice that only the last commit is under #2764, which was 
previously out of sync with #2754  because I don't want to have too many 
dependencies between PR. For issue specific to #2754 I think it's better to 
review it there. @srdo 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208744273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 
 if (!newAssignments.equals(existingAssignments)) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
+//Should we change these logs from info to debug after 
they are port to metrics?
--- End diff --

Will log in Jira.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread zd-project
Github user zd-project commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208743919
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 ---
@@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map topoConf
 public void start() {
 if (reporter != null) {
 LOG.debug("Starting...");
+//TODO: will we make the period customizable?
--- End diff --

Will log in Jira.


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208723928
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2288,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208707067
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java
 ---
@@ -134,6 +150,7 @@ public Response daemonLog(@Context HttpServletRequest 
request) throws IOExceptio
  */
 @GET
 @Path("/searchLogs")
+//Seems redundant
--- End diff --

remove


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208723798
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2057,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208691016
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
--- End diff --

comment seems off


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208726809
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
+ * @param startByteOffset number of byte to be ignored in each log file
+ * @param targetStr searched string
+ * @return all matched results
+ */
 @VisibleForTesting
-Matched findNMatches(List logs, int numMatches, int fileOffset, 
int offset, String search) {
+Matched findNMatches(List logs, int numMatches, int fileOffset, 
int startByteOffset, String targetStr) {
 logs = drop(logs, fileOffset);
+LOG.debug("{} files to scan", logs.size());
 
 List> matches = new ArrayList<>();
 int matchCount = 0;
+int scannedFiles = 0;
 
+//TODO: Unnecessarily convoluted loop that should be optimized
 while (true) {
 if (logs.isEmpty()) {
 break;
 }
 
 File firstLog = logs.get(0);
-Map theseMatches;
+Map matchInLog;
 try {
 LOG.debug("Looking through {}", firstLog);
-theseMatches = substringSearch(firstLog, search, 
numMatches - matchCount, offset);
+matchInLog = substringSearch(firstLog, targetStr, 
numMatches - matchCount, startByteOffset);
+scannedFiles++;
 } catch (InvalidRequestException e) {
 LOG.error("Can't search past end of file.", e);
-theseMatches = new HashMap<>();
+matchInLog = new HashMap<>();
 }
 
 String fileName = 
WorkerLogs.getTopologyPortWorkerLog(firstLog);
 
+//This section simply put the formatted log filename and 
corresponding port in the matching.
 final List> newMatches = new 
ArrayList<>(matches);
-Map currentFileMatch = new 
HashMap<>(theseMatches);
+Map currentFileMatch = new 
HashMap<>(matchInLog);
 currentFileMatch.put("fileName", fileName);
 Path firstLogAbsPath;
 try {
 firstLogAbsPath = firstLog.getCanonicalFile().toPath();
 } catch (IOException e) {
 throw new RuntimeException(e);
 }
+//Why do we need to start from scratch to retrieve just the 
port here?
 currentFileMatch.put("port", 
truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString());
 newMatches.add(currentFileMatch);
 
-int newCount = matchCount + 
((List)theseMatches.get("matches")).size();
+int newCount = matchCount + 
((List)matchInLog.get("matches")).size();
 
-//theseMatches is never empty! As guaranteed by the 
#get().size() method above
+//matchInLog is never empty! As guaranteed by the 
#get().size() method above
 if (newCount == matchCount) {
 // matches and matchCount is not changed
 logs = rest(logs);
-offset = 0;
+startByteOffset = 0;
 fileOffset = fileOffset + 1;
 } else if (newCount >= numMatches) {
 matches = newMatches;
 break;
 } else {
 matches = newMatches;
 logs = rest(logs);
-offset = 0;
+startByteOffset = 0;
 fileOffset = fileOffset + 1;
 matchCount = newCount;
 }
 }
 
-return new Matched(fileOffset, search, matches);
+LOG.info("scanned {} files", scannedFiles);
+//fileOffset is not being used and it behaves inconsistently 
(showing
+// (index of files search ends on - 1) if [enough matches] else 
(index of files search ends on))
+// I don't think we should expose the data to public if it's not 
used.
+// can I dropped this field or change its behavior so it's used 
for metrics [numScannedFiles]?
--- End diff --

You can file a separate jira for this


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208705884
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java
 ---
@@ -55,6 +63,8 @@ public Response downloadFile(String fileName, String 
user, boolean isDaemon) thr
 File file = new File(rootDir, fileName).getCanonicalFile();
 if (file.exists()) {
 if (isDaemon || 
resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) {
+//How should I put the constant for magic numbers?
--- End diff --

It's fine here.  


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208704852
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 ---
@@ -223,8 +246,8 @@ void cleanupEmptyTopoDirectory(File dir) throws 
IOException {
 
 @VisibleForTesting
 FileFilter mkFileFilterForLogCleanup(long nowMillis) {
-final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis);
-return file -> !file.isFile() && 
lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis;
+//Doesn't it make more sense to do file.isDirectory here?
--- End diff --

I agree. Also the name of this method is not clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208706459
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java
 ---
@@ -88,9 +87,14 @@ public void setLogFilePermission(String fileName) throws 
IOException {
 
 if (runAsUser && topoOwner.isPresent() && file.exists() && 
!Files.isReadable(file.toPath())) {
 LOG.debug("Setting permissions on file {} with topo-owner {}", 
fileName, topoOwner);
-ClientSupervisorUtils.processLauncherAndWait(stormConf, 
topoOwner.get(),
-Lists.newArrayList("blob", file.getCanonicalPath()), 
null,
-"setup group read permissions for file: " + fileName);
+try {
+ClientSupervisorUtils.processLauncherAndWait(stormConf, 
topoOwner.get(),
+Lists.newArrayList("blob", 
file.getCanonicalPath()), null,
+"setup group read permissions for file: " + 
fileName);
+} catch (IOException e) {
+ExceptionMeters.NUM_PERMISSION_EXCEPTIONS.mark();
--- End diff --

The exception name is not clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208689498
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -71,6 +78,11 @@
 
 public class LogviewerLogSearchHandler {
 private static final Logger LOG = 
LoggerFactory.getLogger(LogviewerLogSearchHandler.class);
+private static final Meter numDeepSearchNoResult = 
StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result");
+private static final Histogram numFilesOpenedDeepSearch = 
StormMetricsRegistry.registerHistogram(
+"logviewer:num-files-opened-deep-search", new 
ExponentiallyDecayingReservoir());
--- End diff --

opened--> scanned? more clear


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208691270
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java
 ---
@@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, 
String value) throws InvalidR
 }
 }
 
+/**
+ * Find the first N matches of target string in files.
+ * @param logs all candidate log files to search
+ * @param numMatches number of matches expected
+ * @param fileOffset   
   Unclear metrics
+ * @param startByteOffset number of byte to be ignored in each log file
+ * @param targetStr searched string
+ * @return all matched results
+ */
 @VisibleForTesting
-Matched findNMatches(List logs, int numMatches, int fileOffset, 
int offset, String search) {
+Matched findNMatches(List logs, int numMatches, int fileOffset, 
int startByteOffset, String targetStr) {
 logs = drop(logs, fileOffset);
+LOG.debug("{} files to scan", logs.size());
 
 List> matches = new ArrayList<>();
 int matchCount = 0;
+int scannedFiles = 0;
 
+//TODO: Unnecessarily convoluted loop that should be optimized
--- End diff --

remove comment


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208718349
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set

[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208703547
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java
 ---
@@ -95,6 +102,9 @@ public LogCleaner(Map stormConf, 
WorkerLogs workerLogs, Director
 
 LOG.info("configured max total size of worker logs: {} MB, max 
total size of worker logs per directory: {} MB",
 maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb);
+//Switch to CachedGauge if this starts to hurt performance
+// 
https://stackoverflow.com/questions/5857199/how-to-find-out-the-size-of-file-and-directory-in-java-without-creating-the-obje
--- End diff --

remove link


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208651482
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
 ---
@@ -265,32 +269,26 @@ public Response daemonLogPage(String fileName, 
Integer start, Integer length, St
 String rootDir = daemonLogRoot;
 File file = new File(rootDir, fileName).getCanonicalFile();
 String path = file.getCanonicalPath();
-boolean isZipFile = path.endsWith(".gz");
 
 if (file.exists() && new 
File(rootDir).getCanonicalFile().equals(file.getParentFile())) {
 // all types of files included
 List logFiles = Arrays.stream(new 
File(rootDir).listFiles())
 .filter(File::isFile)
 .collect(toList());
 
-List filesStrWithoutFileParam = logFiles.stream()
-.map(File::getName).filter(fName -> 
!StringUtils.equals(fileName, fName)).collect(toList());
-
-List reorderedFilesStr = new ArrayList<>();
-reorderedFilesStr.addAll(filesStrWithoutFileParam);
+List reorderedFilesStr = logFiles.stream()
+.map(File::getName).filter(fName -> 
!StringUtils.equals(fileName, fName)).collect(toList());
 reorderedFilesStr.add(fileName);
 
 length = length != null ? Math.min(10485760, length) : 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-String logString;
-if (isTxtFile(fileName)) {
-logString = escapeHtml(start != null ? pageFile(path, 
start, length) : pageFile(path, length));
-} else {
-logString = escapeHtml("This is a binary file and cannot 
display! You may download the full file.");
+final boolean isZipFile = path.endsWith(".gz");
+long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : 
file.length();
--- End diff --

why not reuse `getFileLength`


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208703040
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
 ---
@@ -186,7 +192,22 @@ private boolean isFileEligibleToSkipDelete(boolean 
forPerDir, Set active
 break;
 }
 }
+} catch (IOException e) {
+ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark();
--- End diff --

better to use `NUM_FILE_SCANNED_EXCEPTIONS`?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208719630
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set

[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208377660
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -48,6 +53,30 @@ public static Meter registerMeter(String name) {
 return REGISTRY.register(name, new Meter());
 }
 
+//Change the name to avoid name conflict in future Metrics release
--- End diff --

change the name?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208394271
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java
 ---
@@ -193,24 +198,23 @@ public Response logPage(String fileName, Integer 
start, Integer length, String g
 throw e.getCause();
 }
 
-List filesStrWithoutFileParam = 
logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
-.filter(fileStr -> !StringUtils.equals(fileName, 
fileStr)).collect(toList());
-
-List reorderedFilesStr = new ArrayList<>();
-reorderedFilesStr.addAll(filesStrWithoutFileParam);
+List reorderedFilesStr = 
logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog)
+.filter(fileStr -> !StringUtils.equals(fileName, 
fileStr)).collect(toList());
 reorderedFilesStr.add(fileName);
 
 length = length != null ? Math.min(10485760, length) : 
LogviewerConstant.DEFAULT_BYTES_PER_PAGE;
-
-String logString;
-if (isTxtFile(fileName)) {
-logString = escapeHtml(start != null ? pageFile(path, 
start, length) : pageFile(path, length));
-} else {
-logString = escapeHtml("This is a binary file and 
cannot display! You may download the full file.");
+//This is the same as what #pageFile(String path, Integer 
tail) does
+// boolean isZipFile = path.endsWith(".gz");
+// long fileLength = isZipFile ? 
ServerUtils.zipFileSize(new File(path)) : new File(path).length();
+// return pageFile(path, Long.valueOf(fileLength - 
tail).intValue(), tail);
--- End diff --

remove comments


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208702901
  
--- Diff: 
storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java
 ---
@@ -124,6 +128,7 @@ public int deleteOldestWhileTooLarge(List dirs,
 File file = pq.poll();
 stack.push(file);
 }
+LOG.debug("pq: {}, stack: {}", pq, stack);
--- End diff --

do we still need this?


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208710506
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2890,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

fine here. Or you can use a constant. 


---


[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on a diff in the pull request:

https://github.com/apache/storm/pull/2754#discussion_r208377360
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java
 ---
@@ -51,6 +51,7 @@ public void run() {
 private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() {
 Map localHeartbeats;
 try {
+//TODO: This call no longer throws exceptions, do we still 
want to wrap it in try catch block?
--- End diff --

remove comment


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208714291
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208704942
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
+}
+//To millis. How should I put the constant for magic 
numbers?
--- End diff --

You can use TimeUnit instead to do conversions, removing the need for magic 
numbers 
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208703156
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -780,15 +870,15 @@ private static int numUsedWorkers(SchedulerAssignment 
assignment) {
 key.add(ni.get_node());
 key.add(ni.get_port_iterator().next());
 List> value = new ArrayList<>(entry.getValue());
-value.sort((a, b) -> a.get(0).compareTo(b.get(0)));
+value.sort(Comparator.comparing(a -> a.get(0)));
--- End diff --

Neat, didn't know about this method.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208707855
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 }
 }
 // make the new assignments for topologies
-Map newSchedulerAssignments = null;
 synchronized (schedLock) {
-newSchedulerAssignments = 
computeNewSchedulerAssignments(existingAssignments, topologies, bases, 
scratchTopoId);
+Map newSchedulerAssignments =
+computeNewSchedulerAssignments(existingAssignments, 
topologies, bases, scratchTopoId);
 
+//Should probably change List to Tuple 
for better readability
--- End diff --

If you want to do it, go ahead IMO.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208705728
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
+longestSchedulingTime.updateAndGet(t -> t > elapsed ? t : elapsed);
--- End diff --

Nit: Could use Math.max for this.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208721916
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
--- End diff --

Nit: This is a hint about the language, I'd rather not have it here.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208721202
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 newAssignments.put(topoId, newAssignment);
 }
 
-if (!newAssignments.equals(existingAssignments)) {
+boolean assignmentChanged = 
inspectSchduling(existingAssignments, newAssignments);
+if (assignmentChanged) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
-LOG.info("Fragmentation after scheduling is: {} MB, {} 
PCore CPUs", fragmentedMemory(), fragmentedCpu());
-nodeIdToResources.get().forEach((id, node) ->
-LOG.info(
-"Node Id: {} Total 
Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used "
-+ "CPU: {}, 
Available CPU: {}, fragmented: {}",
-id, 
node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(),
-
node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), 
isFragmented(node)));
 idToResources.set(new HashMap<>());
 idToWorkerResources.set(new HashMap<>());
 }
 
 //tasks figure out what tasks to talk to by looking at 
topology at runtime
 // only log/set when there's been a change to the assignment
+// TODO: why do we have loop fission here
--- End diff --

My guess would be for readability. If you want to refactor go ahead. 


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208699933
  
--- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java 
---
@@ -39,6 +42,11 @@ public String getId() {
 return getNodeId() + ":" + getPort();
 }
 
+public List toList() {
+//For compatibility to call in Nimbus#mkAssignment
--- End diff --

This seems to only be used in one place. Why is this better than invoking 
`Arrays.asList(slot.getNodeId(), slot.getPort())`?

The reason I'm asking is that to me, there isn't an obvious WorkerSlot to 
List translation, so I think a generic toList like this decreases readability 
compared to just writing out this code in Nimbus.

Same comment for ExecutorDetails.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208703338
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -411,6 +431,10 @@
 private final StormTimer timer;
 private final IScheduler scheduler;
 private final IScheduler underlyingScheduler;
+//Metrics related
+private final AtomicReference schedulingStartTime = new 
AtomicReference<>(null);
--- End diff --

Nit: Consider renaming these so it's obvious what time unit it is, e.g. 
`schedulingStartTimeNanos`


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208720133
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
--- End diff --

The name is a little vague. How about `calculateAssignmentChanged` or 
something like that? Also there's a missing e in scheduling.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208718858
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208709991
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
--- End diff --

Is this check correct? If existing is `A B C` and new is `B C`, 
entriesInCommon is `B C`, so this would be false.

Why is the check even needed? Don't we always want to remove all existing 
that aren't in new, and add all new that aren't in existing?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208714561
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
 return ret;
 }
 
-private static Map, List>> 
computeNewTopoToExecToNodePort(
-Map schedAssignments, Map existingAssignments) {
-Map, List>> ret = 
computeTopoToExecToNodePort(schedAssignments);
-// Print some useful information
-if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
-for (Entry, List>> entry : 
ret.entrySet()) {
-String topoId = entry.getKey();
-Map, List> execToNodePort = 
entry.getValue();
-Assignment assignment = existingAssignments.get(topoId);
-if (assignment == null) {
-continue;
+private boolean inspectSchduling(Map 
existingAssignments,
+Map 
newAssignments) {
+assert existingAssignments != null && newAssignments != null;
+boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
+long numRemovedExec = 0;
+long numRemovedSlot = 0;
+long numAddedExec   = 0;
+long numAddedSlot   = 0;
+if (existingAssignments.isEmpty()) {
+for (Entry entry : 
newAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
+}
+} else if (newAssignments.isEmpty()) {
+for (Entry entry : 
existingAssignments.entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
+}
+} else {
+MapDifference difference = 
Maps.difference(existingAssignments, newAssignments);
+if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
+for (Entry entry : 
difference.entriesOnlyOnLeft().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
+LOG.info("Remove executors: {}", execToPort.keySet());
+numRemovedSlot += count;
+numRemovedExec += execToPort.size();
 }
-Map, NodeInfo> old = 
assignment.get_executor_node_port();
-Map, List> reassigned = new HashMap<>();
-for (Entry, List> execAndNodePort : 
execToNodePort.entrySet()) {
-NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
-String node = (String) 
execAndNodePort.getValue().get(0);
-Long port = (Long) execAndNodePort.getValue().get(1);
-if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
-|| 
!port.equals(oldAssigned.get_port_iterator().next())) {
-reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
-}
+for (Entry entry : 
difference.entriesOnlyOnRight().entrySet()) {
+final Map, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
+final long count = new 
HashSet<>(execToPort.values()).size();
+LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
+LOG.info("Assign executors: {}", execToPort.keySet());
+numAddedSlot += count;
+numAddedExec += execToPort.size();
 }
 
-if (!reassigned.isEmpty()) {
-int count = (new 
HashSet<>(execToNodePort.values())).size();
-Set> 

[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208705183
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
+long elapsed = -schedulingStartTime.getAndSet(null) + 
Time.nanoTime();
--- End diff --

This is a weird way to write this. Why `-a + b` rather than `b - a`?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208722273
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2807,16 +2897,15 @@ public void launchServer() throws Exception {
 }
 });
 
-StormMetricsRegistry.registerGauge("nimbus:num-supervisors", 
() -> state.supervisors(null).size());
-StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", 
this::fragmentedMemory);
-StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", 
this::fragmentedCpu);
-StormMetricsRegistry.registerGauge("nimbus:available-memory", 
() -> nodeIdToResources.get().values()
+//Be cautious using method reference instead of lambda. 
subexpression preceding :: will be evaluated only upon evaluation
+// Num supervisor, and fragmented resources have been included 
in cluster summary
+
StormMetricsRegistry.registerGauge("nimbus:total-available-memory 
(nonegative)", () -> nodeIdToResources.get().values()
 .parallelStream()
-.mapToDouble(SupervisorResources::getAvailableMem)
+.mapToDouble(supervisorResources -> 
Math.max(supervisorResources.getAvailableMem(), 0))
 .sum());
-StormMetricsRegistry.registerGauge("nimbus:available-cpu", () 
-> nodeIdToResources.get().values()
+StormMetricsRegistry.registerGauge("nimbus:available-cpu 
(nonnegative)", () -> nodeIdToResources.get().values()
--- End diff --

I don't know whether we need to worry about this, but do we need to change 
the metric names here? If so, I'd rather build the non-negativity into the 
name, e.g. `nimbus:available-cpu-non-negative`.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208724033
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -2826,9 +2915,22 @@ public void launchServer() throws Exception {
 .parallelStream()
 .mapToDouble(SupervisorResources::getTotalCpu)
 .sum());
-
+
StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> {
+Long currTime = Time.nanoTime();
+Long startTime = schedulingStartTime.get();
+//There could be race condition here but seems trivial, 
elapsed is
+// guaranteed to be no longer than real elapsed time of 
scheduling
+Long longest = longestSchedulingTime.get();
+if (startTime != null) {
+longest = currTime - startTime > longest ? currTime - 
startTime : longest;
--- End diff --

I don't understand what is happening here. Why are we potentially 
calculating a new longest here?


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r203726617
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java
 ---
@@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, 
Map topoConf
 public void start() {
 if (reporter != null) {
 LOG.debug("Starting...");
+//TODO: will we make the period customizable?
--- End diff --

Please either raise an issue for this or remove the TODO. They have a habit 
of getting put in the code and then forgotten.


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r203727117
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) 
throws Exception {
 
 if (!newAssignments.equals(existingAssignments)) {
 LOG.debug("RESETTING id->resources and 
id->worker-resources cache!");
+//Should we change these logs from info to debug after 
they are port to metrics?
--- End diff --

Same as above


---


[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2764#discussion_r208706303
  
--- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1984,11 +2074,13 @@ private int fragmentedCpu() {
 Cluster cluster = new Cluster(inimbus, supervisors, 
topoToSchedAssignment, topologies, conf);
 cluster.setStatusMap(idToSchedStatus.get());
 
-long beforeSchedule = System.currentTimeMillis();
+schedulingStartTime.set(Time.nanoTime());
 scheduler.schedule(topologies, cluster);
-long scheduleTimeElapsedMs = System.currentTimeMillis() - 
beforeSchedule;
-LOG.debug("Scheduling took {} ms for {} topologies", 
scheduleTimeElapsedMs, topologies.getTopologies().size());
-scheduleTopologyTimeMs.update(scheduleTimeElapsedMs);
+//Will compiler optimize the order of evalutation and cause race 
condition?
--- End diff --

Which race are you worried about here?


---


[GitHub] storm issue #2798: STORM-3184: Mask the plaintext passwords from the logs

2018-08-08 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2798
  
ping @HeartSaVioR 


---


[GitHub] storm pull request #2798: STORM-3184: Mask the plaintext passwords from the ...

2018-08-08 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/2798

STORM-3184: Mask the plaintext passwords from the logs

Introduce a `Password` config annotation and use it to mark configs that are
sensitive and mask the values while logging.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm STORM-3184

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2798.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2798


commit fc942ee10d86649db9e9b8ce3dc0a04ea23439ce
Author: Arun Mahadevan 
Date:   2018-08-07T19:13:54Z

STORM-3184: Mask the plaintext passwords from the logs

Introduce a `Password` config annotation and use it to mark configs that are
sensitive and mask the values while logging.




---


[GitHub] storm issue #2764: STORM-3147: Port ClusterSummary as metrics to StormMetric...

2018-08-08 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2764
  
Has been rebased on 3133 now.


---


[GitHub] storm pull request #2743: [STORM-3130]: Add Wrappers for Timer registration ...

2018-08-08 Thread zd-project
Github user zd-project closed the pull request at:

https://github.com/apache/storm/pull/2743


---


[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...

2018-08-08 Thread zd-project
Github user zd-project commented on the issue:

https://github.com/apache/storm/pull/2743
  
Merged in #2710 


---


[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2743
  
merged in https://github.com/apache/storm/pull/2710


---


[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...

2018-08-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2710


---


[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...

2018-08-08 Thread Ethanlm
Github user Ethanlm commented on the issue:

https://github.com/apache/storm/pull/2710
  
@zd-project  Could you squash all the commits? Will merge this in.


---


[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.

2018-08-08 Thread srdo
Github user srdo commented on a diff in the pull request:

https://github.com/apache/storm/pull/2790#discussion_r208577649
  
--- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
 ---
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.bolt;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Serializable callback for use with the KafkaProducer on KafkaBolt.
+ */
+public interface PreparableCallback extends Callback, Serializable {
+void prepare(Map topoConf, TopologyContext context);
--- End diff --

I'm a little unsure what you're asking. It's an extension of the Kafka 
Callback interface, which adds the prepare method that fits Storm. 


---