Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r154194951 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + + protected static class SolverResult { + private final int statesSearched; + private final boolean success; + private final long timeTakenMillis; + private final int backtracked; + + public SolverResult(SearcherState state, boolean success) { + this.statesSearched = state.getStatesSearched(); + this.success = success; + timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; + backtracked = state.numBacktrack; + } + + public SchedulingResult asSchedulingResult() { + if (success) { + return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, + "Cannot find scheduling that satisfies all constraints (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + } + + protected static class SearcherState { + // Metrics + // How many states searched so far. + private int statesSearched = 0; + // Number of times we had to backtrack. + private int numBacktrack = 0; + final long startTimeMillis; + private final long maxEndTimeMs; + + // Current state + // The current executor we are trying to schedule + private int execIndex = 0; + // A map of the worker to the components in the worker to be able to enforce constraints. + private final Map<WorkerSlot, Set<String>> workerCompAssignment; + private final boolean[] okToRemoveFromWorker; + // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints + private final Map<RAS_Node, Set<String>> nodeCompAssignment; + private final boolean[] okToRemoveFromNode; + + // Static State + // The list of all executors (preferably sorted to make assignments simpler). + private final List<ExecutorDetails> execs; + //The maximum number of state to search before stopping. + private final int maxStatesSearched; + //The topology we are scheduling + private final TopologyDetails td; + + private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment, + int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) { + assert !execs.isEmpty(); + assert execs != null; + + this.workerCompAssignment = workerCompAssignment; + this.nodeCompAssignment = nodeCompAssignment; + this.maxStatesSearched = maxStatesSearched; + this.execs = execs; + okToRemoveFromWorker = new boolean[execs.size()]; + okToRemoveFromNode = new boolean[execs.size()]; + this.td = td; + startTimeMillis = Time.currentTimeMillis(); + if (maxTimeMs <= 0) { + maxEndTimeMs = Long.MAX_VALUE; + } else { + maxEndTimeMs = startTimeMillis + maxTimeMs; + } + } + + public void incStatesSearched() { + statesSearched++; + if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) { + LOG.debug("States Searched: {}", statesSearched); + LOG.debug("backtrack: {}", numBacktrack); + } + } + + public int getStatesSearched() { + return statesSearched; + } + + public boolean areSearchLimitsExceeded() { + return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs; + } + + public SearcherState nextExecutor() { + execIndex++; + if (execIndex >= execs.size()) { + throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size()); + } + return this; + } + + public boolean areAllExecsScheduled() { + return execIndex == execs.size() - 1; + } + + public ExecutorDetails currentExec() { + return execs.get(execIndex); + } + + public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) { + ExecutorDetails exec = currentExec(); + LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot); + //It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it + okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp); + okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp); + node.assignSingleExecutor(workerSlot, exec, td); + } + + public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) { + execIndex--; + if (execIndex < 0) { + throw new IllegalStateException("Internal Error exec index became negative"); + } + numBacktrack++; + ExecutorDetails exec = currentExec(); + LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot); + if (okToRemoveFromWorker[execIndex]) { + workerCompAssignment.get(workerSlot).remove(comp); + } + if (okToRemoveFromNode[execIndex]) { + nodeCompAssignment.get(node).remove(comp); + } + node.freeSingleExecutor(exec, td); + } + } + + private Map<String, RAS_Node> nodes; + private Map<ExecutorDetails, String> execToComp; + private Map<String, Set<ExecutorDetails>> compToExecs; + private List<String> favoredNodes; + private List<String> unFavoredNodes; + + //constraints and spreads + private Map<String, Map<String, Integer>> constraintMatrix; + private HashSet<String> spreadComps = new HashSet<>(); + + //hard coded max number of states to search + public static final int MAX_STATE_SEARCH = 100_000; + + @Override + public SchedulingResult schedule(Cluster cluster, TopologyDetails td) { + prepare(cluster); + LOG.debug("Scheduling {}", td.getId()); + nodes = RAS_Nodes.getAllNodesFrom(cluster); + Map<WorkerSlot, Set<String>> workerCompAssignment = new HashMap<>(); + Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>(); + //set max number of states to search + final int maxStateSearch = Math.min(MAX_STATE_SEARCH, + ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL))); + + final long maxTimeMs = + ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS), -1).intValue() * 1000L; + + favoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES); + unFavoredNodes = (List<String>) td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES); + + //get mapping of execs to components + execToComp = td.getExecutorToComponent(); + //get mapping of components to executors + compToExecs = getCompToExecs(execToComp); + + //get topology constraints + constraintMatrix = getConstraintMap(td, compToExecs.keySet()); + + //get spread components + spreadComps = getSpreadComps(td); + + ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>(); + //get a sorted list of unassigned executors based on number of constraints + Set<ExecutorDetails> unassignedExecutors = new HashSet<>(cluster.getUnassignedExecutors(td)); + for (ExecutorDetails exec1 : getSortedExecs(spreadComps, constraintMatrix, compToExecs)) { + if (unassignedExecutors.contains(exec1)) { + sortedExecs.add(exec1); + } + } + + //initialize structures + for (RAS_Node node : nodes.values()) { + nodeCompAssignment.put(node, new HashSet<>()); + } + //populate with existing assignments + SchedulerAssignment existingAssignment = cluster.getAssignmentById(td.getId()); + if (existingAssignment != null) { + for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : existingAssignment.getExecutorToSlot().entrySet()) { + ExecutorDetails exec1 = entry1.getKey(); + String compId = execToComp.get(exec1); + WorkerSlot ws = entry1.getValue(); + RAS_Node node = nodes.get(ws.getNodeId()); + //populate node to component Assignments + nodeCompAssignment.get(node).add(compId); + //populate worker to comp assignments + workerCompAssignment.computeIfAbsent(ws, (k) -> new HashSet<>()).add(compId); + } + } + + //early detection/early fail + if (!checkSchedulingFeasibility()) { + //Scheduling Status set to FAIL_OTHER so no eviction policy will be attempted to make space for this topology + return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, "Scheduling not feasible!"); + } + return backtrackSearch(new SearcherState(workerCompAssignment, nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td)) + .asSchedulingResult(); + } + + private boolean checkSchedulingFeasibility() { + for (String comp : spreadComps) { + int numExecs = compToExecs.get(comp).size(); + if (numExecs > nodes.size()) { + LOG.error("Unsatisfiable constraint: Component: {} marked as spread has {} executors which is larger " + + "than number of nodes: {}", comp, numExecs, nodes.size()); + return false; + } + } + if (execToComp.size() >= MAX_STATE_SEARCH) { + LOG.error("Number of executors is greater than the maximum number of states allowed to be searched. " + + "# of executors: {} Max states to search: {}", execToComp.size(), MAX_STATE_SEARCH); + return false; + } + return true; + } + + @Override + protected TreeSet<ObjectResources> sortObjectResources( + final AllResources allResources, ExecutorDetails exec, TopologyDetails topologyDetails, + final ExistingScheduleFunc existingScheduleFunc) { + return GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, topologyDetails, existingScheduleFunc); + } + + // Backtracking algorithm does not take into account the ordering of executors in worker to reduce traversal space + @VisibleForTesting + protected SolverResult backtrackSearch(SearcherState state) { + state.incStatesSearched(); + if (state.areSearchLimitsExceeded()) { + LOG.warn("Limits Exceeded"); + return new SolverResult(state, false); + } + + ExecutorDetails exec = state.currentExec(); + List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, favoredNodes, unFavoredNodes); + + for (ObjectResources nodeResources: sortedNodes) { + RAS_Node node = nodes.get(nodeResources.id); + for (WorkerSlot workerSlot : node.getSlotsAvailbleTo(state.td)) { + if (isExecAssignmentToWorkerValid(workerSlot, state)) { + String comp = execToComp.get(exec); + + state.tryToSchedule(comp, node, workerSlot); + + if (state.areAllExecsScheduled()) { + //Everything is scheduled correctly, so no need to search any more. + return new SolverResult(state, true); + } + + SolverResult results = backtrackSearch(state.nextExecutor()); + if (results.success) { + //We found a good result we are done. + return results; + } + + if (state.areSearchLimitsExceeded()) { + //No need to search more it is not going to help. + return new SolverResult(state, false); + } + + //backtracking (If we ever get here there really isn't a lot of hope that we will find a scheduling) + state.backtrack(comp, node, workerSlot); + } + } + } + //Tried all of the slots and none of them worked. + return new SolverResult(state, false); + } + + /** + * Check if any constraints are violated if exec is scheduled on worker. + * @return true if scheduling exec on worker does not violate any constraints, returns false if it does + */ + public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, SearcherState state) { + final ExecutorDetails exec = state.currentExec(); + //check resources + RAS_Node node = nodes.get(worker.getNodeId()); + if (!node.wouldFit(worker, exec, state.td)) { + LOG.trace("{} would not fit in resources available on {}", exec, worker); + return false; + } + + //check if exec can be on worker based on user defined component exclusions + String execComp = execToComp.get(exec); + Set<String> components = state.workerCompAssignment.get(worker); + if (components != null) { + for (String comp : components) { + if (constraintMatrix.get(execComp).get(comp) != 0) { + LOG.trace("{} found {} constraint violation {} on {}", exec, execComp, comp, worker); + return false; + } + } + } + + //check if exec satisfy spread + if (spreadComps.contains(execComp)) { + if (state.nodeCompAssignment.get(node).contains(execComp)) { + LOG.trace("{} Found spread violation {} on node {}", exec, execComp, node.getId()); + return false; + } + } + return true; + } + + static Map<String, Map<String, Integer>> getConstraintMap(TopologyDetails topo, Set<String> comps) { + Map<String, Map<String, Integer>> matrix = new HashMap<>(); + for (String comp : comps) { + matrix.put(comp, new HashMap<>()); + for (String comp2 : comps) { + matrix.get(comp).put(comp2, 0); + } + } + List<List<String>> constraints = (List<List<String>>) topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS); + if (constraints != null) { + for (List<String> constraintPair : constraints) { + String comp1 = constraintPair.get(0); + String comp2 = constraintPair.get(1); + if (!matrix.containsKey(comp1)) { + LOG.warn("Comp: {} declared in constraints is not valid!", comp1); + continue; + } + if (!matrix.containsKey(comp2)) { + LOG.warn("Comp: {} declared in constraints is not valid!", comp2); + continue; + } + matrix.get(comp1).put(comp2, 1); + matrix.get(comp2).put(comp1, 1); + } + } + return matrix; + } + + /** + * Determines if a scheduling is valid and all constraints are satisfied. + */ + @VisibleForTesting + public static boolean validateSolution(Cluster cluster, TopologyDetails td) { + return checkSpreadSchedulingValid(cluster, td) + && checkConstraintsSatisfied(cluster, td) + && checkResourcesCorrect(cluster, td); + } + + /** + * Check if constraints are satisfied. + */ + private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking constraints..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); + //get topology constraints + Map<String, Map<String, Integer>> constraintMatrix = getConstraintMap(topo, new HashSet<>(topo.getExecutorToComponent().values())); + + Map<WorkerSlot, List<String>> workerCompMap = new HashMap<>(); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) { + WorkerSlot worker = entry.getValue(); + ExecutorDetails exec = entry.getKey(); + String comp = execToComp.get(exec); + if (!workerCompMap.containsKey(worker)) { + workerCompMap.put(worker, new LinkedList<>()); + } + workerCompMap.get(worker).add(comp); + } + for (Map.Entry<WorkerSlot, List<String>> entry : workerCompMap.entrySet()) { + List<String> comps = entry.getValue(); + for (int i = 0; i < comps.size(); i++) { + for (int j = 0; j < comps.size(); j++) { + if (i != j && constraintMatrix.get(comps.get(i)).get(comps.get(j)) == 1) { + LOG.error("Incorrect Scheduling: worker exclusion for Component {} and {} not satisfied on WorkerSlot: {}", + comps.get(i), comps.get(j), entry.getKey()); + return false; + } + } + } + } + return true; + } + + private static Map<WorkerSlot, RAS_Node> workerToNodes(Cluster cluster) { + Map<WorkerSlot, RAS_Node> workerToNodes = new HashMap<>(); + for (RAS_Node node: RAS_Nodes.getAllNodesFrom(cluster).values()) { + for (WorkerSlot s : node.getUsedSlots()) { + workerToNodes.put(s, node); + } + } + return workerToNodes; + } + + private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking for a valid scheduling..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); + Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>(); + Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>(); + Map<RAS_Node, HashSet<String>> nodeCompMap = new HashMap<>(); + Map<WorkerSlot, RAS_Node> workerToNodes = workerToNodes(cluster); + boolean ret = true; + + HashSet<String> spreadComps = getSpreadComps(topo); + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) { + ExecutorDetails exec = entry.getKey(); + WorkerSlot worker = entry.getValue(); + RAS_Node node = workerToNodes.get(worker); + + if (!workerExecMap.containsKey(worker)) { + workerExecMap.put(worker, new HashSet<>()); + workerCompMap.put(worker, new HashSet<>()); + } + + if (!nodeCompMap.containsKey(node)) { + nodeCompMap.put(node, new HashSet<>()); + } + if (workerExecMap.get(worker).contains(exec)) { + LOG.error("Incorrect Scheduling: Found duplicate in scheduling"); + return false; + } + workerExecMap.get(worker).add(exec); + String comp = execToComp.get(exec); + workerCompMap.get(worker).add(comp); + if (spreadComps.contains(comp)) { + if (nodeCompMap.get(node).contains(comp)) { + LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}", + comp, exec, node.getId(), nodeCompMap.get(node)); + ret = false; + } + } + nodeCompMap.get(node).add(comp); + } + return ret; + } + + /** + * Check if resource constraints satisfied. + */ + private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) { + LOG.info("Checking Resources..."); + Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + Map<RAS_Node, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>(); + Map<ExecutorDetails, WorkerSlot> mergedExecToWorker = new HashMap<>(); + Map<String, RAS_Node> nodes = RAS_Nodes.getAllNodesFrom(cluster); + //merge with existing assignments + if (cluster.getAssignmentById(topo.getId()) != null + && cluster.getAssignmentById(topo.getId()).getExecutorToSlot() != null) { + mergedExecToWorker.putAll(cluster.getAssignmentById(topo.getId()).getExecutorToSlot()); + } + mergedExecToWorker.putAll(result); + + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : mergedExecToWorker.entrySet()) { + ExecutorDetails exec = entry.getKey(); + WorkerSlot worker = entry.getValue(); + RAS_Node node = nodes.get(worker.getNodeId()); + + if (node.getAvailableMemoryResources() < 0.0 && node.getAvailableCpuResources() < 0.0) { + LOG.error("Incorrect Scheduling: found node with negative available resources"); + return false; + } + if (!nodeToExecs.containsKey(node)) { + nodeToExecs.put(node, new LinkedList<>()); + } + nodeToExecs.get(node).add(exec); + } + + for (Map.Entry<RAS_Node, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) { + RAS_Node node = entry.getKey(); + Collection<ExecutorDetails> execs = entry.getValue(); + double cpuUsed = 0.0; + double memoryUsed = 0.0; + for (ExecutorDetails exec : execs) { + cpuUsed += topo.getTotalCpuReqTask(exec); + memoryUsed += topo.getTotalMemReqTask(exec); + } + if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) { + LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of cpu. Expected: {}" + + " Actual: {} Executors scheduled on node: {}", + node.getId(), (node.getTotalCpuResources() - cpuUsed), node.getAvailableCpuResources(), execs); + return false; + } + if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) { + LOG.error("Incorrect Scheduling: node {} has consumed incorrect amount of memory. Expected: {}" + + " Actual: {} Executors scheduled on node: {}", + node.getId(), (node.getTotalMemoryResources() - memoryUsed), node.getAvailableMemoryResources(), execs); + return false; + } + } + return true; + } + + private Map<String, Set<ExecutorDetails>> getCompToExecs(Map<ExecutorDetails, String> executorToComp) { + Map<String, Set<ExecutorDetails>> retMap = new HashMap<>(); + for (Map.Entry<ExecutorDetails, String> entry : executorToComp.entrySet()) { + ExecutorDetails exec = entry.getKey(); + String comp = entry.getValue(); + if (!retMap.containsKey(comp)) { + retMap.put(comp, new HashSet<>()); + } + retMap.get(comp).add(exec); --- End diff -- Should we make this more java 8 like?
---