This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7dd42617a46fcecfffbea3409391e204a40b9b1
Author: sxnan <suxuanna...@gmail.com>
AuthorDate: Tue Aug 9 21:07:17 2022 +0800

    [FLINK-28860][runtime] JobMaster wait for partition promote before close
---
 .../partition/JobMasterPartitionTracker.java       |  27 ++-
 .../partition/JobMasterPartitionTrackerImpl.java   |  89 +++-----
 .../TaskExecutorPartitionTrackerImpl.java          |  10 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  46 ++--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |   3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  33 ++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  20 +-
 .../TaskExecutorGatewayDecoratorBase.java          |  13 +-
 .../JobMasterPartitionTrackerImplTest.java         | 239 +++++++++++++--------
 .../partition/NoOpJobMasterPartitionTracker.java   |   7 +-
 .../TestingJobMasterPartitionTracker.java          |  31 +--
 .../jobmaster/JobMasterPartitionReleaseTest.java   |  19 +-
 .../TaskExecutorPartitionLifecycleTest.java        |  28 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |  27 +--
 .../TestingTaskExecutorGatewayBuilder.java         |  30 ++-
 15 files changed, 378 insertions(+), 244 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index 2c116e4951e..fa6886f5076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
@@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker
             Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster);
 
     /**
-     * Releases the job partitions and promotes the cluster partitions, and 
stops the tracking of
-     * partitions that were released/promoted.
+     * Promotes the given partitions, and stops the tracking of partitions 
that were promoted.
+     *
+     * @param resultPartitionIds ID of the partition containing both job 
partitions and cluster
+     *     partitions.
+     * @return Future that will be completed if the partitions are promoted.
      */
