This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f2e20375f [FLINK-27523][runtime] Runtime supports producing and 
consuming cached intermediate results
a9f2e20375f is described below

commit a9f2e20375f669a5f42944c3ba12a903a4624e43
Author: sxnan <suxuanna...@gmail.com>
AuthorDate: Wed May 25 10:55:41 2022 +0800

    [FLINK-27523][runtime] Runtime supports producing and consuming cached 
intermediate results
    
    This closes #19653.
---
 .../TaskDeploymentDescriptorFactory.java           |  76 +++++-
 .../executiongraph/DefaultExecutionGraph.java      |   7 +
 .../executiongraph/EdgeManagerBuildUtil.java       |   6 +-
 .../InternalExecutionGraphAccessor.java            |   8 +-
 .../RegionPartitionGroupReleaseStrategy.java       |   3 +-
 .../network/partition/ClusterPartitionManager.java |  23 ++
 .../io/network/partition/DataSetMetaInfo.java      |  19 ++
 .../partition/JobMasterPartitionTracker.java       |  10 +
 .../partition/JobMasterPartitionTrackerImpl.java   |  37 +++
 .../partition/ResourceManagerPartitionTracker.java |  10 +
 .../ResourceManagerPartitionTrackerImpl.java       |  21 +-
 .../partition/TaskExecutorPartitionInfo.java       |  15 +-
 .../TaskExecutorPartitionTrackerImpl.java          |  21 +-
 .../apache/flink/runtime/jobgraph/JobVertex.java   |  29 ++-
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   1 +
 .../runtime/resourcemanager/ResourceManager.java   |  19 ++
 ...achedIntermediateDataSetCorruptedException.java |  44 ++++
 .../flink/runtime/scheduler/DefaultScheduler.java  |  34 ++-
 .../scheduler/strategy/ConsumedPartitionGroup.java |  24 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   7 +-
 .../partition/ClusterPartitionReport.java          |  22 +-
 .../TaskDeploymentDescriptorFactoryTest.java       |   3 +-
 .../JobMasterPartitionTrackerImplTest.java         |  58 +++++
 .../partition/NoOpJobMasterPartitionTracker.java   |  13 +
 .../NoOpResourceManagerPartitionTracker.java       |   8 +
 .../ResourceManagerPartitionTrackerImplTest.java   |  67 ++++-
 .../TaskExecutorPartitionTrackerImplTest.java      |  80 +++++-
 .../TestingJobMasterPartitionTracker.java          |  13 +
 .../jobmaster/JobIntermediateDatasetReuseTest.java | 270 +++++++++++++++++++++
 .../ResourceManagerPartitionLifecycleTest.java     |  36 ++-
 .../utils/TestingResourceManagerGateway.java       |  15 ++
 .../adapter/DefaultExecutionVertexTest.java        |   4 +-
 .../runtime/scheduler/adaptive/ExecutingTest.java  |   9 +
 .../strategy/TestingSchedulingExecutionVertex.java |   8 +-
 .../strategy/TestingSchedulingTopology.java        |   3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java     |  19 +-
 36 files changed, 971 insertions(+), 71 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index dbe20738531..6da8f4fabca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -39,11 +39,13 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobType;
+import 
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
 import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.CompressedSerializedValue;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
@@ -51,7 +53,9 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.function.Function;
 
@@ -71,6 +75,8 @@ public class TaskDeploymentDescriptorFactory {
     private final Function<IntermediateResultPartitionID, 
IntermediateResultPartition>
             resultPartitionRetriever;
     private final BlobWriter blobWriter;
+    private final Map<IntermediateDataSetID, ShuffleDescriptor[]>
+            consumedClusterPartitionShuffleDescriptors;
 
     private TaskDeploymentDescriptorFactory(
             ExecutionAttemptID executionId,
@@ -81,7 +87,9 @@ public class TaskDeploymentDescriptorFactory {
             List<ConsumedPartitionGroup> consumedPartitionGroups,
             Function<IntermediateResultPartitionID, 
IntermediateResultPartition>
                     resultPartitionRetriever,
-            BlobWriter blobWriter) {
+            BlobWriter blobWriter,
+            Map<IntermediateDataSetID, ShuffleDescriptor[]>
+                    consumedClusterPartitionShuffleDescriptors) {
         this.executionId = executionId;
         this.serializedJobInformation = serializedJobInformation;
         this.taskInfo = taskInfo;
@@ -90,6 +98,8 @@ public class TaskDeploymentDescriptorFactory {
         this.consumedPartitionGroups = consumedPartitionGroups;
         this.resultPartitionRetriever = resultPartitionRetriever;
         this.blobWriter = blobWriter;
+        this.consumedClusterPartitionShuffleDescriptors =
+                consumedClusterPartitionShuffleDescriptors;
     }
 
     public TaskDeploymentDescriptor createDeploymentDescriptor(
@@ -137,6 +147,19 @@ public class TaskDeploymentDescriptorFactory {
                                     consumedIntermediateResult, 
consumedPartitionGroup)));
         }
 
+        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptor[]> entry :
+                consumedClusterPartitionShuffleDescriptors.entrySet()) {
+            // For FLIP-205, the JobGraph generating side ensure that the 
cluster partition is
+            // produced with only one subpartition. Therefore, we always 
consume the partition with
+            // subpartition index of 0.
+            inputGates.add(
+                    new InputGateDeploymentDescriptor(
+                            entry.getKey(),
+                            ResultPartitionType.BLOCKING_PERSISTENT,
+                            0,
+                            entry.getValue()));
+        }
+
         return inputGates;
     }
 
@@ -231,9 +254,22 @@ public class TaskDeploymentDescriptorFactory {
     }
 
     public static TaskDeploymentDescriptorFactory fromExecutionVertex(
-            ExecutionVertex executionVertex) throws IOException {
+            ExecutionVertex executionVertex)
+            throws IOException, CachedIntermediateDataSetCorruptedException {
         InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
+        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors;
+        try {
+            clusterPartitionShuffleDescriptors =
+                    getClusterPartitionShuffleDescriptors(executionVertex);
+        } catch (Throwable e) {
+            throw new CachedIntermediateDataSetCorruptedException(
+                    e,
+                    executionVertex
+                            .getJobVertex()
+                            .getJobVertex()
+                            .getIntermediateDataSetIdsToConsume());
+        }
 
         return new TaskDeploymentDescriptorFactory(
                 executionVertex.getCurrentExecutionAttempt().getAttemptId(),
@@ -244,7 +280,41 @@ public class TaskDeploymentDescriptorFactory {
                 
internalExecutionGraphAccessor.getPartitionLocationConstraint(),
                 executionVertex.getAllConsumedPartitionGroups(),
                 internalExecutionGraphAccessor::getResultPartitionOrThrow,
-                internalExecutionGraphAccessor.getBlobWriter());
+                internalExecutionGraphAccessor.getBlobWriter(),
+                clusterPartitionShuffleDescriptors);
+    }
+
+    private static Map<IntermediateDataSetID, ShuffleDescriptor[]>
+            getClusterPartitionShuffleDescriptors(ExecutionVertex 
executionVertex) {
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+                executionVertex.getExecutionGraphAccessor();
+        final List<IntermediateDataSetID> consumedClusterDataSetIds =
+                
executionVertex.getJobVertex().getJobVertex().getIntermediateDataSetIdsToConsume();
+        Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors =
+                new HashMap<>();
+
+        for (IntermediateDataSetID consumedClusterDataSetId : 
consumedClusterDataSetIds) {
+            List<? extends ShuffleDescriptor> shuffleDescriptors =
+                    
internalExecutionGraphAccessor.getClusterPartitionShuffleDescriptors(
+                            consumedClusterDataSetId);
+
+            // For FLIP-205, the job graph generating side makes sure that the 
producer and consumer
+            // of the cluster partition have the same parallelism and each 
consumer Task consumes
+            // one output partition of the producer.
+            Preconditions.checkState(
+                    executionVertex.getTotalNumberOfParallelSubtasks() == 
shuffleDescriptors.size(),
+                    "The parallelism (%s) of the cache consuming job vertex is 
"
+                            + "different from the number of shuffle 
descriptors (%s) of the intermediate data set",
+                    executionVertex.getTotalNumberOfParallelSubtasks(),
+                    shuffleDescriptors.size());
+
+            clusterPartitionShuffleDescriptors.put(
+                    consumedClusterDataSetId,
+                    new ShuffleDescriptor[] {
+                        
shuffleDescriptors.get(executionVertex.getParallelSubtaskIndex())
+                    });
+        }
+        return clusterPartitionShuffleDescriptors;
     }
 
     private static MaybeOffloaded<JobInformation> getSerializedJobInformation(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 713f185b8cf..a1e91a2d801 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -73,6 +73,7 @@ import 
org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
@@ -1602,4 +1603,10 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
     public ExecutionGraphID getExecutionGraphID() {
         return executionGraphId;
     }
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return 
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetID);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
index 9ba16aa8eb9..3aa6e59de0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
@@ -182,7 +182,8 @@ public class EdgeManagerBuildUtil {
             IntermediateResultPartitionID consumedPartitionId,
             IntermediateResult intermediateResult) {
         ConsumedPartitionGroup consumedPartitionGroup =
-                
ConsumedPartitionGroup.fromSinglePartition(consumedPartitionId);
+                ConsumedPartitionGroup.fromSinglePartition(
+                        consumedPartitionId, 
intermediateResult.getResultType());
         registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, 
intermediateResult);
         return consumedPartitionGroup;
     }
