Ethanlm commented on a change in pull request #3346:
URL: https://github.com/apache/storm/pull/3346#discussion_r524462437



##########
File path: 
storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java
##########
@@ -428,16 +465,39 @@ protected SchedulingResult 
scheduleExecutorsOnNodes(List<ExecutorDetails> ordere
                     }
                     progressIdxForExec[execIndex]++;
 
+                    int numBoundAckerAssigned
+                        = assignBoundAckersForNewWorkerSlot(exec, node, 
workerSlot);
+                    if (numBoundAckerAssigned == -1) {
+                        // This only happens when trying to assign bound 
ackers to the worker slot and failed.
+                        // Free the entire worker slot and put those bound 
ackers back to unassigned list
+                        LOG.debug("Failed to assign bound acker for exec: {} 
of topo: {} to worker: {}.  Backtracking.",
+                            exec, topoName, workerSlot);
+                        searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
+                        continue;
+                    }
+
                     if (!isExecAssignmentToWorkerValid(exec, workerSlot)) {
+                        // This only happens when this exec can not fit in the 
workerSlot
+                        // and this is not the first exec to this workerSlot.
+                        // So just go to next workerSlot and don't free the 
worker.
+                        if (numBoundAckerAssigned > 0) {
+                            LOG.debug("Failed to assign exec: {} of topo: {} 
with bound ackers to worker: {}.  Backtracking.",
+                                exec, topoName, workerSlot);
+                            searcherState.freeWorkerSlotWithBoundAckers(node, 
workerSlot);
+                        }
                         continue;
                     }
 
                     searcherState.incStatesSearched();
                     searcherState.assignCurrentExecutor(execToComp, node, 
workerSlot);
+                    if (numBoundAckerAssigned > 0) {
+                        // This exec with its bounded ackers have all been 
successfully assigned
+                        searcherState.getExecsWithBoundAckers().add(exec);
+                    }
                     if (searcherState.areAllExecsScheduled()) {
                         //Everything is scheduled correctly, so no need to 
search any more.
                         LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} 
in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}",
-                                loopCnt, System.currentTimeMillis() - 
startTimeMilli,
+                                loopCnt, Time.currentTimeMillis() - 
startTimeMilli,

Review comment:
       `Time` is clock time if it is not in unit test. 
   I am not very clear why we need to use System here but not on the second one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to