Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2442#discussion_r154193911 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java --- @@ -0,0 +1,623 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.scheduler.resource.strategies.scheduling; + +import com.google.common.annotations.VisibleForTesting; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.Stack; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.storm.Config; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.ExecutorDetails; +import org.apache.storm.scheduler.SchedulerAssignment; +import org.apache.storm.scheduler.TopologyDetails; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.resource.RAS_Node; +import org.apache.storm.scheduler.resource.RAS_Nodes; +import org.apache.storm.scheduler.resource.SchedulingResult; +import org.apache.storm.scheduler.resource.SchedulingStatus; +import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConstraintSolverStrategy extends BaseResourceAwareStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ConstraintSolverStrategy.class); + + protected static class SolverResult { + private final int statesSearched; + private final boolean success; + private final long timeTakenMillis; + private final int backtracked; + + public SolverResult(SearcherState state, boolean success) { + this.statesSearched = state.getStatesSearched(); + this.success = success; + timeTakenMillis = Time.currentTimeMillis() - state.startTimeMillis; + backtracked = state.numBacktrack; + } + + public SchedulingResult asSchedulingResult() { + if (success) { + return SchedulingResult.success("Fully Scheduled by ConstraintSolverStrategy (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, + "Cannot find scheduling that satisfies all constraints (" + statesSearched + + " states traversed in " + timeTakenMillis + "ms, backtracked " + backtracked + " times)"); + } + } + + protected static class SearcherState { + // Metrics + // How many states searched so far. + private int statesSearched = 0; + // Number of times we had to backtrack. + private int numBacktrack = 0; + final long startTimeMillis; + private final long maxEndTimeMs; + + // Current state + // The current executor we are trying to schedule + private int execIndex = 0; + // A map of the worker to the components in the worker to be able to enforce constraints. + private final Map<WorkerSlot, Set<String>> workerCompAssignment; + private final boolean[] okToRemoveFromWorker; + // for the currently tested assignment a Map of the node to the components on it to be able to enforce constraints + private final Map<RAS_Node, Set<String>> nodeCompAssignment; + private final boolean[] okToRemoveFromNode; + + // Static State + // The list of all executors (preferably sorted to make assignments simpler). + private final List<ExecutorDetails> execs; + //The maximum number of state to search before stopping. + private final int maxStatesSearched; + //The topology we are scheduling + private final TopologyDetails td; + + private SearcherState(Map<WorkerSlot, Set<String>> workerCompAssignment, Map<RAS_Node, Set<String>> nodeCompAssignment, + int maxStatesSearched, long maxTimeMs, List<ExecutorDetails> execs, TopologyDetails td) { + assert !execs.isEmpty(); + assert execs != null; + + this.workerCompAssignment = workerCompAssignment; + this.nodeCompAssignment = nodeCompAssignment; + this.maxStatesSearched = maxStatesSearched; + this.execs = execs; + okToRemoveFromWorker = new boolean[execs.size()]; + okToRemoveFromNode = new boolean[execs.size()]; + this.td = td; + startTimeMillis = Time.currentTimeMillis(); + if (maxTimeMs <= 0) { + maxEndTimeMs = Long.MAX_VALUE; + } else { + maxEndTimeMs = startTimeMillis + maxTimeMs; + } + } + + public void incStatesSearched() { + statesSearched++; + if (LOG.isDebugEnabled() && statesSearched % 1_000 == 0) { + LOG.debug("States Searched: {}", statesSearched); + LOG.debug("backtrack: {}", numBacktrack); + } + } + + public int getStatesSearched() { + return statesSearched; + } + + public boolean areSearchLimitsExceeded() { + return statesSearched > maxStatesSearched || Time.currentTimeMillis() > maxEndTimeMs; + } + + public SearcherState nextExecutor() { + execIndex++; + if (execIndex >= execs.size()) { + throw new IllegalStateException("Exceeded the exec limit " + execIndex + " >= " + execs.size()); + } + return this; + } + + public boolean areAllExecsScheduled() { + return execIndex == execs.size() - 1; + } + + public ExecutorDetails currentExec() { + return execs.get(execIndex); + } + + public void tryToSchedule(String comp, RAS_Node node, WorkerSlot workerSlot) { + ExecutorDetails exec = currentExec(); + LOG.trace("Trying assignment of {} {} to {}", exec, comp, workerSlot); + //It is possible that this component is already scheduled on this node or worker. If so when we backtrack we cannot remove it + okToRemoveFromWorker[execIndex] = workerCompAssignment.computeIfAbsent(workerSlot, (k) -> new HashSet<>()).add(comp); + okToRemoveFromNode[execIndex] = nodeCompAssignment.get(node).add(comp); + node.assignSingleExecutor(workerSlot, exec, td); + } + + public void backtrack(String comp, RAS_Node node, WorkerSlot workerSlot) { + execIndex--; + if (execIndex < 0) { + throw new IllegalStateException("Internal Error exec index became negative"); + } + numBacktrack++; + ExecutorDetails exec = currentExec(); + LOG.trace("Backtracking {} {} from {}", exec, comp, workerSlot); + if (okToRemoveFromWorker[execIndex]) { + workerCompAssignment.get(workerSlot).remove(comp); + } + if (okToRemoveFromNode[execIndex]) { --- End diff -- As defensive we may want to reset this back to false.
---