This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a9f2e20375f [FLINK-27523][runtime] Runtime supports producing and consuming cached intermediate results a9f2e20375f is described below commit a9f2e20375f669a5f42944c3ba12a903a4624e43 Author: sxnan <suxuanna...@gmail.com> AuthorDate: Wed May 25 10:55:41 2022 +0800 [FLINK-27523][runtime] Runtime supports producing and consuming cached intermediate results This closes #19653. --- .../TaskDeploymentDescriptorFactory.java | 76 +++++- .../executiongraph/DefaultExecutionGraph.java | 7 + .../executiongraph/EdgeManagerBuildUtil.java | 6 +- .../InternalExecutionGraphAccessor.java | 8 +- .../RegionPartitionGroupReleaseStrategy.java | 3 +- .../network/partition/ClusterPartitionManager.java | 23 ++ .../io/network/partition/DataSetMetaInfo.java | 19 ++ .../partition/JobMasterPartitionTracker.java | 10 + .../partition/JobMasterPartitionTrackerImpl.java | 37 +++ .../partition/ResourceManagerPartitionTracker.java | 10 + .../ResourceManagerPartitionTrackerImpl.java | 21 +- .../partition/TaskExecutorPartitionInfo.java | 15 +- .../TaskExecutorPartitionTrackerImpl.java | 21 +- .../apache/flink/runtime/jobgraph/JobVertex.java | 29 ++- .../apache/flink/runtime/jobmaster/JobMaster.java | 1 + .../runtime/resourcemanager/ResourceManager.java | 19 ++ ...achedIntermediateDataSetCorruptedException.java | 44 ++++ .../flink/runtime/scheduler/DefaultScheduler.java | 34 ++- .../scheduler/strategy/ConsumedPartitionGroup.java | 24 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 7 +- .../partition/ClusterPartitionReport.java | 22 +- .../TaskDeploymentDescriptorFactoryTest.java | 3 +- .../JobMasterPartitionTrackerImplTest.java | 58 +++++ .../partition/NoOpJobMasterPartitionTracker.java | 13 + .../NoOpResourceManagerPartitionTracker.java | 8 + .../ResourceManagerPartitionTrackerImplTest.java | 67 ++++- .../TaskExecutorPartitionTrackerImplTest.java | 80 +++++- .../TestingJobMasterPartitionTracker.java | 13 + .../jobmaster/JobIntermediateDatasetReuseTest.java | 270 +++++++++++++++++++++ .../ResourceManagerPartitionLifecycleTest.java | 36 ++- .../utils/TestingResourceManagerGateway.java | 15 ++ .../adapter/DefaultExecutionVertexTest.java | 4 +- .../runtime/scheduler/adaptive/ExecutingTest.java | 9 + .../strategy/TestingSchedulingExecutionVertex.java | 8 +- .../strategy/TestingSchedulingTopology.java | 3 +- .../runtime/taskexecutor/TaskExecutorTest.java | 19 +- 36 files changed, 971 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java index dbe20738531..6da8f4fabca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java @@ -39,11 +39,13 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobType; +import org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor; import org.apache.flink.types.Either; import org.apache.flink.util.CompressedSerializedValue; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; @@ -51,7 +53,9 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; @@ -71,6 +75,8 @@ public class TaskDeploymentDescriptorFactory { private final Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever; private final BlobWriter blobWriter; + private final Map<IntermediateDataSetID, ShuffleDescriptor[]> + consumedClusterPartitionShuffleDescriptors; private TaskDeploymentDescriptorFactory( ExecutionAttemptID executionId, @@ -81,7 +87,9 @@ public class TaskDeploymentDescriptorFactory { List<ConsumedPartitionGroup> consumedPartitionGroups, Function<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitionRetriever, - BlobWriter blobWriter) { + BlobWriter blobWriter, + Map<IntermediateDataSetID, ShuffleDescriptor[]> + consumedClusterPartitionShuffleDescriptors) { this.executionId = executionId; this.serializedJobInformation = serializedJobInformation; this.taskInfo = taskInfo; @@ -90,6 +98,8 @@ public class TaskDeploymentDescriptorFactory { this.consumedPartitionGroups = consumedPartitionGroups; this.resultPartitionRetriever = resultPartitionRetriever; this.blobWriter = blobWriter; + this.consumedClusterPartitionShuffleDescriptors = + consumedClusterPartitionShuffleDescriptors; } public TaskDeploymentDescriptor createDeploymentDescriptor( @@ -137,6 +147,19 @@ public class TaskDeploymentDescriptorFactory { consumedIntermediateResult, consumedPartitionGroup))); } + for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry : + consumedClusterPartitionShuffleDescriptors.entrySet()) { + // For FLIP-205, the JobGraph generating side ensure that the cluster partition is + // produced with only one subpartition. Therefore, we always consume the partition with + // subpartition index of 0. + inputGates.add( + new InputGateDeploymentDescriptor( + entry.getKey(), + ResultPartitionType.BLOCKING_PERSISTENT, + 0, + entry.getValue())); + } + return inputGates; } @@ -231,9 +254,22 @@ public class TaskDeploymentDescriptorFactory { } public static TaskDeploymentDescriptorFactory fromExecutionVertex( - ExecutionVertex executionVertex) throws IOException { + ExecutionVertex executionVertex) + throws IOException, CachedIntermediateDataSetCorruptedException { InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor(); + Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors; + try { + clusterPartitionShuffleDescriptors = + getClusterPartitionShuffleDescriptors(executionVertex); + } catch (Throwable e) { + throw new CachedIntermediateDataSetCorruptedException( + e, + executionVertex + .getJobVertex() + .getJobVertex() + .getIntermediateDataSetIdsToConsume()); + } return new TaskDeploymentDescriptorFactory( executionVertex.getCurrentExecutionAttempt().getAttemptId(), @@ -244,7 +280,41 @@ public class TaskDeploymentDescriptorFactory { internalExecutionGraphAccessor.getPartitionLocationConstraint(), executionVertex.getAllConsumedPartitionGroups(), internalExecutionGraphAccessor::getResultPartitionOrThrow, - internalExecutionGraphAccessor.getBlobWriter()); + internalExecutionGraphAccessor.getBlobWriter(), + clusterPartitionShuffleDescriptors); + } + + private static Map<IntermediateDataSetID, ShuffleDescriptor[]> + getClusterPartitionShuffleDescriptors(ExecutionVertex executionVertex) { + final InternalExecutionGraphAccessor internalExecutionGraphAccessor = + executionVertex.getExecutionGraphAccessor(); + final List<IntermediateDataSetID> consumedClusterDataSetIds = + executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume(); + Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors = + new HashMap<>(); + + for (IntermediateDataSetID consumedClusterDataSetId : consumedClusterDataSetIds) { + List<? extends ShuffleDescriptor> shuffleDescriptors = + internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors( + consumedClusterDataSetId); + + // For FLIP-205, the job graph generating side makes sure that the producer and consumer + // of the cluster partition have the same parallelism and each consumer Task consumes + // one output partition of the producer. + Preconditions.checkState( + executionVertex.getTotalNumberOfParallelSubtasks() == shuffleDescriptors.size(), + "The parallelism (%s) of the cache consuming job vertex is " + + "different from the number of shuffle descriptors (%s) of the intermediate data set", + executionVertex.getTotalNumberOfParallelSubtasks(), + shuffleDescriptors.size()); + + clusterPartitionShuffleDescriptors.put( + consumedClusterDataSetId, + new ShuffleDescriptor[] { + shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex()) + }); + } + return clusterPartitionShuffleDescriptors; } private static MaybeOffloaded<JobInformation> getSerializedJobInformation( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index 713f185b8cf..a1e91a2d801 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -73,6 +73,7 @@ import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; @@ -1602,4 +1603,10 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG public ExecutionGraphID getExecutionGraphID() { return executionGraphId; } + + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetID); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java index 9ba16aa8eb9..3aa6e59de0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java @@ -182,7 +182,8 @@ public class EdgeManagerBuildUtil { IntermediateResultPartitionID consumedPartitionId, IntermediateResult intermediateResult) { ConsumedPartitionGroup consumedPartitionGroup = - ConsumedPartitionGroup.fromSinglePartition(consumedPartitionId); + ConsumedPartitionGroup.fromSinglePartition( + consumedPartitionId, intermediateResult.getResultType()); registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, intermediateResult); return consumedPartitionGroup; } @@ -191,7 +192,8 @@ public class EdgeManagerBuildUtil { List<IntermediateResultPartitionID> consumedPartitions, IntermediateResult intermediateResult) { ConsumedPartitionGroup consumedPartitionGroup = - ConsumedPartitionGroup.fromMultiplePartitions(consumedPartitions); + ConsumedPartitionGroup.fromMultiplePartitions( + consumedPartitions, intermediateResult.getResultType()); registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, intermediateResult); return consumedPartitionGroup; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java index 2e2cd3733bf..8ef23b2d834 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java @@ -26,9 +26,11 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.types.Either; import org.apache.flink.util.SerializedValue; @@ -64,7 +66,7 @@ public interface InternalExecutionGraphAccessor { @Nonnull ComponentMainThreadExecutor getJobMasterMainThreadExecutor(); - ShuffleMaster<?> getShuffleMaster(); + ShuffleMaster<? extends ShuffleDescriptor> getShuffleMaster(); JobMasterPartitionTracker getPartitionTracker(); @@ -114,4 +116,8 @@ public interface InternalExecutionGraphAccessor { boolean isDynamic(); ExecutionGraphID getExecutionGraphID(); + + /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ + List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateResultPartition); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java index 06490de9027..a837df72808 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java @@ -147,7 +147,8 @@ public class RegionPartitionGroupReleaseStrategy for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) { final ConsumerRegionGroupExecutionView consumerRegionGroup = partitionGroupConsumerRegions.get(consumedPartitionGroup); - if (consumerRegionGroup.isFinished()) { + if (consumerRegionGroup.isFinished() + && !consumedPartitionGroup.getResultPartitionType().isPersistent()) { // At present, there's only one ConsumerVertexGroup for each // ConsumedPartitionGroup, so if a ConsumedPartitionGroup is fully consumed, all // its partitions are releasable. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java index 2e2aa6722e6..9a08e3cc569 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java @@ -17,8 +17,12 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -39,4 +43,23 @@ public interface ClusterPartitionManager { * @return future that is completed once all partitions have been released */ CompletableFuture<Void> releaseClusterPartitions(IntermediateDataSetID dataSetToRelease); + + /** + * Report the cluster partitions status in the task executor. + * + * @param taskExecutorId The id of the task executor. + * @param clusterPartitionReport The status of the cluster partitions. + * @return future that is completed once the report have been processed. + */ + CompletableFuture<Void> reportClusterPartitions( + ResourceID taskExecutorId, ClusterPartitionReport clusterPartitionReport); + + /** + * Get the shuffle descriptors of the cluster partitions ordered by partition number. + * + * @param intermediateDataSetID The id of the dataset. + * @return shuffle descriptors of the cluster partitions. + */ + CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java index cb2a39f3159..ae446ad5bf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java @@ -18,9 +18,14 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.util.Preconditions; +import java.util.Comparator; +import java.util.Map; import java.util.OptionalInt; +import java.util.SortedMap; +import java.util.TreeMap; /** Container for meta-data of a data set. */ public final class DataSetMetaInfo { @@ -28,6 +33,10 @@ public final class DataSetMetaInfo { private final int numRegisteredPartitions; private final int numTotalPartitions; + private final SortedMap<ResultPartitionID, ShuffleDescriptor> + shuffleDescriptorsOrderByPartitionId = + new TreeMap<>( + Comparator.comparingInt(o -> o.getPartitionId().getPartitionNumber())); private DataSetMetaInfo(int numRegisteredPartitions, int numTotalPartitions) { this.numRegisteredPartitions = numRegisteredPartitions; @@ -44,6 +53,16 @@ public final class DataSetMetaInfo { return numTotalPartitions; } + public DataSetMetaInfo addShuffleDescriptors( + Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) { + this.shuffleDescriptorsOrderByPartitionId.putAll(shuffleDescriptors); + return this; + } + + public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() { + return this.shuffleDescriptorsOrderByPartitionId; + } + static DataSetMetaInfo withoutNumRegisteredPartitions(int numTotalPartitions) { return new DataSetMetaInfo(UNKNOWN, numTotalPartitions); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java index cb7c2bef862..2c116e4951e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java @@ -19,8 +19,12 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; +import java.util.List; /** * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. @@ -61,4 +65,10 @@ public interface JobMasterPartitionTracker /** Get all the partitions under tracking. */ Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions(); + + void connectToResourceManager(ResourceManagerGateway resourceManagerGateway); + + /** Get the shuffle descriptors of the cluster partitions ordered by partition number. */ + List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java index 0e6a6c38810..8c8069f9760 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java @@ -20,11 +20,15 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.util.Preconditions; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -52,6 +56,9 @@ public class JobMasterPartitionTrackerImpl private final ShuffleMaster<?> shuffleMaster; private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + private ResourceManagerGateway resourceManagerGateway; + private final Map<IntermediateDataSetID, List<ShuffleDescriptor>> + clusterPartitionShuffleDescriptors; public JobMasterPartitionTrackerImpl( JobID jobId, @@ -61,6 +68,7 @@ public class JobMasterPartitionTrackerImpl this.jobId = Preconditions.checkNotNull(jobId); this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + this.clusterPartitionShuffleDescriptors = new HashMap<>(); } @Override @@ -118,6 +126,35 @@ public class JobMasterPartitionTrackerImpl return partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList()); } + @Override + public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { + this.resourceManagerGateway = resourceManagerGateway; + } + + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return clusterPartitionShuffleDescriptors.computeIfAbsent( + intermediateDataSetID, this::requestShuffleDescriptorsFromResourceManager); + } + + private List<ShuffleDescriptor> requestShuffleDescriptorsFromResourceManager( + IntermediateDataSetID intermediateDataSetID) { + Preconditions.checkNotNull( + resourceManagerGateway, "JobMaster is not connected to ResourceManager"); + try { + return this.resourceManagerGateway + .getClusterPartitionsShuffleDescriptors(intermediateDataSetID) + .get(); + } catch (Throwable e) { + throw new RuntimeException( + String.format( + "Failed to get shuffle descriptors of intermediate dataset %s from ResourceManager", + intermediateDataSetID), + e); + } + } + private void stopTrackingAndHandlePartitions( Collection<ResultPartitionID> resultPartitionIds, BiConsumer<ResourceID, Collection<ResultPartitionDeploymentDescriptor>> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java index dc8fb41b1d3..b6ee0d765e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -64,4 +66,12 @@ public interface ResourceManagerPartitionTracker { * @return tracked datasets */ Map<IntermediateDataSetID, DataSetMetaInfo> listDataSets(); + + /** + * Returns all the shuffle descriptors of cluster partitions for the intermediate dataset. + * + * @param dataSetID The id of the intermediate dataset. + * @return the shuffle descriptors. + */ + List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(IntermediateDataSetID dataSetID); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java index be8534a7723..72a322f0118 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java @@ -20,16 +20,19 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -100,6 +103,17 @@ public class ResourceManagerPartitionTrackerImpl implements ResourceManagerParti return partitionReleaseCompletionFuture; } + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID dataSetID) { + final DataSetMetaInfo dataSetMetaInfo = this.dataSetMetaInfo.get(dataSetID); + if (dataSetMetaInfo == null) { + return Collections.emptyList(); + } + + return new ArrayList<>(dataSetMetaInfo.getShuffleDescriptors().values()); + } + private void internalProcessClusterPartitionReport( ResourceID taskExecutorId, ClusterPartitionReport clusterPartitionReport) { final Set<IntermediateDataSetID> dataSetsWithLostPartitions = @@ -194,13 +208,16 @@ public class ResourceManagerPartitionTrackerImpl implements ResourceManagerParti if (dataSetMetaInfo == null) { return DataSetMetaInfo .withoutNumRegisteredPartitions( - entry.getNumTotalPartitions()); + entry.getNumTotalPartitions()) + .addShuffleDescriptors( + entry.getShuffleDescriptors()); } else { // double check that the meta data is consistent Preconditions.checkState( dataSetMetaInfo.getNumTotalPartitions() == entry.getNumTotalPartitions()); - return dataSetMetaInfo; + return dataSetMetaInfo.addShuffleDescriptors( + entry.getShuffleDescriptors()); } })); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java index db0618a8453..ddbfbc5fb78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Objects; @@ -28,17 +29,17 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Encapsulates meta-information the TaskExecutor requires to be kept for each partition. */ public final class TaskExecutorPartitionInfo { - private final ResultPartitionID resultPartitionId; private final IntermediateDataSetID intermediateDataSetId; + private final ShuffleDescriptor shuffleDescriptor; private final int numberOfPartitions; public TaskExecutorPartitionInfo( - ResultPartitionID resultPartitionId, + ShuffleDescriptor shuffleDescriptor, IntermediateDataSetID intermediateDataSetId, int numberOfPartitions) { - this.resultPartitionId = checkNotNull(resultPartitionId); this.intermediateDataSetId = checkNotNull(intermediateDataSetId); + this.shuffleDescriptor = checkNotNull(shuffleDescriptor); checkArgument(numberOfPartitions > 0); this.numberOfPartitions = numberOfPartitions; } @@ -48,7 +49,7 @@ public final class TaskExecutorPartitionInfo { } public ResultPartitionID getResultPartitionId() { - return resultPartitionId; + return shuffleDescriptor.getResultPartitionID(); } public int getNumberOfPartitions() { @@ -77,8 +78,12 @@ public final class TaskExecutorPartitionInfo { public static TaskExecutorPartitionInfo from( ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { return new TaskExecutorPartitionInfo( - resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(), + resultPartitionDeploymentDescriptor.getShuffleDescriptor(), resultPartitionDeploymentDescriptor.getResultId(), resultPartitionDeploymentDescriptor.getTotalNumberOfPartitions()); } + + public ShuffleDescriptor getShuffleDescriptor() { + return shuffleDescriptor; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java index c34f77000d4..e183ab9c2dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.CollectionUtil; @@ -26,7 +27,6 @@ import org.apache.flink.util.Preconditions; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,7 +91,7 @@ public class TaskExecutorPartitionTrackerImpl clusterPartitions.computeIfAbsent( dataSetMetaInfo.getIntermediateDataSetId(), ignored -> new DataSetEntry(dataSetMetaInfo.getNumberOfPartitions())); - dataSetEntry.addPartition(partitionTrackerEntry.getResultPartitionId()); + dataSetEntry.addPartition(partitionTrackerEntry.getMetaInfo().getShuffleDescriptor()); } } @@ -121,8 +121,8 @@ public class TaskExecutorPartitionTrackerImpl entry -> new ClusterPartitionReport.ClusterPartitionReportEntry( entry.getKey(), - entry.getValue().getPartitionIds(), - entry.getValue().getTotalNumberOfPartitions())) + entry.getValue().getTotalNumberOfPartitions(), + entry.getValue().getShuffleDescriptors())) .collect(Collectors.toList()); return new ClusterPartitionReport(reportEntries); @@ -130,23 +130,28 @@ public class TaskExecutorPartitionTrackerImpl private static class DataSetEntry { - private final Set<ResultPartitionID> partitionIds = new HashSet<>(); + private final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = + new HashMap<>(); private final int totalNumberOfPartitions; private DataSetEntry(int totalNumberOfPartitions) { this.totalNumberOfPartitions = totalNumberOfPartitions; } - void addPartition(ResultPartitionID resultPartitionId) { - partitionIds.add(resultPartitionId); + void addPartition(ShuffleDescriptor shuffleDescriptor) { + shuffleDescriptors.put(shuffleDescriptor.getResultPartitionID(), shuffleDescriptor); } public Set<ResultPartitionID> getPartitionIds() { - return partitionIds; + return shuffleDescriptors.keySet(); } public int getTotalNumberOfPartitions() { return totalNumberOfPartitions; } + + public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() { + return shuffleDescriptors; + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index 843ed765daf..d54056ef4f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -142,6 +142,11 @@ public class JobVertex implements java.io.Serializable { */ private String resultOptimizerProperties; + /** + * The intermediateDataSetId of the cached intermediate dataset that the job vertex consumes. + */ + private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = new ArrayList<>(); + // -------------------------------------------------------------------------------------------- /** @@ -467,10 +472,6 @@ public class JobVertex implements java.io.Serializable { } // -------------------------------------------------------------------------------------------- - private IntermediateDataSet createAndAddResultDataSet(ResultPartitionType partitionType) { - return createAndAddResultDataSet(new IntermediateDataSetID(), partitionType); - } - public IntermediateDataSet createAndAddResultDataSet( IntermediateDataSetID id, ResultPartitionType partitionType) { @@ -481,8 +482,18 @@ public class JobVertex implements java.io.Serializable { public JobEdge connectNewDataSetAsInput( JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) { + return connectNewDataSetAsInput( + input, distPattern, partitionType, new IntermediateDataSetID()); + } - IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType); + public JobEdge connectNewDataSetAsInput( + JobVertex input, + DistributionPattern distPattern, + ResultPartitionType partitionType, + IntermediateDataSetID intermediateDataSetId) { + + IntermediateDataSet dataSet = + input.createAndAddResultDataSet(intermediateDataSetId, partitionType); JobEdge edge = new JobEdge(dataSet, this, distPattern); this.inputs.add(edge); @@ -568,6 +579,14 @@ public class JobVertex implements java.io.Serializable { this.resultOptimizerProperties = resultOptimizerProperties; } + public void addIntermediateDataSetIdToConsume(IntermediateDataSetID intermediateDataSetId) { + intermediateDataSetIdsToConsume.add(intermediateDataSetId); + } + + public List<IntermediateDataSetID> getIntermediateDataSetIdsToConsume() { + return intermediateDataSetIdsToConsume; + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 54b1954cc57..476802ca7f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1125,6 +1125,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> resourceManagerGateway, resourceManagerResourceId); slotPoolService.connectToResourceManager(resourceManagerGateway); + partitionTracker.connectToResourceManager(resourceManagerGateway); resourceManagerHeartbeatManager.monitorTarget( resourceManagerResourceId, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 3cd99ca0cf7..634b56f1e32 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcServiceUtils; import org.apache.flink.runtime.security.token.DelegationTokenManager; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.slots.ResourceRequirement; import org.apache.flink.runtime.slots.ResourceRequirements; import org.apache.flink.runtime.taskexecutor.FileType; @@ -72,6 +73,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway; +import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.concurrent.FutureUtils; @@ -81,6 +83,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -777,6 +780,22 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> return clusterPartitionTracker.releaseClusterPartitions(dataSetId); } + @Override + public CompletableFuture<Void> reportClusterPartitions( + ResourceID taskExecutorId, ClusterPartitionReport clusterPartitionReport) { + clusterPartitionTracker.processTaskExecutorClusterPartitionReport( + taskExecutorId, clusterPartitionReport); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return CompletableFuture.completedFuture( + clusterPartitionTracker.getClusterPartitionShuffleDescriptors( + intermediateDataSetID)); + } + @Override public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSets() { return CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java new file mode 100644 index 00000000000..1740a79fd31 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.throwable.ThrowableAnnotation; +import org.apache.flink.runtime.throwable.ThrowableType; + +import java.util.List; + +/** Indicates some task fail to consume cached intermediate dataset. */ +@ThrowableAnnotation(ThrowableType.NonRecoverableError) +public class CachedIntermediateDataSetCorruptedException extends JobException { + private final List<IntermediateDataSetID> corruptedIntermediateDataSetID; + + public CachedIntermediateDataSetCorruptedException( + Throwable cause, List<IntermediateDataSetID> corruptedIntermediateDataSetID) { + super( + String.format( + "Corrupted intermediate dataset IDs: %s", corruptedIntermediateDataSetID), + cause); + this.corruptedIntermediateDataSetID = corruptedIntermediateDataSetID; + } + + public List<IntermediateDataSetID> getCorruptedIntermediateDataSetID() { + return corruptedIntermediateDataSetID; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 9a1f88feb66..e7869b6c9a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -36,6 +36,9 @@ import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy; import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult; import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionException; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; @@ -62,6 +65,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -293,15 +297,39 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio private void handleTaskFailure( final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { + Throwable revisedError = + maybeTranslateToCachedIntermediateDataSetException(error, executionVertexId); final long timestamp = System.currentTimeMillis(); - setGlobalFailureCause(error, timestamp); - notifyCoordinatorsAboutTaskFailure(executionVertexId, error); + setGlobalFailureCause(revisedError, timestamp); + notifyCoordinatorsAboutTaskFailure(executionVertexId, revisedError); final FailureHandlingResult failureHandlingResult = executionFailureHandler.getFailureHandlingResult( - executionVertexId, error, timestamp); + executionVertexId, revisedError, timestamp); maybeRestartTasks(failureHandlingResult); } + private Throwable maybeTranslateToCachedIntermediateDataSetException( + @Nullable Throwable cause, ExecutionVertexID failedVertex) { + if (!(cause instanceof PartitionException)) { + return cause; + } + + final List<IntermediateDataSetID> intermediateDataSetIdsToConsume = + getExecutionJobVertex(failedVertex.getJobVertexId()) + .getJobVertex() + .getIntermediateDataSetIdsToConsume(); + final IntermediateResultPartitionID failedPartitionId = + ((PartitionException) cause).getPartitionId().getPartitionId(); + + if (!intermediateDataSetIdsToConsume.contains( + failedPartitionId.getIntermediateDataSetID())) { + return cause; + } + + return new CachedIntermediateDataSetCorruptedException( + cause, Collections.singletonList(failedPartitionId.getIntermediateDataSetID())); + } + private void notifyCoordinatorsAboutTaskFailure( final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { final ExecutionJobVertex jobVertex = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java index 790f3b436ae..6e4672c48e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java @@ -19,8 +19,10 @@ package org.apache.flink.runtime.scheduler.strategy; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.util.Preconditions; import java.util.Collections; import java.util.Iterator; @@ -38,11 +40,16 @@ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartit private final IntermediateDataSetID intermediateDataSetID; - private ConsumedPartitionGroup(List<IntermediateResultPartitionID> resultPartitions) { + private final ResultPartitionType resultPartitionType; + + private ConsumedPartitionGroup( + List<IntermediateResultPartitionID> resultPartitions, + ResultPartitionType resultPartitionType) { checkArgument( resultPartitions.size() > 0, "The size of result partitions in the ConsumedPartitionGroup should be larger than 0."); this.intermediateDataSetID = resultPartitions.get(0).getIntermediateDataSetID(); + this.resultPartitionType = Preconditions.checkNotNull(resultPartitionType); // Sanity check: all the partitions in one ConsumedPartitionGroup should have the same // IntermediateDataSetID @@ -56,13 +63,16 @@ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartit } public static ConsumedPartitionGroup fromMultiplePartitions( - List<IntermediateResultPartitionID> resultPartitions) { - return new ConsumedPartitionGroup(resultPartitions); + List<IntermediateResultPartitionID> resultPartitions, + ResultPartitionType resultPartitionType) { + return new ConsumedPartitionGroup(resultPartitions, resultPartitionType); } public static ConsumedPartitionGroup fromSinglePartition( - IntermediateResultPartitionID resultPartition) { - return new ConsumedPartitionGroup(Collections.singletonList(resultPartition)); + IntermediateResultPartitionID resultPartition, + ResultPartitionType resultPartitionType) { + return new ConsumedPartitionGroup( + Collections.singletonList(resultPartition), resultPartitionType); } @Override @@ -103,4 +113,8 @@ public class ConsumedPartitionGroup implements Iterable<IntermediateResultPartit public boolean areAllPartitionsFinished() { return unfinishedPartitions.get() == 0; } + + public ResultPartitionType getResultPartitionType() { + return resultPartitionType; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8e7ec03781a..55bc617f5b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -907,7 +907,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { try { partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease); partitionTracker.promoteJobPartitions(partitionsToPromote); - + if (establishedResourceManagerConnection != null) { + establishedResourceManagerConnection + .getResourceManagerGateway() + .reportClusterPartitions( + getResourceID(), partitionTracker.createClusterPartitionReport()); + } closeJobManagerConnectionIfNoAllocatedResources(jobId); } catch (Throwable t) { // TODO: Do we still need this catch branch? diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java index 72f54808c54..a85ccf6efc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java @@ -19,10 +19,12 @@ package org.apache.flink.runtime.taskexecutor.partition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.util.Preconditions; import java.io.Serializable; import java.util.Collection; +import java.util.Map; import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -56,22 +58,22 @@ public class ClusterPartitionReport implements Serializable { private static final long serialVersionUID = -666517548300250601L; private final IntermediateDataSetID dataSetId; - private final Set<ResultPartitionID> hostedPartitions; + private final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors; private final int numTotalPartitions; public ClusterPartitionReportEntry( IntermediateDataSetID dataSetId, - Set<ResultPartitionID> hostedPartitions, - int numTotalPartitions) { + int numTotalPartitions, + Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) { Preconditions.checkNotNull(dataSetId); - Preconditions.checkNotNull(hostedPartitions); - Preconditions.checkArgument(!hostedPartitions.isEmpty()); + Preconditions.checkNotNull(shuffleDescriptors); + Preconditions.checkArgument(!shuffleDescriptors.isEmpty()); Preconditions.checkArgument(numTotalPartitions > 0); - Preconditions.checkState(hostedPartitions.size() <= numTotalPartitions); + Preconditions.checkState(shuffleDescriptors.size() <= numTotalPartitions); this.dataSetId = dataSetId; - this.hostedPartitions = hostedPartitions; this.numTotalPartitions = numTotalPartitions; + this.shuffleDescriptors = shuffleDescriptors; } public IntermediateDataSetID getDataSetId() { @@ -79,11 +81,15 @@ public class ClusterPartitionReport implements Serializable { } public Set<ResultPartitionID> getHostedPartitions() { - return hostedPartitions; + return shuffleDescriptors.keySet(); } public int getNumTotalPartitions() { return numTotalPartitions; } + + public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() { + return shuffleDescriptors; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java index 75bceb6882b..7dba9971c56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource; @@ -161,7 +162,7 @@ public class TaskDeploymentDescriptorFactoryTest extends TestLogger { } private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) - throws IOException { + throws IOException, CachedIntermediateDataSetCorruptedException { return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev) .createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java index 1f287c4a777..83af9e0c5bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ProducerDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; @@ -35,6 +36,8 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -297,6 +300,47 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { externallyReleasedPartitions, containsInAnyOrder(jobPartitionId0, jobPartitionId1)); } + @Test + public void testGetShuffleDescriptors() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); + + final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final JobMasterPartitionTrackerImpl partitionTracker = + new JobMasterPartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> + Optional.of( + createTaskExecutorGateway( + resourceId, taskExecutorReleaseCalls))); + + TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + partitionTracker.connectToResourceManager(resourceManagerGateway); + partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); + + assertThat( + resourceManagerGateway.requestedIntermediateDataSetIds, + contains(intermediateDataSetId)); + } + + @Test(expected = NullPointerException.class) + public void testGetShuffleDescriptorsBeforeConnectToResourceManager() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + IntermediateDataSetID intermediateDataSetId = new IntermediateDataSetID(); + + final Queue<ReleaseCall> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final JobMasterPartitionTrackerImpl partitionTracker = + new JobMasterPartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> + Optional.of( + createTaskExecutorGateway( + resourceId, taskExecutorReleaseCalls))); + partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId); + } + private static TaskExecutorGateway createTaskExecutorGateway( ResourceID taskExecutorId, Collection<ReleaseCall> releaseOrPromoteCalls) { return new TestingTaskExecutorGatewayBuilder() @@ -329,6 +373,20 @@ public class JobMasterPartitionTrackerImplTest extends TestLogger { } } + private static class TestingResourceManagerGateway + extends org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway { + + private final List<IntermediateDataSetID> requestedIntermediateDataSetIds = + new ArrayList<>(); + + @Override + public CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + requestedIntermediateDataSetIds.add(intermediateDataSetID); + return CompletableFuture.completedFuture(Collections.emptyList()); + } + } + private static class ReleaseCall { private final ResourceID taskExecutorId; private final JobID jobId; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java index 8572f2e63b0..087a29613e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.Collections; +import java.util.List; /** No-op implementation of {@link JobMasterPartitionTracker}. */ public enum NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker { @@ -53,6 +57,15 @@ public enum NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker { return Collections.emptyList(); } + @Override + public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {} + + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return Collections.emptyList(); + } + @Override public Collection<PartitionTrackerEntry<ResourceID, ResultPartitionDeploymentDescriptor>> stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java index 003cdb76f1a..be31e2bc131 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java @@ -19,9 +19,11 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -49,6 +51,12 @@ public enum NoOpResourceManagerPartitionTracker implements ResourceManagerPartit return Collections.emptyMap(); } + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID dataSetID) { + return Collections.emptyList(); + } + @SuppressWarnings( "unused") // unused parameter allows usage as a ResourceManagerPartitionTrackerFactory public static ResourceManagerPartitionTracker get( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java index c3925894d3e..83246ac5b20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java @@ -19,7 +19,10 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.TestLogger; @@ -28,11 +31,12 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; @@ -187,6 +191,39 @@ public class ResourceManagerPartitionTrackerImplTest extends TestLogger { assertThat(tracker.areAllMapsEmpty(), is(true)); } + @Test + public void testGetClusterPartitionShuffleDescriptors() { + final ResourceManagerPartitionTrackerImpl tracker = + new ResourceManagerPartitionTrackerImpl(new TestClusterPartitionReleaser()); + + assertThat(tracker.listDataSets().size(), is(0)); + + List<ResultPartitionID> resultPartitionIDS = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + resultPartitionIDS.add( + new ResultPartitionID( + new IntermediateResultPartitionID(DATA_SET_ID, i), + ExecutionAttemptID.randomId())); + } + + for (ResultPartitionID resultPartitionID : resultPartitionIDS) { + report(tracker, TASK_EXECUTOR_ID_1, DATA_SET_ID, 100, resultPartitionID); + } + + final List<ShuffleDescriptor> shuffleDescriptors = + tracker.getClusterPartitionShuffleDescriptors(DATA_SET_ID); + assertThat(shuffleDescriptors.size(), is(100)); + assertThat( + shuffleDescriptors.stream() + .map(ShuffleDescriptor::getResultPartitionID) + .collect(Collectors.toList()), + contains(resultPartitionIDS.toArray())); + + reportEmpty(tracker, TASK_EXECUTOR_ID_1); + reportEmpty(tracker, TASK_EXECUTOR_ID_2); + assertThat(tracker.areAllMapsEmpty(), is(true)); + } + private static void reportEmpty( ResourceManagerPartitionTracker tracker, ResourceID... taskExecutorIds) { for (ResourceID taskExecutorId : taskExecutorIds) { @@ -210,12 +247,34 @@ public class ResourceManagerPartitionTrackerImplTest extends TestLogger { IntermediateDataSetID dataSetId, int numTotalPartitions, ResultPartitionID... partitionId) { + final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = + Arrays.stream(partitionId) + .map(TestShuffleDescriptor::new) + .collect( + Collectors.toMap( + TestShuffleDescriptor::getResultPartitionID, d -> d)); return new ClusterPartitionReport( Collections.singletonList( new ClusterPartitionReport.ClusterPartitionReportEntry( - dataSetId, - new HashSet<>(Arrays.asList(partitionId)), - numTotalPartitions))); + dataSetId, numTotalPartitions, shuffleDescriptors))); + } + + private static class TestShuffleDescriptor implements ShuffleDescriptor { + private final ResultPartitionID resultPartitionID; + + TestShuffleDescriptor(ResultPartitionID resultPartitionID) { + this.resultPartitionID = resultPartitionID; + } + + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionID; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.empty(); + } } private static class TestClusterPartitionReleaser diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java index a8272949996..800a638176a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.api.common.JobID; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -26,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext; import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; @@ -39,6 +41,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static org.hamcrest.CoreMatchers.not; @@ -66,10 +69,16 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { partitionTracker.startTrackingPartition( jobId, - new TaskExecutorPartitionInfo(clusterPartitionId, dataSetId, numberOfPartitions)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(clusterPartitionId), + dataSetId, + numberOfPartitions)); partitionTracker.startTrackingPartition( jobId, - new TaskExecutorPartitionInfo(jobPartitionId, dataSetId, numberOfPartitions + 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(jobPartitionId), + dataSetId, + numberOfPartitions + 1)); partitionTracker.promoteJobPartitions(Collections.singleton(clusterPartitionId)); @@ -97,10 +106,16 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment); partitionTracker.startTrackingPartition( new JobID(), - new TaskExecutorPartitionInfo(resultPartitionId1, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId1), + new IntermediateDataSetID(), + 1)); partitionTracker.startTrackingPartition( new JobID(), - new TaskExecutorPartitionInfo(resultPartitionId2, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId2), + new IntermediateDataSetID(), + 1)); partitionTracker.stopTrackingAndReleaseJobPartitions( Collections.singleton(resultPartitionId1)); @@ -123,10 +138,16 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment); partitionTracker.startTrackingPartition( jobId1, - new TaskExecutorPartitionInfo(resultPartitionId1, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId1), + new IntermediateDataSetID(), + 1)); partitionTracker.startTrackingPartition( jobId2, - new TaskExecutorPartitionInfo(resultPartitionId2, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId2), + new IntermediateDataSetID(), + 1)); partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId1); assertThat(shuffleReleaseFuture.get(), hasItem(resultPartitionId1)); @@ -147,10 +168,16 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment); partitionTracker.startTrackingPartition( jobId, - new TaskExecutorPartitionInfo(resultPartitionId1, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId1), + new IntermediateDataSetID(), + 1)); partitionTracker.startTrackingPartition( jobId, - new TaskExecutorPartitionInfo(resultPartitionId2, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId2), + new IntermediateDataSetID(), + 1)); partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1)); partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId); @@ -171,10 +198,16 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment); partitionTracker.startTrackingPartition( new JobID(), - new TaskExecutorPartitionInfo(resultPartitionId1, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId1), + new IntermediateDataSetID(), + 1)); partitionTracker.startTrackingPartition( new JobID(), - new TaskExecutorPartitionInfo(resultPartitionId2, new IntermediateDataSetID(), 1)); + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId2), + new IntermediateDataSetID(), + 1)); partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1)); partitionTracker.stopTrackingAndReleaseAllClusterPartitions(); @@ -197,9 +230,13 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { final TaskExecutorPartitionTracker partitionTracker = new TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment); partitionTracker.startTrackingPartition( - new JobID(), new TaskExecutorPartitionInfo(resultPartitionId1, dataSetId1, 1)); + new JobID(), + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId1), dataSetId1, 1)); partitionTracker.startTrackingPartition( - new JobID(), new TaskExecutorPartitionInfo(resultPartitionId2, dataSetId2, 1)); + new JobID(), + new TaskExecutorPartitionInfo( + new TestingShuffleDescriptor(resultPartitionId2), dataSetId2, 1)); partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1)); partitionTracker.stopTrackingAndReleaseClusterPartitions(Collections.singleton(dataSetId1)); @@ -268,4 +305,23 @@ public class TaskExecutorPartitionTrackerImplTest extends TestLogger { backingShuffleEnvironment.close(); } } + + private static class TestingShuffleDescriptor implements ShuffleDescriptor { + private final ResultPartitionID resultPartitionID; + + private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) { + + this.resultPartitionID = resultPartitionID; + } + + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionID; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.empty(); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java index 8aecfd88180..8378befcfe0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java @@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -126,6 +130,15 @@ public class TestingJobMasterPartitionTracker implements JobMasterPartitionTrack return getAllTrackedPartitionsSupplier.get(); } + @Override + public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) {} + + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return Collections.emptyList(); + } + @Override public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) { return isTrackingPartitionsForFunction.apply(producingTaskExecutorId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java new file mode 100644 index 00000000000..907bef037ae --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.minicluster.TestingMiniCluster; +import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration; +import org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException; +import org.apache.flink.types.IntValue; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** Integration tests for reusing persisted intermediate dataset */ +public class JobIntermediateDatasetReuseTest { + + private static final Logger LOG = + LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class); + + @Test + public void testClusterPartitionReuse() throws Exception { + internalTestClusterPartitionReuse(1, 1, jobResult -> assertTrue(jobResult.isSuccess())); + } + + @Test + public void testClusterPartitionReuseMultipleParallelism() throws Exception { + internalTestClusterPartitionReuse(64, 64, jobResult -> assertTrue(jobResult.isSuccess())); + } + + @Test + public void testClusterPartitionReuseWithMoreConsumerParallelismThrowException() + throws Exception { + internalTestClusterPartitionReuse( + 1, + 2, + jobResult -> { + assertFalse(jobResult.isSuccess()); + assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult)); + }); + } + + @Test + public void testClusterPartitionReuseWithLessConsumerParallelismThrowException() + throws Exception { + internalTestClusterPartitionReuse( + 2, + 1, + jobResult -> { + assertFalse(jobResult.isSuccess()); + assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult)); + }); + } + + private void internalTestClusterPartitionReuse( + int producerParallelism, + int consumerParallelism, + Consumer<JobResult> jobResultVerification) + throws Exception { + final TestingMiniClusterConfiguration miniClusterConfiguration = + TestingMiniClusterConfiguration.newBuilder().build(); + + try (TestingMiniCluster miniCluster = + TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) { + miniCluster.start(); + + IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); + final JobGraph firstJobGraph = + createFirstJobGraph(producerParallelism, intermediateDataSetID); + miniCluster.submitJob(firstJobGraph).get(); + CompletableFuture<JobResult> jobResultFuture = + miniCluster.requestJobResult(firstJobGraph.getJobID()); + JobResult jobResult = jobResultFuture.get(); + assertTrue(jobResult.isSuccess()); + + final JobGraph secondJobGraph = + createSecondJobGraph(consumerParallelism, intermediateDataSetID); + miniCluster.submitJob(secondJobGraph).get(); + jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID()); + jobResult = jobResultFuture.get(); + jobResultVerification.accept(jobResult); + } + } + + @Test + public void testClusterPartitionReuseWithTMFail() throws Exception { + final TestingMiniClusterConfiguration miniClusterConfiguration = + TestingMiniClusterConfiguration.newBuilder().build(); + + try (TestingMiniCluster miniCluster = + TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) { + miniCluster.start(); + + IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID(); + final JobGraph firstJobGraph = createFirstJobGraph(1, intermediateDataSetID); + miniCluster.submitJob(firstJobGraph).get(); + CompletableFuture<JobResult> jobResultFuture = + miniCluster.requestJobResult(firstJobGraph.getJobID()); + JobResult jobResult = jobResultFuture.get(); + assertTrue(jobResult.isSuccess()); + + miniCluster.terminateTaskManager(0); + miniCluster.startTaskManager(); + + final JobGraph secondJobGraph = createSecondJobGraph(1, intermediateDataSetID); + final ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1024, 1000)); + secondJobGraph.setExecutionConfig(executionConfig); + miniCluster.submitJob(secondJobGraph).get(); + jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID()); + jobResult = jobResultFuture.get(); + assertFalse(jobResult.isSuccess()); + final CachedIntermediateDataSetCorruptedException exception = + getCachedIntermediateDataSetCorruptedException(jobResult); + assertNotNull(exception); + assertEquals( + intermediateDataSetID, exception.getCorruptedIntermediateDataSetID().get(0)); + + firstJobGraph.setJobID(new JobID()); + miniCluster.submitJob(firstJobGraph).get(); + jobResultFuture = miniCluster.requestJobResult(firstJobGraph.getJobID()); + jobResult = jobResultFuture.get(); + assertTrue(jobResult.isSuccess()); + + secondJobGraph.setJobID(new JobID()); + miniCluster.submitJob(secondJobGraph).get(); + jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID()); + jobResult = jobResultFuture.get(); + assertTrue(jobResult.isSuccess()); + } + } + + private CachedIntermediateDataSetCorruptedException + getCachedIntermediateDataSetCorruptedException(JobResult jobResult) { + assertTrue(jobResult.getSerializedThrowable().isPresent()); + Throwable throwable = + jobResult + .getSerializedThrowable() + .get() + .deserializeError(Thread.currentThread().getContextClassLoader()); + while (throwable != null) { + if (throwable instanceof CachedIntermediateDataSetCorruptedException) { + return (CachedIntermediateDataSetCorruptedException) throwable; + } + throwable = throwable.getCause(); + } + return null; + } + + private JobGraph createSecondJobGraph( + int parallelism, IntermediateDataSetID intermediateDataSetID) { + final JobVertex receiver = new JobVertex("Receiver 2", null); + receiver.setParallelism(parallelism); + receiver.setInvokableClass(Receiver.class); + receiver.addIntermediateDataSetIdToConsume(intermediateDataSetID); + + return new JobGraph(null, "Second Job", receiver); + } + + private JobGraph createFirstJobGraph( + int parallelism, IntermediateDataSetID intermediateDataSetID) { + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(parallelism); + sender.setInvokableClass(Sender.class); + + final JobVertex receiver = new JobVertex("Receiver"); + receiver.setParallelism(parallelism); + receiver.setInvokableClass(Receiver.class); + + receiver.connectNewDataSetAsInput( + sender, + DistributionPattern.POINTWISE, + ResultPartitionType.BLOCKING_PERSISTENT, + intermediateDataSetID); + + return new JobGraph(null, "First Job", sender, receiver); + } + + /** + * Basic sender {@link AbstractInvokable} which sends 100 record base on its index to down + * stream. + */ + public static class Sender extends AbstractInvokable { + + public Sender(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + int index = getIndexInSubtaskGroup(); + final RecordWriter<IntValue> writer = + new RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0)); + + try { + for (int i = index; i < index + 100; ++i) { + writer.emit(new IntValue(i)); + LOG.debug("Sender({}) emit {}", index, i); + } + writer.flushAll(); + } finally { + writer.close(); + } + } + } + + /** + * Basic receiver {@link AbstractInvokable} which verifies the sent elements from the {@link + * Sender}. + */ + public static class Receiver extends AbstractInvokable { + + public Receiver(Environment environment) { + super(environment); + } + + @Override + public void invoke() throws Exception { + int index = getIndexInSubtaskGroup(); + final RecordReader<IntValue> reader = + new RecordReader<>( + getEnvironment().getInputGate(0), + IntValue.class, + getEnvironment().getTaskManagerInfo().getTmpDirectories()); + for (int i = index; i < index + 100; ++i) { + final int value = reader.next().getValue(); + LOG.debug("Receiver({}) received {}", index, value); + assertEquals(i, value); + } + + assertNull(reader.next()); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java index 997a3ba59a8..c527fdeca31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload; @@ -45,10 +46,12 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; +import java.util.Map; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.instanceOf; @@ -226,14 +229,20 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger { IntermediateDataSetID dataSetId, int numTotalPartitions, ResultPartitionID... partitionIds) { + + final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = + Arrays.stream(partitionIds) + .map(TestingShuffleDescriptor::new) + .collect( + Collectors.toMap( + TestingShuffleDescriptor::getResultPartitionID, d -> d)); + return new TaskExecutorHeartbeatPayload( new SlotReport(), new ClusterPartitionReport( Collections.singletonList( new ClusterPartitionReport.ClusterPartitionReportEntry( - dataSetId, - new HashSet<>(Arrays.asList(partitionIds)), - numTotalPartitions)))); + dataSetId, numTotalPartitions, shuffleDescriptors)))); } @FunctionalInterface @@ -249,4 +258,23 @@ public class ResourceManagerPartitionLifecycleTest extends TestLogger { ResourceID taskExecutorId2) throws Exception; } + + private static class TestingShuffleDescriptor implements ShuffleDescriptor { + + private final ResultPartitionID resultPartitionID; + + private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) { + this.resultPartitionID = resultPartitionID; + } + + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionID; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.empty(); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index b3a38bd8140..9e9740245e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -46,18 +46,21 @@ import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorEx import org.apache.flink.runtime.rest.messages.LogInfo; import org.apache.flink.runtime.rest.messages.ThreadDumpInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.slots.ResourceRequirements; import org.apache.flink.runtime.taskexecutor.FileType; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway; +import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.QuadFunction; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -492,4 +495,16 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { IntermediateDataSetID dataSetToRelease) { return CompletableFuture.completedFuture(null); } + + @Override + public CompletableFuture<Void> reportClusterPartitions( + ResourceID taskExecutorId, ClusterPartitionReport clusterPartitionReport) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture<List<ShuffleDescriptor>> getClusterPartitionsShuffleDescriptors( + IntermediateDataSetID intermediateDataSetID) { + return CompletableFuture.completedFuture(null); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java index 491740f9bd6..f97116e58b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java @@ -81,7 +81,9 @@ public class DefaultExecutionVertexTest extends TestLogger { List<ConsumedPartitionGroup> consumedPartitionGroups = Collections.singletonList( - ConsumedPartitionGroup.fromSinglePartition(intermediateResultPartitionId)); + ConsumedPartitionGroup.fromSinglePartition( + intermediateResultPartitionId, + schedulingResultPartition.getResultType())); Map<IntermediateResultPartitionID, DefaultResultPartition> resultPartitionById = Collections.singletonMap(intermediateResultPartitionId, schedulingResultPartition); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index ce7936757b8..c5131d7513e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -52,6 +52,7 @@ import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy; import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -63,6 +64,7 @@ import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.testutils.TestingUtils; @@ -1021,5 +1023,12 @@ public class ExecutingTest extends TestLogger { public ExecutionGraphID getExecutionGraphID() { return new ExecutionGraphID(); } + + @Override + public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors( + IntermediateDataSetID intermediateResultPartition) { + throw new UnsupportedOperationException( + "This method is not supported by the MockInternalExecutionGraphAccessor."); + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java index 32c81b6768b..4621680d360 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.scheduler.strategy; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.IterableUtils; @@ -91,7 +92,8 @@ public class TestingSchedulingExecutionVertex implements SchedulingExecutionVert void addConsumedPartition(TestingSchedulingResultPartition consumedPartition) { final ConsumedPartitionGroup consumedPartitionGroup = - ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId()); + ConsumedPartitionGroup.fromSinglePartition( + consumedPartition.getId(), consumedPartition.getResultType()); consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup); if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) { @@ -143,6 +145,8 @@ public class TestingSchedulingExecutionVertex implements SchedulingExecutionVert Map<IntermediateResultPartitionID, TestingSchedulingResultPartition> resultPartitionsById) { this.resultPartitionsById.putAll(resultPartitionsById); + final ResultPartitionType resultType = + resultPartitionsById.values().iterator().next().getResultType(); for (ConsumedPartitionGroup partitionGroup : consumedPartitionGroups) { List<IntermediateResultPartitionID> partitionIds = @@ -151,7 +155,7 @@ public class TestingSchedulingExecutionVertex implements SchedulingExecutionVert partitionIds.add(partitionId); } this.consumedPartitionGroups.add( - ConsumedPartitionGroup.fromMultiplePartitions(partitionIds)); + ConsumedPartitionGroup.fromMultiplePartitions(partitionIds, resultType)); } return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java index 82cf47fe01b..82463a1a2cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java @@ -352,7 +352,8 @@ public class TestingSchedulingTopology implements SchedulingTopology { ConsumedPartitionGroup.fromMultiplePartitions( resultPartitions.stream() .map(TestingSchedulingResultPartition::getId) - .collect(Collectors.toList())); + .collect(Collectors.toList()), + resultPartitions.get(0).getResultType()); Map<IntermediateResultPartitionID, TestingSchedulingResultPartition> consumedPartitionById = resultPartitions.stream() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index b35de9f7f88..ea2fec1b5ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager; import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager; @@ -138,6 +139,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Queue; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; @@ -696,11 +698,24 @@ public class TaskExecutorTest extends TestLogger { private static TaskExecutorPartitionTracker createPartitionTrackerWithFixedPartitionReport( ShuffleEnvironment<?, ?> shuffleEnvironment) { + ResultPartitionID resultPartitionID = new ResultPartitionID(); final ClusterPartitionReport.ClusterPartitionReportEntry clusterPartitionReportEntry = new ClusterPartitionReport.ClusterPartitionReportEntry( new IntermediateDataSetID(), - Collections.singleton(new ResultPartitionID()), - 4); + 4, + Collections.singletonMap( + resultPartitionID, + new ShuffleDescriptor() { + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionID; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.empty(); + } + })); final ClusterPartitionReport clusterPartitionReport = new ClusterPartitionReport(Collections.singletonList(clusterPartitionReportEntry));