Github user agresch commented on a diff in the pull request: https://github.com/apache/storm/pull/2630#discussion_r180801771 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -100,49 +180,88 @@ private static void markFailedTopology(User u, Cluster c, TopologyDetails td, St u.markTopoUnsuccess(td); } + private void cancelAllPendingClusterStateChanged() { + if (!schedulingInBackground.isEmpty()) { + LOG.warn("Canceling scheduling of {} cluster state changed", schedulingInBackground.keySet()); + for (SchedulingPending sp : schedulingInBackground.values()) { + sp.cancel(); + } + schedulingInBackground.clear(); + } + } + private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, List<TopologyDetails> orderedTopologies) { + LOG.debug("Scheduling {}", td.getId()); + SchedulingPending sp = schedulingInBackground.get(td.getId()); + if (sp == null) { + IStrategy rasStrategy; + String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); + try { + String strategy = strategyConf; + if (strategy.startsWith("backtype.storm")) { + // Storm supports to launch workers of older version. + // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name. + strategy = strategy.replace("backtype.storm", "org.apache.storm"); + LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY"); + } + rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf); + rasStrategy.prepare(conf); + } catch (DisallowedStrategyException e) { + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - " + e.getAttemptedClass() + + " is not an allowed strategy. Please make sure your " + + Config.TOPOLOGY_SCHEDULER_STRATEGY + + " config is one of the allowed strategies: " + + e.getAllowedStrategies(), e); + return; + } catch (RuntimeException e) { + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - failed to create instance of topology strategy " + + strategyConf + + ". Please check logs for details", e); + return; + } + + sp = new SchedulingPending(rasStrategy, 0); + } + //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds Cluster workingState = new Cluster(cluster); RAS_Nodes nodes = new RAS_Nodes(workingState); - IStrategy rasStrategy = null; - String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); - try { - String strategy = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); - if (strategy.startsWith("backtype.storm")) { - // Storm supports to launch workers of older version. - // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from the older version, replace the package name. - strategy = strategy.replace("backtype.storm", "org.apache.storm"); - LOG.debug("Replace backtype.storm with org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY"); - } - rasStrategy = ReflectionUtils.newSchedulerStrategyInstance(strategy, conf); - rasStrategy.prepare(conf); - } catch (DisallowedStrategyException e) { - markFailedTopology(topologySubmitter, cluster, td, - "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " - + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " - + e.getAllowedStrategies(), e); - return; - } catch (RuntimeException e) { - markFailedTopology(topologySubmitter, cluster, td, - "Unsuccessful in scheduling - failed to create instance of topology strategy " - + strategyConf - + ". Please check logs for details", e); - return; - } - for (int i = 0; i < maxSchedulingAttempts; i++) { + for (int i = sp.getAttempt(); i < maxSchedulingAttempts; i++) { SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId()); try { - SchedulingResult result = rasStrategy.schedule(toSchedule, td); + SchedulingResult result; + Future<SchedulingResult> schedulingFuture = sp.scheduleIfNeeded(background, toSchedule, td); + try { + result = schedulingFuture.get(schedulingForegroundTimeoutSeconds, TimeUnit.SECONDS); + sp.resetFuture(); + } catch (TimeoutException te) { + long elapsedTimeSecs = (Time.currentTimeMillis() - sp.getStartTime())/1000; + if (elapsedTimeSecs >= schedulingBackgroundTimeoutSeconds) { --- End diff -- what would be wrong with just letting this remain running?
---