wanglijie95 commented on code in PR #22686:
URL: https://github.com/apache/flink/pull/22686#discussion_r1226372595


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -1730,14 +1705,13 @@ public MarkPartitionFinishedStrategy 
getMarkPartitionFinishedStrategy() {
         return markPartitionFinishedStrategy;
     }
 
-    @Override
-    public boolean isNonFinishedHybridPartitionShouldBeUnknown() {
-        return nonFinishedHybridPartitionShouldBeUnknown;
-    }
-
     @Override
     public JobVertexInputInfo getJobVertexInputInfo(
             JobVertexID jobVertexId, IntermediateDataSetID resultId) {
         return vertexInputInfoStore.get(jobVertexId, resultId);
     }
+

Review Comment:
   Add `@Override`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java:
##########
@@ -121,6 +121,14 @@ public static DefaultExecutionGraph buildGraph(
         // create a new execution graph, if none exists so far
         final DefaultExecutionGraph executionGraph;
         try {
+            TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory =

Review Comment:
   I think it would be better to add a separate try catch block, because the 
exception message of current block is "Could not create the ExecutionGraph"



##########
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java:
##########
@@ -156,20 +141,32 @@ private List<InputGateDeploymentDescriptor> 
createInputGateDeploymentDescriptors
                             subpartitionRange,
                             consumedPartitionGroup.size(),
                             getConsumedPartitionShuffleDescriptors(
-                                    consumedIntermediateResult, 
consumedPartitionGroup)));
+                                    consumedIntermediateResult,
+                                    consumedPartitionGroup,
+                                    
executionVertex.getExecutionGraphAccessor())));
         }
 
-        for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> 
entry :
-                consumedClusterPartitionShuffleDescriptors.entrySet()) {
-            // For FLIP-205, the JobGraph generating side ensure that the 
cluster partition is
-            // produced with only one subpartition. Therefore, we always 
consume the partition with
-            // subpartition index of 0.
-            inputGates.add(
-                    new InputGateDeploymentDescriptor(
-                            entry.getKey(),
-                            ResultPartitionType.BLOCKING_PERSISTENT,
-                            0,
-                            entry.getValue()));
+        try {
+            for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> 
entry :

Review Comment:
   I prefer to keep the same logic as before, only put 
`getClusterPartitionShuffleDescriptors` in try catch block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to