Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148482755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall( // Miscellaneous // -------------------------------------------------------------------------------------------- + /** + * Calculates the preferred locations based on the location preference constraint. + * + * @param locationPreferenceConstraint constraint for the location preference + * @return Future containing the collection of preferred locations. This might not be completed if not all inputs + * have been a resource assigned. + */ + @VisibleForTesting + public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) { + final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs(); + final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture; + + switch(locationPreferenceConstraint) { + case ALL: + preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures); + break; + case ANY: + final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(1); --- End diff -- Maybe it's better to initialize the array with the number of returned futures to avoid resizing completely.
---