This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c4d6344ab8e [FLINK-38403][tests] Fix the unexpected test that the
second job does not restore from checkpoint (#27254)
c4d6344ab8e is described below
commit c4d6344ab8eff81358375ed048d9f993c49b0851
Author: Rui Fan <[email protected]>
AuthorDate: Thu Dec 4 17:49:20 2025 +0100
[FLINK-38403][tests] Fix the unexpected test that the second job does not
restore from checkpoint (#27254)
---
.../UnalignedCheckpointRescaleITCase.java | 61 ++++++++++++++--------
.../checkpointing/UnalignedCheckpointTestBase.java | 39 ++++++++++----
2 files changed, 69 insertions(+), 31 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
index 4a97e17cf7b..5be756a0efa 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointRescaleITCase.java
@@ -20,6 +20,7 @@
package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -54,13 +55,13 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.File;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import static
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.equalTo;
/** Integration test for performing rescale of unaligned checkpoint. */
@@ -366,14 +367,7 @@ public class UnalignedCheckpointRescaleITCase extends
UnalignedCheckpointTestBas
})
.name("long-to-string-map")
.uid("long-to-string-map")
- .map(
- new FailingMapper<>(
- state -> false,
- state ->
- state.completedCheckpoints >=
minCheckpoints / 2
- && state.runNumber ==
0,
- state -> false,
- state -> false))
+ .map(getFailingMapper(minCheckpoints))
.name("failing-map")
.uid("failing-map")
.setParallelism(parallelism)
@@ -394,14 +388,7 @@ public class UnalignedCheckpointRescaleITCase extends
UnalignedCheckpointTestBas
DataStream<Long> combinedSource, long minCheckpoints, boolean
slotSharing) {
combinedSource
.shuffle()
- .map(
- new FailingMapper<>(
- state -> false,
- state ->
- state.completedCheckpoints >=
minCheckpoints / 2
- && state.runNumber == 0,
- state -> false,
- state -> false))
+ .map(getFailingMapper(minCheckpoints))
.name("failing-map")
.uid("failing-map")
.slotSharingGroup(slotSharing ? "default" : "failing-map")
@@ -418,6 +405,25 @@ public class UnalignedCheckpointRescaleITCase extends
UnalignedCheckpointTestBas
.slotSharingGroup(slotSharing ? "default" : "sink");
}
+ /**
+ * Creates a FailingMapper that only fails during snapshot operations.
+ *
+ * <p>Only fails during snapshotState() when completedCheckpoints >=
minCheckpoints/2 AND
+ * runNumber == 0. After job failovers internally, runNumber becomes
attemptNumber > 0, so
+ * failure condition is no longer satisfied. This ensures the mapper
fails exactly once
+ * during initial run to trigger job failover, but never fails again
after failing over and
+ * recovery from checkpoint.
+ */
+ private static <T> FailingMapper<T> getFailingMapper(long
minCheckpoints) {
+ return new FailingMapper<>(
+ state -> false,
+ state ->
+ state.completedCheckpoints >= minCheckpoints / 2
+ && state.runNumber == 0,
+ state -> false,
+ state -> false);
+ }
+
DataStream<Long> createSourcePipeline(
StreamExecutionEnvironment env,
int minCheckpoints,
@@ -611,21 +617,34 @@ public class UnalignedCheckpointRescaleITCase extends
UnalignedCheckpointTestBas
this.sourceSleepMs = sourceSleepMs;
}
+ /**
+ * Tests unaligned checkpoint rescaling behavior.
+ *
+ * <p>Prescale phase: Job fails when completedCheckpoints >=
minCheckpoints/2 via FailingMapper.
+ * Generates checkpoint for rescale test.
+ *
+ * <p>Postscale phase: Job restores from checkpoint with different
parallelism, failovers once,
+ * and finishes after source generates all records.
+ */
@Test
public void shouldRescaleUnalignedCheckpoint() throws Exception {
final UnalignedSettings prescaleSettings =
new UnalignedSettings(topology)
.setParallelism(oldParallelism)
.setExpectedFailures(1)
- .setSourceSleepMs(sourceSleepMs);
+ .setSourceSleepMs(sourceSleepMs)
+ .setExpectedFinalJobStatus(JobStatus.FAILED);
prescaleSettings.setGenerateCheckpoint(true);
- final File checkpointDir = super.execute(prescaleSettings);
-
+ final String checkpointDir = super.execute(prescaleSettings);
+ assertThat(checkpointDir)
+ .as("First job must generate a checkpoint for rescale test to
be valid.")
+ .isNotNull();
// resume
final UnalignedSettings postscaleSettings =
new UnalignedSettings(topology)
.setParallelism(newParallelism)
- .setExpectedFailures(1);
+ .setExpectedFailures(1)
+ .setExpectedFinalJobStatus(JobStatus.FINISHED);
postscaleSettings.setRestoreCheckpoint(checkpointDir);
super.execute(postscaleSettings);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index d2de69d6ec3..cd3e1e71b40 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -19,6 +19,7 @@ package org.apache.flink.test.checkpointing;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.accumulators.LongCounter;
@@ -57,13 +58,12 @@ import
org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.runtime.throwable.ThrowableAnnotation;
-import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.Collector;
@@ -73,6 +73,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
+import org.assertj.core.api.Fail;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -103,6 +104,7 @@ import java.util.stream.IntStream;
import static
org.apache.flink.shaded.guava33.com.google.common.collect.Iterables.getOnlyElement;
import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Base class for tests related to unaligned checkpoints.
@@ -150,7 +152,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
}
@Nullable
- protected File execute(UnalignedSettings settings) throws Exception {
+ protected String execute(UnalignedSettings settings) throws Exception {
final File checkpointDir = temp.newFolder();
Configuration conf = settings.getConfiguration(checkpointDir);
@@ -179,6 +181,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(conf);
settings.configure(env);
+ JobID jobID = null;
try {
// print the test parameters to help debugging when the case is
stuck
System.out.println(
@@ -186,25 +189,34 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
final CompletableFuture<JobSubmissionResult> result =
miniCluster.getMiniCluster().submitJob(streamGraph.getJobGraph());
- final JobID jobID = result.get().getJobID();
+ jobID = result.get().getJobID();
checkCounters(
miniCluster
.getMiniCluster()
.requestJobResult(jobID)
.get()
.toJobExecutionResult(getClass().getClassLoader()));
+ if (settings.expectedFinalJobStatus != null) {
+ assertThat(miniCluster.getMiniCluster().getJobStatus(jobID))
+ .succeedsWithin(Duration.ofMinutes(1))
+ .isEqualTo(settings.expectedFinalJobStatus);
+ }
System.out.println(
"Finished " + getClass().getCanonicalName() + "#" +
name.getMethodName() + ".");
if (settings.generateCheckpoint) {
return CommonTestUtils.getLatestCompletedCheckpointPath(
jobID, miniCluster.getMiniCluster())
- .map(File::new)
- .orElseThrow(() -> new AssertionError("Could not
generate checkpoint"));
+ .orElseGet(() -> Fail.fail("Could not generate
checkpoint"));
}
} catch (Exception e) {
if (ExceptionUtils.findThrowable(e,
TestException.class).isEmpty()) {
throw e;
}
+ if (settings.generateCheckpoint) {
+ return CommonTestUtils.getLatestCompletedCheckpointPath(
+ jobID, miniCluster.getMiniCluster())
+ .orElseGet(() -> Fail.fail("Could not generate
checkpoint"));
+ }
} finally {
miniCluster.after();
}
@@ -680,7 +692,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
protected static class UnalignedSettings {
private int parallelism;
private final int minCheckpoints = 10;
- @Nullable private File restoreCheckpoint;
+ @Nullable private String restoreCheckpoint;
private boolean generateCheckpoint = false;
int expectedFailures = 0;
int tolerableCheckpointFailures = 0;
@@ -691,6 +703,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
private int failuresAfterSourceFinishes = 0;
private ChannelType channelType = ChannelType.MIXED;
private long sourceSleepMs = 0;
+ @Nullable private JobStatus expectedFinalJobStatus = null;
public UnalignedSettings(DagCreator dagCreator) {
this.dagCreator = dagCreator;
@@ -701,7 +714,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
return this;
}
- public UnalignedSettings setRestoreCheckpoint(File restoreCheckpoint) {
+ public UnalignedSettings setRestoreCheckpoint(String
restoreCheckpoint) {
this.restoreCheckpoint = restoreCheckpoint;
return this;
}
@@ -746,6 +759,11 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
return this;
}
+ public UnalignedSettings setExpectedFinalJobStatus(JobStatus
expectedFinalJobStatus) {
+ this.expectedFinalJobStatus = expectedFinalJobStatus;
+ return this;
+ }
+
public void configure(StreamExecutionEnvironment env) {
env.enableCheckpointing(Math.max(100L, parallelism * 50L));
env.getCheckpointConfig()
@@ -754,6 +772,8 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
env.getCheckpointConfig()
.setTolerableCheckpointFailureNumber(tolerableCheckpointFailures);
env.setParallelism(parallelism);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(
+ env, generateCheckpoint ? expectedFailures / 2 :
expectedFailures, 100L);
env.getCheckpointConfig().enableUnalignedCheckpoints(true);
// for custom partitioner
env.getCheckpointConfig().setForceUnalignedCheckpoints(true);
@@ -772,7 +792,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir.toURI().toString());
if (restoreCheckpoint != null) {
- conf.set(StateRecoveryOptions.SAVEPOINT_PATH,
restoreCheckpoint.toURI().toString());
+ conf.set(StateRecoveryOptions.SAVEPOINT_PATH,
restoreCheckpoint);
}
conf.set(
@@ -1132,7 +1152,6 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
return value;
}
- @ThrowableAnnotation(ThrowableType.NonRecoverableError)
static class TestException extends Exception {
public TestException(String s) {
super(s);