-    void stopTrackingAndReleaseOrPromotePartitions(
+    CompletableFuture<Void> stopTrackingAndPromotePartitions(
             Collection<ResultPartitionID> resultPartitionIds);
 
-    /** Get all the partitions under tracking. */
+    /** Gets all the partitions under tracking. */
     Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
 
+    /** Gets all the non-cluster partitions under tracking. */
+    default Collection<ResultPartitionDeploymentDescriptor> 
getAllTrackedNonClusterPartitions() {
+        return getAllTrackedPartitions().stream()
+                .filter(descriptor -> 
!descriptor.getPartitionType().isPersistent())
+                .collect(Collectors.toList());
+    }
+
+    /** Gets all the cluster partitions under tracking. */
+    default Collection<ResultPartitionDeploymentDescriptor> 
getAllTrackedClusterPartitions() {
+        return getAllTrackedPartitions().stream()
+                .filter(descriptor -> 
descriptor.getPartitionType().isPersistent())
+                .collect(Collectors.toList());
+    }
+
     void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway);
 
     /** Get the shuffle descriptors of the cluster partitions ordered by 
partition number. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index 8c8069f9760..a908581b403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -114,11 +119,15 @@ public class JobMasterPartitionTrackerImpl
     }
 
     @Override
-    public void stopTrackingAndReleaseOrPromotePartitions(
+    public CompletableFuture<Void> stopTrackingAndPromotePartitions(
             Collection<ResultPartitionID> resultPartitionIds) {
+        List<CompletableFuture<Acknowledge>> promoteFutures = new 
ArrayList<>();
         stopTrackingAndHandlePartitions(
                 resultPartitionIds,
-                (tmID, partitionDescs) -> 
internalReleaseOrPromotePartitions(tmID, partitionDescs));
+                (tmID, partitionDescs) ->
+                        promoteFutures.add(
+                                internalPromotePartitionsOnTaskExecutor(tmID, 
partitionDescs)));
+        return FutureUtils.completeAll(promoteFutures);
     }
 
     @Override
@@ -185,14 +194,25 @@ public class JobMasterPartitionTrackerImpl
         }
     }
 
-    private void internalReleaseOrPromotePartitions(
+    private CompletableFuture<Acknowledge> 
internalPromotePartitionsOnTaskExecutor(
             ResourceID potentialPartitionLocation,
-            Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
+            Collection<ResultPartitionDeploymentDescriptor> 
clusterPartitionDeploymentDescriptors) {
+        final Set<ResultPartitionID> partitionsRequiringRpcPromoteCalls =
+                clusterPartitionDeploymentDescriptors.stream()
+                        
.filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources)
+                        
.map(JobMasterPartitionTrackerImpl::getResultPartitionId)
+                        .collect(Collectors.toSet());
 
-        internalReleaseOrPromotePartitionsOnTaskExecutor(
-                potentialPartitionLocation, partitionDeploymentDescriptors);
-        internalReleasePartitionsOnShuffleMaster(
-                excludePersistentPartitions(partitionDeploymentDescriptors));
+        if (!partitionsRequiringRpcPromoteCalls.isEmpty()) {
+            final TaskExecutorGateway taskExecutorGateway =
+                    
taskExecutorGatewayLookup.lookup(potentialPartitionLocation).orElse(null);
+
+            if (taskExecutorGateway != null) {
+                return taskExecutorGateway.promotePartitions(
+                        jobId, partitionsRequiringRpcPromoteCalls);
+            }
+        }
+        return CompletableFuture.completedFuture(null);
     }
 
     private void internalReleasePartitionsOnTaskExecutor(
@@ -205,50 +225,13 @@ public class JobMasterPartitionTrackerImpl
                         
.map(JobMasterPartitionTrackerImpl::getResultPartitionId)
                         .collect(Collectors.toSet());
 
-        internalReleaseOrPromotePartitionsOnTaskExecutor(
-                potentialPartitionLocation,
-                partitionsRequiringRpcReleaseCalls,
-                Collections.emptySet());
-    }
-
-    private void internalReleaseOrPromotePartitionsOnTaskExecutor(
-            ResourceID potentialPartitionLocation,
-            Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
-
-        Map<Boolean, Set<ResultPartitionID>> partitionsToReleaseByPersistence =
-                partitionDeploymentDescriptors.stream()
-                        
.filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources)
-                        .collect(
-                                Collectors.partitioningBy(
-                                        resultPartitionDeploymentDescriptor ->
-                                                
resultPartitionDeploymentDescriptor
-                                                        .getPartitionType()
-                                                        .isPersistent(),
-                                        Collectors.mapping(
-                                                
JobMasterPartitionTrackerImpl::getResultPartitionId,
-                                                Collectors.toSet())));
-
-        internalReleaseOrPromotePartitionsOnTaskExecutor(
-                potentialPartitionLocation,
-                partitionsToReleaseByPersistence.get(false),
-                partitionsToReleaseByPersistence.get(true));
-    }
-
-    private void internalReleaseOrPromotePartitionsOnTaskExecutor(
-            ResourceID potentialPartitionLocation,
-            Set<ResultPartitionID> partitionsRequiringRpcReleaseCalls,
-            Set<ResultPartitionID> partitionsRequiringRpcPromoteCalls) {
-
-        if (!partitionsRequiringRpcReleaseCalls.isEmpty()
-                || !partitionsRequiringRpcPromoteCalls.isEmpty()) {
+        if (!partitionsRequiringRpcReleaseCalls.isEmpty()) {
             taskExecutorGatewayLookup
                     .lookup(potentialPartitionLocation)
                     .ifPresent(
                             taskExecutorGateway ->
-                                    
taskExecutorGateway.releaseOrPromotePartitions(
-                                            jobId,
-                                            partitionsRequiringRpcReleaseCalls,
-                                            
partitionsRequiringRpcPromoteCalls));
+                                    taskExecutorGateway.releasePartitions(
+                                            jobId, 
partitionsRequiringRpcReleaseCalls));
         }
     }
 
@@ -267,16 +250,6 @@ public class JobMasterPartitionTrackerImpl
                 .isPresent();
     }
 
-    private static Stream<ResultPartitionDeploymentDescriptor> 
excludePersistentPartitions(
-            Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
-        return partitionDeploymentDescriptors.stream()
-                .filter(
-                        resultPartitionDeploymentDescriptor ->
-                                !resultPartitionDeploymentDescriptor
-                                        .getPartitionType()
-                                        .isPersistent());
-    }
-
     private static ResultPartitionID getResultPartitionId(
             ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor) {
         return 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
index e183ab9c2dd..17f526268e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
@@ -25,6 +25,9 @@ import 
org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -39,6 +42,9 @@ public class TaskExecutorPartitionTrackerImpl
         extends AbstractPartitionTracker<JobID, TaskExecutorPartitionInfo>
         implements TaskExecutorPartitionTracker {
 
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TaskExecutorPartitionTrackerImpl.class);
+
     private final Map<IntermediateDataSetID, DataSetEntry> clusterPartitions = 
new HashMap<>();
     private final ShuffleEnvironment<?, ?> shuffleEnvironment;
 
@@ -58,6 +64,7 @@ public class TaskExecutorPartitionTrackerImpl
     @Override
     public void stopTrackingAndReleaseJobPartitions(
             Collection<ResultPartitionID> partitionsToRelease) {
+        LOG.debug("Releasing Job Partitions {}", partitionsToRelease);
         if (partitionsToRelease.isEmpty()) {
             return;
         }
@@ -72,11 +79,14 @@ public class TaskExecutorPartitionTrackerImpl
                 CollectionUtil.project(
                         stopTrackingPartitionsFor(producingJobId),
                         PartitionTrackerEntry::getResultPartitionId);
+        LOG.debug("Releasing Job Partitions {} for job {}", partitionsForJob, 
producingJobId);
         shuffleEnvironment.releasePartitionsLocally(partitionsForJob);
     }
 
     @Override
     public void promoteJobPartitions(Collection<ResultPartitionID> 
partitionsToPromote) {
+        LOG.debug("Promoting Job Partitions {}", partitionsToPromote);
+
         if (partitionsToPromote.isEmpty()) {
             return;
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 7ce83525c6e..3dfaa1c8776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1069,22 +1069,42 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
     private void jobStatusChanged(final JobStatus newJobStatus) {
         validateRunsInMainThread();
         if (newJobStatus.isGloballyTerminalState()) {
-            runAsync(
-                    () -> {
-                        Collection<ResultPartitionID> allTracked =
-                                
partitionTracker.getAllTrackedPartitions().stream()
-                                        .map(d -> 
d.getShuffleDescriptor().getResultPartitionID())
-                                        .collect(Collectors.toList());
-                        if (newJobStatus == JobStatus.FINISHED) {
-                            
partitionTracker.stopTrackingAndReleaseOrPromotePartitions(allTracked);
-                        } else {
-                            
partitionTracker.stopTrackingAndReleasePartitions(allTracked);
-                        }
-                    });
+            CompletableFuture<Void> partitionPromoteFuture;
+            if (newJobStatus == JobStatus.FINISHED) {
+                Collection<ResultPartitionID> jobPartitions =
+                        
partitionTracker.getAllTrackedNonClusterPartitions().stream()
+                                .map(d -> 
d.getShuffleDescriptor().getResultPartitionID())
+                                .collect(Collectors.toList());
+                
partitionTracker.stopTrackingAndReleasePartitions(jobPartitions);
+                Collection<ResultPartitionID> clusterPartitions =
+                        
partitionTracker.getAllTrackedClusterPartitions().stream()
+                                .map(d -> 
d.getShuffleDescriptor().getResultPartitionID())
+                                .collect(Collectors.toList());
+                partitionPromoteFuture =
+                        
partitionTracker.stopTrackingAndPromotePartitions(clusterPartitions);
+            } else {
+                Collection<ResultPartitionID> allTracked =
+                        partitionTracker.getAllTrackedPartitions().stream()
+                                .map(d -> 
d.getShuffleDescriptor().getResultPartitionID())
+                                .collect(Collectors.toList());
+                partitionTracker.stopTrackingAndReleasePartitions(allTracked);
+                partitionPromoteFuture = 
CompletableFuture.completedFuture(null);
+            }
 
             final ExecutionGraphInfo executionGraphInfo = 
schedulerNG.requestJob();
+
             futureExecutor.execute(
-                    () -> 
jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo));
+                    () -> {
+                        try {
+                            partitionPromoteFuture.get();
+                        } catch (Throwable e) {
+                            // We do not want to fail the job in case of 
partition releasing and
+                            // promoting fail. The TaskExecutors will release 
the partitions
+                            // eventually when they find out the JobMaster is 
closed.
+                            log.warn("Fail to release or promote partitions", 
e);
+                        }
+                        
jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo);
+                    });
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index e0326bab665..eea64cd1e54 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -34,7 +34,6 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
-import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
@@ -76,7 +75,7 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
 
     @Override
     public void releasePartitions(JobID jobId, Set<ResultPartitionID> 
partitionIds) {
-        taskExecutorGateway.releaseOrPromotePartitions(jobId, partitionIds, 
Collections.emptySet());
+        taskExecutorGateway.releasePartitions(jobId, partitionIds);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index f4481ea9d80..a1204956776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -906,27 +906,40 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
     }
 
     @Override
-    public void releaseOrPromotePartitions(
-            JobID jobId,
-            Set<ResultPartitionID> partitionToRelease,
-            Set<ResultPartitionID> partitionsToPromote) {
+    public void releasePartitions(JobID jobId, Set<ResultPartitionID> 
partitionIds) {
+        try {
+            partitionTracker.stopTrackingAndReleaseJobPartitions(partitionIds);
+            closeJobManagerConnectionIfNoAllocatedResources(jobId);
+        } catch (Throwable t) {
+            onFatalError(t);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> promotePartitions(
+            JobID jobId, Set<ResultPartitionID> partitionIds) {
+        CompletableFuture<Acknowledge> future = new CompletableFuture<>();
         try {
-            
partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease);
-            partitionTracker.promoteJobPartitions(partitionsToPromote);
+            partitionTracker.promoteJobPartitions(partitionIds);
             if (establishedResourceManagerConnection != null) {
                 establishedResourceManagerConnection
                         .getResourceManagerGateway()
                         .reportClusterPartitions(
-                                getResourceID(), 
partitionTracker.createClusterPartitionReport());
+                                getResourceID(), 
partitionTracker.createClusterPartitionReport())
+                        .thenAccept(ignore -> 
future.complete(Acknowledge.get()));
+            } else {
+                future.completeExceptionally(
+                        new RuntimeException(
+                                "Task executor is not connecting to 
ResourceManager. "
+                                        + "Fail to report cluster partition to 
ResourceManager"));
             }
             closeJobManagerConnectionIfNoAllocatedResources(jobId);
         } catch (Throwable t) {
-            // TODO: Do we still need this catch branch?
+            future.completeExceptionally(t);
             onFatalError(t);
         }
 
-        // TODO: Maybe it's better to return an Acknowledge here to notify the 
JM about the
-        // success/failure with an Exception
+        return future;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 44c4689b3d1..25d00e89f4e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -101,16 +101,22 @@ public interface TaskExecutorGateway
             @RpcTimeout Time timeout);
 
     /**
-     * Batch release/promote intermediate result partitions.
+     * Batch release intermediate result partitions.
      *
      * @param jobId id of the job that the partitions belong to
-     * @param partitionToRelease partition ids to release
-     * @param partitionsToPromote partitions ids to promote
+     * @param partitionIds partition ids to release
      */
