Github user jerrypeng commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2442#discussion_r157679086
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ---
    @@ -0,0 +1,623 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.scheduler.resource.strategies.scheduling;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableMap;
    +import java.util.Set;
    +import java.util.Stack;
    +import java.util.TreeMap;
    +import java.util.TreeSet;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.resource.RAS_Node;
    +import org.apache.storm.scheduler.resource.RAS_Nodes;
    +import org.apache.storm.scheduler.resource.SchedulingResult;
    +import org.apache.storm.scheduler.resource.SchedulingStatus;
    +import org.apache.storm.utils.ObjectReader;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(ConstraintSolverStrategy.class);
    +
    +    protected static class SolverResult {
    +        private final int statesSearched;
    +        private final boolean success;
    +        private final long timeTakenMillis;
    +        private final int backtracked;
    +
    +        public SolverResult(SearcherState state, boolean success) {
    +            this.statesSearched = state.getStatesSearched();
    +            this.success = success;
    +            timeTakenMillis = Time.currentTimeMillis() - 
state.startTimeMillis;
    +            backtracked = state.numBacktrack;
    +        }
    +
    +        public SchedulingResult asSchedulingResult() {
    +            if (success) {
    +                return SchedulingResult.success("Fully Scheduled by 
ConstraintSolverStrategy (" + statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +            }
    +            return 
SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES,
    +                "Cannot find scheduling that satisfies all constraints (" 
+ statesSearched
    +                    + " states traversed in " + timeTakenMillis + "ms, 
backtracked " + backtracked + " times)");
    +        }
    +    }
    +
    +    protected static class SearcherState {
    +        // Metrics
    +        // How many states searched so far.
    +        private int statesSearched = 0;
    +        // Number of times we had to backtrack.
    +        private int numBacktrack = 0;
    +        final long startTimeMillis;
    +        private final long maxEndTimeMs;
    +
    +        // Current state
    +        // The current executor we are trying to schedule
    +        private int execIndex = 0;
    +        // A map of the worker to the components in the worker to be able 
to enforce constraints.
    +        private final Map<WorkerSlot, Set<String>> workerCompAssignment;
    +        private final boolean[] okToRemoveFromWorker;
    +        // for the currently tested assignment a Map of the node to the 
components on it to be able to enforce constraints
    +        private final Map<RAS_Node, Set<String>> nodeCompAssignment;
    +        private final boolean[] okToRemoveFromNode;
    +
    +        // Static State
    +        // The list of all executors (preferably sorted to make 
assignments simpler).
    +        private final List<ExecutorDetails> execs;
    +        //The maximum number of state to search before stopping.
    +        private final int maxStatesSearched;
    +        //The topology we are scheduling
    +        private final TopologyDetails td;
    +
    +        private SearcherState(Map<WorkerSlot, Set<String>> 
workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment,
    +                              int maxStatesSearched, long maxTimeMs, 
List<ExecutorDetails> execs, TopologyDetails td) {
    +            assert !execs.isEmpty();
    +            assert execs != null;
    +
    +            this.workerCompAssignment = workerCompAssignment;
    +            this.nodeCompAssignment = nodeCompAssignment;
    +            this.maxStatesSearched = maxStatesSearched;
    +            this.execs = execs;
    +            okToRemoveFromWorker = new boolean[execs.size()];
    +            okToRemoveFromNode = new boolean[execs.size()];
    +            this.td = td;
    +            startTimeMillis = Time.currentTimeMillis();
    +            if (maxTimeMs <= 0) {
    +                maxEndTimeMs = Long.MAX_VALUE;
    +            } else {
    +                maxEndTimeMs = startTimeMillis + maxTimeMs;
    +            }
    +        }
    +
    +        public void incStatesSearched() {
    +            statesSearched++;
    +            if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) {
    +                LOG.debug("States Searched: {}", statesSearched);
    +                LOG.debug("backtrack: {}", numBacktrack);
    +            }
    +        }
    +
    +        public int getStatesSearched() {
    +            return statesSearched;
    +        }
    +
    +        public boolean areSearchLimitsExceeded() {
    +            return statesSearched > maxStatesSearched || 
Time.currentTimeMillis() > maxEndTimeMs;
    +        }
    +
    +        public SearcherState nextExecutor() {
    +            execIndex++;
    +            if (execIndex >= execs.size()) {
    +                throw new IllegalStateException("Exceeded the exec limit " 
+ execIndex + " >= " + execs.size());
    +            }
    +            return this;
    +        }
    +
    +        public boolean areAllExecsScheduled() {
    +            return execIndex == execs.size() - 1;
    +        }
    +
    +        public ExecutorDetails currentExec() {
    +            return execs.get(execIndex);
    +        }
    +
    +        public void tryToSchedule(String comp, RAS_Node node, WorkerSlot 
workerSlot) {
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Trying assignment of {} {} to {}", exec, comp, 
workerSlot);
    +            //It is possible that this component is already scheduled on 
this node or worker.  If so when we backtrack we cannot remove it
    +            okToRemoveFromWorker[execIndex] = 
workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new 
HashSet<>()).add(comp);
    +            okToRemoveFromNode[execIndex] = 
