Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4916#discussion_r148482755
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
    @@ -1065,6 +1177,46 @@ private void sendUpdatePartitionInfoRpcCall(
        //  Miscellaneous
        // 
--------------------------------------------------------------------------------------------
     
    +   /**
    +    * Calculates the preferred locations based on the location preference 
constraint.
    +    *
    +    * @param locationPreferenceConstraint constraint for the location 
preference
    +    * @return Future containing the collection of preferred locations. 
This might not be completed if not all inputs
    +    *              have been a resource assigned.
    +    */
    +   @VisibleForTesting
    +   public CompletableFuture<Collection<TaskManagerLocation>> 
calculatePreferredLocations(LocationPreferenceConstraint 
locationPreferenceConstraint) {
    +           final Collection<CompletableFuture<TaskManagerLocation>> 
preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
    +           final CompletableFuture<Collection<TaskManagerLocation>> 
preferredLocationsFuture;
    +
    +           switch(locationPreferenceConstraint) {
    +                   case ALL:
    +                           preferredLocationsFuture = 
FutureUtils.combineAll(preferredLocationFutures);
    +                           break;
    +                   case ANY:
    +                           final ArrayList<TaskManagerLocation> 
completedTaskManagerLocations = new ArrayList<>(1);
    --- End diff --
    
    Maybe it's better to initialize the array with the number of returned 
futures to avoid resizing completely.


---

Reply via email to