-    void releaseOrPromotePartitions(
-            JobID jobId,
-            Set<ResultPartitionID> partitionToRelease,
-            Set<ResultPartitionID> partitionsToPromote);
+    void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds);
+
+    /**
+     * Batch promote intermediate result partitions.
+     *
+     * @param jobId id of the job that the partitions belong to
+     * @param partitionIds partition ids to release
+     * @return Future acknowledge that the partitions are successfully 
promoted.
+     */
+    CompletableFuture<Acknowledge> promotePartitions(
+            JobID jobId, Set<ResultPartitionID> partitionIds);
 
     /**
      * Releases all cluster partitions belong to any of the given data sets.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
index 543d56d8fa9..fa86655bd68 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
@@ -106,11 +106,14 @@ public class TaskExecutorGatewayDecoratorBase implements 
TaskExecutorGateway {
     }
 
     @Override
-    public void releaseOrPromotePartitions(
-            JobID jobId,
-            Set<ResultPartitionID> partitionToRelease,
-            Set<ResultPartitionID> partitionsToPromote) {
-        originalGateway.releaseOrPromotePartitions(jobId, partitionToRelease, 
partitionsToPromote);
+    public void releasePartitions(JobID jobId, Set<ResultPartitionID> 
partitionIds) {
+        originalGateway.releasePartitions(jobId, partitionIds);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> promotePartitions(
+            JobID jobId, Set<ResultPartitionID> partitionIds) {
+        return originalGateway.promotePartitions(jobId, partitionIds);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
index 83af9e0c5bb..d9b6794c3dc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -47,7 +48,6 @@ import java.util.stream.Collectors;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 
@@ -91,12 +91,15 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
         final JobID jobId = new JobID();
 
-        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<ReleaseCall> releaseCall = new ArrayBlockingQueue<>(4);
         final JobMasterPartitionTracker partitionTracker =
                 new JobMasterPartitionTrackerImpl(
                         jobId,
                         shuffleMaster,
-                        tmId -> Optional.of(createTaskExecutorGateway(tmId, 
releaseCalls)));
+                        tmId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                tmId, releaseCall, new 
ArrayBlockingQueue<>(4))));
 
         final ResourceID tmId = ResourceID.generate();
         final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -110,12 +113,11 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
 
         
partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
 
-        assertEquals(1, releaseCalls.size());
-        ReleaseCall releaseCall = releaseCalls.remove();
-        assertEquals(tmId, releaseCall.getTaskExecutorId());
-        assertEquals(jobId, releaseCall.getJobId());
-        assertThat(releaseCall.getReleasedPartitions(), 
contains(resultPartitionId));
-        assertThat(releaseCall.getPromotedPartitions(), is(empty()));
+        assertEquals(1, releaseCall.size());
+        ReleaseCall releaseOrPromoteCall = releaseCall.remove();
+        assertEquals(tmId, releaseOrPromoteCall.getTaskExecutorId());
+        assertEquals(jobId, releaseOrPromoteCall.getJobId());
+        assertThat(releaseOrPromoteCall.getReleasedPartitions(), 
contains(resultPartitionId));
         assertEquals(1, shuffleMaster.externallyReleasedPartitions.size());
         assertEquals(resultPartitionId, 
shuffleMaster.externallyReleasedPartitions.remove());
         assertThat(partitionTracker.isTrackingPartitionsFor(tmId), is(false));
@@ -126,11 +128,15 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
 
         final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
         final JobMasterPartitionTracker partitionTracker =
                 new JobMasterPartitionTrackerImpl(
                         new JobID(),
                         shuffleMaster,
-                        tmId -> Optional.of(createTaskExecutorGateway(tmId, 
releaseCalls)));
+                        tmId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                tmId, releaseCalls, 
promoteCalls)));
 
         final ResourceID tmId = ResourceID.generate();
         final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -144,6 +150,7 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         
partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId));
 
         assertEquals(0, releaseCalls.size());
+        assertEquals(0, promoteCalls.size());
         assertEquals(1, shuffleMaster.externallyReleasedPartitions.size());
         assertThat(shuffleMaster.externallyReleasedPartitions, 
contains(resultPartitionId));
     }
@@ -152,7 +159,8 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
     public void testStopTrackingIssuesNoReleaseCalls() {
         final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
 
-        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
         final JobMasterPartitionTrackerImpl partitionTracker =
                 new JobMasterPartitionTrackerImpl(
                         new JobID(),
@@ -160,7 +168,7 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                         resourceId ->
                                 Optional.of(
                                         createTaskExecutorGateway(
-                                                resourceId, 
taskExecutorReleaseCalls)));
+                                                resourceId, releaseCalls, 
promoteCalls)));
 
         final ResourceID taskExecutorId1 = ResourceID.generate();
         final ResultPartitionID resultPartitionId1 = new ResultPartitionID();
@@ -172,7 +180,8 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
 
         partitionTracker.stopTrackingPartitionsFor(taskExecutorId1);
 
-        assertEquals(0, taskExecutorReleaseCalls.size());
+        assertEquals(0, releaseCalls.size());
+        assertEquals(0, promoteCalls.size());
         assertEquals(0, shuffleMaster.externallyReleasedPartitions.size());
     }
 
@@ -180,7 +189,8 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
     public void testTrackingInternalAndExternalPartitionsByTmId() {
         final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
 
-        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
         final JobMasterPartitionTrackerImpl partitionTracker =
                 new JobMasterPartitionTrackerImpl(
                         new JobID(),
@@ -188,7 +198,7 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                         resourceId ->
                                 Optional.of(
                                         createTaskExecutorGateway(
-                                                resourceId, 
taskExecutorReleaseCalls)));
+                                                resourceId, releaseCalls, 
promoteCalls)));
 
         final ResourceID taskExecutorId = ResourceID.generate();
         final ResultPartitionID resultPartitionId1 = new ResultPartitionID();
@@ -227,10 +237,88 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
     }
 
     @Test
-    public void testReleaseOrPromote() {
+    public void testGetJobPartitionClusterPartition() {
+        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+
+        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
+        final JobMasterPartitionTrackerImpl partitionTracker =
+                new JobMasterPartitionTrackerImpl(
+                        new JobID(),
+                        shuffleMaster,
+                        resourceId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                resourceId, releaseCalls, 
promoteCalls)));
+
+        final ResourceID taskExecutorId = ResourceID.generate();
+        final ResultPartitionID resultPartitionId1 = new ResultPartitionID();
+        final ResultPartitionID resultPartitionId2 = new ResultPartitionID();
+
+        final ResultPartitionDeploymentDescriptor clusterPartition =
+                
AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(
+                        resultPartitionId1, 
ResultPartitionType.BLOCKING_PERSISTENT, false);
+        final ResultPartitionDeploymentDescriptor jobPartition =
+                
AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor(
+                        resultPartitionId2, false);
+        partitionTracker.startTrackingPartition(taskExecutorId, 
clusterPartition);
+        partitionTracker.startTrackingPartition(taskExecutorId, jobPartition);
+
+        
Assertions.assertThat(partitionTracker.getAllTrackedNonClusterPartitions())
+                .containsExactly(jobPartition);
+        
Assertions.assertThat(partitionTracker.getAllTrackedClusterPartitions())
+                .containsExactly(clusterPartition);
+    }
+
+    @Test
+    public void testGetShuffleDescriptors() {
+        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
+
+        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
+        final JobMasterPartitionTrackerImpl partitionTracker =
+                new JobMasterPartitionTrackerImpl(
+                        new JobID(),
+                        shuffleMaster,
+                        resourceId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                resourceId, releaseCalls, 
promoteCalls)));
+
+        TestingResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+        partitionTracker.connectToResourceManager(resourceManagerGateway);
+        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+
+        assertThat(
+                resourceManagerGateway.requestedIntermediateDataSetIds,
+                contains(intermediateDataSetId));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
+        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
+
+        final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4);
+        final JobMasterPartitionTrackerImpl partitionTracker =
+                new JobMasterPartitionTrackerImpl(
+                        new JobID(),
+                        shuffleMaster,
+                        resourceId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                resourceId, releaseCalls, 
promoteCalls)));
+        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+    }
+
+    @Test
+    public void testReleaseJobPartitionPromoteClusterPartition() {
         final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
 
-        final Queue<ReleaseCall> taskExecutorReleaseOrPromoteCalls = new 
ArrayBlockingQueue<>(4);
+        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+        final Queue<PromoteCall> taskExecutorPromoteCalls = new 
ArrayBlockingQueue<>(4);
         final JobMasterPartitionTracker partitionTracker =
                 new JobMasterPartitionTrackerImpl(
                         new JobID(),
@@ -238,7 +326,9 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                         resourceId ->
                                 Optional.of(
                                         createTaskExecutorGateway(
-                                                resourceId, 
taskExecutorReleaseOrPromoteCalls)));
+                                                resourceId,
+                                                taskExecutorReleaseCalls,
+                                                taskExecutorPromoteCalls)));
 
         final ResourceID taskExecutorId1 = ResourceID.generate();
         final ResultPartitionID jobPartitionId0 = new ResultPartitionID();
@@ -272,26 +362,24 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                         clusterPartitionId1, 
ResultPartitionType.BLOCKING_PERSISTENT, false);
         partitionTracker.startTrackingPartition(taskExecutorId1, 
clusterPartition1);
 
-        partitionTracker.stopTrackingAndReleaseOrPromotePartitions(
-                Arrays.asList(
-                        jobPartitionId0,
-                        jobPartitionId1,
-                        clusterPartitionId0,
-                        clusterPartitionId1));
+        partitionTracker.stopTrackingAndReleasePartitions(
+                Arrays.asList(jobPartitionId0, jobPartitionId1));
+        partitionTracker.stopTrackingAndPromotePartitions(
+                Arrays.asList(clusterPartitionId0, clusterPartitionId1));
 
         // Exactly one call should have been made to the hosting task executor
-        assertEquals(1, taskExecutorReleaseOrPromoteCalls.size());
+        assertEquals(1, taskExecutorReleaseCalls.size());
+        assertEquals(1, taskExecutorPromoteCalls.size());
 
-        final ReleaseCall taskExecutorReleaseOrPromoteCall =
-                taskExecutorReleaseOrPromoteCalls.remove();
+        final ReleaseCall releaseCall = taskExecutorReleaseCalls.remove();
+
+        final PromoteCall promoteCall = taskExecutorPromoteCalls.remove();
 
         // One local partition released and one local partition promoted.
         assertEquals(
-                jobPartitionId0,
-                
Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getReleasedPartitions()));
+                jobPartitionId0, 
Iterables.getOnlyElement(releaseCall.getReleasedPartitions()));
         assertEquals(
-                clusterPartitionId0,
-                
Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getPromotedPartitions()));
+                clusterPartitionId0, 
Iterables.getOnlyElement(promoteCall.getPromotedPartitions()));
 
         // Both internal and external partitions will be fed into 
shuffle-master for releasing.
         Collection<ResultPartitionID> externallyReleasedPartitions =
@@ -300,58 +388,19 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                 externallyReleasedPartitions, 
containsInAnyOrder(jobPartitionId0, jobPartitionId1));
     }
 
-    @Test
-    public void testGetShuffleDescriptors() {
-        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
-        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
-
-        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
-        final JobMasterPartitionTrackerImpl partitionTracker =
-                new JobMasterPartitionTrackerImpl(
-                        new JobID(),
-                        shuffleMaster,
-                        resourceId ->
-                                Optional.of(
-                                        createTaskExecutorGateway(
-                                                resourceId, 
taskExecutorReleaseCalls)));
-
-        TestingResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
-        partitionTracker.connectToResourceManager(resourceManagerGateway);
-        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
-
-        assertThat(
-                resourceManagerGateway.requestedIntermediateDataSetIds,
-                contains(intermediateDataSetId));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
-        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
-        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
-
-        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
-        final JobMasterPartitionTrackerImpl partitionTracker =
-                new JobMasterPartitionTrackerImpl(
-                        new JobID(),
-                        shuffleMaster,
-                        resourceId ->
-                                Optional.of(
-                                        createTaskExecutorGateway(
-                                                resourceId, 
taskExecutorReleaseCalls)));
-        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
-    }
-
     private static TaskExecutorGateway createTaskExecutorGateway(
-            ResourceID taskExecutorId, Collection<ReleaseCall> 
releaseOrPromoteCalls) {
+            ResourceID taskExecutorId,
+            Collection<ReleaseCall> releaseCalls,
+            Collection<PromoteCall> promoteCalls) {
         return new TestingTaskExecutorGatewayBuilder()
-                .setReleaseOrPromotePartitionsConsumer(
-                        (jobId, partitionsToRelease, partitionsToPromote) ->
-                                releaseOrPromoteCalls.add(
-                                        new ReleaseCall(
-                                                taskExecutorId,
-                                                jobId,
-                                                partitionsToRelease,
-                                                partitionsToPromote)))
+                .setReleasePartitionsConsumer(
+                        (jobId, partitions) ->
+                                releaseCalls.add(
+                                        new ReleaseCall(taskExecutorId, jobId, 
partitions)))
+                .setPromotePartitionsConsumer(
+                        (jobId, partitions) ->
+                                promoteCalls.add(
+                                        new PromoteCall(taskExecutorId, jobId, 
partitions)))
                 .createTestingTaskExecutorGateway();
     }
 
@@ -391,17 +440,14 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         private final ResourceID taskExecutorId;
         private final JobID jobId;
         private final Collection<ResultPartitionID> releasedPartitions;
-        private final Collection<ResultPartitionID> promotedPartitions;
 
         private ReleaseCall(
                 ResourceID taskExecutorId,
                 JobID jobId,
-                Collection<ResultPartitionID> releasedPartitions,
-                Collection<ResultPartitionID> promotedPartitions) {
+                Collection<ResultPartitionID> releasedPartitions) {
             this.taskExecutorId = taskExecutorId;
             this.jobId = jobId;
             this.releasedPartitions = releasedPartitions;
-            this.promotedPartitions = promotedPartitions;
         }
 
         public ResourceID getTaskExecutorId() {
@@ -415,6 +461,29 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         public Collection<ResultPartitionID> getReleasedPartitions() {
             return releasedPartitions;
         }
+    }
+
+    private static class PromoteCall {
+        private final ResourceID taskExecutorId;
+        private final JobID jobId;
+        private final Collection<ResultPartitionID> promotedPartitions;
+
+        private PromoteCall(
+                ResourceID taskExecutorId,
+                JobID jobId,
+                Collection<ResultPartitionID> promotedPartitions) {
+            this.taskExecutorId = taskExecutorId;
+            this.jobId = jobId;
+            this.promotedPartitions = promotedPartitions;
+        }
+
+        public ResourceID getTaskExecutorId() {
+            return taskExecutorId;
+        }
+
+        public JobID getJobId() {
+            return jobId;
+        }
 
         public Collection<ResultPartitionID> getPromotedPartitions() {
             return promotedPartitions;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
index 130212bcf05..b3915a715d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 /** No-op implementation of {@link JobMasterPartitionTracker}. */
 public class NoOpJobMasterPartitionTracker implements 
