zhuzhurk commented on code in PR #24771: URL: https://github.com/apache/flink/pull/24771#discussion_r1602975736
########## flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java: ########## @@ -1931,53 +1931,133 @@ void testGetAllPartitionWithMetrics() throws Exception { NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal(); - DefaultShuffleMetrics shuffleMetrics = + DefaultShuffleMetrics shuffleMetrics1 = new DefaultShuffleMetrics(new ResultPartitionBytes(new long[] {1, 2, 3})); - Collection<PartitionWithMetrics> defaultPartitionWithMetrics = + Collection<PartitionWithMetrics> defaultPartitionWithMetrics1 = Collections.singletonList( - new DefaultPartitionWithMetrics(shuffleDescriptor, shuffleMetrics)); - final TestingTaskExecutorGateway taskExecutorGateway = + new DefaultPartitionWithMetrics(shuffleDescriptor, shuffleMetrics1)); + DefaultShuffleMetrics shuffleMetrics2 = + new DefaultShuffleMetrics(new ResultPartitionBytes(new long[] {4, 5, 6})); + Collection<PartitionWithMetrics> defaultPartitionWithMetrics2 = + Collections.singletonList( + new DefaultPartitionWithMetrics(shuffleDescriptor, shuffleMetrics2)); + DefaultShuffleMetrics shuffleMetrics3 = + new DefaultShuffleMetrics(new ResultPartitionBytes(new long[] {7, 8, 9})); + Collection<PartitionWithMetrics> defaultPartitionWithMetrics3 = + Collections.singletonList( + new DefaultPartitionWithMetrics(shuffleDescriptor, shuffleMetrics3)); + + // start fetch and retain partitions and then register tm1 + final TestingTaskExecutorGateway taskExecutorGateway1 = new TestingTaskExecutorGatewayBuilder() .setRequestPartitionWithMetricsFunction( ignored -> CompletableFuture.completedFuture( - defaultPartitionWithMetrics)) + defaultPartitionWithMetrics1)) + .setAddress("tm1") .createTestingTaskExecutorGateway(); - final LocalUnresolvedTaskManagerLocation taskManagerUnresolvedLocation = - new LocalUnresolvedTaskManagerLocation(); - final Collection<SlotOffer> slotOffers = - registerSlotsAtJobMaster( - 1, - jobMasterGateway, - jobGraph.getJobID(), - taskExecutorGateway, - taskManagerUnresolvedLocation); - assertThat(slotOffers).hasSize(1); + registerSlotsAtJobMaster( + 1, + jobMasterGateway, + jobGraph.getJobID(), + taskExecutorGateway1, + new LocalUnresolvedTaskManagerLocation()); - waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway); + jobMaster.startFetchAndRetainPartitionWithMetricsOnTaskManager(); - PartitionWithMetrics metrics = - jobMasterGateway - .getAllPartitionWithMetricsOnTaskManagers() - .get() - .iterator() - .next(); - PartitionWithMetrics expectedMetrics = defaultPartitionWithMetrics.iterator().next(); - - assertThat(metrics.getPartitionMetrics().getPartitionBytes().getSubpartitionBytes()) - .isEqualTo( - expectedMetrics - .getPartitionMetrics() - .getPartitionBytes() - .getSubpartitionBytes()); - assertThat(metrics.getPartition().getResultPartitionID()) - .isEqualTo(expectedMetrics.getPartition().getResultPartitionID()); - assertThat(metrics.getPartition().isUnknown()) - .isEqualTo(expectedMetrics.getPartition().isUnknown()); - assertThat(metrics.getPartition().storesLocalResourcesOn()) - .isEqualTo(expectedMetrics.getPartition().storesLocalResourcesOn()); - } + verifyPartitionMetrics(jobMasterGateway, defaultPartitionWithMetrics1); + + // register tm2 + TestingTaskExecutorGateway taskExecutorGateway2 = + new TestingTaskExecutorGatewayBuilder() + .setRequestPartitionWithMetricsFunction( + ignored -> + CompletableFuture.completedFuture( + defaultPartitionWithMetrics2)) + .setAddress("tm2") + .createTestingTaskExecutorGateway(); + registerSlotsAtJobMaster( + 1, + jobMasterGateway, + jobGraph.getJobID(), + taskExecutorGateway2, + new LocalUnresolvedTaskManagerLocation()); + + Collection<PartitionWithMetrics> expectedMetrics = + new ArrayList<>(defaultPartitionWithMetrics1); + expectedMetrics.addAll(defaultPartitionWithMetrics2); + verifyPartitionMetrics(jobMasterGateway, expectedMetrics); + + // stop fetch and retain partitions and then register tm3 + TestingTaskExecutorGateway taskExecutorGateway3 = + new TestingTaskExecutorGatewayBuilder() + .setRequestPartitionWithMetricsFunction( + ignored -> + CompletableFuture.completedFuture( + defaultPartitionWithMetrics3)) + .setAddress("tm3") + .createTestingTaskExecutorGateway(); + + jobMaster.stopFetchAndRetainPartitionWithMetricsOnTaskManager(); + + registerSlotsAtJobMaster( + 1, + jobMasterGateway, + jobGraph.getJobID(), + taskExecutorGateway3, + new LocalUnresolvedTaskManagerLocation()); + + verifyPartitionMetrics(jobMasterGateway, expectedMetrics); + } + } + + private static void verifyPartitionMetrics( + JobMasterGateway jobMasterGateway, Collection<PartitionWithMetrics> expectedMetrics) + throws InterruptedException, ExecutionException { + Collection<PartitionWithMetrics> partitionWithMetrics = Review Comment: It's better to convert them into maps and compare partitions with the same id. Note that you will need to assign different partition descriptors to different `partitionWithMetrics`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ########## @@ -275,6 +278,15 @@ public boolean isParallelismConfigured() { return parallelismConfigured; } + public void setDynamicParallelism(int parallelism) { + this.parallelism = parallelism; Review Comment: maybe reuse `setParallelism()`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java: ########## @@ -45,4 +45,8 @@ CompletableFuture<?> stopTrackingAndReleasePartitions( * all TaskManagers. */ CompletableFuture<Collection<PartitionWithMetrics>> getAllPartitionWithMetricsOnTaskManagers(); + + void notifyPartitionRecoveryStarted(); Review Comment: Comments should be added. ########## flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionFailoverStrategyTest.java: ########## @@ -285,6 +286,9 @@ void testRegionFailoverForMultipleVerticesRegions() throws Exception { * * Component 1: 1; component 2: 2 */ + @Disabled( + "To support job recovery, filtering the CREATED execution vertex not in strategy, instead of in " + + "DefaultScheduler#restartTasksWithDelay. This case can be covered by BatchFineGrainedRecoveryITCase") Review Comment: We should remove this test if it is no longer needed. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ########## @@ -335,15 +335,37 @@ private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult } private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { - final Set<ExecutionVertexID> verticesToRestart = - failureHandlingResult.getVerticesToRestart(); + final boolean globalRecovery = failureHandlingResult.isGlobalFailure(); + + final Set<ExecutionVertexID> verticesToRestart; + if (!globalRecovery) { + // we do not need to restart tasks which are already in the initial state + // we do not do this for global failures because global failures can happen when all + // tasks are still CREATED and thus the scheduling can get blocked + verticesToRestart = + failureHandlingResult.getVerticesToRestart().stream() + .filter( + executionVertexID -> + getExecutionVertex(executionVertexID) + .getExecutionState() + != ExecutionState.CREATED) + .collect(Collectors.toSet()); + + checkState(failureHandlingResult.getFailedExecution().isPresent()); + log.info( + "{} tasks should be restarted to recover the failed task {}. ", + verticesToRestart.size(), + failureHandlingResult.getFailedExecution().get().getVertex().getID()); Review Comment: The log seems redundant to the logs below? ########## flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java: ########## @@ -139,4 +139,8 @@ default void snapshotState( /** Restores the state of the shuffle master from the provided snapshots. */ default void restoreState(List<ShuffleMasterSnapshot> snapshots) {} + + default void notifyPartitionRecoveryStarted(JobID jobId) {} Review Comment: Comments should be added. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java: ########## @@ -335,15 +335,37 @@ private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult } private void restartTasksWithDelay(final FailureHandlingResult failureHandlingResult) { - final Set<ExecutionVertexID> verticesToRestart = - failureHandlingResult.getVerticesToRestart(); + final boolean globalRecovery = failureHandlingResult.isGlobalFailure(); + + final Set<ExecutionVertexID> verticesToRestart; + if (!globalRecovery) { + // we do not need to restart tasks which are already in the initial state + // we do not do this for global failures because global failures can happen when all + // tasks are still CREATED and thus the scheduling can get blocked Review Comment: Why would this happen? ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java: ########## @@ -159,6 +159,9 @@ public class JobVertex implements java.io.Serializable { private boolean parallelismConfigured = false; + /** Indicates whether the parallelism of this job vertex is dynamic decided. */ Review Comment: dynamic decided -> decided dynamically -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org