Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2623#discussion_r179571966 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java --- @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) { List<ExecutorDetails> execsScheduled = new LinkedList<>(); Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>(); - for (Component component : componentMap.values()) { - compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>()); + for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) { + Component component = componentEntry.getValue(); + compToExecsToSchedule.put(component.getId(), new LinkedList<>()); for (ExecutorDetails exec : component.getExecs()) { if (unassignedExecutors.contains(exec)) { compToExecsToSchedule.get(component.getId()).add(exec); + LOG.info("{} has unscheduled executor {}", component.getId(), exec); } } } - Set<Component> sortedComponents = sortComponents(componentMap); - sortedComponents.addAll(componentMap.values()); + List<Component> sortedComponents = topologicalSortComponents(componentMap); - for (Component currComp : sortedComponents) { - Map<String, Component> neighbors = new HashMap<String, Component>(); - for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) { - neighbors.put(compId, componentMap.get(compId)); + for (Component currComp: sortedComponents) { + int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); + for (int i = 0; i < numExecs; i++) { + execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule)); } - Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors); - Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId()); - - boolean flag = false; - do { - flag = false; - if (!currCompExesToSched.isEmpty()) { - execsScheduled.add(currCompExesToSched.poll()); - flag = true; - } + } + + LOG.info("The ordering result is {}", execsScheduled); + + return execsScheduled; + } - for (Component neighborComp : sortedNeighbors) { - Queue<ExecutorDetails> neighborCompExesToSched = - compToExecsToSchedule.get(neighborComp.getId()); - if (!neighborCompExesToSched.isEmpty()) { - execsScheduled.add(neighborCompExesToSched.poll()); - flag = true; + private List<ExecutorDetails> takeExecutors(Component currComp, int numExecs, + final Map<String, Component> componentMap, + final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) { + List<ExecutorDetails> execsScheduled = new ArrayList<>(); + Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get((currComp.getId())); + Set<String> sortedChildren = getSortedChildren(currComp, componentMap); + + execsScheduled.add(currQueue.poll()); + + for (String childId: sortedChildren) { + Component childComponent = componentMap.get(childId); + Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId); + int childNumExecs = childQueue.size(); + if (childNumExecs == 0) { + continue; + } + int numExecsToTake = 1; + if (isShuffleFromParentToChild(currComp, childComponent)) { + // if it's shuffle grouping, truncate + numExecsToTake = Math.max(1, childNumExecs / numExecs); + } // otherwise, one-by-one + + for (int i = 0; i < numExecsToTake; i++) { + execsScheduled.addAll(takeExecutors(childComponent, childNumExecs, componentMap, compToExecsToSchedule)); + } + } + + return execsScheduled; + } + + private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) { + Set<String> children = component.getChildren(); + Set<String> sortedChildren = + new TreeSet<String>((o1, o2) -> { + Component child1 = componentMap.get(o1); + Component child2 = componentMap.get(o2); + boolean child1IsShuffle = isShuffleFromParentToChild(component, child1); + boolean child2IsShuffle = isShuffleFromParentToChild(component, child2); + + if (child1IsShuffle && child2IsShuffle) { + return o1.compareTo(o2); + } else if (child1IsShuffle) { + return 1; + } else { + return -1; + } + }); + sortedChildren.addAll(children); + return sortedChildren; + } + + private boolean isShuffleFromParentToChild(Component parent, Component child) { --- End diff -- Nit: could we rename this from `isShuffleFromParentToChild` to something more like `hasLocalityAwareGroupingFromParentToChild`? I know it is longer, but in the future we may want to offer a way to expand this to more than just shuffle.
---