JobMasterPartitionTracker {
@@ -50,8 +51,10 @@ public class NoOpJobMasterPartitionTracker implements 
JobMasterPartitionTracker
             Collection<ResultPartitionID> resultPartitionIds, boolean 
releaseOnShuffleMaster) {}
 
     @Override
-    public void stopTrackingAndReleaseOrPromotePartitions(
-            Collection<ResultPartitionID> resultPartitionIds) {}
+    public CompletableFuture<Void> stopTrackingAndPromotePartitions(
+            Collection<ResultPartitionID> resultPartitionIds) {
+        return null;
+    }
 
     @Override
     public Collection<ResultPartitionDeploymentDescriptor> 
getAllTrackedPartitions() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
index 8378befcfe0..59d6bd935fd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -41,8 +42,9 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
             startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
     private Consumer<Collection<ResultPartitionID>> 
stopTrackingAndReleasePartitionsConsumer =
             ignored -> {};
-    private Consumer<Collection<ResultPartitionID>>
-            stopTrackingAndReleaseOrPromotePartitionsConsumer = ignored -> {};
+
+    private Consumer<Collection<ResultPartitionID>> 
stopTrackingAndPromotePartitionsConsumer =
+            ignored -> {};
     private Consumer<Collection<ResultPartitionID>> 
