Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2400#discussion_r148852799
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 ---
    @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) {
     
         @Override
         public void schedule(Topologies topologies, Cluster cluster) {
    -        //initialize data structures
    -        for (TopologyDetails td : cluster.getTopologies()) {
    +        Map<String, User> userMap = getUsers(cluster);
    +        List<TopologyDetails> orderedTopologies = new 
ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap));
    +        LOG.info("Ordered list of topologies is: {}", 
orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList()));
    +        for (TopologyDetails td : orderedTopologies) {
                 if (!cluster.needsSchedulingRas(td)) {
                     //cluster forgets about its previous status, so if it is 
scheduled just leave it.
                     cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    -            }
    -        }
    -        Map<String, User> userMap = getUsers(cluster);
    -
    -        while (true) {
    -            TopologyDetails td;
    -            try {
    -                //Call scheduling priority strategy
    -                td = 
schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running priority strategy 
{}. No topologies will be scheduled!",
    -                        schedulingPrioritystrategy.getClass().getName(), 
ex);
    -                break;
    -            }
    -            if (td == null) {
    -                break;
    -            }
    -            User submitter = userMap.get(td.getTopologySubmitter());
    -            if (cluster.needsSchedulingRas(td)) {
    -                scheduleTopology(td, cluster, submitter, userMap);
                 } else {
    -                LOG.warn("Topology {} is already fully scheduled!", 
td.getName());
    -                cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled");
    +                User submitter = userMap.get(td.getTopologySubmitter());
    +                scheduleTopology(td, cluster, submitter, 
orderedTopologies);
                 }
             }
         }
     
    +    private static void markFailedTopology(User u, Cluster c, 
TopologyDetails td, String message) {
    +        markFailedTopology(u, c, td, message, null);
    +    }
     
    -    public void scheduleTopology(TopologyDetails td, Cluster cluster, 
final User topologySubmitter,
    -                                 Map<String, User> userMap) {
    +    private static void markFailedTopology(User u, Cluster c, 
TopologyDetails td, String message, Throwable t) {
    +        c.setStatus(td, message);
    +        String realMessage = td.getId() + " " + message;
    +        if (t != null) {
    +            LOG.error(realMessage, t);
    +        } else {
    +            LOG.error(realMessage);
    +        }
    +        u.markTopoUnsuccess(td);
    +    }
    +
    +    private void scheduleTopology(TopologyDetails td, Cluster cluster, 
final User topologySubmitter,
    +                                  List<TopologyDetails> orderedTopologies) 
{
             //A copy of cluster that we can modify, but does not get committed 
back to cluster unless scheduling succeeds
             Cluster workingState = new Cluster(cluster);
    +        RAS_Nodes nodes = new RAS_Nodes(workingState);
             IStrategy rasStrategy = null;
             String strategyConf = (String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY);
             try {
    -            rasStrategy = (IStrategy) 
ReflectionUtils.newSchedulerStrategyInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
    +            rasStrategy = 
ReflectionUtils.newSchedulerStrategyInstance((String) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf);
                 rasStrategy.prepare(conf);
             } catch (DisallowedStrategyException e) {
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " 
+ e.getAttemptedClass()
    -                    + " is not an allowed strategy. Please make sure your 
" + Config.TOPOLOGY_SCHEDULER_STRATEGY
    -                    + " config is one of the allowed strategies: " + 
e.getAllowedStrategies().toString());
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - " + e.getAttemptedClass()
    +                    + " is not an allowed strategy. Please make sure your "
    +                    + Config.TOPOLOGY_SCHEDULER_STRATEGY
    +                    + " config is one of the allowed strategies: "
    +                    + e.getAllowedStrategies(), e);
                 return;
             } catch (RuntimeException e) {
    -            LOG.error("failed to create instance of IStrategy: {} Topology 
{} will not be scheduled.",
    -                    strategyConf, td.getName(), e);
    -            topologySubmitter.markTopoUnsuccess(td);
    -            cluster.setStatus(td.getId(), "Unsuccessful in scheduling - 
failed to create instance of topology strategy "
    -                    + strategyConf + ". Please check logs for details");
    +            markFailedTopology(topologySubmitter, cluster, td,
    +                "Unsuccessful in scheduling - failed to create instance of 
topology strategy "
    +                    + strategyConf
    +                    + ". Please check logs for details", e);
                 return;
             }
     
    -        while (true) {
    -            // A copy of the cluster that restricts the strategy to only 
modify a single topology
    +        for (int i = 0; i < maxSchedulingAttempts; i++) {
                 SingleTopologyCluster toSchedule = new 
SingleTopologyCluster(workingState, td.getId());
    -            SchedulingResult result = null;
                 try {
    -                result = rasStrategy.schedule(toSchedule, td);
    -            } catch (Exception ex) {
    -                LOG.error("Exception thrown when running strategy {} to 
schedule topology {}."
    -                        + " Topology will not be scheduled!", 
rasStrategy.getClass().getName(), td.getName(), ex);
    -                topologySubmitter.markTopoUnsuccess(td);
    -                cluster.setStatus(td.getId(), "Unsuccessful in scheduling 
- Exception thrown when running strategy {}"
    -                        + rasStrategy.getClass().getName() + ". Please 
check logs for details");
    -            }
    -            LOG.debug("scheduling result: {}", result);
    -            if (result != null) {
    -                if (result.isSuccess()) {
    -                    try {
    +                SchedulingResult result = rasStrategy.schedule(toSchedule, 
td);
    +                LOG.debug("scheduling result: {}", result);
    +                if (result != null) {
    +                    if (result.isSuccess()) {
                             cluster.updateFrom(toSchedule);
                             cluster.setStatus(td.getId(), "Running - " + 
result.getMessage());
    -                    } catch (Exception ex) {
    -                        LOG.error("Unsuccessful attempting to assign 
executors to nodes.", ex);
    -                        topologySubmitter.markTopoUnsuccess(td);
    -                        cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - "
    -                                + "IllegalStateException thrown when 
attempting to assign executors to nodes. Please check"
    -                                + " log for details.");
    -                    }
    -                    return;
    -                } else {
    -                    if (result.getStatus() == 
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    -                        boolean madeSpace = false;
    +                        //DONE
    +                        return;
    +                    } else if (result.getStatus() == 
SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
    +                        LOG.info("Not enough resources to schedule {}", 
td.getName());
    +                        List<TopologyDetails> reversedList = 
ImmutableList.copyOf(orderedTopologies).reverse();
                             try {
    -                            //need to re prepare since scheduling state 
might have been restored
    -                            madeSpace = 
evictionStrategy.makeSpaceForTopo(td, workingState, userMap);
    +                            boolean evictedSomething = false;
    +                            LOG.debug("attempting to make space for topo 
{} from user {}", td.getName(), td.getTopologySubmitter());
    +                            int tdIndex = reversedList.indexOf(td);
    +                            double cpuNeeded = td.getTotalRequestedCpu();
    +                            double memoryNeeded = 
td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap();
    +                            SchedulerAssignment assignment = 
cluster.getAssignmentById(td.getId());
    +                            if (assignment != null) {
    +                                cpuNeeded -= getCpuUsed(assignment);
    +                                memoryNeeded -= getMemoryUsed(assignment);
    +                            }
    +                            cluster.getTopologyResourcesMap();
    +                            for (int index = 0; index < tdIndex; index++) {
    +                                TopologyDetails topologyEvict = 
reversedList.get(index);
    +                                SchedulerAssignment evictAssignemnt = 
workingState.getAssignmentById(topologyEvict.getId());
    +                                if (evictAssignemnt != null && 
!evictAssignemnt.getSlots().isEmpty()) {
    +                                    Collection<WorkerSlot> workersToEvict 
= workingState.getUsedSlotsByTopologyId(topologyEvict.getId());
    +
    +                                    LOG.debug("Evicting Topology {} with 
workers: {} from user {}", topologyEvict.getName(), workersToEvict,
    +                                        
topologyEvict.getTopologySubmitter());
    +                                    cpuNeeded -= 
getCpuUsed(evictAssignemnt);
    +                                    memoryNeeded -= 
getMemoryUsed(evictAssignemnt);
    +                                    evictedSomething = true;
    +                                    nodes.freeSlots(workersToEvict);
    +                                    if (cpuNeeded <= 0 && memoryNeeded <= 
0) {
    +                                        //We evicted enough topologies to 
have a hope of scheduling, so try it now, and don't evict more
    +                                        // than is needed
    +                                        break;
    +                                    }
    +                                }
    +                            }
    +
    +                            if (!evictedSomething) {
    +                                markFailedTopology(topologySubmitter, 
cluster, td,
    +                                    "Not enough resources to schedule - " 
+ result.getErrorMessage());
    +                                return;
    +                            }
                             } catch (Exception ex) {
    -                            LOG.error("Exception thrown when running 
eviction strategy {} to schedule topology {}."
    -                                            + " No evictions will be 
done!", evictionStrategy.getClass().getName(),
    -                                    td.getName(), ex);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            return;
    -                        }
    -                        if (!madeSpace) {
    -                            LOG.debug("Could not make space for topo {} 
will move to attempted", td);
    -                            topologySubmitter.markTopoUnsuccess(td);
    -                            cluster.setStatus(td.getId(), "Not enough 
resources to schedule - "
    -                                    + result.getErrorMessage());
    -                            return;
    +                            LOG.error("Exception thrown when running 
eviction to schedule topology {}."
    +                                    + " No evictions will be done! Error: 
{}",
    +                                td.getName(), ex.getClass().getName(), ex);
                             }
    +                        //Only place we fall though to do the loop over 
again...
                             continue;
                         } else {
    --- End diff --
    
    Should comment that this is for all other failed status


---

Reply via email to