gaoyunhaii commented on code in PR #19653:
URL: https://github.com/apache/flink/pull/19653#discussion_r872337757


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -299,6 +302,21 @@ private void handleTaskFailure(
         final FailureHandlingResult failureHandlingResult =
                 executionFailureHandler.getFailureHandlingResult(
                         executionVertexId, error, timestamp);
+
+        // Notify shuffle master that the cached intermediate dataset is 
corrupted.
+        if (failureHandlingResult.getError() instanceof 
CacheCorruptedException) {

Review Comment:
   I think it might be not sufficient to only consider `DefaultScheduler`. We 
might also have AdaptiveBatchScheduler for batch jobs and AdaptiveScheduler for 
streaming jobs (if we also consider the case to start a new streaming job with 
cached result partition). 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -142,6 +142,12 @@ public class JobVertex implements java.io.Serializable {
      */
     private String resultOptimizerProperties;
 
+    /**
+     * Optional, the intermediateDataSetId of the cached intermediate dataset 
that the job vertex
+     * consumes.
+     */
+    @Nullable private final IntermediateDataSetID intermediateDataSetID;

Review Comment:
   Each JobVertex may consumes multiple `IntermediateDataSet`, thus I think it 
is not sufficient to bookkeep one id here?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java:
##########
@@ -45,6 +56,9 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final int networkBufferSize;
 
+    private final Map<IntermediateDataSetID, 
Collection<NettyShuffleDescriptor>>

Review Comment:
   Previously for TaskExecutor managed partitions, the process of prompting 
should be implemented in `JobMasterPartitionTracker` directly, thus the 
`NettyShuffleMaster` should not need to manage the cluster partitions directly? 
   
   Also I'm a bit wondering if it is possible we unify the two processes? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##########
@@ -84,6 +86,33 @@ CompletableFuture<T> registerPartitionWithProducer(
             PartitionDescriptor partitionDescriptor,
             ProducerDescriptor producerDescriptor);
 
+    /**
+     * Returns all the shuffle descriptors for the partitions in the 
intermediate data set with the
+     * given id.
+     *
+     * @param intermediateDataSetID The id of hte intermediate data set.
+     * @return all the shuffle descriptors for the partitions in the 
intermediate data set. Null if
+     *     not exist.
+     */
+    default Collection<T> getClusterPartitionShuffleDescriptors(

Review Comment:
   Might also cc @wsry for a double check of the proposed methods here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to