stopTrackingPartitionsConsumer = ignored -> {};
     private Supplier<Collection<ResultPartitionDeploymentDescriptor>>
             getAllTrackedPartitionsSupplier = () -> Collections.emptyList();
@@ -73,13 +75,6 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
         this.stopTrackingAndReleasePartitionsConsumer = 
stopTrackingAndReleasePartitionsConsumer;
     }
 
-    public void setStopTrackingAndReleaseOrPromotePartitionsConsumer(
-            Consumer<Collection<ResultPartitionID>>
-                    stopTrackingAndReleaseOrPromotePartitionsConsumer) {
-        this.stopTrackingAndReleaseOrPromotePartitionsConsumer =
-                stopTrackingAndReleaseOrPromotePartitionsConsumer;
-    }
-
     public void setStopTrackingPartitionsConsumer(
             Consumer<Collection<ResultPartitionID>> 
stopTrackingPartitionsConsumer) {
         this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer;
@@ -91,6 +86,11 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
         this.getAllTrackedPartitionsSupplier = getAllTrackedPartitionsSupplier;
     }
 
+    public void setStopTrackingAndPromotePartitionsConsumer(
+            Consumer<Collection<ResultPartitionID>> 
stopTrackingAndPromotePartitionsConsumer) {
+        this.stopTrackingAndPromotePartitionsConsumer = 
stopTrackingAndPromotePartitionsConsumer;
+    }
+
     @Override
     public void startTrackingPartition(
             ResourceID producingTaskExecutorId,
@@ -112,6 +112,13 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
         stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds);
     }
 
+    @Override
+    public CompletableFuture<Void> stopTrackingAndPromotePartitions(
+            Collection<ResultPartitionID> resultPartitionIds) {
+        stopTrackingAndPromotePartitionsConsumer.accept(resultPartitionIds);
+        return CompletableFuture.completedFuture(null);
+    }
+
     @Override
     public Collection<PartitionTrackerEntry<ResourceID, 
ResultPartitionDeploymentDescriptor>>
             stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
@@ -119,12 +126,6 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
         return Collections.emptyList();
     }
 
-    @Override
-    public void stopTrackingAndReleaseOrPromotePartitions(
-            Collection<ResultPartitionID> resultPartitionIds) {
-        
stopTrackingAndReleaseOrPromotePartitionsConsumer.accept(resultPartitionIds);
-    }
-
     @Override
     public Collection<ResultPartitionDeploymentDescriptor> 
getAllTrackedPartitions() {
         return getAllTrackedPartitionsSupplier.get();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java
index 5180d1aeb26..2a8e95f4ed9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java
@@ -64,6 +64,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
@@ -208,8 +210,8 @@ public class JobMasterPartitionReleaseTest extends 
TestLogger {
         private final CompletableFuture<Collection<ResultPartitionID>> 
partitionsForRelease =
                 new CompletableFuture<>();
 
-        private final CompletableFuture<Collection<ResultPartitionID>>
-                partitionsForReleaseOrPromote = new CompletableFuture<>();
+        private final CompletableFuture<Collection<ResultPartitionID>> 
clusterPartitionsForPromote =
+                new CompletableFuture<>();
 
         private final JobMaster jobMaster;
 
@@ -235,8 +237,8 @@ public class JobMasterPartitionReleaseTest extends 
TestLogger {
                     taskExecutorIdForStopTracking::complete);
             partitionTracker.setStopTrackingAndReleasePartitionsConsumer(
                     partitionsForRelease::complete);
-            
partitionTracker.setStopTrackingAndReleaseOrPromotePartitionsConsumer(
-                    partitionsForReleaseOrPromote::complete);
+            partitionTracker.setStopTrackingAndPromotePartitionsConsumer(
+                    clusterPartitionsForPromote::complete);
 
             Configuration configuration = new Configuration();
             configuration.setString(
@@ -313,7 +315,14 @@ public class JobMasterPartitionReleaseTest extends 
TestLogger {
         }
 
         public CompletableFuture<Collection<ResultPartitionID>> 
getPartitionsForReleaseOrPromote() {
-            return partitionsForReleaseOrPromote;
+            return partitionsForRelease.thenCombine(
+                    clusterPartitionsForPromote,
+                    (resultPartitionIds, resultPartitionIds2) -> {
+                        Set<ResultPartitionID> res = new HashSet<>();
+                        res.addAll(resultPartitionIds);
+                        res.addAll(resultPartitionIds2);
+                        return res;
+                    });
         }
 
         public void close() throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d07a0434b14..0238ec9e5c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -149,20 +149,16 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
     public void testJobMasterConnectionTerminationAfterExternalRelease() 
throws Exception {
         testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
                 ((taskExecutorGateway, jobID, resultPartitionID) ->
-                        taskExecutorGateway.releaseOrPromotePartitions(
-                                jobID,
-                                Collections.singleton(resultPartitionID),
-                                Collections.emptySet())));
+                        taskExecutorGateway.releasePartitions(
+                                jobID, 
Collections.singleton(resultPartitionID))));
     }
 
     @Test
     public void testJobMasterConnectionTerminationAfterExternalPromotion() 
throws Exception {
         testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
                 ((taskExecutorGateway, jobID, resultPartitionID) ->
-                        taskExecutorGateway.releaseOrPromotePartitions(
-                                jobID,
-                                Collections.emptySet(),
-                                Collections.singleton(resultPartitionID))));
+                        taskExecutorGateway.promotePartitions(
+                                jobID, 
Collections.singleton(resultPartitionID))));
     }
 
     private void 
testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion(
@@ -225,8 +221,8 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
             trackerIsTrackingPartitions.set(true);
             assertThat(firstReleasePartitionsCallFuture.isDone(), is(false));
 
-            taskExecutorGateway.releaseOrPromotePartitions(
-                    jobId, Collections.singleton(new ResultPartitionID()), 
Collections.emptySet());
+            taskExecutorGateway.releasePartitions(
+                    jobId, Collections.singleton(new ResultPartitionID()));
 
             // at this point we only know that the TE has entered 
releasePartitions; we cannot be
             // certain whether it
@@ -276,10 +272,8 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                                     .getShuffleDescriptor()
                                     .getResultPartitionID();
 
-                    taskExecutorGateway.releaseOrPromotePartitions(
-                            jobId,
-                            Collections.singleton(resultPartitionId),
-                            Collections.emptySet());
+                    taskExecutorGateway.releasePartitions(
+                            jobId, Collections.singleton(resultPartitionId));
 
                     assertThat(releasePartitionsFuture.get(), 
hasItems(resultPartitionId));
                 });
@@ -299,10 +293,8 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                                     .getShuffleDescriptor()
                                     .getResultPartitionID();
 
-                    taskExecutorGateway.releaseOrPromotePartitions(
-                            jobId,
-                            Collections.emptySet(),
-                            Collections.singleton(resultPartitionId));
+                    taskExecutorGateway.promotePartitions(
+                            jobId, Collections.singleton(resultPartitionId));
 
                     assertThat(promotePartitionsFuture.get(), 
hasItems(resultPartitionId));
                 });
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 24c56284405..c699fab7b69 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -47,7 +47,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.QuadFunction;
-import org.apache.flink.util.function.TriConsumer;
 import org.apache.flink.util.function.TriFunction;
 
 import java.util.Collection;
@@ -92,9 +91,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
     private final Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier;
 
