This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 5a8d42345cd [FLINK-32996][Tests] Fix the 
CheckpointAfterAllTasksFinishedITCase.testFailoverAfterSomeTasksFinished fails
5a8d42345cd is described below

commit 5a8d42345cd740332bb731369039230a64c5d088
Author: JiangXin <jiangxin...@gmail.com>
AuthorDate: Fri Sep 1 18:38:22 2023 +0800

    [FLINK-32996][Tests] Fix the 
CheckpointAfterAllTasksFinishedITCase.testFailoverAfterSomeTasksFinished fails
    
    This closes #23331.
---
 .../CheckpointAfterAllTasksFinishedITCase.java     | 36 ++++++++++++++++------
 1 file changed, 27 insertions(+), 9 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
index 7b14cfca29c..3520f186074 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -73,6 +74,8 @@ public class CheckpointAfterAllTasksFinishedITCase extends 
AbstractTestBase {
         env.setParallelism(4);
         smallResult = sharedObjects.add(new CopyOnWriteArrayList<>());
         bigResult = sharedObjects.add(new CopyOnWriteArrayList<>());
+        IntegerStreamSource.failedBefore = false;
+        IntegerStreamSource.latch = new CountDownLatch(1);
     }
 
     @Test
@@ -98,7 +101,6 @@ public class CheckpointAfterAllTasksFinishedITCase extends 
AbstractTestBase {
 
             env.setRestartStrategy(RestartStrategies.noRestart());
             env.enableCheckpointing(100);
-            IntegerStreamSource.latch = new CountDownLatch(1);
             JobGraph jobGraph = getStreamGraph(env, true, false).getJobGraph();
             miniCluster.submitJob(jobGraph).get();
 
@@ -129,40 +131,56 @@ public class CheckpointAfterAllTasksFinishedITCase 
extends AbstractTestBase {
             miniCluster.requestJobResult(restoredJobGraph.getJobID()).get();
             
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
             
assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
-        } finally {
-            IntegerStreamSource.latch = null;
         }
     }
 
+    /**
+     * Tests the behaviors of the following two scenarios at which the 
subtasks that have finished
+     * should be restored to {@link 
org.apache.flink.runtime.scheduler.VertexEndOfDataListener}.
+     *
+     * <p>Some subtasks in the job have reached the end of data but failed due 
to their final
+     * checkpoint throwing exceptions.
+     *
+     * <p>Some pipelines of the job have been finished but the next checkpoint 
is not triggered, at
+     * this time a full-strategy failover happens.
+     */
     @Test
     public void testFailoverAfterSomeTasksFinished() throws Exception {
+        final Configuration config = new Configuration();
+        config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
"full");
+
         final MiniClusterConfiguration cfg =
                 new MiniClusterConfiguration.Builder()
                         .withRandomPorts()
                         .setNumTaskManagers(1)
                         .setNumSlotsPerTaskManager(4)
+                        .setConfiguration(config)
                         .build();
         try (MiniCluster miniCluster = new MiniCluster(cfg)) {
             miniCluster.start();
 
             env.enableCheckpointing(100);
-            IntegerStreamSource.latch = new CountDownLatch(1);
             JobGraph jobGraph = getStreamGraph(env, true, true).getJobGraph();
-            miniCluster.submitJob(jobGraph);
+            miniCluster.submitJob(jobGraph).get();
 
             CommonTestUtils.waitForSubtasksToFinish(
                     miniCluster,
                     jobGraph.getJobID(),
                     findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(),
-                    false);
+                    true);
             bigResult.get().clear();
             IntegerStreamSource.latch.countDown();
 
             miniCluster.requestJobResult(jobGraph.getJobID()).get();
-            
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
+
+            // We are expecting the source to be finished and then restart 
during failover, so the
+            // sink should receive records as much as double 
SMALL_SOURCE_NUM_RECORDS.
+            // However, in a few cases, a checkpoint happens to be triggered 
before failover, and
+            // the source would not restart so the sink will only receive 
SMALL_SOURCE_NUM_RECORDS
+            // records.
+            assertThat(smallResult.get().size())
+                    .isIn(SMALL_SOURCE_NUM_RECORDS, SMALL_SOURCE_NUM_RECORDS * 
2);
             
assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
-        } finally {
-            IntegerStreamSource.latch = null;
         }
     }
 

Reply via email to