Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2400#discussion_r148852532 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java --- @@ -62,123 +70,144 @@ public void prepare(Map<String, Object> conf) { @Override public void schedule(Topologies topologies, Cluster cluster) { - //initialize data structures - for (TopologyDetails td : cluster.getTopologies()) { + Map<String, User> userMap = getUsers(cluster); + List<TopologyDetails> orderedTopologies = new ArrayList<>(schedulingPriorityStrategy.getOrderedTopologies(cluster, userMap)); + LOG.info("Ordered list of topologies is: {}", orderedTopologies.stream().map((t) -> t.getId()).collect(Collectors.toList())); + for (TopologyDetails td : orderedTopologies) { if (!cluster.needsSchedulingRas(td)) { //cluster forgets about its previous status, so if it is scheduled just leave it. cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled"); - } - } - Map<String, User> userMap = getUsers(cluster); - - while (true) { - TopologyDetails td; - try { - //Call scheduling priority strategy - td = schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap); - } catch (Exception ex) { - LOG.error("Exception thrown when running priority strategy {}. No topologies will be scheduled!", - schedulingPrioritystrategy.getClass().getName(), ex); - break; - } - if (td == null) { - break; - } - User submitter = userMap.get(td.getTopologySubmitter()); - if (cluster.needsSchedulingRas(td)) { - scheduleTopology(td, cluster, submitter, userMap); } else { - LOG.warn("Topology {} is already fully scheduled!", td.getName()); - cluster.setStatusIfAbsent(td.getId(), "Fully Scheduled"); + User submitter = userMap.get(td.getTopologySubmitter()); + scheduleTopology(td, cluster, submitter, orderedTopologies); } } } + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message) { + markFailedTopology(u, c, td, message, null); + } - public void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, - Map<String, User> userMap) { + private static void markFailedTopology(User u, Cluster c, TopologyDetails td, String message, Throwable t) { + c.setStatus(td, message); + String realMessage = td.getId() + " " + message; + if (t != null) { + LOG.error(realMessage, t); + } else { + LOG.error(realMessage); + } + u.markTopoUnsuccess(td); + } + + private void scheduleTopology(TopologyDetails td, Cluster cluster, final User topologySubmitter, + List<TopologyDetails> orderedTopologies) { //A copy of cluster that we can modify, but does not get committed back to cluster unless scheduling succeeds Cluster workingState = new Cluster(cluster); + RAS_Nodes nodes = new RAS_Nodes(workingState); IStrategy rasStrategy = null; String strategyConf = (String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY); try { - rasStrategy = (IStrategy) ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); + rasStrategy = ReflectionUtils.newSchedulerStrategyInstance((String) td.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), conf); rasStrategy.prepare(conf); } catch (DisallowedStrategyException e) { - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + e.getAttemptedClass() - + " is not an allowed strategy. Please make sure your " + Config.TOPOLOGY_SCHEDULER_STRATEGY - + " config is one of the allowed strategies: " + e.getAllowedStrategies().toString()); + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - " + e.getAttemptedClass() + + " is not an allowed strategy. Please make sure your " + + Config.TOPOLOGY_SCHEDULER_STRATEGY + + " config is one of the allowed strategies: " + + e.getAllowedStrategies(), e); return; } catch (RuntimeException e) { - LOG.error("failed to create instance of IStrategy: {} Topology {} will not be scheduled.", - strategyConf, td.getName(), e); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " - + strategyConf + ". Please check logs for details"); + markFailedTopology(topologySubmitter, cluster, td, + "Unsuccessful in scheduling - failed to create instance of topology strategy " + + strategyConf + + ". Please check logs for details", e); return; } - while (true) { - // A copy of the cluster that restricts the strategy to only modify a single topology + for (int i = 0; i < maxSchedulingAttempts; i++) { SingleTopologyCluster toSchedule = new SingleTopologyCluster(workingState, td.getId()); - SchedulingResult result = null; try { - result = rasStrategy.schedule(toSchedule, td); - } catch (Exception ex) { - LOG.error("Exception thrown when running strategy {} to schedule topology {}." - + " Topology will not be scheduled!", rasStrategy.getClass().getName(), td.getName(), ex); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" - + rasStrategy.getClass().getName() + ". Please check logs for details"); - } - LOG.debug("scheduling result: {}", result); - if (result != null) { - if (result.isSuccess()) { - try { + SchedulingResult result = rasStrategy.schedule(toSchedule, td); + LOG.debug("scheduling result: {}", result); + if (result != null) { + if (result.isSuccess()) { cluster.updateFrom(toSchedule); cluster.setStatus(td.getId(), "Running - " + result.getMessage()); - } catch (Exception ex) { - LOG.error("Unsuccessful attempting to assign executors to nodes.", ex); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " - + "IllegalStateException thrown when attempting to assign executors to nodes. Please check" - + " log for details."); - } - return; - } else { - if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { - boolean madeSpace = false; + //DONE + return; + } else if (result.getStatus() == SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) { + LOG.info("Not enough resources to schedule {}", td.getName()); + List<TopologyDetails> reversedList = ImmutableList.copyOf(orderedTopologies).reverse(); try { - //need to re prepare since scheduling state might have been restored - madeSpace = evictionStrategy.makeSpaceForTopo(td, workingState, userMap); + boolean evictedSomething = false; + LOG.debug("attempting to make space for topo {} from user {}", td.getName(), td.getTopologySubmitter()); + int tdIndex = reversedList.indexOf(td); + double cpuNeeded = td.getTotalRequestedCpu(); + double memoryNeeded = td.getTotalRequestedMemOffHeap() + td.getTotalRequestedMemOnHeap(); + SchedulerAssignment assignment = cluster.getAssignmentById(td.getId()); + if (assignment != null) { + cpuNeeded -= getCpuUsed(assignment); + memoryNeeded -= getMemoryUsed(assignment); + } + cluster.getTopologyResourcesMap(); + for (int index = 0; index < tdIndex; index++) { + TopologyDetails topologyEvict = reversedList.get(index); + SchedulerAssignment evictAssignemnt = workingState.getAssignmentById(topologyEvict.getId()); + if (evictAssignemnt != null && !evictAssignemnt.getSlots().isEmpty()) { + Collection<WorkerSlot> workersToEvict = workingState.getUsedSlotsByTopologyId(topologyEvict.getId()); + + LOG.debug("Evicting Topology {} with workers: {} from user {}", topologyEvict.getName(), workersToEvict, + topologyEvict.getTopologySubmitter()); + cpuNeeded -= getCpuUsed(evictAssignemnt); + memoryNeeded -= getMemoryUsed(evictAssignemnt); + evictedSomething = true; + nodes.freeSlots(workersToEvict); + if (cpuNeeded <= 0 && memoryNeeded <= 0) { + //We evicted enough topologies to have a hope of scheduling, so try it now, and don't evict more + // than is needed + break; + } + } + } + + if (!evictedSomething) { + markFailedTopology(topologySubmitter, cluster, td, + "Not enough resources to schedule - " + result.getErrorMessage()); + return; + } } catch (Exception ex) { - LOG.error("Exception thrown when running eviction strategy {} to schedule topology {}." - + " No evictions will be done!", evictionStrategy.getClass().getName(), - td.getName(), ex); - topologySubmitter.markTopoUnsuccess(td); - return; - } - if (!madeSpace) { - LOG.debug("Could not make space for topo {} will move to attempted", td); - topologySubmitter.markTopoUnsuccess(td); - cluster.setStatus(td.getId(), "Not enough resources to schedule - " - + result.getErrorMessage()); - return; + LOG.error("Exception thrown when running eviction to schedule topology {}." + + " No evictions will be done! Error: {}", + td.getName(), ex.getClass().getName(), ex); --- End diff -- Why do we need the exception class name???
---