@@ -191,7 +192,8 @@ public class EdgeManagerBuildUtil {
             List<IntermediateResultPartitionID> consumedPartitions,
             IntermediateResult intermediateResult) {
         ConsumedPartitionGroup consumedPartitionGroup =
-                
ConsumedPartitionGroup.fromMultiplePartitions(consumedPartitions);
+                ConsumedPartitionGroup.fromMultiplePartitions(
+                        consumedPartitions, 
intermediateResult.getResultType());
         registerConsumedPartitionGroupToEdgeManager(consumedPartitionGroup, 
intermediateResult);
         return consumedPartitionGroup;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
index 2e2cd3733bf..8ef23b2d834 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java
@@ -26,9 +26,11 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.SerializedValue;
@@ -64,7 +66,7 @@ public interface InternalExecutionGraphAccessor {
     @Nonnull
     ComponentMainThreadExecutor getJobMasterMainThreadExecutor();
 
-    ShuffleMaster<?> getShuffleMaster();
+    ShuffleMaster<? extends ShuffleDescriptor> getShuffleMaster();
 
     JobMasterPartitionTracker getPartitionTracker();
 
@@ -114,4 +116,8 @@ public interface InternalExecutionGraphAccessor {
     boolean isDynamic();
 
     ExecutionGraphID getExecutionGraphID();
+
+    /** Get the shuffle descriptors of the cluster partitions ordered by 
partition number. */
+    List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateResultPartition);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
index 06490de9027..a837df72808 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
@@ -147,7 +147,8 @@ public class RegionPartitionGroupReleaseStrategy
         for (ConsumedPartitionGroup consumedPartitionGroup : 
consumedPartitionGroups) {
             final ConsumerRegionGroupExecutionView consumerRegionGroup =
                     partitionGroupConsumerRegions.get(consumedPartitionGroup);
-            if (consumerRegionGroup.isFinished()) {
+            if (consumerRegionGroup.isFinished()
+                    && 
!consumedPartitionGroup.getResultPartitionType().isPersistent()) {
                 // At present, there's only one ConsumerVertexGroup for each
                 // ConsumedPartitionGroup, so if a ConsumedPartitionGroup is 
fully consumed, all
                 // its partitions are releasable.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
index 2e2aa6722e6..9a08e3cc569 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ClusterPartitionManager.java
@@ -17,8 +17,12 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -39,4 +43,23 @@ public interface ClusterPartitionManager {
      * @return future that is completed once all partitions have been released
      */
     CompletableFuture<Void> releaseClusterPartitions(IntermediateDataSetID 
dataSetToRelease);
+
+    /**
+     * Report the cluster partitions status in the task executor.
+     *
+     * @param taskExecutorId The id of the task executor.
+     * @param clusterPartitionReport The status of the cluster partitions.
+     * @return future that is completed once the report have been processed.
+     */
+    CompletableFuture<Void> reportClusterPartitions(
+            ResourceID taskExecutorId, ClusterPartitionReport 
clusterPartitionReport);
+
+    /**
+     * Get the shuffle descriptors of the cluster partitions ordered by 
partition number.
+     *
+     * @param intermediateDataSetID The id of the dataset.
+     * @return shuffle descriptors of the cluster partitions.
+     */
+    CompletableFuture<List<ShuffleDescriptor>> 
getClusterPartitionsShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
index cb2a39f3159..ae446ad5bf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/DataSetMetaInfo.java
@@ -18,9 +18,14 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Comparator;
+import java.util.Map;
 import java.util.OptionalInt;
+import java.util.SortedMap;
+import java.util.TreeMap;
 
 /** Container for meta-data of a data set. */
 public final class DataSetMetaInfo {
@@ -28,6 +33,10 @@ public final class DataSetMetaInfo {
 
     private final int numRegisteredPartitions;
     private final int numTotalPartitions;
+    private final SortedMap<ResultPartitionID, ShuffleDescriptor>
+            shuffleDescriptorsOrderByPartitionId =
+                    new TreeMap<>(
+                            Comparator.comparingInt(o -> 
o.getPartitionId().getPartitionNumber()));
 
     private DataSetMetaInfo(int numRegisteredPartitions, int 
numTotalPartitions) {
         this.numRegisteredPartitions = numRegisteredPartitions;
@@ -44,6 +53,16 @@ public final class DataSetMetaInfo {
         return numTotalPartitions;
     }
 
+    public DataSetMetaInfo addShuffleDescriptors(
+            Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) {
+        this.shuffleDescriptorsOrderByPartitionId.putAll(shuffleDescriptors);
+        return this;
+    }
+
+    public Map<ResultPartitionID, ShuffleDescriptor> getShuffleDescriptors() {
+        return this.shuffleDescriptorsOrderByPartitionId;
+    }
+
     static DataSetMetaInfo withoutNumRegisteredPartitions(int 
numTotalPartitions) {
         return new DataSetMetaInfo(UNKNOWN, numTotalPartitions);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
index cb7c2bef862..2c116e4951e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTracker.java
@@ -19,8 +19,12 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
+import java.util.List;
 
 /**
  * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
@@ -61,4 +65,10 @@ public interface JobMasterPartitionTracker
 
     /** Get all the partitions under tracking. */
     Collection<ResultPartitionDeploymentDescriptor> getAllTrackedPartitions();
+
+    void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway);
+
+    /** Get the shuffle descriptors of the cluster partitions ordered by 
partition number. */
+    List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
index 0e6a6c38810..8c8069f9760 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImpl.java
@@ -20,11 +20,15 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +56,9 @@ public class JobMasterPartitionTrackerImpl
     private final ShuffleMaster<?> shuffleMaster;
 
     private final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+    private ResourceManagerGateway resourceManagerGateway;
+    private final Map<IntermediateDataSetID, List<ShuffleDescriptor>>
+            clusterPartitionShuffleDescriptors;
 
     public JobMasterPartitionTrackerImpl(
             JobID jobId,
@@ -61,6 +68,7 @@ public class JobMasterPartitionTrackerImpl
         this.jobId = Preconditions.checkNotNull(jobId);
         this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
         this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+        this.clusterPartitionShuffleDescriptors = new HashMap<>();
     }
 
     @Override
@@ -118,6 +126,35 @@ public class JobMasterPartitionTrackerImpl
         return 
partitionInfos.values().stream().map(PartitionInfo::getMetaInfo).collect(toList());
     }
 
+    @Override
+    public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
+        this.resourceManagerGateway = resourceManagerGateway;
+    }
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return clusterPartitionShuffleDescriptors.computeIfAbsent(
+                intermediateDataSetID, 
this::requestShuffleDescriptorsFromResourceManager);
+    }
+
+    private List<ShuffleDescriptor> 
requestShuffleDescriptorsFromResourceManager(
+            IntermediateDataSetID intermediateDataSetID) {
+        Preconditions.checkNotNull(
+                resourceManagerGateway, "JobMaster is not connected to 
ResourceManager");
+        try {
+            return this.resourceManagerGateway
+                    
.getClusterPartitionsShuffleDescriptors(intermediateDataSetID)
+                    .get();
+        } catch (Throwable e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to get shuffle descriptors of intermediate 
dataset %s from ResourceManager",
+                            intermediateDataSetID),
+                    e);
+        }
+    }
+
     private void stopTrackingAndHandlePartitions(
             Collection<ResultPartitionID> resultPartitionIds,
             BiConsumer<ResourceID, 
Collection<ResultPartitionDeploymentDescriptor>>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
index dc8fb41b1d3..b6ee0d765e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTracker.java
@@ -19,8 +19,10 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -64,4 +66,12 @@ public interface ResourceManagerPartitionTracker {
      * @return tracked datasets
      */
     Map<IntermediateDataSetID, DataSetMetaInfo> listDataSets();
+
+    /**
+     * Returns all the shuffle descriptors of cluster partitions for the 
intermediate dataset.
+     *
+     * @param dataSetID The id of the intermediate dataset.
+     * @return the shuffle descriptors.
+     */
+    List<ShuffleDescriptor> 
getClusterPartitionShuffleDescriptors(IntermediateDataSetID dataSetID);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
index be8534a7723..72a322f0118 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImpl.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -100,6 +103,17 @@ public class ResourceManagerPartitionTrackerImpl 
implements ResourceManagerParti
         return partitionReleaseCompletionFuture;
     }
 
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID dataSetID) {
+        final DataSetMetaInfo dataSetMetaInfo = 
this.dataSetMetaInfo.get(dataSetID);
+        if (dataSetMetaInfo == null) {
+            return Collections.emptyList();
+        }
+
+        return new 
ArrayList<>(dataSetMetaInfo.getShuffleDescriptors().values());
+    }
+
     private void internalProcessClusterPartitionReport(
             ResourceID taskExecutorId, ClusterPartitionReport 
clusterPartitionReport) {
         final Set<IntermediateDataSetID> dataSetsWithLostPartitions =
@@ -194,13 +208,16 @@ public class ResourceManagerPartitionTrackerImpl 
implements ResourceManagerParti
                                             if (dataSetMetaInfo == null) {
                                                 return DataSetMetaInfo
                                                         
.withoutNumRegisteredPartitions(
-                                                                
entry.getNumTotalPartitions());
+                                                                
entry.getNumTotalPartitions())
+                                                        .addShuffleDescriptors(
+                                                                
entry.getShuffleDescriptors());
                                             } else {
                                                 // double check that the meta 
data is consistent
                                                 Preconditions.checkState(
                                                         
dataSetMetaInfo.getNumTotalPartitions()
                                                                 == 
entry.getNumTotalPartitions());
-                                                return dataSetMetaInfo;
+                                                return 
dataSetMetaInfo.addShuffleDescriptors(
+                                                        
entry.getShuffleDescriptors());
                                             }
                                         }));
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
index db0618a8453..ddbfbc5fb78 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionInfo.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Objects;
 
@@ -28,17 +29,17 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** Encapsulates meta-information the TaskExecutor requires to be kept for 
each partition. */
 public final class TaskExecutorPartitionInfo {
 
-    private final ResultPartitionID resultPartitionId;
     private final IntermediateDataSetID intermediateDataSetId;
+    private final ShuffleDescriptor shuffleDescriptor;
 
     private final int numberOfPartitions;
 
     public TaskExecutorPartitionInfo(
-            ResultPartitionID resultPartitionId,
+            ShuffleDescriptor shuffleDescriptor,
             IntermediateDataSetID intermediateDataSetId,
             int numberOfPartitions) {
-        this.resultPartitionId = checkNotNull(resultPartitionId);
         this.intermediateDataSetId = checkNotNull(intermediateDataSetId);
+        this.shuffleDescriptor = checkNotNull(shuffleDescriptor);
         checkArgument(numberOfPartitions > 0);
         this.numberOfPartitions = numberOfPartitions;
     }
@@ -48,7 +49,7 @@ public final class TaskExecutorPartitionInfo {
     }
 
     public ResultPartitionID getResultPartitionId() {
-        return resultPartitionId;
+        return shuffleDescriptor.getResultPartitionID();
     }
 
     public int getNumberOfPartitions() {
@@ -77,8 +78,12 @@ public final class TaskExecutorPartitionInfo {
     public static TaskExecutorPartitionInfo from(
             ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor) {
         return new TaskExecutorPartitionInfo(
-                
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(),
+                resultPartitionDeploymentDescriptor.getShuffleDescriptor(),
                 resultPartitionDeploymentDescriptor.getResultId(),
                 
resultPartitionDeploymentDescriptor.getTotalNumberOfPartitions());
     }
+
+    public ShuffleDescriptor getShuffleDescriptor() {
+        return shuffleDescriptor;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
index c34f77000d4..e183ab9c2dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.CollectionUtil;
@@ -26,7 +27,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,7 +91,7 @@ public class TaskExecutorPartitionTrackerImpl
                     clusterPartitions.computeIfAbsent(
                             dataSetMetaInfo.getIntermediateDataSetId(),
                             ignored -> new 
DataSetEntry(dataSetMetaInfo.getNumberOfPartitions()));
-            
dataSetEntry.addPartition(partitionTrackerEntry.getResultPartitionId());
+            
dataSetEntry.addPartition(partitionTrackerEntry.getMetaInfo().getShuffleDescriptor());
         }
     }
 
@@ -121,8 +121,8 @@ public class TaskExecutorPartitionTrackerImpl
                                 entry ->
                                         new 
ClusterPartitionReport.ClusterPartitionReportEntry(
                                                 entry.getKey(),
-                                                
entry.getValue().getPartitionIds(),
-                                                
entry.getValue().getTotalNumberOfPartitions()))
+                                                
entry.getValue().getTotalNumberOfPartitions(),
+                                                
entry.getValue().getShuffleDescriptors()))
                         .collect(Collectors.toList());
 
         return new ClusterPartitionReport(reportEntries);
@@ -130,23 +130,28 @@ public class TaskExecutorPartitionTrackerImpl
 
     private static class DataSetEntry {
 
-        private final Set<ResultPartitionID> partitionIds = new HashSet<>();
+        private final Map<ResultPartitionID, ShuffleDescriptor> 
shuffleDescriptors =
+                new HashMap<>();
         private final int totalNumberOfPartitions;
 
         private DataSetEntry(int totalNumberOfPartitions) {
             this.totalNumberOfPartitions = totalNumberOfPartitions;
         }
 
-        void addPartition(ResultPartitionID resultPartitionId) {
-            partitionIds.add(resultPartitionId);
+        void addPartition(ShuffleDescriptor shuffleDescriptor) {
+            shuffleDescriptors.put(shuffleDescriptor.getResultPartitionID(), 
shuffleDescriptor);
         }
 
         public Set<ResultPartitionID> getPartitionIds() {
-            return partitionIds;
+            return shuffleDescriptors.keySet();
         }
 
         public int getTotalNumberOfPartitions() {
             return totalNumberOfPartitions;
         }
+
+        public Map<ResultPartitionID, ShuffleDescriptor> 
getShuffleDescriptors() {
+            return shuffleDescriptors;
+        }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 843ed765daf..d54056ef4f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -142,6 +142,11 @@ public class JobVertex implements java.io.Serializable {
      */
     private String resultOptimizerProperties;
 
+    /**
+     * The intermediateDataSetId of the cached intermediate dataset that the 
job vertex consumes.
+     */
+    private final List<IntermediateDataSetID> intermediateDataSetIdsToConsume 
= new ArrayList<>();
+
     // 
--------------------------------------------------------------------------------------------
 
     /**
@@ -467,10 +472,6 @@ public class JobVertex implements java.io.Serializable {
     }
 
     // 
--------------------------------------------------------------------------------------------
-    private IntermediateDataSet createAndAddResultDataSet(ResultPartitionType 
partitionType) {
-        return createAndAddResultDataSet(new IntermediateDataSetID(), 
partitionType);
-    }
-
     public IntermediateDataSet createAndAddResultDataSet(
             IntermediateDataSetID id, ResultPartitionType partitionType) {
 
@@ -481,8 +482,18 @@ public class JobVertex implements java.io.Serializable {
 
     public JobEdge connectNewDataSetAsInput(
             JobVertex input, DistributionPattern distPattern, 
ResultPartitionType partitionType) {
+        return connectNewDataSetAsInput(
+                input, distPattern, partitionType, new 
IntermediateDataSetID());
+    }
 
-        IntermediateDataSet dataSet = 
input.createAndAddResultDataSet(partitionType);
+    public JobEdge connectNewDataSetAsInput(
+            JobVertex input,
+            DistributionPattern distPattern,
+            ResultPartitionType partitionType,
+            IntermediateDataSetID intermediateDataSetId) {
+
+        IntermediateDataSet dataSet =
+                input.createAndAddResultDataSet(intermediateDataSetId, 
partitionType);
 
         JobEdge edge = new JobEdge(dataSet, this, distPattern);
         this.inputs.add(edge);
@@ -568,6 +579,14 @@ public class JobVertex implements java.io.Serializable {
         this.resultOptimizerProperties = resultOptimizerProperties;
     }
 
+    public void addIntermediateDataSetIdToConsume(IntermediateDataSetID 
intermediateDataSetId) {
+        intermediateDataSetIdsToConsume.add(intermediateDataSetId);
+    }
+
+    public List<IntermediateDataSetID> getIntermediateDataSetIdsToConsume() {
+        return intermediateDataSetIdsToConsume;
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 54b1954cc57..476802ca7f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1125,6 +1125,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId>
                             resourceManagerGateway, resourceManagerResourceId);
 
             slotPoolService.connectToResourceManager(resourceManagerGateway);
+            partitionTracker.connectToResourceManager(resourceManagerGateway);
 
             resourceManagerHeartbeatManager.monitorTarget(
                     resourceManagerResourceId,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3cd99ca0cf7..634b56f1e32 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcServiceUtils;
 import org.apache.flink.runtime.security.token.DelegationTokenManager;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.slots.ResourceRequirement;
 import org.apache.flink.runtime.slots.ResourceRequirements;
 import org.apache.flink.runtime.taskexecutor.FileType;
@@ -72,6 +73,7 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationRejection;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
@@ -81,6 +83,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -777,6 +780,22 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
         return clusterPartitionTracker.releaseClusterPartitions(dataSetId);
     }
 
+    @Override
+    public CompletableFuture<Void> reportClusterPartitions(
+            ResourceID taskExecutorId, ClusterPartitionReport 
clusterPartitionReport) {
+        clusterPartitionTracker.processTaskExecutorClusterPartitionReport(
+                taskExecutorId, clusterPartitionReport);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<List<ShuffleDescriptor>> 
getClusterPartitionsShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return CompletableFuture.completedFuture(
+                clusterPartitionTracker.getClusterPartitionShuffleDescriptors(
+                        intermediateDataSetID));
+    }
+
     @Override
     public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> 
listDataSets() {
         return 
CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
new file mode 100644
index 00000000000..1740a79fd31
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/CachedIntermediateDataSetCorruptedException.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.throwable.ThrowableAnnotation;
+import org.apache.flink.runtime.throwable.ThrowableType;
+
+import java.util.List;
+
+/** Indicates some task fail to consume cached intermediate dataset. */
+@ThrowableAnnotation(ThrowableType.NonRecoverableError)
+public class CachedIntermediateDataSetCorruptedException extends JobException {
+    private final List<IntermediateDataSetID> corruptedIntermediateDataSetID;
+
+    public CachedIntermediateDataSetCorruptedException(
+            Throwable cause, List<IntermediateDataSetID> 
corruptedIntermediateDataSetID) {
+        super(
+                String.format(
+                        "Corrupted intermediate dataset IDs: %s", 
corruptedIntermediateDataSetID),
+                cause);
+        this.corruptedIntermediateDataSetID = corruptedIntermediateDataSetID;
+    }
+
+    public List<IntermediateDataSetID> getCorruptedIntermediateDataSetID() {
+        return corruptedIntermediateDataSetID;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 9a1f88feb66..e7869b6c9a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -36,6 +36,9 @@ import 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHa
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionException;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
@@ -62,6 +65,7 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -293,15 +297,39 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
 
     private void handleTaskFailure(
             final ExecutionVertexID executionVertexId, @Nullable final 
Throwable error) {
+        Throwable revisedError =
+                maybeTranslateToCachedIntermediateDataSetException(error, 
executionVertexId);
         final long timestamp = System.currentTimeMillis();
-        setGlobalFailureCause(error, timestamp);
-        notifyCoordinatorsAboutTaskFailure(executionVertexId, error);
+        setGlobalFailureCause(revisedError, timestamp);
+        notifyCoordinatorsAboutTaskFailure(executionVertexId, revisedError);
         final FailureHandlingResult failureHandlingResult =
                 executionFailureHandler.getFailureHandlingResult(
-                        executionVertexId, error, timestamp);
+                        executionVertexId, revisedError, timestamp);
         maybeRestartTasks(failureHandlingResult);
     }
 
+    private Throwable maybeTranslateToCachedIntermediateDataSetException(
+            @Nullable Throwable cause, ExecutionVertexID failedVertex) {
+        if (!(cause instanceof PartitionException)) {
+            return cause;
+        }
+
+        final List<IntermediateDataSetID> intermediateDataSetIdsToConsume =
+                getExecutionJobVertex(failedVertex.getJobVertexId())
+                        .getJobVertex()
+                        .getIntermediateDataSetIdsToConsume();
+        final IntermediateResultPartitionID failedPartitionId =
+                ((PartitionException) cause).getPartitionId().getPartitionId();
+
+        if (!intermediateDataSetIdsToConsume.contains(
+                failedPartitionId.getIntermediateDataSetID())) {
+            return cause;
+        }
+
+        return new CachedIntermediateDataSetCorruptedException(
+                cause, 
Collections.singletonList(failedPartitionId.getIntermediateDataSetID()));
+    }
+
     private void notifyCoordinatorsAboutTaskFailure(
             final ExecutionVertexID executionVertexId, @Nullable final 
Throwable error) {
         final ExecutionJobVertex jobVertex =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
index 790f3b436ae..6e4672c48e9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
@@ -19,8 +19,10 @@
 package org.apache.flink.runtime.scheduler.strategy;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.Iterator;
@@ -38,11 +40,16 @@ public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartit
 
     private final IntermediateDataSetID intermediateDataSetID;
 
-    private ConsumedPartitionGroup(List<IntermediateResultPartitionID> 
resultPartitions) {
+    private final ResultPartitionType resultPartitionType;
+
+    private ConsumedPartitionGroup(
+            List<IntermediateResultPartitionID> resultPartitions,
+            ResultPartitionType resultPartitionType) {
         checkArgument(
                 resultPartitions.size() > 0,
                 "The size of result partitions in the ConsumedPartitionGroup 
should be larger than 0.");
         this.intermediateDataSetID = 
resultPartitions.get(0).getIntermediateDataSetID();
+        this.resultPartitionType = 
Preconditions.checkNotNull(resultPartitionType);
 
         // Sanity check: all the partitions in one ConsumedPartitionGroup 
should have the same
         // IntermediateDataSetID
@@ -56,13 +63,16 @@ public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartit
     }
 
     public static ConsumedPartitionGroup fromMultiplePartitions(
-            List<IntermediateResultPartitionID> resultPartitions) {
-        return new ConsumedPartitionGroup(resultPartitions);
+            List<IntermediateResultPartitionID> resultPartitions,
+            ResultPartitionType resultPartitionType) {
+        return new ConsumedPartitionGroup(resultPartitions, 
resultPartitionType);
     }
 
     public static ConsumedPartitionGroup fromSinglePartition(
-            IntermediateResultPartitionID resultPartition) {
-        return new 
ConsumedPartitionGroup(Collections.singletonList(resultPartition));
+            IntermediateResultPartitionID resultPartition,
+            ResultPartitionType resultPartitionType) {
+        return new ConsumedPartitionGroup(
+                Collections.singletonList(resultPartition), 
resultPartitionType);
     }
 
     @Override
@@ -103,4 +113,8 @@ public class ConsumedPartitionGroup implements 
Iterable<IntermediateResultPartit
     public boolean areAllPartitionsFinished() {
         return unfinishedPartitions.get() == 0;
     }
+
+    public ResultPartitionType getResultPartitionType() {
+        return resultPartitionType;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 8e7ec03781a..55bc617f5b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -907,7 +907,12 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         try {
             
partitionTracker.stopTrackingAndReleaseJobPartitions(partitionToRelease);
             partitionTracker.promoteJobPartitions(partitionsToPromote);
-
+            if (establishedResourceManagerConnection != null) {
+                establishedResourceManagerConnection
+                        .getResourceManagerGateway()
+                        .reportClusterPartitions(
+                                getResourceID(), 
partitionTracker.createClusterPartitionReport());
+            }
             closeJobManagerConnectionIfNoAllocatedResources(jobId);
         } catch (Throwable t) {
             // TODO: Do we still need this catch branch?
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
index 72f54808c54..a85ccf6efc7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/partition/ClusterPartitionReport.java
@@ -19,10 +19,12 @@ package org.apache.flink.runtime.taskexecutor.partition;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -56,22 +58,22 @@ public class ClusterPartitionReport implements Serializable 
{
         private static final long serialVersionUID = -666517548300250601L;
 
         private final IntermediateDataSetID dataSetId;
-        private final Set<ResultPartitionID> hostedPartitions;
+        private final Map<ResultPartitionID, ShuffleDescriptor> 
shuffleDescriptors;
         private final int numTotalPartitions;
 
         public ClusterPartitionReportEntry(
                 IntermediateDataSetID dataSetId,
-                Set<ResultPartitionID> hostedPartitions,
-                int numTotalPartitions) {
+                int numTotalPartitions,
+                Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors) {
             Preconditions.checkNotNull(dataSetId);
-            Preconditions.checkNotNull(hostedPartitions);
-            Preconditions.checkArgument(!hostedPartitions.isEmpty());
+            Preconditions.checkNotNull(shuffleDescriptors);
+            Preconditions.checkArgument(!shuffleDescriptors.isEmpty());
             Preconditions.checkArgument(numTotalPartitions > 0);
-            Preconditions.checkState(hostedPartitions.size() <= 
numTotalPartitions);
+            Preconditions.checkState(shuffleDescriptors.size() <= 
numTotalPartitions);
 
             this.dataSetId = dataSetId;
-            this.hostedPartitions = hostedPartitions;
             this.numTotalPartitions = numTotalPartitions;
+            this.shuffleDescriptors = shuffleDescriptors;
         }
 
         public IntermediateDataSetID getDataSetId() {
@@ -79,11 +81,15 @@ public class ClusterPartitionReport implements Serializable 
{
         }
 
         public Set<ResultPartitionID> getHostedPartitions() {
-            return hostedPartitions;
+            return shuffleDescriptors.keySet();
         }
 
         public int getNumTotalPartitions() {
             return numTotalPartitions;
         }
+
+        public Map<ResultPartitionID, ShuffleDescriptor> 
getShuffleDescriptors() {
+            return shuffleDescriptors;
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 75bceb6882b..7dba9971c56 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
@@ -161,7 +162,7 @@ public class TaskDeploymentDescriptorFactoryTest extends 
TestLogger {
     }
 
     private static TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(ExecutionVertex ev)
-            throws IOException {
+            throws IOException, CachedIntermediateDataSetCorruptedException {
 
         return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
                 .createDeploymentDescriptor(new AllocationID(), null, 
Collections.emptyList());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
index 1f287c4a777..83af9e0c5bb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/JobMasterPartitionTrackerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -35,6 +36,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -297,6 +300,47 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
                 externallyReleasedPartitions, 
containsInAnyOrder(jobPartitionId0, jobPartitionId1));
     }
 
+    @Test
+    public void testGetShuffleDescriptors() {
+        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
+
+        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+        final JobMasterPartitionTrackerImpl partitionTracker =
+                new JobMasterPartitionTrackerImpl(
+                        new JobID(),
+                        shuffleMaster,
+                        resourceId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                resourceId, 
taskExecutorReleaseCalls)));
+
+        TestingResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
+        partitionTracker.connectToResourceManager(resourceManagerGateway);
+        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+
+        assertThat(
+                resourceManagerGateway.requestedIntermediateDataSetIds,
+                contains(intermediateDataSetId));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testGetShuffleDescriptorsBeforeConnectToResourceManager() {
+        final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster();
+        IntermediateDataSetID intermediateDataSetId = new 
IntermediateDataSetID();
+
+        final Queue<ReleaseCall> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+        final JobMasterPartitionTrackerImpl partitionTracker =
+                new JobMasterPartitionTrackerImpl(
+                        new JobID(),
+                        shuffleMaster,
+                        resourceId ->
+                                Optional.of(
+                                        createTaskExecutorGateway(
+                                                resourceId, 
taskExecutorReleaseCalls)));
+        
partitionTracker.getClusterPartitionShuffleDescriptors(intermediateDataSetId);
+    }
+
     private static TaskExecutorGateway createTaskExecutorGateway(
             ResourceID taskExecutorId, Collection<ReleaseCall> 
releaseOrPromoteCalls) {
         return new TestingTaskExecutorGatewayBuilder()
@@ -329,6 +373,20 @@ public class JobMasterPartitionTrackerImplTest extends 
TestLogger {
         }
     }
 
+    private static class TestingResourceManagerGateway
+            extends 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway {
+
+        private final List<IntermediateDataSetID> 
requestedIntermediateDataSetIds =
+                new ArrayList<>();
+
+        @Override
+        public CompletableFuture<List<ShuffleDescriptor>> 
getClusterPartitionsShuffleDescriptors(
+                IntermediateDataSetID intermediateDataSetID) {
+            requestedIntermediateDataSetIds.add(intermediateDataSetID);
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+    }
+
     private static class ReleaseCall {
         private final ResourceID taskExecutorId;
         private final JobID jobId;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
index 8572f2e63b0..087a29613e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpJobMasterPartitionTracker.java
@@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 
 /** No-op implementation of {@link JobMasterPartitionTracker}. */
 public enum NoOpJobMasterPartitionTracker implements JobMasterPartitionTracker 
{
@@ -53,6 +57,15 @@ public enum NoOpJobMasterPartitionTracker implements 
JobMasterPartitionTracker {
         return Collections.emptyList();
     }
 
+    @Override
+    public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {}
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return Collections.emptyList();
+    }
+
     @Override
     public Collection<PartitionTrackerEntry<ResourceID, 
ResultPartitionDeploymentDescriptor>>
             stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
index 003cdb76f1a..be31e2bc131 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpResourceManagerPartitionTracker.java
@@ -19,9 +19,11 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -49,6 +51,12 @@ public enum NoOpResourceManagerPartitionTracker implements 
ResourceManagerPartit
         return Collections.emptyMap();
     }
 
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID dataSetID) {
+        return Collections.emptyList();
+    }
+
     @SuppressWarnings(
             "unused") // unused parameter allows usage as a 
ResourceManagerPartitionTrackerFactory
     public static ResourceManagerPartitionTracker get(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
index c3925894d3e..83246ac5b20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResourceManagerPartitionTrackerImplTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.TestLogger;
 
@@ -28,11 +31,12 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
@@ -187,6 +191,39 @@ public class ResourceManagerPartitionTrackerImplTest 
extends TestLogger {
         assertThat(tracker.areAllMapsEmpty(), is(true));
     }
 
+    @Test
+    public void testGetClusterPartitionShuffleDescriptors() {
+        final ResourceManagerPartitionTrackerImpl tracker =
+                new ResourceManagerPartitionTrackerImpl(new 
TestClusterPartitionReleaser());
+
+        assertThat(tracker.listDataSets().size(), is(0));
+
+        List<ResultPartitionID> resultPartitionIDS = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            resultPartitionIDS.add(
+                    new ResultPartitionID(
+                            new IntermediateResultPartitionID(DATA_SET_ID, i),
+                            ExecutionAttemptID.randomId()));
+        }
+
+        for (ResultPartitionID resultPartitionID : resultPartitionIDS) {
+            report(tracker, TASK_EXECUTOR_ID_1, DATA_SET_ID, 100, 
resultPartitionID);
+        }
+
+        final List<ShuffleDescriptor> shuffleDescriptors =
+                tracker.getClusterPartitionShuffleDescriptors(DATA_SET_ID);
+        assertThat(shuffleDescriptors.size(), is(100));
+        assertThat(
+                shuffleDescriptors.stream()
+                        .map(ShuffleDescriptor::getResultPartitionID)
+                        .collect(Collectors.toList()),
+                contains(resultPartitionIDS.toArray()));
+
+        reportEmpty(tracker, TASK_EXECUTOR_ID_1);
+        reportEmpty(tracker, TASK_EXECUTOR_ID_2);
+        assertThat(tracker.areAllMapsEmpty(), is(true));
+    }
+
     private static void reportEmpty(
             ResourceManagerPartitionTracker tracker, ResourceID... 
taskExecutorIds) {
         for (ResourceID taskExecutorId : taskExecutorIds) {
@@ -210,12 +247,34 @@ public class ResourceManagerPartitionTrackerImplTest 
extends TestLogger {
             IntermediateDataSetID dataSetId,
             int numTotalPartitions,
             ResultPartitionID... partitionId) {
+        final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors =
+                Arrays.stream(partitionId)
+                        .map(TestShuffleDescriptor::new)
+                        .collect(
+                                Collectors.toMap(
+                                        
TestShuffleDescriptor::getResultPartitionID, d -> d));
         return new ClusterPartitionReport(
                 Collections.singletonList(
                         new ClusterPartitionReport.ClusterPartitionReportEntry(
-                                dataSetId,
-                                new HashSet<>(Arrays.asList(partitionId)),
-                                numTotalPartitions)));
+                                dataSetId, numTotalPartitions, 
shuffleDescriptors)));
+    }
+
+    private static class TestShuffleDescriptor implements ShuffleDescriptor {
+        private final ResultPartitionID resultPartitionID;
+
+        TestShuffleDescriptor(ResultPartitionID resultPartitionID) {
+            this.resultPartitionID = resultPartitionID;
+        }
+
+        @Override
+        public ResultPartitionID getResultPartitionID() {
+            return resultPartitionID;
+        }
+
+        @Override
+        public Optional<ResourceID> storesLocalResourcesOn() {
+            return Optional.empty();
+        }
     }
 
     private static class TestClusterPartitionReleaser
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
index a8272949996..800a638176a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TaskExecutorPartitionTrackerImplTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -26,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
 import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
@@ -39,6 +41,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.CoreMatchers.not;
@@ -66,10 +69,16 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
 
         partitionTracker.startTrackingPartition(
                 jobId,
-                new TaskExecutorPartitionInfo(clusterPartitionId, dataSetId, 
numberOfPartitions));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(clusterPartitionId),
+                        dataSetId,
+                        numberOfPartitions));
         partitionTracker.startTrackingPartition(
                 jobId,
-                new TaskExecutorPartitionInfo(jobPartitionId, dataSetId, 
numberOfPartitions + 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(jobPartitionId),
+                        dataSetId,
+                        numberOfPartitions + 1));
 
         
partitionTracker.promoteJobPartitions(Collections.singleton(clusterPartitionId));
 
@@ -97,10 +106,16 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
                 new 
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
         partitionTracker.startTrackingPartition(
                 new JobID(),
-                new TaskExecutorPartitionInfo(resultPartitionId1, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId1),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.startTrackingPartition(
                 new JobID(),
-                new TaskExecutorPartitionInfo(resultPartitionId2, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId2),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.stopTrackingAndReleaseJobPartitions(
                 Collections.singleton(resultPartitionId1));
 
@@ -123,10 +138,16 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
                 new 
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
         partitionTracker.startTrackingPartition(
                 jobId1,
-                new TaskExecutorPartitionInfo(resultPartitionId1, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId1),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.startTrackingPartition(
                 jobId2,
-                new TaskExecutorPartitionInfo(resultPartitionId2, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId2),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId1);
 
         assertThat(shuffleReleaseFuture.get(), hasItem(resultPartitionId1));
@@ -147,10 +168,16 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
                 new 
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
         partitionTracker.startTrackingPartition(
                 jobId,
-                new TaskExecutorPartitionInfo(resultPartitionId1, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId1),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.startTrackingPartition(
                 jobId,
-                new TaskExecutorPartitionInfo(resultPartitionId2, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId2),
+                        new IntermediateDataSetID(),
+                        1));
         
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
 
         partitionTracker.stopTrackingAndReleaseJobPartitionsFor(jobId);
@@ -171,10 +198,16 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
                 new 
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
         partitionTracker.startTrackingPartition(
                 new JobID(),
-                new TaskExecutorPartitionInfo(resultPartitionId1, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId1),
+                        new IntermediateDataSetID(),
+                        1));
         partitionTracker.startTrackingPartition(
                 new JobID(),
-                new TaskExecutorPartitionInfo(resultPartitionId2, new 
IntermediateDataSetID(), 1));
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId2),
+                        new IntermediateDataSetID(),
+                        1));
         
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
 
         partitionTracker.stopTrackingAndReleaseAllClusterPartitions();
@@ -197,9 +230,13 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
         final TaskExecutorPartitionTracker partitionTracker =
                 new 
TaskExecutorPartitionTrackerImpl(testingShuffleEnvironment);
         partitionTracker.startTrackingPartition(
-                new JobID(), new TaskExecutorPartitionInfo(resultPartitionId1, 
dataSetId1, 1));
+                new JobID(),
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId1), 
dataSetId1, 1));
         partitionTracker.startTrackingPartition(
-                new JobID(), new TaskExecutorPartitionInfo(resultPartitionId2, 
dataSetId2, 1));
+                new JobID(),
+                new TaskExecutorPartitionInfo(
+                        new TestingShuffleDescriptor(resultPartitionId2), 
dataSetId2, 1));
         
partitionTracker.promoteJobPartitions(Collections.singleton(resultPartitionId1));
 
         
partitionTracker.stopTrackingAndReleaseClusterPartitions(Collections.singleton(dataSetId1));
@@ -268,4 +305,23 @@ public class TaskExecutorPartitionTrackerImplTest extends 
TestLogger {
             backingShuffleEnvironment.close();
         }
     }
+
+    private static class TestingShuffleDescriptor implements ShuffleDescriptor 
{
+        private final ResultPartitionID resultPartitionID;
+
+        private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
+
+            this.resultPartitionID = resultPartitionID;
+        }
+
+        @Override
+        public ResultPartitionID getResultPartitionID() {
+            return resultPartitionID;
+        }
+
+        @Override
+        public Optional<ResourceID> storesLocalResourcesOn() {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
index 8aecfd88180..8378befcfe0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingJobMasterPartitionTracker.java
@@ -19,9 +19,13 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -126,6 +130,15 @@ public class TestingJobMasterPartitionTracker implements 
JobMasterPartitionTrack
         return getAllTrackedPartitionsSupplier.get();
     }
 
+    @Override
+    public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {}
+
+    @Override
+    public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return Collections.emptyList();
+    }
+
     @Override
     public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) 
{
         return isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
new file mode 100644
index 00000000000..907bef037ae
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobIntermediateDatasetReuseTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import 
org.apache.flink.runtime.scheduler.CachedIntermediateDataSetCorruptedException;
+import org.apache.flink.types.IntValue;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Integration tests for reusing persisted intermediate dataset */
+public class JobIntermediateDatasetReuseTest {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);
+
+    @Test
+    public void testClusterPartitionReuse() throws Exception {
+        internalTestClusterPartitionReuse(1, 1, jobResult -> 
assertTrue(jobResult.isSuccess()));
+    }
+
+    @Test
+    public void testClusterPartitionReuseMultipleParallelism() throws 
Exception {
+        internalTestClusterPartitionReuse(64, 64, jobResult -> 
assertTrue(jobResult.isSuccess()));
+    }
+
+    @Test
+    public void 
testClusterPartitionReuseWithMoreConsumerParallelismThrowException()
+            throws Exception {
+        internalTestClusterPartitionReuse(
+                1,
+                2,
+                jobResult -> {
+                    assertFalse(jobResult.isSuccess());
+                    
assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult));
+                });
+    }
+
+    @Test
+    public void 
testClusterPartitionReuseWithLessConsumerParallelismThrowException()
+            throws Exception {
+        internalTestClusterPartitionReuse(
+                2,
+                1,
+                jobResult -> {
+                    assertFalse(jobResult.isSuccess());
+                    
assertNotNull(getCachedIntermediateDataSetCorruptedException(jobResult));
+                });
+    }
+
+    private void internalTestClusterPartitionReuse(
+            int producerParallelism,
+            int consumerParallelism,
+            Consumer<JobResult> jobResultVerification)
+            throws Exception {
+        final TestingMiniClusterConfiguration miniClusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder().build();
+
+        try (TestingMiniCluster miniCluster =
+                
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+            miniCluster.start();
+
+            IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
+            final JobGraph firstJobGraph =
+                    createFirstJobGraph(producerParallelism, 
intermediateDataSetID);
+            miniCluster.submitJob(firstJobGraph).get();
+            CompletableFuture<JobResult> jobResultFuture =
+                    miniCluster.requestJobResult(firstJobGraph.getJobID());
+            JobResult jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+
+            final JobGraph secondJobGraph =
+                    createSecondJobGraph(consumerParallelism, 
intermediateDataSetID);
+            miniCluster.submitJob(secondJobGraph).get();
+            jobResultFuture = 
miniCluster.requestJobResult(secondJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            jobResultVerification.accept(jobResult);
+        }
+    }
+
+    @Test
+    public void testClusterPartitionReuseWithTMFail() throws Exception {
+        final TestingMiniClusterConfiguration miniClusterConfiguration =
+                TestingMiniClusterConfiguration.newBuilder().build();
+
+        try (TestingMiniCluster miniCluster =
+                
TestingMiniCluster.newBuilder(miniClusterConfiguration).build()) {
+            miniCluster.start();
+
+            IntermediateDataSetID intermediateDataSetID = new 
IntermediateDataSetID();
+            final JobGraph firstJobGraph = createFirstJobGraph(1, 
intermediateDataSetID);
+            miniCluster.submitJob(firstJobGraph).get();
+            CompletableFuture<JobResult> jobResultFuture =
+                    miniCluster.requestJobResult(firstJobGraph.getJobID());
+            JobResult jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+
+            miniCluster.terminateTaskManager(0);
+            miniCluster.startTaskManager();
+
+            final JobGraph secondJobGraph = createSecondJobGraph(1, 
intermediateDataSetID);
+            final ExecutionConfig executionConfig = new ExecutionConfig();
+            
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1024, 
1000));
+            secondJobGraph.setExecutionConfig(executionConfig);
+            miniCluster.submitJob(secondJobGraph).get();
+            jobResultFuture = 
miniCluster.requestJobResult(secondJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            assertFalse(jobResult.isSuccess());
+            final CachedIntermediateDataSetCorruptedException exception =
+                    getCachedIntermediateDataSetCorruptedException(jobResult);
+            assertNotNull(exception);
+            assertEquals(
+                    intermediateDataSetID, 
exception.getCorruptedIntermediateDataSetID().get(0));
+
+            firstJobGraph.setJobID(new JobID());
+            miniCluster.submitJob(firstJobGraph).get();
+            jobResultFuture = 
miniCluster.requestJobResult(firstJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+
+            secondJobGraph.setJobID(new JobID());
+            miniCluster.submitJob(secondJobGraph).get();
+            jobResultFuture = 
miniCluster.requestJobResult(secondJobGraph.getJobID());
+            jobResult = jobResultFuture.get();
+            assertTrue(jobResult.isSuccess());
+        }
+    }
+
+    private CachedIntermediateDataSetCorruptedException
+            getCachedIntermediateDataSetCorruptedException(JobResult 
jobResult) {
+        assertTrue(jobResult.getSerializedThrowable().isPresent());
+        Throwable throwable =
+                jobResult
+                        .getSerializedThrowable()
+                        .get()
+                        
.deserializeError(Thread.currentThread().getContextClassLoader());
+        while (throwable != null) {
+            if (throwable instanceof 
CachedIntermediateDataSetCorruptedException) {
+                return (CachedIntermediateDataSetCorruptedException) throwable;
+            }
+            throwable = throwable.getCause();
+        }
+        return null;
+    }
+
+    private JobGraph createSecondJobGraph(
+            int parallelism, IntermediateDataSetID intermediateDataSetID) {
+        final JobVertex receiver = new JobVertex("Receiver 2", null);
+        receiver.setParallelism(parallelism);
+        receiver.setInvokableClass(Receiver.class);
+        receiver.addIntermediateDataSetIdToConsume(intermediateDataSetID);
+
+        return new JobGraph(null, "Second Job", receiver);
+    }
+
+    private JobGraph createFirstJobGraph(
+            int parallelism, IntermediateDataSetID intermediateDataSetID) {
+        final JobVertex sender = new JobVertex("Sender");
+        sender.setParallelism(parallelism);
+        sender.setInvokableClass(Sender.class);
+
+        final JobVertex receiver = new JobVertex("Receiver");
+        receiver.setParallelism(parallelism);
+        receiver.setInvokableClass(Receiver.class);
+
+        receiver.connectNewDataSetAsInput(
+                sender,
+                DistributionPattern.POINTWISE,
+                ResultPartitionType.BLOCKING_PERSISTENT,
+                intermediateDataSetID);
+
+        return new JobGraph(null, "First Job", sender, receiver);
+    }
+
+    /**
+     * Basic sender {@link AbstractInvokable} which sends 100 record base on 
its index to down
+     * stream.
+     */
+    public static class Sender extends AbstractInvokable {
+
+        public Sender(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            int index = getIndexInSubtaskGroup();
+            final RecordWriter<IntValue> writer =
+                    new 
RecordWriterBuilder<IntValue>().build(getEnvironment().getWriter(0));
+
+            try {
+                for (int i = index; i < index + 100; ++i) {
+                    writer.emit(new IntValue(i));
+                    LOG.debug("Sender({}) emit {}", index, i);
+                }
+                writer.flushAll();
+            } finally {
+                writer.close();
+            }
+        }
+    }
+
+    /**
+     * Basic receiver {@link AbstractInvokable} which verifies the sent 
elements from the {@link
+     * Sender}.
+     */
+    public static class Receiver extends AbstractInvokable {
+
+        public Receiver(Environment environment) {
+            super(environment);
+        }
+
+        @Override
+        public void invoke() throws Exception {
+            int index = getIndexInSubtaskGroup();
+            final RecordReader<IntValue> reader =
+                    new RecordReader<>(
+                            getEnvironment().getInputGate(0),
+                            IntValue.class,
+                            
getEnvironment().getTaskManagerInfo().getTmpDirectories());
+            for (int i = index; i < index + 100; ++i) {
+                final int value = reader.next().getValue();
+                LOG.debug("Receiver({}) received {}", index, value);
+                assertEquals(i, value);
+            }
+
+            assertNull(reader.next());
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
index 997a3ba59a8..c527fdeca31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
@@ -45,10 +46,12 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
@@ -226,14 +229,20 @@ public class ResourceManagerPartitionLifecycleTest 
extends TestLogger {
             IntermediateDataSetID dataSetId,
             int numTotalPartitions,
             ResultPartitionID... partitionIds) {
+
+        final Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors =
+                Arrays.stream(partitionIds)
+                        .map(TestingShuffleDescriptor::new)
+                        .collect(
+                                Collectors.toMap(
+                                        
TestingShuffleDescriptor::getResultPartitionID, d -> d));
+
         return new TaskExecutorHeartbeatPayload(
                 new SlotReport(),
                 new ClusterPartitionReport(
                         Collections.singletonList(
                                 new 
ClusterPartitionReport.ClusterPartitionReportEntry(
-                                        dataSetId,
-                                        new 
HashSet<>(Arrays.asList(partitionIds)),
-                                        numTotalPartitions))));
+                                        dataSetId, numTotalPartitions, 
shuffleDescriptors))));
     }
 
     @FunctionalInterface
