wanglijie95 commented on code in PR #22686: URL: https://github.com/apache/flink/pull/22686#discussion_r1226372595
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java: ########## @@ -1730,14 +1705,13 @@ public MarkPartitionFinishedStrategy getMarkPartitionFinishedStrategy() { return markPartitionFinishedStrategy; } - @Override - public boolean isNonFinishedHybridPartitionShouldBeUnknown() { - return nonFinishedHybridPartitionShouldBeUnknown; - } - @Override public JobVertexInputInfo getJobVertexInputInfo( JobVertexID jobVertexId, IntermediateDataSetID resultId) { return vertexInputInfoStore.get(jobVertexId, resultId); } + Review Comment: Add `@Override` ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java: ########## @@ -121,6 +121,14 @@ public static DefaultExecutionGraph buildGraph( // create a new execution graph, if none exists so far final DefaultExecutionGraph executionGraph; try { + TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory = Review Comment: I think it would be better to add a separate try catch block, because the exception message of current block is "Could not create the ExecutionGraph" ########## flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java: ########## @@ -156,20 +141,32 @@ private List<InputGateDeploymentDescriptor> createInputGateDeploymentDescriptors subpartitionRange, consumedPartitionGroup.size(), getConsumedPartitionShuffleDescriptors( - consumedIntermediateResult, consumedPartitionGroup))); + consumedIntermediateResult, + consumedPartitionGroup, + executionVertex.getExecutionGraphAccessor()))); } - for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> entry : - consumedClusterPartitionShuffleDescriptors.entrySet()) { - // For FLIP-205, the JobGraph generating side ensure that the cluster partition is - // produced with only one subpartition. Therefore, we always consume the partition with - // subpartition index of 0. - inputGates.add( - new InputGateDeploymentDescriptor( - entry.getKey(), - ResultPartitionType.BLOCKING_PERSISTENT, - 0, - entry.getValue())); + try { + for (Map.Entry<IntermediateDataSetID, ShuffleDescriptorAndIndex[]> entry : Review Comment: I prefer to keep the same logic as before, only put `getClusterPartitionShuffleDescriptors` in try catch block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org