zhuzhurk commented on code in PR #24771:
URL: https://github.com/apache/flink/pull/24771#discussion_r1602975736


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1931,53 +1931,133 @@ void testGetAllPartitionWithMetrics() throws Exception 
{
 
             NettyShuffleDescriptor shuffleDescriptor =
                     NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
-            DefaultShuffleMetrics shuffleMetrics =
+            DefaultShuffleMetrics shuffleMetrics1 =
                     new DefaultShuffleMetrics(new ResultPartitionBytes(new 
long[] {1, 2, 3}));
-            Collection<PartitionWithMetrics> defaultPartitionWithMetrics =
+            Collection<PartitionWithMetrics> defaultPartitionWithMetrics1 =
                     Collections.singletonList(
-                            new DefaultPartitionWithMetrics(shuffleDescriptor, 
shuffleMetrics));
-            final TestingTaskExecutorGateway taskExecutorGateway =
+                            new DefaultPartitionWithMetrics(shuffleDescriptor, 
shuffleMetrics1));
+            DefaultShuffleMetrics shuffleMetrics2 =
+                    new DefaultShuffleMetrics(new ResultPartitionBytes(new 
long[] {4, 5, 6}));
+            Collection<PartitionWithMetrics> defaultPartitionWithMetrics2 =
+                    Collections.singletonList(
+                            new DefaultPartitionWithMetrics(shuffleDescriptor, 
shuffleMetrics2));
+            DefaultShuffleMetrics shuffleMetrics3 =
+                    new DefaultShuffleMetrics(new ResultPartitionBytes(new 
long[] {7, 8, 9}));
+            Collection<PartitionWithMetrics> defaultPartitionWithMetrics3 =
+                    Collections.singletonList(
+                            new DefaultPartitionWithMetrics(shuffleDescriptor, 
shuffleMetrics3));
+
+            // start fetch and retain partitions and then register tm1
+            final TestingTaskExecutorGateway taskExecutorGateway1 =
                     new TestingTaskExecutorGatewayBuilder()
                             .setRequestPartitionWithMetricsFunction(
                                     ignored ->
                                             CompletableFuture.completedFuture(
-                                                    
defaultPartitionWithMetrics))
+                                                    
defaultPartitionWithMetrics1))
+                            .setAddress("tm1")
                             .createTestingTaskExecutorGateway();
 
-            final LocalUnresolvedTaskManagerLocation 
taskManagerUnresolvedLocation =
-                    new LocalUnresolvedTaskManagerLocation();
-            final Collection<SlotOffer> slotOffers =
-                    registerSlotsAtJobMaster(
-                            1,
-                            jobMasterGateway,
-                            jobGraph.getJobID(),
-                            taskExecutorGateway,
-                            taskManagerUnresolvedLocation);
-            assertThat(slotOffers).hasSize(1);
+            registerSlotsAtJobMaster(
+                    1,
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway1,
+                    new LocalUnresolvedTaskManagerLocation());
 
-            waitUntilAllExecutionsAreScheduledOrDeployed(jobMasterGateway);
+            jobMaster.startFetchAndRetainPartitionWithMetricsOnTaskManager();
 
