bipinprasad commented on a change in pull request #3215: Storm3585 - New compact Constraint config including maxCoLocationCnt URL: https://github.com/apache/storm/pull/3215#discussion_r387904802
########## File path: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java ########## @@ -134,48 +252,51 @@ private static boolean checkConstraintsSatisfied(Cluster cluster, TopologyDetail return workerToNodes; } - private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo) { + private static boolean checkSpreadSchedulingValid(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) { LOG.info("Checking for a valid scheduling..."); assert (cluster.getAssignmentById(topo.getId()) != null); - Map<ExecutorDetails, WorkerSlot> result = cluster.getAssignmentById(topo.getId()).getExecutorToSlot(); + if (constraintConfig == null) { + constraintConfig = new ConstraintConfig(topo); + } Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent(); - Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new HashMap<>(); - Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>(); - Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>(); + Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // this is the critical count Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster); boolean ret = true; - HashSet<String> spreadComps = getSpreadComps(topo); - for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) { + Map<String, Integer> spreadCompCnts = constraintConfig.maxCoLocationCnts; + for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) { ExecutorDetails exec = entry.getKey(); + String comp = execToComp.get(exec); WorkerSlot worker = entry.getValue(); RasNode node = workerToNodes.get(worker); - - if (workerExecMap.computeIfAbsent(worker, (k) -> new HashSet<>()).contains(exec)) { - LOG.error("Incorrect Scheduling: Found duplicate in scheduling"); - return false; - } - workerExecMap.get(worker).add(exec); - String comp = execToComp.get(exec); - workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp); - if (spreadComps.contains(comp)) { - if (nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).contains(comp)) { - LOG.error("Incorrect Scheduling: Spread for Component: {} {} on node {} not satisfied {}", - comp, exec, node.getId(), nodeCompMap.get(node)); + String nodeId = node.getId(); + + if (spreadCompCnts.containsKey(comp)) { + int allowedColocationMaxCnt = spreadCompCnts.get(comp); + Map<String, Integer> oneNodeCompMap = nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>()); + oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) + 1); + if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) { + LOG.error("Incorrect Scheduling: MaxCoLocationCnt for Component: {} {} on node {} not satisfied, cnt {} > allowed {}", + comp, exec, nodeId, oneNodeCompMap.get(comp), allowedColocationMaxCnt); ret = false; } } - nodeCompMap.computeIfAbsent(node, (k) -> new HashSet<>()).add(comp); + } + if (!ret) { + LOG.error("Incorrect MaxCoLocationCnts: Node-Component-Cnt {}", nodeCompMap); } return ret; } /** * Check if resource constraints satisfied. */ - private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo) { + private static boolean checkResourcesCorrect(Cluster cluster, TopologyDetails topo, ConstraintConfig constraintConfig) { Review comment: removed unused param ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services