This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b7dd42617a46fcecfffbea3409391e204a40b9b1 Author: sxnan <suxuanna...@gmail.com> AuthorDate: Tue Aug 9 21:07:17 2022 +0800 [FLINK-28860][runtime] JobMaster wait for partition promote before close --- .../partition/JobMasterPartitionTracker.java | 27 ++- .../partition/JobMasterPartitionTrackerImpl.java | 89 +++----- .../TaskExecutorPartitionTrackerImpl.java | 10 + .../apache/flink/runtime/jobmaster/JobMaster.java | 46 ++-- .../runtime/jobmaster/RpcTaskManagerGateway.java | 3 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 33 ++- .../runtime/taskexecutor/TaskExecutorGateway.java | 20 +- .../TaskExecutorGatewayDecoratorBase.java | 13 +- .../JobMasterPartitionTrackerImplTest.java | 239 +++++++++++++-------- .../partition/NoOpJobMasterPartitionTracker.java | 7 +- .../TestingJobMasterPartitionTracker.java | 31 +-- .../jobmaster/JobMasterPartitionReleaseTest.java | 19 +- .../TaskExecutorPartitionLifecycleTest.java | 28 +-- .../taskexecutor/TestingTaskExecutorGateway.java | 27 +-- .../TestingTaskExecutorGatewayBuilder.java | 30 ++- 15 files changed, 378 insertions(+), 244 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java index 2c116e4951e..fa6886f5076 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; /** * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. @@ -57,15 +59,32 @@ public interface JobMasterPartitionTracker Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster); /** - * Releases the job partitions and promotes the cluster partitions, and stops the tracking of - * partitions that were released/promoted. + * Promotes the given partitions, and stops the tracking of partitions that were promoted. + * + * @param resultPartitionIds ID of the partition containing both job partitions and cluster + * partitions. + * @return Future that will be completed if the partitions are promoted. */ - void stopTrackingAndReleaseOrPromotePartitions( + CompletableFuture<Void> stopTrackingAndPromotePartitions( Collection<ResultPartitionID> resultPartitionIds); - /** Get all the partitions under tracking. */ + /** Gets all the partitions under tracking. */ Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions(); + /** Gets all the non-cluster partitions under tracking. */ + default Collection<ResultPartitionDeploymentDescriptor> getAllTrackedNonClusterPartitions() { + return getAllTrackedPartitions().stream() + .filter(descriptor -> !descriptor.getPartitionType().isPersistent()) + .collect(Collectors.toList()); + } + + /** Gets all the cluster partitions under tracking. */ + default Collection<ResultPartitionDeploymentDescriptor> getAllTrackedClusterPartitions() { + return getAllTrackedPartitions().stream() + .filter(descriptor -> descriptor.getPartitionType().isPersistent()) + .collect(Collectors.toList()); + } + void connectToResourceManager(ResourceManagerGateway resourceManagerGateway); /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java index 8c8069f9760..a908581b403 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java @@ -21,17 +21,22 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -114,11 +119,15 @@ public class JobMasterPartitionTrackerImpl } @Override - public void stopTrackingAndReleaseOrPromotePartitions( + public CompletableFuture<Void> stopTrackingAndPromotePartitions( Collection<ResultPartitionID> resultPartitionIds) { + List<CompletableFuture<Acknowledge>> promoteFutures = new ArrayList<>(); stopTrackingAndHandlePartitions( resultPartitionIds, - (tmID, partitionDescs) -> internalReleaseOrPromotePartitions(tmID, partitionDescs)); + (tmID, partitionDescs) -> + promoteFutures.add( + internalPromotePartitionsOnTaskExecutor(tmID, partitionDescs))); + return FutureUtils.completeAll(promoteFutures); } @Override @@ -185,14 +194,25 @@ public class JobMasterPartitionTrackerImpl } } - private void internalReleaseOrPromotePartitions( + private CompletableFuture<Acknowledge> internalPromotePartitionsOnTaskExecutor( ResourceID potentialPartitionLocation, - Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { + Collection<ResultPartitionDeploymentDescriptor> clusterPartitionDeploymentDescriptors) { + final Set<ResultPartitionID> partitionsRequiringRpcPromoteCalls = + clusterPartitionDeploymentDescriptors.stream() + .filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources) + .map(JobMasterPartitionTrackerImpl::getResultPartitionId) + .collect(Collectors.toSet()); - internalReleaseOrPromotePartitionsOnTaskExecutor( - potentialPartitionLocation, partitionDeploymentDescriptors); - internalReleasePartitionsOnShuffleMaster( - excludePersistentPartitions(partitionDeploymentDescriptors)); + if (!partitionsRequiringRpcPromoteCalls.isEmpty()) { + final TaskExecutorGateway taskExecutorGateway = + taskExecutorGatewayLookup.lookup(potentialPartitionLocation).orElse(null); + + if (taskExecutorGateway != null) { + return taskExecutorGateway.promotePartitions( + jobId, partitionsRequiringRpcPromoteCalls); + } + } + return CompletableFuture.completedFuture(null); } private void internalReleasePartitionsOnTaskExecutor( @@ -205,50 +225,13 @@ public class JobMasterPartitionTrackerImpl .map(JobMasterPartitionTrackerImpl::getResultPartitionId) .collect(Collectors.toSet()); - internalReleaseOrPromotePartitionsOnTaskExecutor( - potentialPartitionLocation, - partitionsRequiringRpcReleaseCalls, - Collections.emptySet()); - } - - private void internalReleaseOrPromotePartitionsOnTaskExecutor( - ResourceID potentialPartitionLocation, - Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { - - Map<Boolean, Set<ResultPartitionID>> partitionsToReleaseByPersistence = - partitionDeploymentDescriptors.stream() - .filter(JobMasterPartitionTrackerImpl::isPartitionWithLocalResources) - .collect( - Collectors.partitioningBy( - resultPartitionDeploymentDescriptor -> - resultPartitionDeploymentDescriptor - .getPartitionType() - .isPersistent(), - Collectors.mapping( - JobMasterPartitionTrackerImpl::getResultPartitionId, - Collectors.toSet()))); - - internalReleaseOrPromotePartitionsOnTaskExecutor( - potentialPartitionLocation, - partitionsToReleaseByPersistence.get(false), - partitionsToReleaseByPersistence.get(true)); - } - - private void internalReleaseOrPromotePartitionsOnTaskExecutor( - ResourceID potentialPartitionLocation, - Set<ResultPartitionID> partitionsRequiringRpcReleaseCalls, - Set<ResultPartitionID> partitionsRequiringRpcPromoteCalls) { - - if (!partitionsRequiringRpcReleaseCalls.isEmpty() - || !partitionsRequiringRpcPromoteCalls.isEmpty()) { + if (!partitionsRequiringRpcReleaseCalls.isEmpty()) { taskExecutorGatewayLookup .lookup(potentialPartitionLocation) .ifPresent( taskExecutorGateway -> - taskExecutorGateway.releaseOrPromotePartitions( - jobId, - partitionsRequiringRpcReleaseCalls, - partitionsRequiringRpcPromoteCalls)); + taskExecutorGateway.releasePartitions( + jobId, partitionsRequiringRpcReleaseCalls)); } } @@ -267,16 +250,6 @@ public class JobMasterPartitionTrackerImpl .isPresent(); } - private static Stream<ResultPartitionDeploymentDescriptor> excludePersistentPartitions( - Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { - return partitionDeploymentDescriptors.stream() - .filter( - resultPartitionDeploymentDescriptor -> - !resultPartitionDeploymentDescriptor - .getPartitionType() - .isPersistent()); - } - private static ResultPartitionID getResultPartitionId( ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { return resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java index e183ab9c2dd..17f526268e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java @@ -25,6 +25,9 @@ import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -39,6 +42,9 @@ public class TaskExecutorPartitionTrackerImpl extends AbstractPartitionTracker<JobID, TaskExecutorPartitionInfo> implements TaskExecutorPartitionTracker { + private static final Logger LOG = + LoggerFactory.getLogger(TaskExecutorPartitionTrackerImpl.class); + private final Map<IntermediateDataSetID, DataSetEntry> clusterPartitions = new HashMap<>(); private final ShuffleEnvironment<?, ?> shuffleEnvironment; @@ -58,6 +64,7 @@ public class TaskExecutorPartitionTrackerImpl @Override public void stopTrackingAndReleaseJobPartitions( Collection<ResultPartitionID> partitionsToRelease) { + LOG.debug("Releasing Job Partitions {}", partitionsToRelease); if (partitionsToRelease.isEmpty()) { return; } @@ -72,11 +79,14 @@ public class TaskExecutorPartitionTrackerImpl CollectionUtil.project( stopTrackingPartitionsFor(producingJobId), PartitionTrackerEntry::getResultPartitionId); + LOG.debug("Releasing Job Partitions {} for job {}", partitionsForJob, producingJobId); shuffleEnvironment.releasePartitionsLocally(partitionsForJob); } @Override public void promoteJobPartitions(Collection<ResultPartitionID> partitionsToPromote) { + LOG.debug("Promoting Job Partitions {}", partitionsToPromote); + if (partitionsToPromote.isEmpty()) { return; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 7ce83525c6e..3dfaa1c8776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1069,22 +1069,42 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> private void jobStatusChanged(final JobStatus newJobStatus) { validateRunsInMainThread(); if (newJobStatus.isGloballyTerminalState()) { - runAsync( - () -> { - Collection<ResultPartitionID> allTracked = - partitionTracker.getAllTrackedPartitions().stream() - .map(d -> d.getShuffleDescriptor().getResultPartitionID()) - .collect(Collectors.toList()); - if (newJobStatus == JobStatus.FINISHED) { - partitionTracker.stopTrackingAndReleaseOrPromotePartitions(allTracked); - } else { - partitionTracker.stopTrackingAndReleasePartitions(allTracked); - } - }); + CompletableFuture<Void> partitionPromoteFuture; + if (newJobStatus == JobStatus.FINISHED) { + Collection<ResultPartitionID> jobPartitions = + partitionTracker.getAllTrackedNonClusterPartitions().stream() + .map(d -> d.getShuffleDescriptor().getResultPartitionID()) + .collect(Collectors.toList()); + partitionTracker.stopTrackingAndReleasePartitions(jobPartitions); + Collection<ResultPartitionID> clusterPartitions = + partitionTracker.getAllTrackedClusterPartitions().stream() + .map(d -> d.getShuffleDescriptor().getResultPartitionID()) + .collect(Collectors.toList()); + partitionPromoteFuture = + partitionTracker.stopTrackingAndPromotePartitions(clusterPartitions); + } else { + Collection<ResultPartitionID> allTracked = + partitionTracker.getAllTrackedPartitions().stream() + .map(d -> d.getShuffleDescriptor().getResultPartitionID()) + .collect(Collectors.toList()); + partitionTracker.stopTrackingAndReleasePartitions(allTracked); + partitionPromoteFuture = CompletableFuture.completedFuture(null); + } final ExecutionGraphInfo executionGraphInfo = schedulerNG.requestJob(); + futureExecutor.execute( - () -> jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo)); + () -> { + try { + partitionPromoteFuture.get(); + } catch (Throwable e) { + // We do not want to fail the job in case of partition releasing and + // promoting fail. The TaskExecutors will release the partitions + // eventually when they find out the JobMaster is closed. + log.warn("Fail to release or promote partitions", e); + } + jobCompletionActions.jobReachedGloballyTerminalState(executionGraphInfo); + }); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index e0326bab665..eea64cd1e54 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -34,7 +34,6 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; -import java.util.Collections; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -76,7 +75,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { @Override public void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds) { - taskExecutorGateway.releaseOrPromotePartitions(jobId, partitionIds, Collections.emptySet()); + taskExecutorGateway.releasePartitions(jobId, partitionIds); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f4481ea9d80..a1204956776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -906,27 +906,40 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public void releaseOrPromotePartitions( - JobID jobId, - Set<ResultPartitionID> partitionToRelease, - Set<ResultPartitionID> partitionsToPromote) { + public void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds) { + try { + partitionTracker.stopTrackingAndReleaseJobPartitions(partitionIds); + closeJobManagerConnectionIfNoAllocatedResources(jobId); + } catch (Throwable t) { + onFatalError(t); + } + } + + @Override + public CompletableFuture<Acknowledge> promotePartitions( + JobID jobId, Set<ResultPartitionID> partitionIds) { + CompletableFuture<Acknowledge> future = new CompletableFuture<>(); try { - partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease); - partitionTracker.promoteJobPartitions(partitionsToPromote); + partitionTracker.promoteJobPartitions(partitionIds); if (establishedResourceManagerConnection != null) { establishedResourceManagerConnection .getResourceManagerGateway() .reportClusterPartitions( - getResourceID(), partitionTracker.createClusterPartitionReport()); + getResourceID(), partitionTracker.createClusterPartitionReport()) + .thenAccept(ignore -> future.complete(Acknowledge.get())); + } else { + future.completeExceptionally( + new RuntimeException( + "Task executor is not connecting to ResourceManager. " + + "Fail to report cluster partition to ResourceManager")); } closeJobManagerConnectionIfNoAllocatedResources(jobId); } catch (Throwable t) { - // TODO: Do we still need this catch branch? + future.completeExceptionally(t); onFatalError(t); } - // TODO: Maybe it's better to return an Acknowledge here to notify the JM about the - // success/failure with an Exception + return future; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 44c4689b3d1..25d00e89f4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -101,16 +101,22 @@ public interface TaskExecutorGateway @RpcTimeout Time timeout); /** - * Batch release/promote intermediate result partitions. + * Batch release intermediate result partitions. * * @param jobId id of the job that the partitions belong to - * @param partitionToRelease partition ids to release - * @param partitionsToPromote partitions ids to promote + * @param partitionIds partition ids to release */ - void releaseOrPromotePartitions( - JobID jobId, - Set<ResultPartitionID> partitionToRelease, - Set<ResultPartitionID> partitionsToPromote); + void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds); + + /** + * Batch promote intermediate result partitions. + * + * @param jobId id of the job that the partitions belong to + * @param partitionIds partition ids to release + * @return Future acknowledge that the partitions are successfully promoted. + */ + CompletableFuture<Acknowledge> promotePartitions( + JobID jobId, Set<ResultPartitionID> partitionIds); /** * Releases all cluster partitions belong to any of the given data sets. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java index 543d56d8fa9..fa86655bd68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java @@ -106,11 +106,14 @@ public class TaskExecutorGatewayDecoratorBase implements TaskExecutorGateway { } @Override - public void releaseOrPromotePartitions( - JobID jobId, - Set<ResultPartitionID> partitionToRelease, - Set<ResultPartitionID> partitionsToPromote) { - originalGateway.releaseOrPromotePartitions(jobId, partitionToRelease, partitionsToPromote); + public void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds) { + originalGateway.releasePartitions(jobId, partitionIds); + } + + @Override + public CompletableFuture<Acknowledge> promotePartitions( + JobID jobId, Set<ResultPartitionID> partitionIds) { + return originalGateway.promotePartitions(jobId, partitionIds); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java index 83af9e0c5bb..d9b6794c3dc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java @@ -31,6 +31,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; +import org.assertj.core.api.Assertions; import org.junit.Test; import java.util.ArrayList; @@ -47,7 +48,6 @@ import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; @@ -91,12 +91,15 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); final JobID jobId = new JobID(); - final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<ReleaseCall> releaseCall = new ArrayBlockingQueue<>(4); final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl( jobId, shuffleMaster, - tmId -> Optional.of(createTaskExecutorGateway(tmId, releaseCalls))); + tmId -> + Optional.of( + createTaskExecutorGateway( + tmId, releaseCall, new ArrayBlockingQueue<>(4)))); final ResourceID tmId = ResourceID.generate(); final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -110,12 +113,11 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId)); - assertEquals(1, releaseCalls.size()); - ReleaseCall releaseCall = releaseCalls.remove(); - assertEquals(tmId, releaseCall.getTaskExecutorId()); - assertEquals(jobId, releaseCall.getJobId()); - assertThat(releaseCall.getReleasedPartitions(), contains(resultPartitionId)); - assertThat(releaseCall.getPromotedPartitions(), is(empty())); + assertEquals(1, releaseCall.size()); + ReleaseCall releaseOrPromoteCall = releaseCall.remove(); + assertEquals(tmId, releaseOrPromoteCall.getTaskExecutorId()); + assertEquals(jobId, releaseOrPromoteCall.getJobId()); + assertThat(releaseOrPromoteCall.getReleasedPartitions(), contains(resultPartitionId)); assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); assertEquals(resultPartitionId, shuffleMaster.externallyReleasedPartitions.remove()); assertThat(partitionTracker.isTrackingPartitionsFor(tmId), is(false)); @@ -126,11 +128,15 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl( new JobID(), shuffleMaster, - tmId -> Optional.of(createTaskExecutorGateway(tmId, releaseCalls))); + tmId -> + Optional.of( + createTaskExecutorGateway( + tmId, releaseCalls, promoteCalls))); final ResourceID tmId = ResourceID.generate(); final ResultPartitionID resultPartitionId = new ResultPartitionID(); @@ -144,6 +150,7 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { partitionTracker.stopTrackingAndReleasePartitions(Arrays.asList(resultPartitionId)); assertEquals(0, releaseCalls.size()); + assertEquals(0, promoteCalls.size()); assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); assertThat(shuffleMaster.externallyReleasedPartitions, contains(resultPartitionId)); } @@ -152,7 +159,8 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { public void testStopTrackingIssuesNoReleaseCalls() { final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); - final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); final JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl( new JobID(), @@ -160,7 +168,7 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { resourceId -> Optional.of( createTaskExecutorGateway( - resourceId, taskExecutorReleaseCalls))); + resourceId, releaseCalls, promoteCalls))); final ResourceID taskExecutorId1 = ResourceID.generate(); final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); @@ -172,7 +180,8 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { partitionTracker.stopTrackingPartitionsFor(taskExecutorId1); - assertEquals(0, taskExecutorReleaseCalls.size()); + assertEquals(0, releaseCalls.size()); + assertEquals(0, promoteCalls.size()); assertEquals(0, shuffleMaster.externallyReleasedPartitions.size()); } @@ -180,7 +189,8 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { public void testTrackingInternalAndExternalPartitionsByTmId() { final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); - final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); final JobMasterPartitionTrackerImpl partitionTracker = new JobMasterPartitionTrackerImpl( new JobID(), @@ -188,7 +198,7 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { resourceId -> Optional.of( createTaskExecutorGateway( - resourceId, taskExecutorReleaseCalls))); + resourceId, releaseCalls, promoteCalls))); final ResourceID taskExecutorId = ResourceID.generate(); final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); @@ -227,10 +237,88 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { } @Test - public void testReleaseOrPromote() { + public void testGetJobPartitionClusterPartition() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + + final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); + final JobMasterPartitionTrackerImpl partitionTracker = + new JobMasterPartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> + Optional.of( + createTaskExecutorGateway( + resourceId, releaseCalls, promoteCalls))); + + final ResourceID taskExecutorId = ResourceID.generate(); + final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); + final ResultPartitionID resultPartitionId2 = new ResultPartitionID(); + + final ResultPartitionDeploymentDescriptor clusterPartition = + AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor( + resultPartitionId1, ResultPartitionType.BLOCKING_PERSISTENT, false); + final ResultPartitionDeploymentDescriptor jobPartition = + AbstractPartitionTrackerTest.createResultPartitionDeploymentDescriptor( + resultPartitionId2, false); + partitionTracker.startTrackingPartition(taskExecutorId, clusterPartition); + partitionTracker.startTrackingPartition(taskExecutorId, jobPartition); + + Assertions.assertThat(partitionTracker.getAllTrackedNonClusterPartitions()) + .containsExactly(jobPartition); + Assertions.assertThat(partitionTracker.getAllTrackedClusterPartitions()) + .containsExactly(clusterPartition); + } + + @Test + public void testGetShuffleDescriptors() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); + + final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); + final JobMasterPartitionTrackerImpl partitionTracker = + new JobMasterPartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> + Optional.of( + createTaskExecutorGateway( + resourceId, releaseCalls, promoteCalls))); + + TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + partitionTracker.connectToResourceManager(resourceManagerGateway); + partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); + + assertThat( + resourceManagerGateway.requestedIntermediateDataSetIds, + contains(intermediateDataSetId)); + } + + @Test(expected = NullPointerException.class) + public void testGetShuffleDescriptorsBeforeConnectToResourceManager() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); + + final Queue<ReleaseCall> releaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> promoteCalls = new ArrayBlockingQueue<>(4); + final JobMasterPartitionTrackerImpl partitionTracker = + new JobMasterPartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> + Optional.of( + createTaskExecutorGateway( + resourceId, releaseCalls, promoteCalls))); + partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); + } + + @Test + public void testReleaseJobPartitionPromoteClusterPartition() { final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); - final Queue<ReleaseCall> taskExecutorReleaseOrPromoteCalls = new ArrayBlockingQueue<>(4); + final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final Queue<PromoteCall> taskExecutorPromoteCalls = new ArrayBlockingQueue<>(4); final JobMasterPartitionTracker partitionTracker = new JobMasterPartitionTrackerImpl( new JobID(), @@ -238,7 +326,9 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { resourceId -> Optional.of( createTaskExecutorGateway( - resourceId, taskExecutorReleaseOrPromoteCalls))); + resourceId, + taskExecutorReleaseCalls, + taskExecutorPromoteCalls))); final ResourceID taskExecutorId1 = ResourceID.generate(); final ResultPartitionID jobPartitionId0 = new ResultPartitionID(); @@ -272,26 +362,24 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { clusterPartitionId1, ResultPartitionType.BLOCKING_PERSISTENT, false); partitionTracker.startTrackingPartition(taskExecutorId1, clusterPartition1); - partitionTracker.stopTrackingAndReleaseOrPromotePartitions( - Arrays.asList( - jobPartitionId0, - jobPartitionId1, - clusterPartitionId0, - clusterPartitionId1)); + partitionTracker.stopTrackingAndReleasePartitions( + Arrays.asList(jobPartitionId0, jobPartitionId1)); + partitionTracker.stopTrackingAndPromotePartitions( + Arrays.asList(clusterPartitionId0, clusterPartitionId1)); // Exactly one call should have been made to the hosting task executor - assertEquals(1, taskExecutorReleaseOrPromoteCalls.size()); + assertEquals(1, taskExecutorReleaseCalls.size()); + assertEquals(1, taskExecutorPromoteCalls.size()); - final ReleaseCall taskExecutorReleaseOrPromoteCall = - taskExecutorReleaseOrPromoteCalls.remove(); + final ReleaseCall releaseCall = taskExecutorReleaseCalls.remove(); + + final PromoteCall promoteCall = taskExecutorPromoteCalls.remove(); // One local partition released and one local partition promoted. assertEquals( - jobPartitionId0, - Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getReleasedPartitions())); + jobPartitionId0, Iterables.getOnlyElement(releaseCall.getReleasedPartitions())); assertEquals( - clusterPartitionId0, - Iterables.getOnlyElement(taskExecutorReleaseOrPromoteCall.getPromotedPartitions())); + clusterPartitionId0, Iterables.getOnlyElement(promoteCall.getPromotedPartitions())); // Both internal and external partitions will be fed into shuffle-master for releasing. Collection<ResultPartitionID> externallyReleasedPartitions = @@ -300,58 +388,19 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { externallyReleasedPartitions, containsInAnyOrder(jobPartitionId0, jobPartitionId1)); } - @Test - public void testGetShuffleDescriptors() { - final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); - IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); - - final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); - final JobMasterPartitionTrackerImpl partitionTracker = - new JobMasterPartitionTrackerImpl( - new JobID(), - shuffleMaster, - resourceId -> - Optional.of( - createTaskExecutorGateway( - resourceId, taskExecutorReleaseCalls))); - - TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - partitionTracker.connectToResourceManager(resourceManagerGateway); - partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); - - assertThat( - resourceManagerGateway.requestedIntermediateDataSetIds, - contains(intermediateDataSetId)); - } - - @Test(expected = NullPointerException.class) - public void testGetShuffleDescriptorsBeforeConnectToResourceManager() { - final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); - IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); - - final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); - final JobMasterPartitionTrackerImpl partitionTracker = - new JobMasterPartitionTrackerImpl( - new JobID(), - shuffleMaster, - resourceId -> - Optional.of( - createTaskExecutorGateway( - resourceId, taskExecutorReleaseCalls))); - partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); - } - private static TaskExecutorGateway createTaskExecutorGateway( - ResourceID taskExecutorId, Collection<ReleaseCall> releaseOrPromoteCalls) { + ResourceID taskExecutorId, + Collection<ReleaseCall> releaseCalls, + Collection<PromoteCall> promoteCalls) { return new TestingTaskExecutorGatewayBuilder() - .setReleaseOrPromotePartitionsConsumer( - (jobId, partitionsToRelease, partitionsToPromote) -> - releaseOrPromoteCalls.add( - new ReleaseCall( - taskExecutorId, - jobId, - partitionsToRelease, - partitionsToPromote))) + .setReleasePartitionsConsumer( + (jobId, partitions) -> + releaseCalls.add( + new ReleaseCall(taskExecutorId, jobId, partitions))) + .setPromotePartitionsConsumer( + (jobId, partitions) -> + promoteCalls.add( + new PromoteCall(taskExecutorId, jobId, partitions))) .createTestingTaskExecutorGateway(); } @@ -391,17 +440,14 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { private final ResourceID taskExecutorId; private final JobID jobId; private final Collection<ResultPartitionID> releasedPartitions; - private final Collection<ResultPartitionID> promotedPartitions; private ReleaseCall( ResourceID taskExecutorId, JobID jobId, - Collection<ResultPartitionID> releasedPartitions, - Collection<ResultPartitionID> promotedPartitions) { + Collection<ResultPartitionID> releasedPartitions) { this.taskExecutorId = taskExecutorId; this.jobId = jobId; this.releasedPartitions = releasedPartitions; - this.promotedPartitions = promotedPartitions; } public ResourceID getTaskExecutorId() { @@ -415,6 +461,29 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { public Collection<ResultPartitionID> getReleasedPartitions() { return releasedPartitions; } + } + + private static class PromoteCall { + private final ResourceID taskExecutorId; + private final JobID jobId; + private final Collection<ResultPartitionID> promotedPartitions; + + private PromoteCall( + ResourceID taskExecutorId, + JobID jobId, + Collection<ResultPartitionID> promotedPartitions) { + this.taskExecutorId = taskExecutorId; + this.jobId = jobId; + this.promotedPartitions = promotedPartitions; + } + + public ResourceID getTaskExecutorId() { + return taskExecutorId; + } + + public JobID getJobId() { + return jobId; + } public Collection<ResultPartitionID> getPromotedPartitions() { return promotedPartitions; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java index 130212bcf05..b3915a715d9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; /** No-op implementation of {@link JobMasterPartitionTracker}. */ public class NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker { @@ -50,8 +51,10 @@ public class NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker Collection<ResultPartitionID> resultPartitionIds, boolean releaseOnShuffleMaster) {} @Override - public void stopTrackingAndReleaseOrPromotePartitions( - Collection<ResultPartitionID> resultPartitionIds) {} + public CompletableFuture<Void> stopTrackingAndPromotePartitions( + Collection<ResultPartitionID> resultPartitionIds) { + return null; + } @Override public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java index 8378befcfe0..59d6bd935fd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -41,8 +42,9 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {}; private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {}; - private Consumer<Collection<ResultPartitionID>> - stopTrackingAndReleaseOrPromotePartitionsConsumer = ignored -> {}; + + private Consumer<Collection<ResultPartitionID>> stopTrackingAndPromotePartitionsConsumer = + ignored -> {}; private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {}; private Supplier<Collection<ResultPartitionDeploymentDescriptor>> getAllTrackedPartitionsSupplier = () -> Collections.emptyList(); @@ -73,13 +75,6 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer; } - public void setStopTrackingAndReleaseOrPromotePartitionsConsumer( - Consumer<Collection<ResultPartitionID>> - stopTrackingAndReleaseOrPromotePartitionsConsumer) { - this.stopTrackingAndReleaseOrPromotePartitionsConsumer = - stopTrackingAndReleaseOrPromotePartitionsConsumer; - } - public void setStopTrackingPartitionsConsumer( Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) { this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer; @@ -91,6 +86,11 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack this.getAllTrackedPartitionsSupplier = getAllTrackedPartitionsSupplier; } + public void setStopTrackingAndPromotePartitionsConsumer( + Consumer<Collection<ResultPartitionID>> stopTrackingAndPromotePartitionsConsumer) { + this.stopTrackingAndPromotePartitionsConsumer = stopTrackingAndPromotePartitionsConsumer; + } + @Override public void startTrackingPartition( ResourceID producingTaskExecutorId, @@ -112,6 +112,13 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds); } + @Override + public CompletableFuture<Void> stopTrackingAndPromotePartitions( + Collection<ResultPartitionID> resultPartitionIds) { + stopTrackingAndPromotePartitionsConsumer.accept(resultPartitionIds); + return CompletableFuture.completedFuture(null); + } + @Override public Collection<PartitionTrackerEntry<ResourceID, ResultPartitionDeploymentDescriptor>> stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) { @@ -119,12 +126,6 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack return Collections.emptyList(); } - @Override - public void stopTrackingAndReleaseOrPromotePartitions( - Collection<ResultPartitionID> resultPartitionIds) { - stopTrackingAndReleaseOrPromotePartitionsConsumer.accept(resultPartitionIds); - } - @Override public Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions() { return getAllTrackedPartitionsSupplier.get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java index 5180d1aeb26..2a8e95f4ed9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterPartitionReleaseTest.java @@ -64,6 +64,8 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -208,8 +210,8 @@ public class JobMasterPartitionReleaseTest extends TestLogger { private final CompletableFuture<Collection<ResultPartitionID>> partitionsForRelease = new CompletableFuture<>(); - private final CompletableFuture<Collection<ResultPartitionID>> - partitionsForReleaseOrPromote = new CompletableFuture<>(); + private final CompletableFuture<Collection<ResultPartitionID>> clusterPartitionsForPromote = + new CompletableFuture<>(); private final JobMaster jobMaster; @@ -235,8 +237,8 @@ public class JobMasterPartitionReleaseTest extends TestLogger { taskExecutorIdForStopTracking::complete); partitionTracker.setStopTrackingAndReleasePartitionsConsumer( partitionsForRelease::complete); - partitionTracker.setStopTrackingAndReleaseOrPromotePartitionsConsumer( - partitionsForReleaseOrPromote::complete); + partitionTracker.setStopTrackingAndPromotePartitionsConsumer( + clusterPartitionsForPromote::complete); Configuration configuration = new Configuration(); configuration.setString( @@ -313,7 +315,14 @@ public class JobMasterPartitionReleaseTest extends TestLogger { } public CompletableFuture<Collection<ResultPartitionID>> getPartitionsForReleaseOrPromote() { - return partitionsForReleaseOrPromote; + return partitionsForRelease.thenCombine( + clusterPartitionsForPromote, + (resultPartitionIds, resultPartitionIds2) -> { + Set<ResultPartitionID> res = new HashSet<>(); + res.addAll(resultPartitionIds); + res.addAll(resultPartitionIds2); + return res; + }); } public void close() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index d07a0434b14..0238ec9e5c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -149,20 +149,16 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { public void testJobMasterConnectionTerminationAfterExternalRelease() throws Exception { testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion( ((taskExecutorGateway, jobID, resultPartitionID) -> - taskExecutorGateway.releaseOrPromotePartitions( - jobID, - Collections.singleton(resultPartitionID), - Collections.emptySet()))); + taskExecutorGateway.releasePartitions( + jobID, Collections.singleton(resultPartitionID)))); } @Test public void testJobMasterConnectionTerminationAfterExternalPromotion() throws Exception { testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion( ((taskExecutorGateway, jobID, resultPartitionID) -> - taskExecutorGateway.releaseOrPromotePartitions( - jobID, - Collections.emptySet(), - Collections.singleton(resultPartitionID)))); + taskExecutorGateway.promotePartitions( + jobID, Collections.singleton(resultPartitionID)))); } private void testJobMasterConnectionTerminationAfterExternalReleaseOrPromotion( @@ -225,8 +221,8 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { trackerIsTrackingPartitions.set(true); assertThat(firstReleasePartitionsCallFuture.isDone(), is(false)); - taskExecutorGateway.releaseOrPromotePartitions( - jobId, Collections.singleton(new ResultPartitionID()), Collections.emptySet()); + taskExecutorGateway.releasePartitions( + jobId, Collections.singleton(new ResultPartitionID())); // at this point we only know that the TE has entered releasePartitions; we cannot be // certain whether it @@ -276,10 +272,8 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { .getShuffleDescriptor() .getResultPartitionID(); - taskExecutorGateway.releaseOrPromotePartitions( - jobId, - Collections.singleton(resultPartitionId), - Collections.emptySet()); + taskExecutorGateway.releasePartitions( + jobId, Collections.singleton(resultPartitionId)); assertThat(releasePartitionsFuture.get(), hasItems(resultPartitionId)); }); @@ -299,10 +293,8 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { .getShuffleDescriptor() .getResultPartitionID(); - taskExecutorGateway.releaseOrPromotePartitions( - jobId, - Collections.emptySet(), - Collections.singleton(resultPartitionId)); + taskExecutorGateway.promotePartitions( + jobId, Collections.singleton(resultPartitionId)); assertThat(promotePartitionsFuture.get(), hasItems(resultPartitionId)); }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 24c56284405..c699fab7b69 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -47,7 +47,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.QuadFunction; -import org.apache.flink.util.function.TriConsumer; import org.apache.flink.util.function.TriFunction; import java.util.Collection; @@ -92,9 +91,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { private final Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier; - private final TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> - releaseOrPromotePartitionsConsumer; - + private BiConsumer<JobID, Set<ResultPartitionID>> releasePartitionsConsumer; + private BiConsumer<JobID, Set<ResultPartitionID>> promotePartitionsConsumer; private final Consumer<Collection<IntermediateDataSetID>> releaseClusterPartitionsConsumer; private final TriFunction< @@ -144,8 +142,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { Consumer<Exception> disconnectResourceManagerConsumer, Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction, Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier, - TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> - releaseOrPromotePartitionsConsumer, + BiConsumer<JobID, Set<ResultPartitionID>> releasePartitionsConsumer, + BiConsumer<JobID, Set<ResultPartitionID>> promotePartitionsConsumer, Consumer<Collection<IntermediateDataSetID>> releaseClusterPartitionsConsumer, TriFunction< ExecutionAttemptID, @@ -178,7 +176,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { this.disconnectResourceManagerConsumer = disconnectResourceManagerConsumer; this.cancelTaskFunction = cancelTaskFunction; this.canBeReleasedSupplier = canBeReleasedSupplier; - this.releaseOrPromotePartitionsConsumer = releaseOrPromotePartitionsConsumer; + this.releasePartitionsConsumer = releasePartitionsConsumer; + this.promotePartitionsConsumer = promotePartitionsConsumer; this.releaseClusterPartitionsConsumer = releaseClusterPartitionsConsumer; this.operatorEventHandler = operatorEventHandler; this.requestThreadDumpSupplier = requestThreadDumpSupplier; @@ -221,11 +220,15 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override - public void releaseOrPromotePartitions( - JobID jobId, - Set<ResultPartitionID> partitionToRelease, - Set<ResultPartitionID> partitionsToPromote) { - releaseOrPromotePartitionsConsumer.accept(jobId, partitionToRelease, partitionsToPromote); + public void releasePartitions(JobID jobId, Set<ResultPartitionID> partitionIds) { + releasePartitionsConsumer.accept(jobId, partitionIds); + } + + @Override + public CompletableFuture<Acknowledge> promotePartitions( + JobID jobId, Set<ResultPartitionID> partitionIds) { + promotePartitionsConsumer.accept(jobId, partitionIds); + return CompletableFuture.completedFuture(Acknowledge.get()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java index 4f519d18309..283a6c9ef35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java @@ -81,7 +81,12 @@ public class TestingTaskExecutorGatewayBuilder { NOOP_CANCEL_TASK_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private static final TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> - NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, ignoredC) -> {}; + NOOP_RELEASE_OR_PROMOTE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, ignoredC) -> {}; + + private static final BiConsumer<JobID, Set<ResultPartitionID>> + NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {}; + private static final BiConsumer<JobID, Set<ResultPartitionID>> + NOOP_PROMOTE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {}; private static final TriFunction< ExecutionAttemptID, OperatorID, @@ -131,8 +136,11 @@ public class TestingTaskExecutorGatewayBuilder { NOOP_CANCEL_TASK_FUNCTION; private Supplier<CompletableFuture<Boolean>> canBeReleasedSupplier = () -> CompletableFuture.completedFuture(true); - private TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> - releaseOrPromotePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER; + + private BiConsumer<JobID, Set<ResultPartitionID>> releasePartitionsConsumer = + NOOP_RELEASE_PARTITIONS_CONSUMER; + private BiConsumer<JobID, Set<ResultPartitionID>> promotePartitionsConsumer = + NOOP_PROMOTE_PARTITIONS_CONSUMER; private Consumer<Collection<IntermediateDataSetID>> releaseClusterPartitionsConsumer = ignored -> {}; private TriFunction< @@ -239,10 +247,15 @@ public class TestingTaskExecutorGatewayBuilder { return this; } - public TestingTaskExecutorGatewayBuilder setReleaseOrPromotePartitionsConsumer( - TriConsumer<JobID, Set<ResultPartitionID>, Set<ResultPartitionID>> - releasePartitionsConsumer) { - this.releaseOrPromotePartitionsConsumer = releasePartitionsConsumer; + public TestingTaskExecutorGatewayBuilder setReleasePartitionsConsumer( + BiConsumer<JobID, Set<ResultPartitionID>> releasePartitionsConsumer) { + this.releasePartitionsConsumer = releasePartitionsConsumer; + return this; + } + + public TestingTaskExecutorGatewayBuilder setPromotePartitionsConsumer( + BiConsumer<JobID, Set<ResultPartitionID>> promotePartitionsConsumer) { + this.promotePartitionsConsumer = promotePartitionsConsumer; return this; } @@ -307,7 +320,8 @@ public class TestingTaskExecutorGatewayBuilder { disconnectResourceManagerConsumer, cancelTaskFunction, canBeReleasedSupplier, - releaseOrPromotePartitionsConsumer, + releasePartitionsConsumer, + promotePartitionsConsumer, releaseClusterPartitionsConsumer, operatorEventHandler, requestThreadDumpSupplier,