-            PartitionWithMetrics metrics =
-                    jobMasterGateway
-                            .getAllPartitionWithMetricsOnTaskManagers()
-                            .get()
-                            .iterator()
-                            .next();
-            PartitionWithMetrics expectedMetrics = 
defaultPartitionWithMetrics.iterator().next();
-
-            
assertThat(metrics.getPartitionMetrics().getPartitionBytes().getSubpartitionBytes())
-                    .isEqualTo(
-                            expectedMetrics
-                                    .getPartitionMetrics()
-                                    .getPartitionBytes()
-                                    .getSubpartitionBytes());
-            assertThat(metrics.getPartition().getResultPartitionID())
-                    
.isEqualTo(expectedMetrics.getPartition().getResultPartitionID());
-            assertThat(metrics.getPartition().isUnknown())
-                    .isEqualTo(expectedMetrics.getPartition().isUnknown());
-            assertThat(metrics.getPartition().storesLocalResourcesOn())
-                    
.isEqualTo(expectedMetrics.getPartition().storesLocalResourcesOn());
-        }
+            verifyPartitionMetrics(jobMasterGateway, 
defaultPartitionWithMetrics1);
+
+            // register tm2
+            TestingTaskExecutorGateway taskExecutorGateway2 =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setRequestPartitionWithMetricsFunction(
+                                    ignored ->
+                                            CompletableFuture.completedFuture(
+                                                    
defaultPartitionWithMetrics2))
+                            .setAddress("tm2")
+                            .createTestingTaskExecutorGateway();
+            registerSlotsAtJobMaster(
+                    1,
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway2,
+                    new LocalUnresolvedTaskManagerLocation());
+
+            Collection<PartitionWithMetrics> expectedMetrics =
+                    new ArrayList<>(defaultPartitionWithMetrics1);
+            expectedMetrics.addAll(defaultPartitionWithMetrics2);
+            verifyPartitionMetrics(jobMasterGateway, expectedMetrics);
+
+            // stop fetch and retain partitions and then register tm3
+            TestingTaskExecutorGateway taskExecutorGateway3 =
+                    new TestingTaskExecutorGatewayBuilder()
+                            .setRequestPartitionWithMetricsFunction(
+                                    ignored ->
+                                            CompletableFuture.completedFuture(
+                                                    
defaultPartitionWithMetrics3))
+                            .setAddress("tm3")
+                            .createTestingTaskExecutorGateway();
+
+            jobMaster.stopFetchAndRetainPartitionWithMetricsOnTaskManager();
+
+            registerSlotsAtJobMaster(
+                    1,
+                    jobMasterGateway,
+                    jobGraph.getJobID(),
+                    taskExecutorGateway3,
+                    new LocalUnresolvedTaskManagerLocation());
+
+            verifyPartitionMetrics(jobMasterGateway, expectedMetrics);
+        }
+    }
+
+    private static void verifyPartitionMetrics(
+            JobMasterGateway jobMasterGateway, 
Collection<PartitionWithMetrics> expectedMetrics)
+            throws InterruptedException, ExecutionException {
+        Collection<PartitionWithMetrics> partitionWithMetrics =

Review Comment:
   It's better to convert them into maps and compare partitions with the same 
id.
   Note that you will need to assign different partition descriptors to 
different `partitionWithMetrics`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -275,6 +278,15 @@ public boolean isParallelismConfigured() {
         return parallelismConfigured;
     }
 
+    public void setDynamicParallelism(int parallelism) {
+        this.parallelism = parallelism;

Review Comment:
   maybe reuse `setParallelism()`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/JobShuffleContext.java:
##########
@@ -45,4 +45,8 @@ CompletableFuture<?> stopTrackingAndReleasePartitions(
      * all TaskManagers.
      */
     CompletableFuture<Collection<PartitionWithMetrics>> 
getAllPartitionWithMetricsOnTaskManagers();
+
+    void notifyPartitionRecoveryStarted();

Review Comment:
   Comments should be added.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/RestartPipelinedRegionFailoverStrategyTest.java:
##########
@@ -285,6 +286,9 @@ void testRegionFailoverForMultipleVerticesRegions() throws 
Exception {
      *
      * Component 1: 1; component 2: 2
      */
+    @Disabled(
+            "To support job recovery, filtering the CREATED execution vertex 
not in strategy, instead of in "
+                    + "DefaultScheduler#restartTasksWithDelay. This case can 
be covered by BatchFineGrainedRecoveryITCase")

Review Comment:
   We should remove this test if it is no longer needed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -335,15 +335,37 @@ private void maybeRestartTasks(final 
FailureHandlingResult failureHandlingResult
     }
 
     private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
-        final Set<ExecutionVertexID> verticesToRestart =
-                failureHandlingResult.getVerticesToRestart();
+        final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
+
+        final Set<ExecutionVertexID> verticesToRestart;
+        if (!globalRecovery) {
+            // we do not need to restart tasks which are already in the 
initial state
+            // we do not do this for global failures because global failures 
can happen when all
+            // tasks are still CREATED and thus the scheduling can get blocked
+            verticesToRestart =
+                    failureHandlingResult.getVerticesToRestart().stream()
+                            .filter(
+                                    executionVertexID ->
+                                            
getExecutionVertex(executionVertexID)
+                                                            
.getExecutionState()
+                                                    != ExecutionState.CREATED)
+                            .collect(Collectors.toSet());
+
+            checkState(failureHandlingResult.getFailedExecution().isPresent());
+            log.info(
+                    "{} tasks should be restarted to recover the failed task 
{}. ",
+                    verticesToRestart.size(),
+                    
failureHandlingResult.getFailedExecution().get().getVertex().getID());

Review Comment:
   The log seems redundant to the logs below?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMaster.java:
##########
@@ -139,4 +139,8 @@ default void snapshotState(
 
     /** Restores the state of the shuffle master from the provided snapshots. 
*/
     default void restoreState(List<ShuffleMasterSnapshot> snapshots) {}
+
+    default void notifyPartitionRecoveryStarted(JobID jobId) {}

Review Comment:
   Comments should be added.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java:
##########
@@ -335,15 +335,37 @@ private void maybeRestartTasks(final 
FailureHandlingResult failureHandlingResult
     }
 
     private void restartTasksWithDelay(final FailureHandlingResult 
failureHandlingResult) {
-        final Set<ExecutionVertexID> verticesToRestart =
-                failureHandlingResult.getVerticesToRestart();
+        final boolean globalRecovery = failureHandlingResult.isGlobalFailure();
+
+        final Set<ExecutionVertexID> verticesToRestart;
+        if (!globalRecovery) {
+            // we do not need to restart tasks which are already in the 
initial state
+            // we do not do this for global failures because global failures 
can happen when all
+            // tasks are still CREATED and thus the scheduling can get blocked

Review Comment:
   Why would this happen?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java:
##########
@@ -159,6 +159,9 @@ public class JobVertex implements java.io.Serializable {
 
     private boolean parallelismConfigured = false;
 
+    /** Indicates whether the parallelism of this job vertex is dynamic 
decided. */

Review Comment:
   dynamic decided -> decided dynamically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to