-    private final TriConsumer<JobID, Set<ResultPartitionID>, 
Set<ResultPartitionID>>
-            releaseOrPromotePartitionsConsumer;
-
+    private BiConsumer<JobID, Set<ResultPartitionID>> 
releasePartitionsConsumer;
+    private BiConsumer<JobID, Set<ResultPartitionID>> 
promotePartitionsConsumer;
     private final Consumer<Collection<IntermediateDataSetID>> 
releaseClusterPartitionsConsumer;
 
     private final TriFunction<
@@ -144,8 +142,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
             Consumer<Exception> disconnectResourceManagerConsumer,
             Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> 
cancelTaskFunction,
             Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier,
-            TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>>
-                    releaseOrPromotePartitionsConsumer,
+            BiConsumer<JobID, Set<ResultPartitionID>> 
releasePartitionsConsumer,
+            BiConsumer<JobID, Set<ResultPartitionID>> 
promotePartitionsConsumer,
             Consumer<Collection<IntermediateDataSetID>> 
releaseClusterPartitionsConsumer,
             TriFunction<
                             ExecutionAttemptID,
@@ -178,7 +176,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
         this.disconnectResourceManagerConsumer = 
disconnectResourceManagerConsumer;
         this.cancelTaskFunction = cancelTaskFunction;
         this.canBeReleasedSupplier = canBeReleasedSupplier;
-        this.releaseOrPromotePartitionsConsumer = 
releaseOrPromotePartitionsConsumer;
+        this.releasePartitionsConsumer = releasePartitionsConsumer;
+        this.promotePartitionsConsumer = promotePartitionsConsumer;
         this.releaseClusterPartitionsConsumer = 
releaseClusterPartitionsConsumer;
         this.operatorEventHandler = operatorEventHandler;
         this.requestThreadDumpSupplier = requestThreadDumpSupplier;
@@ -221,11 +220,15 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
     }
 
     @Override
-    public void releaseOrPromotePartitions(
-            JobID jobId,
-            Set<ResultPartitionID> partitionToRelease,
-            Set<ResultPartitionID> partitionsToPromote) {
-        releaseOrPromotePartitionsConsumer.accept(jobId, partitionToRelease, 
partitionsToPromote);
+    public void releasePartitions(JobID jobId, Set<ResultPartitionID> 
partitionIds) {
+        releasePartitionsConsumer.accept(jobId, partitionIds);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> promotePartitions(
+            JobID jobId, Set<ResultPartitionID> partitionIds) {
+        promotePartitionsConsumer.accept(jobId, partitionIds);
+        return CompletableFuture.completedFuture(Acknowledge.get());
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 4f519d18309..283a6c9ef35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -81,7 +81,12 @@ public class TestingTaskExecutorGatewayBuilder {
             NOOP_CANCEL_TASK_FUNCTION =
                     ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
     private static final TriConsumer<JobID, Set<ResultPartitionID>, 
Set<ResultPartitionID>>
-            NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, ignoredC) 
-> {};
+            NOOP_RELEASE_OR_PROMOTE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, 
ignoredC) -> {};
+
+    private static final BiConsumer<JobID, Set<ResultPartitionID>>
+            NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {};
+    private static final BiConsumer<JobID, Set<ResultPartitionID>>
+            NOOP_PROMOTE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {};
     private static final TriFunction<
                     ExecutionAttemptID,
                     OperatorID,
@@ -131,8 +136,11 @@ public class TestingTaskExecutorGatewayBuilder {
             NOOP_CANCEL_TASK_FUNCTION;
     private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier =
             () -> CompletableFuture.completedFuture(true);
-    private TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>>
-            releaseOrPromotePartitionsConsumer = 
NOOP_RELEASE_PARTITIONS_CONSUMER;
+
+    private BiConsumer<JobID, Set<ResultPartitionID>> 
releasePartitionsConsumer =
+            NOOP_RELEASE_PARTITIONS_CONSUMER;
+    private BiConsumer<JobID, Set<ResultPartitionID>> 
promotePartitionsConsumer =
+            NOOP_PROMOTE_PARTITIONS_CONSUMER;
     private Consumer<Collection<IntermediateDataSetID>> 
releaseClusterPartitionsConsumer =
             ignored -> {};
     private TriFunction<
@@ -239,10 +247,15 @@ public class TestingTaskExecutorGatewayBuilder {
         return this;
     }
 
-    public TestingTaskExecutorGatewayBuilder 
setReleaseOrPromotePartitionsConsumer(
-            TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>>
-                    releasePartitionsConsumer) {
-        this.releaseOrPromotePartitionsConsumer = releasePartitionsConsumer;
+    public TestingTaskExecutorGatewayBuilder setReleasePartitionsConsumer(
+            BiConsumer<JobID, Set<ResultPartitionID>> 
releasePartitionsConsumer) {
+        this.releasePartitionsConsumer = releasePartitionsConsumer;
+        return this;
+    }
+
+    public TestingTaskExecutorGatewayBuilder setPromotePartitionsConsumer(
+            BiConsumer<JobID, Set<ResultPartitionID>> 
promotePartitionsConsumer) {
+        this.promotePartitionsConsumer = promotePartitionsConsumer;
         return this;
     }
 
@@ -307,7 +320,8 @@ public class TestingTaskExecutorGatewayBuilder {
                 disconnectResourceManagerConsumer,
                 cancelTaskFunction,
                 canBeReleasedSupplier,
-                releaseOrPromotePartitionsConsumer,
+                releasePartitionsConsumer,
+                promotePartitionsConsumer,
                 releaseClusterPartitionsConsumer,
                 operatorEventHandler,
                 requestThreadDumpSupplier,

Reply via email to