nodeCompAssignment.get(node).add(comp);
    +            node.assignSingleExecutor(workerSlot, exec, td);
    +        }
    +
    +        public void backtrack(String comp, RAS_Node node, WorkerSlot 
workerSlot) {
    +            execIndex--;
    +            if (execIndex < 0) {
    +                throw new IllegalStateException("Internal Error exec index 
became negative");
    +            }
    +            numBacktrack++;
    +            ExecutorDetails exec = currentExec();
    +            LOG.trace("Backtracking {} {} from {}", exec, comp, 
workerSlot);
    +            if (okToRemoveFromWorker[execIndex]) {
    +                workerCompAssignment.get(workerSlot).remove(comp);
    +            }
    +            if (okToRemoveFromNode[execIndex]) {
    +                nodeCompAssignment.get(node).remove(comp);
    +            }
    +            node.freeSingleExecutor(exec, td);
    +        }
    +    }
    +
    +    private Map<String, RAS_Node> nodes;
    +    private Map<ExecutorDetails, String> execToComp;
    +    private Map<String, Set<ExecutorDetails>> compToExecs;
    +    private List<String> favoredNodes;
    +    private List<String> unFavoredNodes;
    +
    +    //constraints and spreads
    +    private Map<String, Map<String, Integer>> constraintMatrix;
    +    private HashSet<String> spreadComps = new HashSet<>();
    +
    +    //hard coded max number of states to search
    +    public static final int MAX_STATE_SEARCH = 100_000;
    +
    +    @Override
    +    public SchedulingResult schedule(Cluster cluster, TopologyDetails td) {
    +        prepare(cluster);
    +        LOG.debug("Scheduling {}", td.getId());
    +        nodes = RAS_Nodes.getAllNodesFrom(cluster);
    +        Map<WorkerSlot, Set<String>> workerCompAssignment = new 
HashMap<>();
    +        Map<RAS_Node, Set<String>> nodeCompAssignment = new HashMap<>();
    +        //set max number of states to search
    +        final int maxStateSearch = Math.min(MAX_STATE_SEARCH,
    +            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_DEPTH_TRAVERSAL)));
    +
    +        final long maxTimeMs =
    +            
ObjectReader.getInt(td.getConf().get(Config.TOPOLOGY_CONSTRAINTS_MAX_TIME_SECS),
 -1).intValue() * 1000L;
    +
    +        favoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_FAVORED_NODES);
    +        unFavoredNodes = (List<String>) 
td.getConf().get(Config.TOPOLOGY_SCHEDULER_UNFAVORED_NODES);
    +
    +        //get mapping of execs to components
    +        execToComp = td.getExecutorToComponent();
    +        //get mapping of components to executors
    +        compToExecs = getCompToExecs(execToComp);
    +
    +        //get topology constraints
    +        constraintMatrix = getConstraintMap(td, compToExecs.keySet());
    +
    +        //get spread components
    +        spreadComps = getSpreadComps(td);
    +
    +        ArrayList<ExecutorDetails> sortedExecs = new ArrayList<>();
    +        //get a sorted list of unassigned executors based on number of 
