Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/4949#discussion_r192406775 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotProfile.java --- @@ -203,68 +206,104 @@ public LocalityAwareRequirementsToSlotMatcher(@Nonnull Collection<TaskManagerLoc } // we build up two indexes, one for resource id and one for host names of the preferred locations. - HashSet<ResourceID> preferredResourceIDs = new HashSet<>(locationPreferences.size()); - HashSet<String> preferredFQHostNames = new HashSet<>(locationPreferences.size()); + Map<ResourceID, Integer> preferredResourceIDs = new HashMap<>(locationPreferences.size()); + Map<String, Integer> preferredFQHostNames = new HashMap<>(locationPreferences.size()); for (TaskManagerLocation locationPreference : locationPreferences) { - preferredResourceIDs.add(locationPreference.getResourceID()); - preferredFQHostNames.add(locationPreference.getFQDNHostname()); + Integer oldVal = preferredResourceIDs.getOrDefault(locationPreference.getResourceID(), 0); + preferredResourceIDs.put(locationPreference.getResourceID(), oldVal + 1); + + oldVal = preferredFQHostNames.getOrDefault(locationPreference.getFQDNHostname(), 0); + preferredFQHostNames.put(locationPreference.getFQDNHostname(), oldVal + 1); } Iterator<IN> iterator = candidates.iterator(); - IN matchByHostName = null; IN matchByAdditionalRequirements = null; + final Map<IN, CandidateMatchedResult> candidateMatchedResults = new HashMap<>(); --- End diff -- Checked, just remember the currently highest ranked result is enough. ð
---