[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239306840 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- @agresch Got your idea. In my personal opinion, for production, if a node has a blacklist supervisor(which means the supervisor does not send any HBs for some time), most of the cases it because the node machine itself has some problems(for now there are a few causes like: disk is full or network is in disconnection), so a safer way is we never schedule workers to the node if there are some blacklist supervisors on it. If you want to make use of the healthy supervisor on the node(has some blacklist supervisors also), at least there is a decision to make sure the supervisor is healthy, we can do this through checking the heartbeats of it. ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2920#discussion_r239235259 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java --- @@ -24,7 +24,7 @@ public void processWorkerMetrics(Map conf, WorkerMetrics metrics) throws MetricException { try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { client.getClient().processWorkerMetrics(metrics); -} catch (TException e) { --- End diff -- done ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
Github user kishorvpatil commented on a diff in the pull request: https://github.com/apache/storm/pull/2920#discussion_r239231045 --- Diff: storm-server/src/main/java/org/apache/storm/metricstore/NimbusMetricProcessor.java --- @@ -24,7 +24,7 @@ public void processWorkerMetrics(Map conf, WorkerMetrics metrics) throws MetricException { try (NimbusClient client = NimbusClient.getConfiguredClient(conf)) { client.getClient().processWorkerMetrics(metrics); -} catch (TException e) { --- End diff -- Could we instead of handling all exceptions, be more specific on capturing `NimbusLeaderNotFoundException` ? `catch (TException | NimbusLeaderNotFoundException e)` ---
[GitHub] storm issue #2917: [STORM-3294] Upgrade jetty version to latest stable 9.4.1...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2917 Ok, just wanted to be sure that there wasn't an issue to document. +1 ---
[GitHub] storm pull request #2920: STORM-3297 prevent supervisor restart when no nimb...
GitHub user agresch opened a pull request: https://github.com/apache/storm/pull/2920 STORM-3297 prevent supervisor restart when no nimbus leader exists You can merge this pull request into a Git repository by running: $ git pull https://github.com/agresch/storm agresch_processWorkerMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2920.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2920 commit d8e649d2139a7fb7e71c022a28eefd40f684308e Author: Aaron Gresch Date: 2018-12-05T17:27:43Z STORM-3297 prevent supervisor restart when no nimbus leader exists ---
NimbusLeaderNotFoundException
I was wondering if there was support to change https://git.ouroath.com/storm/storm/blob/master/storm-client/src/jvm/org/apache/storm/utils/NimbusLeaderNotFoundException.java to a checked exception.
[GitHub] storm issue #2917: [STORM-3294] Upgrade jetty version to latest stable 9.4.1...
Github user kishorvpatil commented on the issue: https://github.com/apache/storm/pull/2917 @srdo There was no explicit issue. Just wanted to upgrade to latest on jetty minor version ---
[GitHub] storm issue #2917: [STORM-3294] Upgrade jetty version to latest stable 9.4.1...
Github user srdo commented on the issue: https://github.com/apache/storm/pull/2917 Looks good. Could you update the commit message to contain the issue number? Is this upgrade fixing a known issue? ---
[GitHub] storm pull request #2919: STORM-3296: Upgrade curator-test to avoid CURATOR-...
GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/2919 STORM-3296: Upgrade curator-test to avoid CURATOR-409 https://issues.apache.org/jira/browse/STORM-3296 You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-3296 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2919.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2919 commit 3c4646a14dff42fc23809192d56efcb29a14525e Author: Stig Rohde Døssing Date: 2018-12-05T13:56:41Z STORM-3296: Upgrade curator-test to avoid CURATOR-409 ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239066404 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- If there are two supervisors on a host that are blacklisted and we release one, one supervisor will remain blacklisted. This single blacklisted supervisor will be returned here: https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167 Then this code will see that the same host that both supervisors are on is blacklisted, preventing any scheduling on that node: https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java#L167 I think a better overall long term solution is to blacklist supervisors instead of hosts, but that touches a lot more code. In the short term I think this is a relatively small tradeoff to allow blacklisting to work with multiple supervisors per host. ---
[GitHub] storm pull request #2918: STORM-3295 allow blacklist scheduling to function ...
Github user danny0405 commented on a diff in the pull request: https://github.com/apache/storm/pull/2918#discussion_r239039664 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/RasBlacklistStrategy.java --- @@ -79,25 +81,46 @@ if (shortage.areAnyOverZero() || shortageSlots > 0) { LOG.info("Need {} and {} slots more. Releasing some blacklisted nodes to cover it.", shortage, shortageSlots); -//release earliest blacklist -for (String supervisor : blacklistedNodeIds) { -SupervisorDetails sd = availableSupervisors.get(supervisor); -if (sd != null) { -NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); -int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); -readyToRemove.add(supervisor); -shortage.remove(sdAvailable, cluster.getResourceMetrics()); -shortageSlots -= sdAvailableSlots; -LOG.debug("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisor, -sdAvailable, sdAvailableSlots, shortage, shortageSlots); -if (!shortage.areAnyOverZero() && shortageSlots <= 0) { -// we have enough resources now... -break; + +//release earliest blacklist - but release all supervisors on a given blacklisted host. +Map> hostToSupervisorIds = createHostToSupervisorMap(blacklistedNodeIds, cluster); +for (Set supervisorIds : hostToSupervisorIds.values()) { +for (String supervisorId : supervisorIds) { +SupervisorDetails sd = availableSupervisors.get(supervisorId); +if (sd != null) { +NormalizedResourcesWithMemory sdAvailable = cluster.getAvailableResources(sd); +int sdAvailableSlots = cluster.getAvailablePorts(sd).size(); +readyToRemove.add(supervisorId); +shortage.remove(sdAvailable, cluster.getResourceMetrics()); +shortageSlots -= sdAvailableSlots; +LOG.info("Releasing {} with {} and {} slots leaving {} and {} slots to go", supervisorId, +sdAvailable, sdAvailableSlots, shortage, shortageSlots); --- End diff -- So what is the purpose we need to release supervisors blacklist grouping by nodes. What if the node is stuck or broken ? If we shuffle the release by nodes, may be the risk would be spread evenly. ---