Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2764#discussion_r208970052
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -735,39 +756,108 @@ private static int 
numUsedWorkers(SchedulerAssignment assignment) {
             return ret;
         }
     
    -    private static Map<String, Map<List<Long>, List<Object>>> 
computeNewTopoToExecToNodePort(
    -        Map<String, SchedulerAssignment> schedAssignments, Map<String, 
Assignment> existingAssignments) {
    -        Map<String, Map<List<Long>, List<Object>>> ret = 
computeTopoToExecToNodePort(schedAssignments);
    -        // Print some useful information
    -        if (existingAssignments != null && !existingAssignments.isEmpty()) 
{
    -            for (Entry<String, Map<List<Long>, List<Object>>> entry : 
ret.entrySet()) {
    -                String topoId = entry.getKey();
    -                Map<List<Long>, List<Object>> execToNodePort = 
entry.getValue();
    -                Assignment assignment = existingAssignments.get(topoId);
    -                if (assignment == null) {
    -                    continue;
    +    private boolean inspectSchduling(Map<String, Assignment> 
existingAssignments,
    +                                            Map<String, Assignment> 
newAssignments) {
    +        assert existingAssignments != null && newAssignments != null;
    +        boolean anyChanged = existingAssignments.isEmpty() ^ 
newAssignments.isEmpty();
    +        long numRemovedExec = 0;
    +        long numRemovedSlot = 0;
    +        long numAddedExec   = 0;
    +        long numAddedSlot   = 0;
    +        if (existingAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
newAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Assigning {} to {} slots", entry.getKey(), 
count);
    +                LOG.info("Assign executors: {}", execToPort.keySet());
    +                numAddedSlot += count;
    +                numAddedExec += execToPort.size();
    +            }
    +        } else if (newAssignments.isEmpty()) {
    +            for (Entry<String, Assignment> entry : 
existingAssignments.entrySet()) {
    +                final Map<List<Long>, NodeInfo> execToPort = 
entry.getValue().get_executor_node_port();
    +                final long count = new 
HashSet<>(execToPort.values()).size();
    +                LOG.info("Removing {} from {} slots", entry.getKey(), 
count);
    +                LOG.info("Remove executors: {}", execToPort.keySet());
    +                numRemovedSlot += count;
    +                numRemovedExec += execToPort.size();
    +            }
    +        } else {
    +            MapDifference<String, Assignment> difference = 
Maps.difference(existingAssignments, newAssignments);
    +            if (anyChanged = (difference.entriesInCommon().size() != 
newAssignments.size())) {
    --- End diff --
    
    We don't know whether two non empty maps are equal or not until we do 
`Maps#difference`. Then I can assign the variable `anyChanged` to remember if 
any changes in ScheduledAssignments have been made, which is the return value 
as well as the determinant if I should add more loggings 


---

Reply via email to