constraints
    +        Set<ExecutorDetails> unassignedExecutors = new 
HashSet<>(cluster.getUnassignedExecutors(td));
    +        for (ExecutorDetails exec1 : getSortedExecs(spreadComps, 
constraintMatrix, compToExecs)) {
    +            if (unassignedExecutors.contains(exec1)) {
    +                sortedExecs.add(exec1);
    +            }
    +        }
    +
    +        //initialize structures
    +        for (RAS_Node node : nodes.values()) {
    +            nodeCompAssignment.put(node, new HashSet<>());
    +        }
    +        //populate with existing assignments
    +        SchedulerAssignment existingAssignment = 
cluster.getAssignmentById(td.getId());
    +        if (existingAssignment != null) {
    +            for (Map.Entry<ExecutorDetails, WorkerSlot> entry1 : 
existingAssignment.getExecutorToSlot().entrySet()) {
    +                ExecutorDetails exec1 = entry1.getKey();
    +                String compId = execToComp.get(exec1);
    +                WorkerSlot ws = entry1.getValue();
    +                RAS_Node node = nodes.get(ws.getNodeId());
    +                //populate node to component Assignments
    +                nodeCompAssignment.get(node).add(compId);
    +                //populate worker to comp assignments
    +                workerCompAssignment.computeIfAbsent(ws, (k) -> new 
HashSet<>()).add(compId);
    +            }
    +        }
    +
    +        //early detection/early fail
    +        if (!checkSchedulingFeasibility()) {
    +            //Scheduling Status set to FAIL_OTHER so no eviction policy 
will be attempted to make space for this topology
    +            return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, 
"Scheduling not feasible!");
    +        }
    +        return backtrackSearch(new SearcherState(workerCompAssignment, 
nodeCompAssignment, maxStateSearch, maxTimeMs, sortedExecs, td))
    +            .asSchedulingResult();
    +    }
    +
    +    private boolean checkSchedulingFeasibility() {
    +        for (String comp : spreadComps) {
    +            int numExecs = compToExecs.get(comp).size();
    +            if (numExecs > nodes.size()) {
    +                LOG.error("Unsatisfiable constraint: Component: {} marked 
as spread has {} executors which is larger "
    +                    + "than number of nodes: {}", comp, numExecs, 
nodes.size());
    +                return false;
    +            }
    +        }
    +        if (execToComp.size() >= MAX_STATE_SEARCH) {
    +            LOG.error("Number of executors is greater than the maximum 
number of states allowed to be searched.  "
    +                + "# of executors: {} Max states to search: {}", 
execToComp.size(), MAX_STATE_SEARCH);
    +            return false;
    +        }
    +        return true;
    +    }
    +    
    +    @Override
    +    protected TreeSet<ObjectResources> sortObjectResources(
    +        final AllResources allResources, ExecutorDetails exec, 
TopologyDetails topologyDetails,
    +        final ExistingScheduleFunc existingScheduleFunc) {
    +        return 
GenericResourceAwareStrategy.sortObjectResourcesImpl(allResources, exec, 
topologyDetails, existingScheduleFunc);
    +    }
    +
    +    // Backtracking algorithm does not take into account the ordering of 
