Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2442#discussion_r154195212
--- Diff:
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
---
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.resource.strategies.scheduling;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.Stack;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+import org.apache.storm.scheduler.resource.RAS_Node;
+import org.apache.storm.scheduler.resource.RAS_Nodes;
+import org.apache.storm.scheduler.resource.SchedulingResult;
+import org.apache.storm.scheduler.resource.SchedulingStatus;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
+
+ protected static class SolverResult {
+ private final int statesSearched;
+ private final boolean success;
+ private final long timeTakenMillis;
+ private final int backtracked;
+
+ public SolverResult(SearcherState state, boolean success) {
+ this.statesSearched = state.getStatesSearched();
+ this.success = success;
+ timeTakenMillis = Time.currentTimeMillis() -
state.startTimeMillis;
+ backtracked = state.numBacktrack;
+ }
+
+ public SchedulingResult asSchedulingResult() {
+ if (success) {
+ return SchedulingResult.success("Fully Scheduled by
ConstraintSolverStrategy (" + statesSearched
+ + " states traversed in " + timeTakenMillis + "ms,
backtracked " + backtracked + " times)");
+ }
+ return
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
+ "Cannot find scheduling that satisfies all constraints ("
+ statesSearched
+ + " states traversed in " + timeTakenMillis + "ms,
backtracked " + backtracked + " times)");
+ }
+ }
+
+ protected static class SearcherState {
+ // Metrics
+ // How many states searched so far.
+ private int statesSearched = 0;
+ // Number of times we had to backtrack.
+ private int numBacktrack = 0;
+ final long startTimeMillis;
+ private final long maxEndTimeMs;
+
+ // Current state
+ // The current executor we are trying to schedule
+ private int execIndex = 0;
+ // A map of the worker to the components in the worker to be able
to enforce constraints.
+ private final Map<WorkerSlot, Set<String>> workerCompAssignment;
+ private final boolean[] okToRemoveFromWorker;
+ // for the currently tested assignment a Map of the node to the
components on it to be able to enforce constraints
+ private final Map<RAS_Node, Set<String>> nodeCompAssignment;
+ private final boolean[] okToRemoveFromNode;
+
+ // Static State
+ // The list of all executors (preferably sorted to make
assignments simpler).
+ private final List<ExecutorDetails> execs;
+ //The maximum number of state to search before stopping.
+ private final int maxStatesSearched;
+ //The topology we are scheduling
+ private final TopologyDetails td;
+
+ private SearcherState(Map<WorkerSlot, Set<String>>
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
+ int maxStatesSearched, long maxTimeMs,
List<ExecutorDetails> execs, TopologyDetails td) {
+ assert !execs.isEmpty();
+ assert execs != null;
+
+ this.workerCompAssignment = workerCompAssignment;
+ this.nodeCompAssignment = nodeCompAssignment;
+ this.maxStatesSearched = maxStatesSearched;
+ this.execs = execs;
+ okToRemoveFromWorker = new boolean[execs.size()];
+ okToRemoveFromNode = new boolean[execs.size()];
+ this.td = td;
+ startTimeMillis = Time.currentTimeMillis();
+ if (maxTimeMs <= 0) {
+ maxEndTimeMs = Long.MAX_VALUE;
+ } else {
+ maxEndTimeMs = startTimeMillis + maxTimeMs;
+ }
+ }
+
+ public void incStatesSearched() {
+ statesSearched++;
+ if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
+ LOG.debug("States Searched: {}", statesSearched);
+ LOG.debug("backtrack: {}", numBacktrack);
+ }
+ }
+
+ public int getStatesSearched() {
+ return statesSearched;
+ }
+
+ public boolean areSearchLimitsExceeded() {
+ return statesSearched > maxStatesSearched ||
Time.currentTimeMillis() > maxEndTimeMs;
+ }
+
+ public SearcherState nextExecutor() {
+ execIndex++;
+ if (execIndex >= execs.size()) {
+ throw new IllegalStateException("Exceeded the exec limit "
+ execIndex + " >= " + execs.size());
+ }
+ return this;
+ }
+
+ public boolean areAllExecsScheduled() {
+ return execIndex == execs.size() - 1;
+ }
+
+ public ExecutorDetails currentExec() {
+ return execs.get(execIndex);
+ }
+
+ public void tryToSchedule(String comp, RAS_Node node, WorkerSlot
workerSlot) {
+ ExecutorDetails exec = currentExec();
+ LOG.trace("Trying assignment of {} {} to {}", exec, comp,
workerSlot);
+ //It is possible that this component is already scheduled on
this node or worker. If so when we backtrack we cannot remove it
+ okToRemoveFromWorker[execIndex] =
workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new
HashSet<>()).add(comp);
+ okToRemoveFromNode[execIndex] =
nodeCompAssignment.get(node).add(comp);
+ node.assignSingleExecutor(workerSlot, exec, td);
+ }
+
+ public void backtrack(String comp, RAS_Node node, WorkerSlot
workerSlot) {
+ execIndex--;
+ if (execIndex < 0) {
+ throw new IllegalStateException("Internal Error exec index
became negative");
+ }
+ numBacktrack++;
+ ExecutorDetails exec = currentExec();
+ LOG.trace("Backtracking {} {} from {}", exec, comp,
workerSlot);
+ if (okToRemoveFromWorker[execIndex]) {
+ workerCompAssignment.get(workerSlot).remove(comp);
+ }
+ if (okToRemoveFromNode[execIndex]) {
+ nodeCompAssignment.get(node).remove(comp);
+ }
+ node.freeSingleExecutor(exec, td);
+ }
+ }
+
+ private Map<String, RAS_Node> nodes;
+ private Map<ExecutorDetails, String> execToComp;
+ private Map<String, Set<ExecutorDetails>> compToExecs;
+ private List<String> favoredNodes;
+ private List<String> unFavoredNodes;
+
+ //constraints and spreads
+ private Map<String, Map<String, Integer>> constraintMatrix;
+ private HashSet<String> spreadComps = new HashSet<>();
+
+ //hard coded max number of states to search
+ public static final int MAX_STATE_SEARCH = 100_000;
+
+ @Override
+ public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
+ prepare(cluster);
+ LOG.debug("Scheduling {}", td.getId());
+ nodes = RAS_Nodes.getAllNodesFrom(cluster);
+ Map<WorkerSlot, Set<String>> workerCompAssignment = new
HashMap<>();
+ Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
+ //set max number of states to search
+ final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
+
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
+
+ final long maxTimeMs =
+
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS),
-1).intValue() * 1000L;
+
+ favoredNodes = (List<String>)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
+ unFavoredNodes = (List<String>)
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
+
+ //get mapping of execs to components
+ execToComp = td.getExecutorToComponent();
+ //get mapping of components to executors
+ compToExecs = getCompToExecs(execToComp);
+
+ //get topology constraints
+ constraintMatrix = getConstraintMap(td, compToExecs.keySet());
+
+ //get spread components
+ spreadComps = getSpreadComps(td);
+
+ ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
+ //get a sorted list of unassigned executors based on number of
constraints
+ Set<ExecutorDetails> unassignedExecutors = new
HashSet<>(cluster.getUnassignedExecutors(td));
+ for (ExecutorDetails exec1 : getSortedExecs(spreadComps,
constraintMatrix, compToExecs)) {
+ if (unassignedExecutors.contains(exec1)) {
+ sortedExecs.add(exec1);
+ }
+ }
+
+ //initialize structures
+ for (RAS_Node node : nodes.values()) {
+ nodeCompAssignment.put(node, new HashSet<>());
+ }
+ //populate with existing assignments
+ SchedulerAssignment existingAssignment =
cluster.getAssignmentById(td.getId());
+ if (existingAssignment != null) {
+ for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 :
existingAssignment.getExecutorToSlot().entrySet()) {
+ ExecutorDetails exec1 = entry1.getKey();
+ String compId = execToComp.get(exec1);
+ WorkerSlot ws = entry1.getValue();
+ RAS_Node node = nodes.get(ws.getNodeId());
+ //populate node to component Assignments
+ nodeCompAssignment.get(node).add(compId);
+ //populate worker to comp assignments
+ workerCompAssignment.computeIfAbsent(ws, (k) -> new
HashSet<>()).add(compId);
+ }
+ }
+
+ //early detection/early fail
+ if (!checkSchedulingFeasibility()) {
+ //Scheduling Status set to FAIL_OTHER so no eviction policy
will be attempted to make space for this topology
+ return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER,
"Scheduling not feasible!");
+ }
+ return backtrackSearch(new SearcherState(workerCompAssignment,
nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
+ .asSchedulingResult();
+ }
+
+ private boolean checkSchedulingFeasibility() {
+ for (String comp : spreadComps) {
+ int numExecs = compToExecs.get(comp).size();
+ if (numExecs > nodes.size()) {
+ LOG.error("Unsatisfiable constraint: Component: {} marked
as spread has {} executors which is larger "
+ + "than number of nodes: {}", comp, numExecs,
nodes.size());
+ return false;
+ }
+ }
+ if (execToComp.size() >= MAX_STATE_SEARCH) {
+ LOG.error("Number of executors is greater than the maximum
number of states allowed to be searched. "
+ + "# of executors: {} Max states to search: {}",
execToComp.size(), MAX_STATE_SEARCH);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected TreeSet<ObjectResources> sortObjectResources(
+ final AllResources allResources, ExecutorDetails exec,
TopologyDetails topologyDetails,
+ final ExistingScheduleFunc existingScheduleFunc) {
+ return
GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec,
topologyDetails, existingScheduleFunc);
+ }
+
+ // Backtracking algorithm does not take into account the ordering of
executors in worker to reduce traversal space
+ @VisibleForTesting
+ protected SolverResult backtrackSearch(SearcherState state) {
+ state.incStatesSearched();
+ if (state.areSearchLimitsExceeded()) {
+ LOG.warn("Limits Exceeded");
+ return new SolverResult(state, false);
+ }
+
+ ExecutorDetails exec = state.currentExec();
+ List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec,
favoredNodes, unFavoredNodes);
+
+ for (ObjectResources nodeResources: sortedNodes) {
+ RAS_Node node = nodes.get(nodeResources.id);
+ for (WorkerSlot workerSlot :
node.getSlotsAvailbleTo(state.td)) {
+ if (isExecAssignmentToWorkerValid(workerSlot, state)) {
+ String comp = execToComp.get(exec);
+
+ state.tryToSchedule(comp, node, workerSlot);
+
+ if (state.areAllExecsScheduled()) {
+ //Everything is scheduled correctly, so no need to
search any more.
+ return new SolverResult(state, true);
+ }
+
+ SolverResult results =
backtrackSearch(state.nextExecutor());
+ if (results.success) {
+ //We found a good result we are done.
+ return results;
+ }
+
+ if (state.areSearchLimitsExceeded()) {
+ //No need to search more it is not going to help.
+ return new SolverResult(state, false);
+ }
+
+ //backtracking (If we ever get here there really isn't
a lot of hope that we will find a scheduling)
+ state.backtrack(comp, node, workerSlot);
+ }
+ }
+ }
+ //Tried all of the slots and none of them worked.
+ return new SolverResult(state, false);
+ }
+
+ /**
+ * Check if any constraints are violated if exec is scheduled on
worker.
+ * @return true if scheduling exec on worker does not violate any
constraints, returns false if it does
+ */
+ public boolean isExecAssignmentToWorkerValid(WorkerSlot worker,
SearcherState state) {
+ final ExecutorDetails exec = state.currentExec();
+ //check resources
+ RAS_Node node = nodes.get(worker.getNodeId());
+ if (!node.wouldFit(worker, exec, state.td)) {
+ LOG.trace("{} would not fit in resources available on {}",
exec, worker);
+ return false;
+ }
+
+ //check if exec can be on worker based on user defined component
exclusions
+ String execComp = execToComp.get(exec);
+ Set<String> components = state.workerCompAssignment.get(worker);
+ if (components != null) {
+ for (String comp : components) {
+ if (constraintMatrix.get(execComp).get(comp) != 0) {
+ LOG.trace("{} found {} constraint violation {} on {}",
exec, execComp, comp, worker);
+ return false;
+ }
+ }
+ }
+
+ //check if exec satisfy spread
+ if (spreadComps.contains(execComp)) {
+ if (state.nodeCompAssignment.get(node).contains(execComp)) {
+ LOG.trace("{} Found spread violation {} on node {}", exec,
execComp, node.getId());
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static Map<String, Map<String, Integer>>
getConstraintMap(TopologyDetails topo, Set<String> comps) {
+ Map<String, Map<String, Integer>> matrix = new HashMap<>();
+ for (String comp : comps) {
+ matrix.put(comp, new HashMap<>());
+ for (String comp2 : comps) {
+ matrix.get(comp).put(comp2, 0);
+ }
+ }
+ List<List<String>> constraints = (List<List<String>>)
topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
+ if (constraints != null) {
+ for (List<String> constraintPair : constraints) {
+ String comp1 = constraintPair.get(0);
+ String comp2 = constraintPair.get(1);
+ if (!matrix.containsKey(comp1)) {
+ LOG.warn("Comp: {} declared in constraints is not
valid!", comp1);
--- End diff --
Should this be fatal?
---