[flink] 03/03: [FLINK-28860][runtime] JobMaster wait for partition promote before close

2022-09-07 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f599a8c444ab44660824a7b3e0a08a635c22d3f4
Author: sxnan 
AuthorDate: Tue Aug 9 21:07:17 2022 +0800

[FLINK-28860][runtime] JobMaster wait for partition promote before close
---
 .../partition/JobMasterPartitionTracker.java   |  27 ++-
 .../partition/JobMasterPartitionTrackerImpl.java   |  89 +++-
 .../TaskExecutorPartitionTrackerImpl.java  |  10 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  46 ++--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |   3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  33 ++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  20 +-
 .../TaskExecutorGatewayDecoratorBase.java  |  13 +-
 .../JobMasterPartitionTrackerImplTest.java | 239 +
 .../partition/NoOpJobMasterPartitionTracker.java   |   7 +-
 .../TestingJobMasterPartitionTracker.java  |  31 +--
 .../jobmaster/JobMasterPartitionReleaseTest.java   |  19 +-
 .../TaskExecutorPartitionLifecycleTest.java|  28 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |  27 +--
 .../TestingTaskExecutorGatewayBuilder.java |  30 ++-
 15 files changed, 378 insertions(+), 244 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index 2c116e4951e..fa6886f5076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
@@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker
 Collection resultPartitionIds, boolean 
releaseOnShuffleMaster);
 
 /**
- * Releases the job partitions and promotes the cluster partitions, and 
stops the tracking of
- * partitions that were released/promoted.
+ * Promotes the given partitions, and stops the tracking of partitions 
that were promoted.
+ *
+ * @param resultPartitionIds ID of the partition containing both job 
partitions and cluster
+ * partitions.
+ * @return Future that will be completed if the partitions are promoted.
  */
-void stopTrackingAndReleaseOrPromotePartitions(
+CompletableFuture stopTrackingAndPromotePartitions(
 Collection resultPartitionIds);
 
-/** Get all the partitions under tracking. */
+/** Gets all the partitions under tracking. */
 Collection getAllTrackedPartitions();
 
+/** Gets all the non-cluster partitions under tracking. */
+default Collection 
getAllTrackedNonClusterPartitions() {
+return getAllTrackedPartitions().stream()
+.filter(descriptor -> 
!descriptor.getPartitionType().isPersistent())
+.collect(Collectors.toList());
+}
+
+/** Gets all the cluster partitions under tracking. */
+default Collection 
getAllTrackedClusterPartitions() {
+return getAllTrackedPartitions().stream()
+.filter(descriptor -> 
descriptor.getPartitionType().isPersistent())
+.collect(Collectors.toList());
+}
+
 void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway);
 
 /** Get the shuffle descriptors of the cluster partitions ordered by 
partition number. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index 8c8069f9760..a908581b403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.Tas

[flink] 03/03: [FLINK-28860][runtime] JobMaster wait for partition promote before close

2022-09-07 Thread gaoyunhaii
This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7dd42617a46fcecfffbea3409391e204a40b9b1
Author: sxnan 
AuthorDate: Tue Aug 9 21:07:17 2022 +0800

[FLINK-28860][runtime] JobMaster wait for partition promote before close
---
 .../partition/JobMasterPartitionTracker.java   |  27 ++-
 .../partition/JobMasterPartitionTrackerImpl.java   |  89 +++-
 .../TaskExecutorPartitionTrackerImpl.java  |  10 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  46 ++--
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |   3 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  33 ++-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  20 +-
 .../TaskExecutorGatewayDecoratorBase.java  |  13 +-
 .../JobMasterPartitionTrackerImplTest.java | 239 +
 .../partition/NoOpJobMasterPartitionTracker.java   |   7 +-
 .../TestingJobMasterPartitionTracker.java  |  31 +--
 .../jobmaster/JobMasterPartitionReleaseTest.java   |  19 +-
 .../TaskExecutorPartitionLifecycleTest.java|  28 +--
 .../taskexecutor/TestingTaskExecutorGateway.java   |  27 +--
 .../TestingTaskExecutorGatewayBuilder.java |  30 ++-
 15 files changed, 378 insertions(+), 244 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index 2c116e4951e..fa6886f5076 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
@@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker
 Collection resultPartitionIds, boolean 
releaseOnShuffleMaster);
 
 /**
- * Releases the job partitions and promotes the cluster partitions, and 
stops the tracking of
- * partitions that were released/promoted.
+ * Promotes the given partitions, and stops the tracking of partitions 
that were promoted.
+ *
+ * @param resultPartitionIds ID of the partition containing both job 
partitions and cluster
+ * partitions.
+ * @return Future that will be completed if the partitions are promoted.
  */
-void stopTrackingAndReleaseOrPromotePartitions(
+CompletableFuture stopTrackingAndPromotePartitions(
 Collection resultPartitionIds);
 
-/** Get all the partitions under tracking. */
+/** Gets all the partitions under tracking. */
 Collection getAllTrackedPartitions();
 
+/** Gets all the non-cluster partitions under tracking. */
+default Collection 
getAllTrackedNonClusterPartitions() {
+return getAllTrackedPartitions().stream()
+.filter(descriptor -> 
!descriptor.getPartitionType().isPersistent())
+.collect(Collectors.toList());
+}
+
+/** Gets all the cluster partitions under tracking. */
+default Collection 
getAllTrackedClusterPartitions() {
+return getAllTrackedPartitions().stream()
+.filter(descriptor -> 
descriptor.getPartitionType().isPersistent())
+.collect(Collectors.toList());
+}
+
 void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway);
 
 /** Get the shuffle descriptors of the cluster partitions ordered by 
partition number. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index 8c8069f9760..a908581b403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecu