Github user agresch commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2630#discussion_r180801771
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -100,49 +180,88 @@ private static void markFailedTopology(User u, 
Cluster c, TopologyDetails td, St
             u.markTopoUnsuccess(td);
         }
     
    +    private void cancelAllPendingClusterStateChanged() {
    +        if (!schedulingInBackground.isEmpty()) {
    +            LOG.warn("Canceling scheduling of {} cluster state changed", 
schedulingInBackground.keySet());
    +            for (SchedulingPending sp : schedulingInBackground.values()) {
    +                sp.cancel();
    +            }
    +            schedulingInBackground.clear();
    +        }
    +    }
    +
         private void scheduleTopology(TopologyDetails td, Cluster cluster, 
final User topologySubmitter,
                                       List<TopologyDetails> orderedTopologies) 
{
    +        LOG.debug("Scheduling {}", td.getId());
    +        SchedulingPending sp = schedulingInBackground.get(td.getId());
    +        if (sp == null) {
    +            IStrategy rasStrategy;
    +            String strategyConf = (String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    +            try {
    +                String strategy = strategyConf;
    +                if (strategy.startsWith("backtype.storm")) {
    +                    // Storm supports to launch workers of older version.
    +                    // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes 
from the older version, replace the package name.
    +                    strategy = strategy.replace("backtype.storm", 
"org.apache.storm");
    +                    LOG.debug("Replace backtype.storm with 
org.apache.storm for Config.TOPOLOGY_SCHEDULER_STRATEGY");
    +                }
    +                rasStrategy = 
ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
    +                rasStrategy.prepare(conf);
    +            } catch (DisallowedStrategyException e) {
    +                markFailedTopology(topologySubmitter, cluster, td,
    +                    "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                        + " is not an allowed strategy. Please make sure 
your "
    +                        + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                        + " config is one of the allowed strategies: "
    +                        + e.getAllowedStrategies(), e);
    +                return;
    +            } catch (RuntimeException e) {
    +                markFailedTopology(topologySubmitter, cluster, td,
    +                    "Unsuccessful in scheduling - failed to create 
instance of topology strategy "
    +                        + strategyConf
    +                        + ". Please check logs for details", e);
    +                return;
    +            }
    +
    +            sp = new SchedulingPending(rasStrategy, 0);
    +        }
    +
             //A copy of cluster that we can modify, but does not get committed 
back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
             RAS_Nodes nodes = new RAS_Nodes(workingState);
    -        IStrategy rasStrategy = null;
    -        String strategyConf = (String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    -        try {
    -            String strategy = (String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
    -            if (strategy.startsWith("backtype.storm")) {
    -                // Storm supports to launch workers of older version.
    -                // If the config of TOPOLOGY_SCHEDULER_STRATEGY comes from 
the older version, replace the package name.
    -                strategy = strategy.replace("backtype.storm", 
"org.apache.storm");
    -                LOG.debug("Replace backtype.storm with org.apache.storm 
for Config.TOPOLOGY_SCHEDULER_STRATEGY");
    -            }
    -            rasStrategy = 
ReflectionUtils.newSchedulerStrategyInstance(strategy, conf);
    -            rasStrategy.prepare(conf);
    -        } catch (DisallowedStrategyException e) {
    -            markFailedTopology(topologySubmitter, cluster, td,
    -                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your "
    -                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: "
    -                    + e.getAllowedStrategies(), e);
    -            return;
    -        } catch (RuntimeException e) {
    -            markFailedTopology(topologySubmitter, cluster, td,
    -                "Unsuccessful in scheduling - failed to create instance of 
topology strategy "
    -                    + strategyConf
    -                    + ". Please check logs for details", e);
    -            return;
    -        }
     
    -        for (int i = 0; i < maxSchedulingAttempts; i++) {
    +        for (int i = sp.getAttempt(); i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new 
SingleTopologyCluster(workingState, td.getId());
                 try {
    -                SchedulingResult result = rasStrategy.schedule(toSchedule, 
td);
    +                SchedulingResult result;
    +                Future<SchedulingResult> schedulingFuture = 
sp.scheduleIfNeeded(background, toSchedule, td);
    +                try {
    +                    result = 
schedulingFuture.get(schedulingForegroundTimeoutSeconds, TimeUnit.SECONDS);
    +                    sp.resetFuture();
    +                } catch (TimeoutException te) {
    +                    long elapsedTimeSecs = (Time.currentTimeMillis() - 
sp.getStartTime())/1000;
    +                    if (elapsedTimeSecs >= 
schedulingBackgroundTimeoutSeconds) {
    --- End diff --
    
    what would be wrong with just letting this remain running?


---

Reply via email to