bipinprasad commented on a change in pull request #3215: Storm3585 - New 
compact Constraint config including maxCoLocationCnt
URL: https://github.com/apache/storm/pull/3215#discussion_r387904802
 
 

 ##########
 File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java
 ##########
 @@ -134,48 +252,51 @@ private static boolean checkConstraintsSatisfied(Cluster 
cluster, TopologyDetail
         return workerToNodes;
     }
 
-    private static boolean checkSpreadSchedulingValid(Cluster cluster, 
TopologyDetails topo) {
+    private static boolean checkSpreadSchedulingValid(Cluster cluster, 
TopologyDetails topo, ConstraintConfig constraintConfig) {
         LOG.info("Checking for a valid scheduling...");
         assert (cluster.getAssignmentById(topo.getId()) != null);
-        Map<ExecutorDetails, WorkerSlot> result = 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot();
+        if (constraintConfig == null) {
+            constraintConfig = new ConstraintConfig(topo);
+        }
         Map<ExecutorDetails, String> execToComp = 
topo.getExecutorToComponent();
-        Map<WorkerSlot, HashSet<ExecutorDetails>> workerExecMap = new 
HashMap<>();
-        Map<WorkerSlot, HashSet<String>> workerCompMap = new HashMap<>();
-        Map<RasNode, HashSet<String>> nodeCompMap = new HashMap<>();
+        Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // 
this is the critical count
         Map<WorkerSlot, RasNode> workerToNodes = workerToNodes(cluster);
         boolean ret = true;
 
-        HashSet<String> spreadComps = getSpreadComps(topo);
-        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : result.entrySet()) 
{
+        Map<String, Integer> spreadCompCnts = 
constraintConfig.maxCoLocationCnts;
+        for (Map.Entry<ExecutorDetails, WorkerSlot> entry : 
cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
             ExecutorDetails exec = entry.getKey();
+            String comp = execToComp.get(exec);
             WorkerSlot worker = entry.getValue();
             RasNode node = workerToNodes.get(worker);
-
-            if (workerExecMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).contains(exec)) {
-                LOG.error("Incorrect Scheduling: Found duplicate in 
scheduling");
-                return false;
-            }
-            workerExecMap.get(worker).add(exec);
-            String comp = execToComp.get(exec);
-            workerCompMap.computeIfAbsent(worker, (k) -> new 
HashSet<>()).add(comp);
-            if (spreadComps.contains(comp)) {
-                if (nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).contains(comp)) {
-                    LOG.error("Incorrect Scheduling: Spread for Component: {} 
{} on node {} not satisfied {}",
-                              comp, exec, node.getId(), nodeCompMap.get(node));
+            String nodeId = node.getId();
+
+            if (spreadCompCnts.containsKey(comp)) {
+                int allowedColocationMaxCnt = spreadCompCnts.get(comp);
+                Map<String, Integer> oneNodeCompMap = 
nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
+                oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) 
+ 1);
+                if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
+                    LOG.error("Incorrect Scheduling: MaxCoLocationCnt for 
Component: {} {} on node {} not satisfied, cnt {} > allowed {}",
+                            comp, exec, nodeId, oneNodeCompMap.get(comp), 
allowedColocationMaxCnt);
                     ret = false;
                 }
             }
-            nodeCompMap.computeIfAbsent(node, (k) -> new 
HashSet<>()).add(comp);
+        }
+        if (!ret) {
+            LOG.error("Incorrect MaxCoLocationCnts: Node-Component-Cnt {}", 
nodeCompMap);
         }
         return ret;
     }
 
     /**
      * Check if resource constraints satisfied.
      */
-    private static boolean checkResourcesCorrect(Cluster cluster, 
TopologyDetails topo) {
+    private static boolean checkResourcesCorrect(Cluster cluster, 
TopologyDetails topo, ConstraintConfig constraintConfig) {
 
 Review comment:
   removed unused param

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to