This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c4d6344ab8e [FLINK-38403][tests] Fix the unexpected test that the 
second job does not restore from checkpoint (#27254)
c4d6344ab8e is described below

commit c4d6344ab8eff81358375ed048d9f993c49b0851
Author: Rui Fan <[email protected]>
AuthorDate: Thu Dec 4 17:49:20 2025 +0100

    [FLINK-38403][tests] Fix the unexpected test that the second job does not 
restore from checkpoint (#27254)
---
 .../UnalignedCheckpointRescaleITCase.java          | 61 ++++++++++++++--------
 .../checkpointing/UnalignedCheckpointTestBase.java | 39 ++++++++++----
 2 files changed, 69 insertions(+), 31 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
index 4a97e17cf7b..5be756a0efa 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
@@ -20,6 +20,7 @@
 package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -54,13 +55,13 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collections;
 
 import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 
 /** Integration test for performing rescale of unaligned checkpoint. */
@@ -366,14 +367,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                                 })
                         .name("long-to-string-map")
                         .uid("long-to-string-map")
-                        .map(
-                                new FailingMapper<>(
-                                        state -> false,
-                                        state ->
-                                                state.completedCheckpoints >= 
minCheckpoints / 2
-                                                        && state.runNumber == 
0,
-                                        state -> false,
-                                        state -> false))
+                        .map(getFailingMapper(minCheckpoints))
                         .name("failing-map")
                         .uid("failing-map")
                         .setParallelism(parallelism)
@@ -394,14 +388,7 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                 DataStream<Long> combinedSource, long minCheckpoints, boolean 
slotSharing) {
             combinedSource
                     .shuffle()
-                    .map(
-                            new FailingMapper<>(
-                                    state -> false,
-                                    state ->
-                                            state.completedCheckpoints >= 
minCheckpoints / 2
-                                                    && state.runNumber == 0,
-                                    state -> false,
-                                    state -> false))
+                    .map(getFailingMapper(minCheckpoints))
                     .name("failing-map")
                     .uid("failing-map")
                     .slotSharingGroup(slotSharing ? "default" : "failing-map")
@@ -418,6 +405,25 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
                     .slotSharingGroup(slotSharing ? "default" : "sink");
         }
 
+        /**
+         * Creates a FailingMapper that only fails during snapshot operations.
+         *
+         * <p>Only fails during snapshotState() when completedCheckpoints >= 
minCheckpoints/2 AND
+         * runNumber == 0. After job failovers internally, runNumber becomes 
attemptNumber > 0, so
+         * failure condition is no longer satisfied. This ensures the mapper 
fails exactly once
+         * during initial run to trigger job failover, but never fails again 
after failing over and
+         * recovery from checkpoint.
+         */
+        private static <T> FailingMapper<T> getFailingMapper(long 
minCheckpoints) {
+            return new FailingMapper<>(
+                    state -> false,
+                    state ->
+                            state.completedCheckpoints >= minCheckpoints / 2
+                                    && state.runNumber == 0,
+                    state -> false,
+                    state -> false);
+        }
+
         DataStream<Long> createSourcePipeline(
                 StreamExecutionEnvironment env,
                 int minCheckpoints,
@@ -611,21 +617,34 @@ public class UnalignedCheckpointRescaleITCase extends 
UnalignedCheckpointTestBas
         this.sourceSleepMs = sourceSleepMs;
     }
 