executors in worker to reduce traversal space
    +    @VisibleForTesting
    +    protected SolverResult backtrackSearch(SearcherState state) {
    +        state.incStatesSearched();
    +        if (state.areSearchLimitsExceeded()) {
    +            LOG.warn("Limits Exceeded");
    +            return new SolverResult(state, false);
    +        }
    +
    +        ExecutorDetails exec = state.currentExec();
    +        List<ObjectResources> sortedNodes = sortAllNodes(state.td, exec, 
favoredNodes, unFavoredNodes);
    +
    +        for (ObjectResources nodeResources: sortedNodes) {
    +            RAS_Node node = nodes.get(nodeResources.id);
    +            for (WorkerSlot workerSlot : 
node.getSlotsAvailbleTo(state.td)) {
    +                if (isExecAssignmentToWorkerValid(workerSlot, state)) {
    +                    String comp = execToComp.get(exec);
    +
    +                    state.tryToSchedule(comp, node, workerSlot);
    +
    +                    if (state.areAllExecsScheduled()) {
    +                        //Everything is scheduled correctly, so no need to 
search any more.
    +                        return new SolverResult(state, true);
    +                    }
    +
    +                    SolverResult results = 
backtrackSearch(state.nextExecutor());
    +                    if (results.success) {
    +                        //We found a good result we are done.
    +                        return results;
    +                    }
    +
    +                    if (state.areSearchLimitsExceeded()) {
    +                        //No need to search more it is not going to help.
    +                        return new SolverResult(state, false);
    +                    }
    +
    +                    //backtracking (If we ever get here there really isn't 
a lot of hope that we will find a scheduling)
    +                    state.backtrack(comp, node, workerSlot);
    +                }
    +            }
    +        }
    +        //Tried all of the slots and none of them worked.
    +        return new SolverResult(state, false);
    +    }
    +
    +    /**
    +     * Check if any constraints are violated if exec is scheduled on 
worker.
    +     * @return true if scheduling exec on worker does not violate any 
constraints, returns false if it does
    +     */
    +    public boolean isExecAssignmentToWorkerValid(WorkerSlot worker, 
SearcherState state) {
    +        final ExecutorDetails exec = state.currentExec();
    +        //check resources
    +        RAS_Node node = nodes.get(worker.getNodeId());
    +        if (!node.wouldFit(worker, exec, state.td)) {
    +            LOG.trace("{} would not fit in resources available on {}", 
exec, worker);
    +            return false;
    +        }
    +
    +        //check if exec can be on worker based on user defined component 
exclusions
    +        String execComp = execToComp.get(exec);
    +        Set<String> components = state.workerCompAssignment.get(worker);
    +        if (components != null) {
    +            for (String comp : components) {
    +                if (constraintMatrix.get(execComp).get(comp) != 0) {
    +                    LOG.trace("{} found {} constraint violation {} on {}", 
exec, execComp, comp, worker);
    +                    return false;
    +                }
    +            }
    +        }
    +
    +        //check if exec satisfy spread
    +        if (spreadComps.contains(execComp)) {
    +            if (state.nodeCompAssignment.get(node).contains(execComp)) {
    +                LOG.trace("{} Found spread violation {} on node {}", exec, 
execComp, node.getId());
    +                return false;
    +            }
    +        }
    +        return true;
    +    }
    +
    +    static Map<String, Map<String, Integer>> 
getConstraintMap(TopologyDetails topo, Set<String> comps) {
    +        Map<String, Map<String, Integer>> matrix = new HashMap<>();
    +        for (String comp : comps) {
    +            matrix.put(comp, new HashMap<>());
    +            for (String comp2 : comps) {
    +                matrix.get(comp).put(comp2, 0);
    +            }
    +        }
    +        List<List<String>> constraints = (List<List<String>>) 
topo.getConf().get(Config.TOPOLOGY_CONSTRAINTS);
    +        if (constraints != null) {
    +            for (List<String> constraintPair : constraints) {
    +                String comp1 = constraintPair.get(0);
    +                String comp2 = constraintPair.get(1);
    +                if (!matrix.containsKey(comp1)) {
    +                    LOG.warn("Comp: {} declared in constraints is not 
valid!", comp1);
    --- End diff --
    
    This pretty much means that the user specified (probably unintentionally) a 
component that doesn't exist.  In my experience users probably won't check the 
logs if there is not something obviously wrong with their topology so they will 
probably miss this error.   We should probably display some kind of warning for 
the topology status or even better if we can have client side checking to 
validate that all  components in Config.TOPOLOGY_RAS_CONSTRAINTS are valid 
components of the topology


---

Reply via email to