This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e7267cae4d [Improve][Fink] NO_CDC source support checkpoint (#10094)
e7267cae4d is described below
commit e7267cae4dd710f22850442ae3f9936e9b7de7cc
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Nov 26 21:58:34 2025 +0800
[Improve][Fink] NO_CDC source support checkpoint (#10094)
Co-authored-by: zengyi <[email protected]>
---
.../core/starter/execution/RuntimeEnvironment.java | 10 ++---
.../starter/execution/RuntimeEnvironmentTest.java | 18 ++++++++
.../execution/AbstractFlinkRuntimeEnvironment.java | 51 +++++++++++++++++-----
.../starter/flink/execution/FlinkExecution.java | 17 +++++---
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 27 +++++-------
5 files changed, 84 insertions(+), 39 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
index 5128f436a1..8112cb5ad9 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
@@ -61,13 +61,13 @@ public interface RuntimeEnvironment {
}
static boolean getEnableCheckpoint(Config config) {
- boolean enableCheckpoint;
Config envConfig = config.getConfig("env");
+ long checkpointInterval = -1;
if (envConfig.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- enableCheckpoint =
envConfig.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()) > 0;
- } else {
- enableCheckpoint = false;
+ checkpointInterval =
envConfig.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+ } else if (envConfig.hasPath("execution.checkpoint.interval")) {
+ checkpointInterval =
envConfig.getLong("execution.checkpoint.interval");
}
- return enableCheckpoint || getJobMode(config) == JobMode.STREAMING;
+ return checkpointInterval > 0 || getJobMode(config) ==
JobMode.STREAMING;
}
}
diff --git
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
index 9b77b3c1a9..60e5e8dafd 100644
---
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
+++
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
@@ -45,5 +45,23 @@ public class RuntimeEnvironmentTest {
+ " checkpoint.interval = 10\n"
+ "}");
Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
+
+ config =
+ ConfigFactory.parseString(
+ "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + " execution.checkpoint.interval = 10\n"
+ + "}");
+ Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
+
+ config =
+ ConfigFactory.parseString(
+ "env {\n"
+ + " parallelism = 1\n"
+ + " job.mode = \"BATCH\"\n"
+ + " checkpoint.interval = 0\n"
+ + "}");
+ Assertions.assertFalse(RuntimeEnvironment.getEnableCheckpoint(config));
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index 39710b60b9..c6b4648e00 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -46,6 +46,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.OptionalLong;
import java.util.stream.Collectors;
@Slf4j
@@ -56,6 +57,8 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
protected JobMode jobMode;
protected String jobName = Constants.LOGO;
+ private static final long DEFAULT_CHECKPOINT_INTERVAL_MS = 10000L;
+
protected AbstractFlinkRuntimeEnvironment(Config config) {
this.initialize(config);
}
@@ -77,19 +80,23 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
}
protected void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
+ OptionalLong intervalOpt = resolveCheckpointInterval(true);
+ boolean hasExplicitInterval = intervalOpt.isPresent();
+ boolean positiveInterval = intervalOpt.isPresent() &&
intervalOpt.getAsLong() > 0;
+ long interval = intervalOpt.orElse(DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+ if (jobMode == JobMode.BATCH && !positiveInterval) {
+ log.info(
+ "Checkpoint is disabled for batch job because
'checkpoint.interval' is not set or <= 0.");
+ return;
}
- long interval;
- if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
- } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+ if (hasExplicitInterval && !positiveInterval) {
log.warn(
- "the parameter 'execution.checkpoint.interval' will be
deprecated, please use common parameter 'checkpoint.interval' to set it");
- interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
- } else {
- interval = 10000L;
+ "checkpoint.interval is set to {} which is not positive,
fallback to default {} ms for streaming job.",
+ interval,
+ DEFAULT_CHECKPOINT_INTERVAL_MS);
+ interval = DEFAULT_CHECKPOINT_INTERVAL_MS;
}
CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
@@ -195,8 +202,28 @@ public abstract class AbstractFlinkRuntimeEnvironment
implements RuntimeEnvironm
}
if (this.jobMode.equals(JobMode.BATCH)) {
- environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ OptionalLong intervalOpt = resolveCheckpointInterval(false);
+ if (intervalOpt.isPresent() && intervalOpt.getAsLong() > 0) {
+ log.info(
+ "Flink batch runtime does not support checkpoint-based
restore; 'checkpoint.interval' > 0 will make this batch job run in streaming
runtime.");
+ } else {
+ environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
+ }
+ }
+
+ protected OptionalLong resolveCheckpointInterval(boolean warnLegacy) {
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+ return
OptionalLong.of(config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+ }
+ if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+ if (warnLegacy) {
+ log.warn(
+ "the parameter 'execution.checkpoint.interval' will be
deprecated, please use common parameter 'checkpoint.interval' to set it");
+ }
+ return
OptionalLong.of(config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL));
}
+ return OptionalLong.empty();
}
private void setTimeCharacteristic() {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index ae2c5039d4..bc4bf4a1db 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -52,6 +52,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -135,11 +136,17 @@ public class FlinkExecution implements TaskExecution {
"Flink Execution Plan: {}",
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
LOGGER.info("Flink job name: {}",
flinkRuntimeEnvironment.getJobName());
- if (!flinkRuntimeEnvironment.isStreaming()) {
- flinkRuntimeEnvironment
- .getStreamExecutionEnvironment()
- .setRuntimeMode(RuntimeExecutionMode.BATCH);
- LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
+ if (flinkRuntimeEnvironment.getJobMode() == JobMode.BATCH) {
+ OptionalLong checkpointInterval =
+ flinkRuntimeEnvironment.resolveCheckpointInterval(false);
+ boolean enableCheckpointForBatch =
+ checkpointInterval.isPresent() &&
checkpointInterval.getAsLong() > 0;
+ if (!enableCheckpointForBatch) {
+ flinkRuntimeEnvironment
+ .getStreamExecutionEnvironment()
+ .setRuntimeMode(RuntimeExecutionMode.BATCH);
+ LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
+ }
}
try {
final long jobStartTime = System.currentTimeMillis();
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index fff0646bcb..39d5f136d0 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.container.TestContainerId;
import
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
@@ -208,9 +207,10 @@ public class CheckpointEnableIT extends TestSuiteBase {
public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
throws IOException, InterruptedException {
/**
- * In flink execution environment, checkpoint is not supported and not
needed when executing
- * jobs in BATCH mode. So it is only necessary to determine whether
flink has enabled
- * checkpoint by configuring tasks with 'checkpoint.interval'.
+ * In Flink execution environment, batch jobs normally do not enable
checkpointing. When
+ * 'checkpoint.interval' is configured for a batch job, SeaTunnel will
submit it in
+ * streaming runtime with the same checkpoint interval. This test
verifies that Flink has
+ * enabled checkpointing and uses the configured interval.
*/
Container.ExecResult enableExecResult =
container.executeJob(
@@ -228,19 +228,12 @@ public class CheckpointEnableIT extends TestSuiteBase {
jobId)),
String.class,
Object.class);
- /**
- * when the checkpoint interval is 0x7fffffffffffffff, indicates that
checkpoint is
- * disabled. reference {@link
- * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
- */
- if (container.identifier().equals(TestContainerId.FLINK_1_13)
- || container.identifier().equals(TestContainerId.FLINK_1_14)
- || container.identifier().equals(TestContainerId.FLINK_1_15)
- || container.identifier().equals(TestContainerId.FLINK_1_16)) {
- Assertions.assertEquals(Long.MAX_VALUE,
jobConfig.getOrDefault("interval", 0L));
- } else {
- Assertions.assertEquals(0, jobConfig.getOrDefault("interval", 0));
- }
+ Object intervalObject = jobConfig.get("interval");
+ Assertions.assertNotNull(intervalObject);
+ long interval = ((Number) intervalObject).longValue();
+ // the value here should be consistent with `checkpoint.interval` in
+ // batch_fakesource_to_localfile_checkpoint_enable.conf
+ Assertions.assertEquals(1000L, interval);
}
@TestTemplate