+    /**
+     * Tests unaligned checkpoint rescaling behavior.
+     *
+     * <p>Prescale phase: Job fails when completedCheckpoints >= 
minCheckpoints/2 via FailingMapper.
+     * Generates checkpoint for rescale test.
+     *
+     * <p>Postscale phase: Job restores from checkpoint with different 
parallelism, failovers once,
+     * and finishes after source generates all records.
+     */
     @Test
     public void shouldRescaleUnalignedCheckpoint() throws Exception {
         final UnalignedSettings prescaleSettings =
                 new UnalignedSettings(topology)
                         .setParallelism(oldParallelism)
                         .setExpectedFailures(1)
-                        .setSourceSleepMs(sourceSleepMs);
+                        .setSourceSleepMs(sourceSleepMs)
+                        .setExpectedFinalJobStatus(JobStatus.FAILED);
         prescaleSettings.setGenerateCheckpoint(true);
-        final File checkpointDir = super.execute(prescaleSettings);
-
+        final String checkpointDir = super.execute(prescaleSettings);
+        assertThat(checkpointDir)
+                .as("First job must generate a checkpoint for rescale test to 
be valid.")
+                .isNotNull();
         // resume
         final UnalignedSettings postscaleSettings =
                 new UnalignedSettings(topology)
                         .setParallelism(newParallelism)
-                        .setExpectedFailures(1);
+                        .setExpectedFailures(1)
+                        .setExpectedFinalJobStatus(JobStatus.FINISHED);
         postscaleSettings.setRestoreCheckpoint(checkpointDir);
         super.execute(postscaleSettings);
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index d2de69d6ec3..cd3e1e71b40 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
@@ -57,13 +58,12 @@ import 
org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.throwable.ThrowableAnnotation;
-import org.apache.flink.runtime.throwable.ThrowableType;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
 import org.apache.flink.util.Collector;
@@ -73,6 +73,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
 
+import org.assertj.core.api.Fail;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -103,6 +104,7 @@ import java.util.stream.IntStream;
 
 import static 
org.apache.flink.shaded.guava33.com.google.common.collect.Iterables.getOnlyElement;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Base class for tests related to unaligned checkpoints.
@@ -150,7 +152,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
     }
 
     @Nullable
-    protected File execute(UnalignedSettings settings) throws Exception {
+    protected String execute(UnalignedSettings settings) throws Exception {
         final File checkpointDir = temp.newFolder();
         Configuration conf = settings.getConfiguration(checkpointDir);
 
@@ -179,6 +181,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         final StreamExecutionEnvironment env =
                 StreamExecutionEnvironment.getExecutionEnvironment(conf);
         settings.configure(env);
+        JobID jobID = null;
         try {
             // print the test parameters to help debugging when the case is 
stuck
             System.out.println(
@@ -186,25 +189,34 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             final CompletableFuture<JobSubmissionResult> result =
                     
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
 
-            final JobID jobID = result.get().getJobID();
+            jobID = result.get().getJobID();
             checkCounters(
                     miniCluster
                             .getMiniCluster()
                             .requestJobResult(jobID)
                             .get()
                             
.toJobExecutionResult(getClass().getClassLoader()));
+            if (settings.expectedFinalJobStatus != null) {
+                assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
+                        .succeedsWithin(Duration.ofMinutes(1))
+                        .isEqualTo(settings.expectedFinalJobStatus);
+            }
             System.out.println(
                     "Finished " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
             if (settings.generateCheckpoint) {
                 return CommonTestUtils.getLatestCompletedCheckpointPath(
                                 jobID, miniCluster.getMiniCluster())
-                        .map(File::new)
-                        .orElseThrow(() -> new AssertionError("Could not 
generate checkpoint"));
+                        .orElseGet(() -> Fail.fail("Could not generate 
checkpoint"));
             }
         } catch (Exception e) {
             if (ExceptionUtils.findThrowable(e, 
TestException.class).isEmpty()) {
                 throw e;
             }
+            if (settings.generateCheckpoint) {
+                return CommonTestUtils.getLatestCompletedCheckpointPath(
+                                jobID, miniCluster.getMiniCluster())
+                        .orElseGet(() -> Fail.fail("Could not generate 
checkpoint"));
+            }
         } finally {
             miniCluster.after();
         }
@@ -680,7 +692,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
     protected static class UnalignedSettings {
         private int parallelism;
         private final int minCheckpoints = 10;
-        @Nullable private File restoreCheckpoint;
+        @Nullable private String restoreCheckpoint;
         private boolean generateCheckpoint = false;
         int expectedFailures = 0;
         int tolerableCheckpointFailures = 0;
@@ -691,6 +703,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         private int failuresAfterSourceFinishes = 0;
         private ChannelType channelType = ChannelType.MIXED;
         private long sourceSleepMs = 0;
+        @Nullable private JobStatus expectedFinalJobStatus = null;
 
         public UnalignedSettings(DagCreator dagCreator) {
             this.dagCreator = dagCreator;
@@ -701,7 +714,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             return this;
         }
 
-        public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
+        public UnalignedSettings setRestoreCheckpoint(String 
restoreCheckpoint) {
             this.restoreCheckpoint = restoreCheckpoint;
             return this;
         }
@@ -746,6 +759,11 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             return this;
         }
 
+        public UnalignedSettings setExpectedFinalJobStatus(JobStatus 
expectedFinalJobStatus) {
+            this.expectedFinalJobStatus = expectedFinalJobStatus;
+            return this;
+        }
+
         public void configure(StreamExecutionEnvironment env) {
             env.enableCheckpointing(Math.max(100L, parallelism * 50L));
             env.getCheckpointConfig()
@@ -754,6 +772,8 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             env.getCheckpointConfig()
                     
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
             env.setParallelism(parallelism);
+            RestartStrategyUtils.configureFixedDelayRestartStrategy(
+                    env, generateCheckpoint ? expectedFailures / 2 : 
expectedFailures, 100L);
             env.getCheckpointConfig().enableUnalignedCheckpoints(true);
             // for custom partitioner
             env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
@@ -772,7 +792,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
             conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
             if (restoreCheckpoint != null) {
-                conf.set(StateRecoveryOptions.SAVEPOINT_PATH, 
restoreCheckpoint.toURI().toString());
+                conf.set(StateRecoveryOptions.SAVEPOINT_PATH, 
restoreCheckpoint);
             }
 
             conf.set(
@@ -1132,7 +1152,6 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         return value;
     }
 
-    @ThrowableAnnotation(ThrowableType.NonRecoverableError)
     static class TestException extends Exception {
         public TestException(String s) {
             super(s);

Reply via email to