Github user zd-project commented on a diff in the pull request: https://github.com/apache/storm/pull/2764#discussion_r208748740 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java --- @@ -735,39 +756,108 @@ private static int numUsedWorkers(SchedulerAssignment assignment) { return ret; } - private static Map<String, Map<List<Long>, List<Object>>> computeNewTopoToExecToNodePort( - Map<String, SchedulerAssignment> schedAssignments, Map<String, Assignment> existingAssignments) { - Map<String, Map<List<Long>, List<Object>>> ret = computeTopoToExecToNodePort(schedAssignments); - // Print some useful information - if (existingAssignments != null && !existingAssignments.isEmpty()) { - for (Entry<String, Map<List<Long>, List<Object>>> entry : ret.entrySet()) { - String topoId = entry.getKey(); - Map<List<Long>, List<Object>> execToNodePort = entry.getValue(); - Assignment assignment = existingAssignments.get(topoId); - if (assignment == null) { - continue; + private boolean inspectSchduling(Map<String, Assignment> existingAssignments, + Map<String, Assignment> newAssignments) { + assert existingAssignments != null && newAssignments != null; + boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); + long numRemovedExec = 0; + long numRemovedSlot = 0; + long numAddedExec = 0; + long numAddedSlot = 0; + if (existingAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : newAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); + } + } else if (newAssignments.isEmpty()) { + for (Entry<String, Assignment> entry : existingAssignments.entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); + } + } else { + MapDifference<String, Assignment> difference = Maps.difference(existingAssignments, newAssignments); + if (anyChanged = (difference.entriesInCommon().size() != newAssignments.size())) { + for (Entry<String, Assignment> entry : difference.entriesOnlyOnLeft().entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); } - Map<List<Long>, NodeInfo> old = assignment.get_executor_node_port(); - Map<List<Long>, List<Object>> reassigned = new HashMap<>(); - for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) { - NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); - String node = (String) execAndNodePort.getValue().get(0); - Long port = (Long) execAndNodePort.getValue().get(1); - if (oldAssigned == null || !oldAssigned.get_node().equals(node) - || !port.equals(oldAssigned.get_port_iterator().next())) { - reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); - } + for (Entry<String, Assignment> entry : difference.entriesOnlyOnRight().entrySet()) { + final Map<List<Long>, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); } - if (!reassigned.isEmpty()) { - int count = (new HashSet<>(execToNodePort.values())).size(); - Set<List<Long>> reExecs = reassigned.keySet(); - LOG.info("Reassigning {} to {} slots", topoId, count); - LOG.info("Reassign executors: {}", reExecs); + for (Entry<String, MapDifference.ValueDifference<Assignment>> entry : difference.entriesDiffering().entrySet()) { --- End diff -- `entriesDiffering` Returns an unmodifiable map describing keys that appear in both maps, but with different values.
---