[flink] 03/03: [FLINK-28860][runtime] JobMaster wait for partition promote before close
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git commit f599a8c444ab44660824a7b3e0a08a635c22d3f4 Author: sxnan AuthorDate: Tue Aug 9 21:07:17 2022 +0800 [FLINK-28860][runtime] JobMaster wait for partition promote before close --- .../partition/JobMasterPartitionTracker.java | 27 ++- .../partition/JobMasterPartitionTrackerImpl.java | 89 +++- .../TaskExecutorPartitionTrackerImpl.java | 10 + .../apache/flink/runtime/jobmaster/JobMaster.java | 46 ++-- .../runtime/jobmaster/RpcTaskManagerGateway.java | 3 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 33 ++- .../runtime/taskexecutor/TaskExecutorGateway.java | 20 +- .../TaskExecutorGatewayDecoratorBase.java | 13 +- .../JobMasterPartitionTrackerImplTest.java | 239 + .../partition/NoOpJobMasterPartitionTracker.java | 7 +- .../TestingJobMasterPartitionTracker.java | 31 +-- .../jobmaster/JobMasterPartitionReleaseTest.java | 19 +- .../TaskExecutorPartitionLifecycleTest.java| 28 +-- .../taskexecutor/TestingTaskExecutorGateway.java | 27 +-- .../TestingTaskExecutorGatewayBuilder.java | 30 ++- 15 files changed, 378 insertions(+), 244 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java index 2c116e4951e..fa6886f5076 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. @@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker Collection resultPartitionIds, boolean releaseOnShuffleMaster); /** - * Releases the job partitions and promotes the cluster partitions, and stops the tracking of - * partitions that were released/promoted. + * Promotes the given partitions, and stops the tracking of partitions that were promoted. + * + * @param resultPartitionIds ID of the partition containing both job partitions and cluster + * partitions. + * @return Future that will be completed if the partitions are promoted. */ -void stopTrackingAndReleaseOrPromotePartitions( +CompletableFuture stopTrackingAndPromotePartitions( Collection resultPartitionIds); -/** Get all the partitions under tracking. */ +/** Gets all the partitions under tracking. */ Collection getAllTrackedPartitions(); +/** Gets all the non-cluster partitions under tracking. */ +default Collection getAllTrackedNonClusterPartitions() { +return getAllTrackedPartitions().stream() +.filter(descriptor -> !descriptor.getPartitionType().isPersistent()) +.collect(Collectors.toList()); +} + +/** Gets all the cluster partitions under tracking. */ +default Collection getAllTrackedClusterPartitions() { +return getAllTrackedPartitions().stream() +.filter(descriptor -> descriptor.getPartitionType().isPersistent()) +.collect(Collectors.toList()); +} + void connectToResourceManager(ResourceManagerGateway resourceManagerGateway); /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java index 8c8069f9760..a908581b403 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java @@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.Tas
[flink] 03/03: [FLINK-28860][runtime] JobMaster wait for partition promote before close
This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b7dd42617a46fcecfffbea3409391e204a40b9b1 Author: sxnan AuthorDate: Tue Aug 9 21:07:17 2022 +0800 [FLINK-28860][runtime] JobMaster wait for partition promote before close --- .../partition/JobMasterPartitionTracker.java | 27 ++- .../partition/JobMasterPartitionTrackerImpl.java | 89 +++- .../TaskExecutorPartitionTrackerImpl.java | 10 + .../apache/flink/runtime/jobmaster/JobMaster.java | 46 ++-- .../runtime/jobmaster/RpcTaskManagerGateway.java | 3 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 33 ++- .../runtime/taskexecutor/TaskExecutorGateway.java | 20 +- .../TaskExecutorGatewayDecoratorBase.java | 13 +- .../JobMasterPartitionTrackerImplTest.java | 239 + .../partition/NoOpJobMasterPartitionTracker.java | 7 +- .../TestingJobMasterPartitionTracker.java | 31 +-- .../jobmaster/JobMasterPartitionReleaseTest.java | 19 +- .../TaskExecutorPartitionLifecycleTest.java| 28 +-- .../taskexecutor/TestingTaskExecutorGateway.java | 27 +-- .../TestingTaskExecutorGatewayBuilder.java | 30 ++- 15 files changed, 378 insertions(+), 244 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java index 2c116e4951e..fa6886f5076 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. @@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker Collection resultPartitionIds, boolean releaseOnShuffleMaster); /** - * Releases the job partitions and promotes the cluster partitions, and stops the tracking of - * partitions that were released/promoted. + * Promotes the given partitions, and stops the tracking of partitions that were promoted. + * + * @param resultPartitionIds ID of the partition containing both job partitions and cluster + * partitions. + * @return Future that will be completed if the partitions are promoted. */ -void stopTrackingAndReleaseOrPromotePartitions( +CompletableFuture stopTrackingAndPromotePartitions( Collection resultPartitionIds); -/** Get all the partitions under tracking. */ +/** Gets all the partitions under tracking. */ Collection getAllTrackedPartitions(); +/** Gets all the non-cluster partitions under tracking. */ +default Collection getAllTrackedNonClusterPartitions() { +return getAllTrackedPartitions().stream() +.filter(descriptor -> !descriptor.getPartitionType().isPersistent()) +.collect(Collectors.toList()); +} + +/** Gets all the cluster partitions under tracking. */ +default Collection getAllTrackedClusterPartitions() { +return getAllTrackedPartitions().stream() +.filter(descriptor -> descriptor.getPartitionType().isPersistent()) +.collect(Collectors.toList()); +} + void connectToResourceManager(ResourceManagerGateway resourceManagerGateway); /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java index 8c8069f9760..a908581b403 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java @@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecu