Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208970052 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } - private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort( - Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) { - Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments); - // Print some useful information - if (existingAssignments != null && !existingAssignments.isEmpty()) { - for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) { - String topoId = entry.getKey(); - Map<List<Long>, List<Object>> execToNodePort = entry.getValue(); - Assignment assignment = existingAssignments.get(topoId); - if (assignment == null) { - continue; + private boolean inspectSchduling(Map<String, Assignment> existingAssignments, + Map<String, Assignment> newAssignments) { + assert existingAssignments != null && newAssignments != null; + boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); + long numRemovedExec = 0; + long numRemovedSlot = 0; + long numAddedExec = 0; + long numAddedSlot = 0; + if (existingAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : newAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); + } + } else if (newAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : existingAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); + } + } else { + MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments); + if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { --- End diff -- We don't know whether two non empty maps are equal or not until we do `Maps#difference`. Then I can assign the variable `anyChanged` to remember if any changes in ScheduledAssignments have been made, which is the return value as well as the determinant if I should add more loggings
---