Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r208748740
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
             return ret;
         }
     
    -    private static Map<String, Map<List<Long>, List<Object>>> 
computeNewTopoToExecToNodePort(
    -        Map<String, SchedulerAssignment> schedAssignments, Map<String, 
Assignment> existingAssignments) {
    -        Map<String, Map<List<Long>, List<Object>>> ret = 
computeTopoToExecToNodePort(schedAssignments);
    -        // Print some useful information
    -        if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
    -            for (Entry<String, Map<List<Long>, List<Object>>> entry : 
ret.entrySet()) {
    -                String topoId = entry.getKey();
    -                Map<List<Long>, List<Object>> execToNodePort = 
entry.getValue();
    -                Assignment assignment = existingAssignments.get(topoId);
    -                if (assignment == null) {
    -                    continue;
    +    private boolean inspectSchduling(Map<String, Assignment> 
existingAssignments,
    +                                            Map<String, Assignment> 
newAssignments) {
    +        assert existingAssignments != null && newAssignments != null;
    +        boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
    +        long numRemovedExec = 0;
    +        long numRemovedSlot = 0;
    +        long numAddedExec   = 0;
    +        long numAddedSlot   = 0;
    +        if (existingAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
newAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
    +                LOG.info("Assign executors: {}", execToPort.keySet());
    +                numAddedSlot += count;
    +                numAddedExec += execToPort.size();
    +            }
    +        } else if (newAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
existingAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
    +                LOG.info("Remove executors: {}", execToPort.keySet());
    +                numRemovedSlot += count;
    +                numRemovedExec += execToPort.size();
    +            }
    +        } else {
    +            MapDifference<String, Assignment> difference = 
Maps.difference(existingAssignments, newAssignments);
    +            if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
    +                for (Entry<String, Assignment> entry : 
difference.entriesOnlyOnLeft().entrySet()) {
    +                    final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                    final long count = new 
HashSet<>(execToPort.values()).size();
    +                    LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
    +                    LOG.info("Remove executors: {}", execToPort.keySet());
    +                    numRemovedSlot += count;
    +                    numRemovedExec += execToPort.size();
                     }
    -                Map<List<Long>, NodeInfo> old = 
assignment.get_executor_node_port();
    -                Map<List<Long>, List<Object>> reassigned = new HashMap<>();
    -                for (Entry<List<Long>, List<Object>> execAndNodePort : 
execToNodePort.entrySet()) {
    -                    NodeInfo oldAssigned = 
old.get(execAndNodePort.getKey());
    -                    String node = (String) 
execAndNodePort.getValue().get(0);
    -                    Long port = (Long) execAndNodePort.getValue().get(1);
    -                    if (oldAssigned == null || 
!oldAssigned.get_node().equals(node)
    -                        || 
!port.equals(oldAssigned.get_port_iterator().next())) {
    -                        reassigned.put(execAndNodePort.getKey(), 
execAndNodePort.getValue());
    -                    }
    +                for (Entry<String, Assignment> entry : 
difference.entriesOnlyOnRight().entrySet()) {
    +                    final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                    final long count = new 
HashSet<>(execToPort.values()).size();
    +                    LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
    +                    LOG.info("Assign executors: {}", execToPort.keySet());
    +                    numAddedSlot += count;
    +                    numAddedExec += execToPort.size();
                     }
     
    -                if (!reassigned.isEmpty()) {
    -                    int count = (new 
HashSet<>(execToNodePort.values())).size();
    -                    Set<List<Long>> reExecs = reassigned.keySet();
    -                    LOG.info("Reassigning {} to {} slots", topoId, count);
    -                    LOG.info("Reassign executors: {}", reExecs);
    +                for (Entry<String, 
MapDifference.ValueDifference<Assignment>> entry : 
difference.entriesDiffering().entrySet()) {
    --- End diff --
    
    `entriesDiffering` Returns an unmodifiable map describing keys that appear 
in both maps, but with different values. 
    
    



---

Reply via email to