@@ -249,4 +258,23 @@ public class ResourceManagerPartitionLifecycleTest extends 
TestLogger {
                 ResourceID taskExecutorId2)
                 throws Exception;
     }
+
+    private static class TestingShuffleDescriptor implements ShuffleDescriptor 
{
+
+        private final ResultPartitionID resultPartitionID;
+
+        private TestingShuffleDescriptor(ResultPartitionID resultPartitionID) {
+            this.resultPartitionID = resultPartitionID;
+        }
+
+        @Override
+        public ResultPartitionID getResultPartitionID() {
+            return resultPartitionID;
+        }
+
+        @Override
+        public Optional<ResourceID> storesLocalResourcesOn() {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index b3a38bd8140..9e9740245e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -46,18 +46,21 @@ import 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorEx
 import org.apache.flink.runtime.rest.messages.LogInfo;
 import org.apache.flink.runtime.rest.messages.ThreadDumpInfo;
 import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.slots.ResourceRequirements;
 import org.apache.flink.runtime.taskexecutor.FileType;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
+import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.QuadFunction;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -492,4 +495,16 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
             IntermediateDataSetID dataSetToRelease) {
         return CompletableFuture.completedFuture(null);
     }
+
+    @Override
+    public CompletableFuture<Void> reportClusterPartitions(
+            ResourceID taskExecutorId, ClusterPartitionReport 
clusterPartitionReport) {
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<List<ShuffleDescriptor>> 
getClusterPartitionsShuffleDescriptors(
+            IntermediateDataSetID intermediateDataSetID) {
+        return CompletableFuture.completedFuture(null);
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
index 491740f9bd6..f97116e58b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionVertexTest.java
@@ -81,7 +81,9 @@ public class DefaultExecutionVertexTest extends TestLogger {
 
         List<ConsumedPartitionGroup> consumedPartitionGroups =
                 Collections.singletonList(
-                        
ConsumedPartitionGroup.fromSinglePartition(intermediateResultPartitionId));
+                        ConsumedPartitionGroup.fromSinglePartition(
+                                intermediateResultPartitionId,
+                                schedulingResultPartition.getResultType()));
         Map<IntermediateResultPartitionID, DefaultResultPartition> 
resultPartitionById =
                 Collections.singletonMap(intermediateResultPartitionId, 
schedulingResultPartition);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
index ce7936757b8..c5131d7513e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java
@@ -52,6 +52,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
 import 
org.apache.flink.runtime.executiongraph.failover.flip1.partitionrelease.PartitionGroupReleaseStrategy;
 import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -63,6 +64,7 @@ import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.testutils.TestingUtils;
@@ -1021,5 +1023,12 @@ public class ExecutingTest extends TestLogger {
         public ExecutionGraphID getExecutionGraphID() {
             return new ExecutionGraphID();
         }
+
+        @Override
+        public List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
+                IntermediateDataSetID intermediateResultPartition) {
+            throw new UnsupportedOperationException(
+                    "This method is not supported by the 
MockInternalExecutionGraphAccessor.");
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
index 32c81b6768b..4621680d360 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler.strategy;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.IterableUtils;
@@ -91,7 +92,8 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
 
     void addConsumedPartition(TestingSchedulingResultPartition 
consumedPartition) {
         final ConsumedPartitionGroup consumedPartitionGroup =
-                
ConsumedPartitionGroup.fromSinglePartition(consumedPartition.getId());
+                ConsumedPartitionGroup.fromSinglePartition(
+                        consumedPartition.getId(), 
consumedPartition.getResultType());
 
         
consumedPartition.registerConsumedPartitionGroup(consumedPartitionGroup);
         if (consumedPartition.getState() == ResultPartitionState.CONSUMABLE) {
@@ -143,6 +145,8 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
                 Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>
                         resultPartitionsById) {
             this.resultPartitionsById.putAll(resultPartitionsById);
+            final ResultPartitionType resultType =
+                    
resultPartitionsById.values().iterator().next().getResultType();
 
             for (ConsumedPartitionGroup partitionGroup : 
consumedPartitionGroups) {
                 List<IntermediateResultPartitionID> partitionIds =
@@ -151,7 +155,7 @@ public class TestingSchedulingExecutionVertex implements 
SchedulingExecutionVert
                     partitionIds.add(partitionId);
                 }
                 this.consumedPartitionGroups.add(
-                        
ConsumedPartitionGroup.fromMultiplePartitions(partitionIds));
+                        
ConsumedPartitionGroup.fromMultiplePartitions(partitionIds, resultType));
             }
             return this;
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
index 82cf47fe01b..82463a1a2cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
@@ -352,7 +352,8 @@ public class TestingSchedulingTopology implements 
SchedulingTopology {
                     ConsumedPartitionGroup.fromMultiplePartitions(
                             resultPartitions.stream()
                                     
.map(TestingSchedulingResultPartition::getId)
-                                    .collect(Collectors.toList()));
+                                    .collect(Collectors.toList()),
+                            resultPartitions.get(0).getResultType());
             Map<IntermediateResultPartitionID, 
TestingSchedulingResultPartition>
                     consumedPartitionById =
                             resultPartitions.stream()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b35de9f7f88..ea2fec1b5ff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -78,6 +78,7 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
@@ -138,6 +139,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -696,11 +698,24 @@ public class TaskExecutorTest extends TestLogger {
 
     private static TaskExecutorPartitionTracker 
createPartitionTrackerWithFixedPartitionReport(
             ShuffleEnvironment<?, ?> shuffleEnvironment) {
+        ResultPartitionID resultPartitionID = new ResultPartitionID();
         final ClusterPartitionReport.ClusterPartitionReportEntry 
clusterPartitionReportEntry =
                 new ClusterPartitionReport.ClusterPartitionReportEntry(
                         new IntermediateDataSetID(),
-                        Collections.singleton(new ResultPartitionID()),
-                        4);
+                        4,
+                        Collections.singletonMap(
+                                resultPartitionID,
+                                new ShuffleDescriptor() {
+                                    @Override
+                                    public ResultPartitionID 
getResultPartitionID() {
+                                        return resultPartitionID;
+                                    }
+
+                                    @Override
+                                    public Optional<ResourceID> 
storesLocalResourcesOn() {
+                                        return Optional.empty();
+                                    }
+                                }));
 
         final ClusterPartitionReport clusterPartitionReport =
                 new 
ClusterPartitionReport(Collections.singletonList(clusterPartitionReportEntry));

Reply via email to