This is an automated email from the ASF dual-hosted git repository. lindong pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push: new 5a8d42345cd [FLINK-32996][Tests] Fix the CheckpointAfterAllTasksFinishedITCase.testFailoverAfterSomeTasksFinished fails 5a8d42345cd is described below commit 5a8d42345cd740332bb731369039230a64c5d088 Author: JiangXin <jiangxin...@gmail.com> AuthorDate: Fri Sep 1 18:38:22 2023 +0800 [FLINK-32996][Tests] Fix the CheckpointAfterAllTasksFinishedITCase.testFailoverAfterSomeTasksFinished fails This closes #23331. --- .../CheckpointAfterAllTasksFinishedITCase.java | 36 ++++++++++++++++------ 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java index 7b14cfca29c..3520f186074 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -73,6 +74,8 @@ public class CheckpointAfterAllTasksFinishedITCase extends AbstractTestBase { env.setParallelism(4); smallResult = sharedObjects.add(new CopyOnWriteArrayList<>()); bigResult = sharedObjects.add(new CopyOnWriteArrayList<>()); + IntegerStreamSource.failedBefore = false; + IntegerStreamSource.latch = new CountDownLatch(1); } @Test @@ -98,7 +101,6 @@ public class CheckpointAfterAllTasksFinishedITCase extends AbstractTestBase { env.setRestartStrategy(RestartStrategies.noRestart()); env.enableCheckpointing(100); - IntegerStreamSource.latch = new CountDownLatch(1); JobGraph jobGraph = getStreamGraph(env, true, false).getJobGraph(); miniCluster.submitJob(jobGraph).get(); @@ -129,40 +131,56 @@ public class CheckpointAfterAllTasksFinishedITCase extends AbstractTestBase { miniCluster.requestJobResult(restoredJobGraph.getJobID()).get(); assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS); assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS); - } finally { - IntegerStreamSource.latch = null; } } + /** + * Tests the behaviors of the following two scenarios at which the subtasks that have finished + * should be restored to {@link org.apache.flink.runtime.scheduler.VertexEndOfDataListener}. + * + * <p>Some subtasks in the job have reached the end of data but failed due to their final + * checkpoint throwing exceptions. + * + * <p>Some pipelines of the job have been finished but the next checkpoint is not triggered, at + * this time a full-strategy failover happens. + */ @Test public void testFailoverAfterSomeTasksFinished() throws Exception { + final Configuration config = new Configuration(); + config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full"); + final MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder() .withRandomPorts() .setNumTaskManagers(1) .setNumSlotsPerTaskManager(4) + .setConfiguration(config) .build(); try (MiniCluster miniCluster = new MiniCluster(cfg)) { miniCluster.start(); env.enableCheckpointing(100); - IntegerStreamSource.latch = new CountDownLatch(1); JobGraph jobGraph = getStreamGraph(env, true, true).getJobGraph(); - miniCluster.submitJob(jobGraph); + miniCluster.submitJob(jobGraph).get(); CommonTestUtils.waitForSubtasksToFinish( miniCluster, jobGraph.getJobID(), findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(), - false); + true); bigResult.get().clear(); IntegerStreamSource.latch.countDown(); miniCluster.requestJobResult(jobGraph.getJobID()).get(); - assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS); + + // We are expecting the source to be finished and then restart during failover, so the + // sink should receive records as much as double SMALL_SOURCE_NUM_RECORDS. + // However, in a few cases, a checkpoint happens to be triggered before failover, and + // the source would not restart so the sink will only receive SMALL_SOURCE_NUM_RECORDS + // records. + assertThat(smallResult.get().size()) + .isIn(SMALL_SOURCE_NUM_RECORDS, SMALL_SOURCE_NUM_RECORDS * 2); assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS); - } finally { - IntegerStreamSource.latch = null; } }