[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208748740 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749692 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() .parallelStream() -.mapToDouble(SupervisorResources::getAvailableMem) +.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0)) .sum()); -StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values() +StormMetricsRegistry.registerGauge("nimbus:available-cpu (nonnegative)", () -> nodeIdToResources.get().values() --- End diff -- See Jira [STORM-3151](https://issues.apache.org/jira/browse/STORM-3151?filter=-2) ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208751049 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; --- End diff -- We would like to compute the distribution of scheduler latency as well as the longest scheduling iteration. If a scheduler is stuck in the middle of a scheduling iteration, the histogram won't reflect that until the scheduling iteration has ended because timer only report the time for a complete cycle. Hence I added this gauge to track the longest scheduling iteration in real time. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749524 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation --- End diff -- Interesting, I thought I removed this already. Maybe it's rebased back in accidentally. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749175 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208748414 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- Good catch. I'll use `areEqual` instead. This method is for inspecting the performance (throughput) of scheduler, specifically, how many assignments, workers and executors are changed. It doesn't affect any actual assignments results. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208749367 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- Will log in Jira ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208747514 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- Okay will come up with a better name. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746949 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) throws Exception { } } // make the new assignments for topologies -Map newSchedulerAssignments = null; synchronized (schedLock) { -newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +Map newSchedulerAssignments = +computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +//Should probably change List to Tuple for better readability --- End diff -- Again, probably better with another Jira for general code cleanup ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746736 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- This is probably trivial, but I think it's possible to have longestSchdulingTime slightly higher max of scheduleTopologyTimeMs. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746266 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); --- End diff -- Oh my bad. This is probably premature optimization. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208746090 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- I'm concerned of loss of precision here since all of them return long instead of double after conversion. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208745736 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- I thought it looks cleaner to on Nimbus side. The legacy implementation isn't pretty to start with. (a lot of List or List) If you want I can file a Jira to improve `mkAssignment` in general, to eliminate them altogether. ---
[GitHub] storm issue #2764: STORM-3147: Port ClusterSummary as metrics to StormMetric...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2764 Please notice that only the last commit is under #2764, which was previously out of sync with #2754 because I don't want to have too many dependencies between PR. For issue specific to #2754 I think it's better to review it there. @srdo ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208744273 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) throws Exception { if (!newAssignments.equals(existingAssignments)) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); +//Should we change these logs from info to debug after they are port to metrics? --- End diff -- Will log in Jira. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208743919 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java --- @@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf public void start() { if (reporter != null) { LOG.debug("Starting..."); +//TODO: will we make the period customizable? --- End diff -- Will log in Jira. ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208723928 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2288,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208707067 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java --- @@ -134,6 +150,7 @@ public Response daemonLog(@Context HttpServletRequest request) throws IOExceptio */ @GET @Path("/searchLogs") +//Seems redundant --- End diff -- remove ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208723798 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2057,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208691016 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics --- End diff -- comment seems off ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208726809 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting -Matched findNMatches(List logs, int numMatches, int fileOffset, int offset, String search) { +Matched findNMatches(List logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); +LOG.debug("{} files to scan", logs.size()); List> matches = new ArrayList<>(); int matchCount = 0; +int scannedFiles = 0; +//TODO: Unnecessarily convoluted loop that should be optimized while (true) { if (logs.isEmpty()) { break; } File firstLog = logs.get(0); -Map theseMatches; +Map matchInLog; try { LOG.debug("Looking through {}", firstLog); -theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); +matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset); +scannedFiles++; } catch (InvalidRequestException e) { LOG.error("Can't search past end of file.", e); -theseMatches = new HashMap<>(); +matchInLog = new HashMap<>(); } String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); +//This section simply put the formatted log filename and corresponding port in the matching. final List> newMatches = new ArrayList<>(matches); -Map currentFileMatch = new HashMap<>(theseMatches); +Map currentFileMatch = new HashMap<>(matchInLog); currentFileMatch.put("fileName", fileName); Path firstLogAbsPath; try { firstLogAbsPath = firstLog.getCanonicalFile().toPath(); } catch (IOException e) { throw new RuntimeException(e); } +//Why do we need to start from scratch to retrieve just the port here? currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString()); newMatches.add(currentFileMatch); -int newCount = matchCount + ((List)theseMatches.get("matches")).size(); +int newCount = matchCount + ((List)matchInLog.get("matches")).size(); -//theseMatches is never empty! As guaranteed by the #get().size() method above +//matchInLog is never empty! As guaranteed by the #get().size() method above if (newCount == matchCount) { // matches and matchCount is not changed logs = rest(logs); -offset = 0; +startByteOffset = 0; fileOffset = fileOffset + 1; } else if (newCount >= numMatches) { matches = newMatches; break; } else { matches = newMatches; logs = rest(logs); -offset = 0; +startByteOffset = 0; fileOffset = fileOffset + 1; matchCount = newCount; } } -return new Matched(fileOffset, search, matches); +LOG.info("scanned {} files", scannedFiles); +//fileOffset is not being used and it behaves inconsistently (showing +// (index of files search ends on - 1) if [enough matches] else (index of files search ends on)) +// I don't think we should expose the data to public if it's not used. +// can I dropped this field or change its behavior so it's used for metrics [numScannedFiles]? --- End diff -- You can file a separate jira for this ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208705884 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java --- @@ -55,6 +63,8 @@ public Response downloadFile(String fileName, String user, boolean isDaemon) thr File file = new File(rootDir, fileName).getCanonicalFile(); if (file.exists()) { if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) { +//How should I put the constant for magic numbers? --- End diff -- It's fine here. ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208704852 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java --- @@ -223,8 +246,8 @@ void cleanupEmptyTopoDirectory(File dir) throws IOException { @VisibleForTesting FileFilter mkFileFilterForLogCleanup(long nowMillis) { -final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis); -return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis; +//Doesn't it make more sense to do file.isDirectory here? --- End diff -- I agree. Also the name of this method is not clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208706459 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java --- @@ -88,9 +87,14 @@ public void setLogFilePermission(String fileName) throws IOException { if (runAsUser && topoOwner.isPresent() && file.exists() && !Files.isReadable(file.toPath())) { LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner); -ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), -Lists.newArrayList("blob", file.getCanonicalPath()), null, -"setup group read permissions for file: " + fileName); +try { +ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), +Lists.newArrayList("blob", file.getCanonicalPath()), null, +"setup group read permissions for file: " + fileName); +} catch (IOException e) { +ExceptionMeters.NUM_PERMISSION_EXCEPTIONS.mark(); --- End diff -- The exception name is not clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208689498 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -71,6 +78,11 @@ public class LogviewerLogSearchHandler { private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); +private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result"); +private static final Histogram numFilesOpenedDeepSearch = StormMetricsRegistry.registerHistogram( +"logviewer:num-files-opened-deep-search", new ExponentiallyDecayingReservoir()); --- End diff -- opened--> scanned? more clear ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208691270 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java --- @@ -388,63 +414,83 @@ private Integer tryParseIntParam(String paramName, String value) throws InvalidR } } +/** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset Unclear metrics + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting -Matched findNMatches(List logs, int numMatches, int fileOffset, int offset, String search) { +Matched findNMatches(List logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); +LOG.debug("{} files to scan", logs.size()); List> matches = new ArrayList<>(); int matchCount = 0; +int scannedFiles = 0; +//TODO: Unnecessarily convoluted loop that should be optimized --- End diff -- remove comment ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208718349 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208703547 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java --- @@ -95,6 +102,9 @@ public LogCleaner(Map stormConf, WorkerLogs workerLogs, Director LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB", maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb); +//Switch to CachedGauge if this starts to hurt performance +// https://stackoverflow.com/questions/5857199/how-to-find-out-the-size-of-file-and-directory-in-java-without-creating-the-obje --- End diff -- remove link ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208651482 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java --- @@ -265,32 +269,26 @@ public Response daemonLogPage(String fileName, Integer start, Integer length, St String rootDir = daemonLogRoot; File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); -boolean isZipFile = path.endsWith(".gz"); if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { // all types of files included List logFiles = Arrays.stream(new File(rootDir).listFiles()) .filter(File::isFile) .collect(toList()); -List filesStrWithoutFileParam = logFiles.stream() -.map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); - -List reorderedFilesStr = new ArrayList<>(); -reorderedFilesStr.addAll(filesStrWithoutFileParam); +List reorderedFilesStr = logFiles.stream() +.map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -String logString; -if (isTxtFile(fileName)) { -logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); -} else { -logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); +final boolean isZipFile = path.endsWith(".gz"); +long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); --- End diff -- why not reuse `getFileLength` ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208703040 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java --- @@ -186,7 +192,22 @@ private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set active break; } } +} catch (IOException e) { +ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); --- End diff -- better to use `NUM_FILE_SCANNED_EXCEPTIONS`? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208719630 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +748,99 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208377660 --- Diff: storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -48,6 +53,30 @@ public static Meter registerMeter(String name) { return REGISTRY.register(name, new Meter()); } +//Change the name to avoid name conflict in future Metrics release --- End diff -- change the name? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208394271 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java --- @@ -193,24 +198,23 @@ public Response logPage(String fileName, Integer start, Integer length, String g throw e.getCause(); } -List filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) -.filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); - -List reorderedFilesStr = new ArrayList<>(); -reorderedFilesStr.addAll(filesStrWithoutFileParam); +List reorderedFilesStr = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) +.filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - -String logString; -if (isTxtFile(fileName)) { -logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); -} else { -logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); +//This is the same as what #pageFile(String path, Integer tail) does +// boolean isZipFile = path.endsWith(".gz"); +// long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); +// return pageFile(path, Long.valueOf(fileLength - tail).intValue(), tail); --- End diff -- remove comments ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208702901 --- Diff: storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java --- @@ -124,6 +128,7 @@ public int deleteOldestWhileTooLarge(List dirs, File file = pq.poll(); stack.push(file); } +LOG.debug("pq: {}, stack: {}", pq, stack); --- End diff -- do we still need this? ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208710506 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2890,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- fine here. Or you can use a constant. ---
[GitHub] storm pull request #2754: STORM-3133: Extend metrics on Nimbus and LogViewer
Github user Ethanlm commented on a diff in the pull request: https://github.com/apache/storm/pull/2754#discussion_r208377360 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/ReportWorkerHeartbeats.java --- @@ -51,6 +51,7 @@ public void run() { private SupervisorWorkerHeartbeats getAndResetWorkerHeartbeats() { Map localHeartbeats; try { +//TODO: This call no longer throws exceptions, do we still want to wrap it in try catch block? --- End diff -- remove comment ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208714291 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208704942 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; +} +//To millis. How should I put the constant for magic numbers? --- End diff -- You can use TimeUnit instead to do conversions, removing the need for magic numbers https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/TimeUnit.html ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208703156 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -780,15 +870,15 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { key.add(ni.get_node()); key.add(ni.get_port_iterator().next()); List> value = new ArrayList<>(entry.getValue()); -value.sort((a, b) -> a.get(0).compareTo(b.get(0))); +value.sort(Comparator.comparing(a -> a.get(0))); --- End diff -- Neat, didn't know about this method. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208707855 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2131,17 +2223,13 @@ private void mkAssignments(String scratchTopoId) throws Exception { } } // make the new assignments for topologies -Map newSchedulerAssignments = null; synchronized (schedLock) { -newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +Map newSchedulerAssignments = +computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); +//Should probably change List to Tuple for better readability --- End diff -- If you want to do it, go ahead IMO. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208705728 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); +longestSchedulingTime.updateAndGet(t -> t > elapsed ? t : elapsed); --- End diff -- Nit: Could use Math.max for this. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208721916 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation --- End diff -- Nit: This is a hint about the language, I'd rather not have it here. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208721202 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2219,21 +2305,16 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } -if (!newAssignments.equals(existingAssignments)) { +boolean assignmentChanged = inspectSchduling(existingAssignments, newAssignments); +if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); -LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); -nodeIdToResources.get().forEach((id, node) -> -LOG.info( -"Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " -+ "CPU: {}, Available CPU: {}, fragmented: {}", -id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } //tasks figure out what tasks to talk to by looking at topology at runtime // only log/set when there's been a change to the assignment +// TODO: why do we have loop fission here --- End diff -- My guess would be for readability. If you want to refactor go ahead. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208699933 --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java --- @@ -39,6 +42,11 @@ public String getId() { return getNodeId() + ":" + getPort(); } +public List toList() { +//For compatibility to call in Nimbus#mkAssignment --- End diff -- This seems to only be used in one place. Why is this better than invoking `Arrays.asList(slot.getNodeId(), slot.getPort())`? The reason I'm asking is that to me, there isn't an obvious WorkerSlot to List translation, so I think a generic toList like this decreases readability compared to just writing out this code in Nimbus. Same comment for ExecutorDetails. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208703338 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -411,6 +431,10 @@ private final StormTimer timer; private final IScheduler scheduler; private final IScheduler underlyingScheduler; +//Metrics related +private final AtomicReference schedulingStartTime = new AtomicReference<>(null); --- End diff -- Nit: Consider renaming these so it's obvious what time unit it is, e.g. `schedulingStartTimeNanos` ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208720133 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, --- End diff -- The name is a little vague. How about `calculateAssignmentChanged` or something like that? Also there's a missing e in scheduling. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208718858 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208709991 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- Is this check correct? If existing is `A B C` and new is `B C`, entriesInCommon is `B C`, so this would be false. Why is the check even needed? Don't we always want to remove all existing that aren't in new, and add all new that aren't in existing? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208714561 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } -private static Map, List>> computeNewTopoToExecToNodePort( -Map schedAssignments, Map existingAssignments) { -Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); -// Print some useful information -if (existingAssignments != null && !existingAssignments.isEmpty()) { -for (Entry, List>> entry : ret.entrySet()) { -String topoId = entry.getKey(); -Map, List> execToNodePort = entry.getValue(); -Assignment assignment = existingAssignments.get(topoId); -if (assignment == null) { -continue; +private boolean inspectSchduling(Map existingAssignments, +Map newAssignments) { +assert existingAssignments != null && newAssignments != null; +boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); +long numRemovedExec = 0; +long numRemovedSlot = 0; +long numAddedExec = 0; +long numAddedSlot = 0; +if (existingAssignments.isEmpty()) { +for (Entry entry : newAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); +} +} else if (newAssignments.isEmpty()) { +for (Entry entry : existingAssignments.entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); +} +} else { +MapDifference difference = Maps.difference(existingAssignments, newAssignments); +if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { +for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Removing {} from {} slots", entry.getKey(), count); +LOG.info("Remove executors: {}", execToPort.keySet()); +numRemovedSlot += count; +numRemovedExec += execToPort.size(); } -Map, NodeInfo> old = assignment.get_executor_node_port(); -Map, List> reassigned = new HashMap<>(); -for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { -NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); -String node = (String) execAndNodePort.getValue().get(0); -Long port = (Long) execAndNodePort.getValue().get(1); -if (oldAssigned == null || !oldAssigned.get_node().equals(node) -|| !port.equals(oldAssigned.get_port_iterator().next())) { -reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); -} +for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { +final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); +final long count = new HashSet<>(execToPort.values()).size(); +LOG.info("Assigning {} to {} slots", entry.getKey(), count); +LOG.info("Assign executors: {}", execToPort.keySet()); +numAddedSlot += count; +numAddedExec += execToPort.size(); } -if (!reassigned.isEmpty()) { -int count = (new HashSet<>(execToNodePort.values())).size(); -Set>
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208705183 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? +long elapsed = -schedulingStartTime.getAndSet(null) + Time.nanoTime(); --- End diff -- This is a weird way to write this. Why `-a + b` rather than `b - a`? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208722273 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2807,16 +2897,15 @@ public void launchServer() throws Exception { } }); -StormMetricsRegistry.registerGauge("nimbus:num-supervisors", () -> state.supervisors(null).size()); -StormMetricsRegistry.registerGauge("nimbus:fragmented-memory", this::fragmentedMemory); -StormMetricsRegistry.registerGauge("nimbus:fragmented-cpu", this::fragmentedCpu); -StormMetricsRegistry.registerGauge("nimbus:available-memory", () -> nodeIdToResources.get().values() +//Be cautious using method reference instead of lambda. subexpression preceding :: will be evaluated only upon evaluation +// Num supervisor, and fragmented resources have been included in cluster summary + StormMetricsRegistry.registerGauge("nimbus:total-available-memory (nonegative)", () -> nodeIdToResources.get().values() .parallelStream() -.mapToDouble(SupervisorResources::getAvailableMem) +.mapToDouble(supervisorResources -> Math.max(supervisorResources.getAvailableMem(), 0)) .sum()); -StormMetricsRegistry.registerGauge("nimbus:available-cpu", () -> nodeIdToResources.get().values() +StormMetricsRegistry.registerGauge("nimbus:available-cpu (nonnegative)", () -> nodeIdToResources.get().values() --- End diff -- I don't know whether we need to worry about this, but do we need to change the metric names here? If so, I'd rather build the non-negativity into the name, e.g. `nimbus:available-cpu-non-negative`. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208724033 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -2826,9 +2915,22 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { +Long currTime = Time.nanoTime(); +Long startTime = schedulingStartTime.get(); +//There could be race condition here but seems trivial, elapsed is +// guaranteed to be no longer than real elapsed time of scheduling +Long longest = longestSchedulingTime.get(); +if (startTime != null) { +longest = currTime - startTime > longest ? currTime - startTime : longest; --- End diff -- I don't understand what is happening here. Why are we potentially calculating a new longest here? ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r203726617 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/metrics/reporters/ConsolePreparableReporter.java --- @@ -52,6 +52,7 @@ public void prepare(MetricRegistry metricsRegistry, Map topoConf public void start() { if (reporter != null) { LOG.debug("Starting..."); +//TODO: will we make the period customizable? --- End diff -- Please either raise an issue for this or remove the TODO. They have a habit of getting put in the code and then forgotten. ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r203727117 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -,13 +2231,23 @@ private void mkAssignments(String scratchTopoId) throws Exception { if (!newAssignments.equals(existingAssignments)) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); +//Should we change these logs from info to debug after they are port to metrics? --- End diff -- Same as above ---
[GitHub] storm pull request #2764: STORM-3147: Port ClusterSummary as metrics to Stor...
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208706303 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -1984,11 +2074,13 @@ private int fragmentedCpu() { Cluster cluster = new Cluster(inimbus, supervisors, topoToSchedAssignment, topologies, conf); cluster.setStatusMap(idToSchedStatus.get()); -long beforeSchedule = System.currentTimeMillis(); +schedulingStartTime.set(Time.nanoTime()); scheduler.schedule(topologies, cluster); -long scheduleTimeElapsedMs = System.currentTimeMillis() - beforeSchedule; -LOG.debug("Scheduling took {} ms for {} topologies", scheduleTimeElapsedMs, topologies.getTopologies().size()); -scheduleTopologyTimeMs.update(scheduleTimeElapsedMs); +//Will compiler optimize the order of evalutation and cause race condition? --- End diff -- Which race are you worried about here? ---
[GitHub] storm issue #2798: STORM-3184: Mask the plaintext passwords from the logs
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2798 ping @HeartSaVioR ---
[GitHub] storm pull request #2798: STORM-3184: Mask the plaintext passwords from the ...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/storm/pull/2798 STORM-3184: Mask the plaintext passwords from the logs Introduce a `Password` config annotation and use it to mark configs that are sensitive and mask the values while logging. You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/storm STORM-3184 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2798.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2798 commit fc942ee10d86649db9e9b8ce3dc0a04ea23439ce Author: Arun Mahadevan Date: 2018-08-07T19:13:54Z STORM-3184: Mask the plaintext passwords from the logs Introduce a `Password` config annotation and use it to mark configs that are sensitive and mask the values while logging. ---
[GitHub] storm issue #2764: STORM-3147: Port ClusterSummary as metrics to StormMetric...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2764 Has been rebased on 3133 now. ---
[GitHub] storm pull request #2743: [STORM-3130]: Add Wrappers for Timer registration ...
Github user zd-project closed the pull request at: https://github.com/apache/storm/pull/2743 ---
[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...
Github user zd-project commented on the issue: https://github.com/apache/storm/pull/2743 Merged in #2710 ---
[GitHub] storm issue #2743: [STORM-3130]: Add Wrappers for Timer registration and tim...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2743 merged in https://github.com/apache/storm/pull/2710 ---
[GitHub] storm pull request #2710: STORM-3099: Extend metrics on supervisor, workers ...
Github user asfgit closed the pull request at: https://github.com/apache/storm/pull/2710 ---
[GitHub] storm issue #2710: STORM-3099: Extend metrics on supervisor, workers and DRP...
Github user Ethanlm commented on the issue: https://github.com/apache/storm/pull/2710 @zd-project Could you squash all the commits? Will merge this in. ---
[GitHub] storm pull request #2790: STORM-3175 - Allow usage of custom Callback.
Github user srdo commented on a diff in the pull request: https://github.com/apache/storm/pull/2790#discussion_r208577649 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java --- @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.bolt; + +import java.io.Serializable; +import java.util.Map; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.storm.task.TopologyContext; + +/** + * Serializable callback for use with the KafkaProducer on KafkaBolt. + */ +public interface PreparableCallback extends Callback, Serializable { +void prepare(Map topoConf, TopologyContext context); --- End diff -- I'm a little unsure what you're asking. It's an extension of the Kafka Callback interface, which adds the prepare method that fits Storm. ---