This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e7267cae4d [Improve][Fink] NO_CDC source support checkpoint (#10094)
e7267cae4d is described below

commit e7267cae4dd710f22850442ae3f9936e9b7de7cc
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Nov 26 21:58:34 2025 +0800

    [Improve][Fink] NO_CDC source support checkpoint (#10094)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../core/starter/execution/RuntimeEnvironment.java | 10 ++---
 .../starter/execution/RuntimeEnvironmentTest.java  | 18 ++++++++
 .../execution/AbstractFlinkRuntimeEnvironment.java | 51 +++++++++++++++++-----
 .../starter/flink/execution/FlinkExecution.java    | 17 +++++---
 .../seatunnel/engine/e2e/CheckpointEnableIT.java   | 27 +++++-------
 5 files changed, 84 insertions(+), 39 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
index 5128f436a1..8112cb5ad9 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironment.java
@@ -61,13 +61,13 @@ public interface RuntimeEnvironment {
     }
 
     static boolean getEnableCheckpoint(Config config) {
-        boolean enableCheckpoint;
         Config envConfig = config.getConfig("env");
+        long checkpointInterval = -1;
         if (envConfig.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            enableCheckpoint = 
envConfig.getInt(EnvCommonOptions.CHECKPOINT_INTERVAL.key()) > 0;
-        } else {
-            enableCheckpoint = false;
+            checkpointInterval = 
envConfig.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+        } else if (envConfig.hasPath("execution.checkpoint.interval")) {
+            checkpointInterval = 
envConfig.getLong("execution.checkpoint.interval");
         }
-        return enableCheckpoint || getJobMode(config) == JobMode.STREAMING;
+        return checkpointInterval > 0 || getJobMode(config) == 
JobMode.STREAMING;
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
index 9b77b3c1a9..60e5e8dafd 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/execution/RuntimeEnvironmentTest.java
@@ -45,5 +45,23 @@ public class RuntimeEnvironmentTest {
                                 + "  checkpoint.interval = 10\n"
                                 + "}");
         Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
+
+        config =
+                ConfigFactory.parseString(
+                        "env {\n"
+                                + "  parallelism = 1\n"
+                                + "  job.mode = \"BATCH\"\n"
+                                + "  execution.checkpoint.interval = 10\n"
+                                + "}");
+        Assertions.assertTrue(RuntimeEnvironment.getEnableCheckpoint(config));
+
+        config =
+                ConfigFactory.parseString(
+                        "env {\n"
+                                + "  parallelism = 1\n"
+                                + "  job.mode = \"BATCH\"\n"
+                                + "  checkpoint.interval = 0\n"
+                                + "}");
+        Assertions.assertFalse(RuntimeEnvironment.getEnableCheckpoint(config));
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
index 39710b60b9..c6b4648e00 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java
@@ -46,6 +46,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -56,6 +57,8 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
     protected JobMode jobMode;
     protected String jobName = Constants.LOGO;
 
+    private static final long DEFAULT_CHECKPOINT_INTERVAL_MS = 10000L;
+
     protected AbstractFlinkRuntimeEnvironment(Config config) {
         this.initialize(config);
     }
@@ -77,19 +80,23 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
     }
 
     protected void setCheckpoint() {
-        if (jobMode == JobMode.BATCH) {
-            log.warn(
-                    "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
+        OptionalLong intervalOpt = resolveCheckpointInterval(true);
+        boolean hasExplicitInterval = intervalOpt.isPresent();
+        boolean positiveInterval = intervalOpt.isPresent() && 
intervalOpt.getAsLong() > 0;
+        long interval = intervalOpt.orElse(DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+        if (jobMode == JobMode.BATCH && !positiveInterval) {
+            log.info(
+                    "Checkpoint is disabled for batch job because 
'checkpoint.interval' is not set or <= 0.");
+            return;
         }
-        long interval;
-        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
-            interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
-        } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+
+        if (hasExplicitInterval && !positiveInterval) {
             log.warn(
-                    "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
-            interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
-        } else {
-            interval = 10000L;
+                    "checkpoint.interval is set to {} which is not positive, 
fallback to default {} ms for streaming job.",
+                    interval,
+                    DEFAULT_CHECKPOINT_INTERVAL_MS);
+            interval = DEFAULT_CHECKPOINT_INTERVAL_MS;
         }
 
         CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
@@ -195,8 +202,28 @@ public abstract class AbstractFlinkRuntimeEnvironment 
implements RuntimeEnvironm
         }
 
         if (this.jobMode.equals(JobMode.BATCH)) {
-            environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+            OptionalLong intervalOpt = resolveCheckpointInterval(false);
+            if (intervalOpt.isPresent() && intervalOpt.getAsLong() > 0) {
+                log.info(
+                        "Flink batch runtime does not support checkpoint-based 
restore; 'checkpoint.interval' > 0 will make this batch job run in streaming 
runtime.");
+            } else {
+                environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
+            }
+        }
+    }
+
+    protected OptionalLong resolveCheckpointInterval(boolean warnLegacy) {
+        if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
+            return 
OptionalLong.of(config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key()));
+        }
+        if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
+            if (warnLegacy) {
+                log.warn(
+                        "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
+            }
+            return 
OptionalLong.of(config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL));
         }
+        return OptionalLong.empty();
     }
 
     private void setTimeCharacteristic() {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index ae2c5039d4..bc4bf4a1db 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -52,6 +52,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -135,11 +136,17 @@ public class FlinkExecution implements TaskExecution {
                 "Flink Execution Plan: {}",
                 
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
         LOGGER.info("Flink job name: {}", 
flinkRuntimeEnvironment.getJobName());
-        if (!flinkRuntimeEnvironment.isStreaming()) {
-            flinkRuntimeEnvironment
-                    .getStreamExecutionEnvironment()
-                    .setRuntimeMode(RuntimeExecutionMode.BATCH);
-            LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
+        if (flinkRuntimeEnvironment.getJobMode() == JobMode.BATCH) {
+            OptionalLong checkpointInterval =
+                    flinkRuntimeEnvironment.resolveCheckpointInterval(false);
+            boolean enableCheckpointForBatch =
+                    checkpointInterval.isPresent() && 
checkpointInterval.getAsLong() > 0;
+            if (!enableCheckpointForBatch) {
+                flinkRuntimeEnvironment
+                        .getStreamExecutionEnvironment()
+                        .setRuntimeMode(RuntimeExecutionMode.BATCH);
+                LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
+            }
         }
         try {
             final long jobStartTime = System.currentTimeMillis();
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
index fff0646bcb..39d5f136d0 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
 import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
-import org.apache.seatunnel.e2e.common.container.TestContainerId;
 import 
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
 import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
@@ -208,9 +207,10 @@ public class CheckpointEnableIT extends TestSuiteBase {
     public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
             throws IOException, InterruptedException {
         /**
-         * In flink execution environment, checkpoint is not supported and not 
needed when executing
-         * jobs in BATCH mode. So it is only necessary to determine whether 
flink has enabled
-         * checkpoint by configuring tasks with 'checkpoint.interval'.
+         * In Flink execution environment, batch jobs normally do not enable 
checkpointing. When
+         * 'checkpoint.interval' is configured for a batch job, SeaTunnel will 
submit it in
+         * streaming runtime with the same checkpoint interval. This test 
verifies that Flink has
+         * enabled checkpointing and uses the configured interval.
          */
         Container.ExecResult enableExecResult =
                 container.executeJob(
@@ -228,19 +228,12 @@ public class CheckpointEnableIT extends TestSuiteBase {
                                         jobId)),
                         String.class,
                         Object.class);
-        /**
-         * when the checkpoint interval is 0x7fffffffffffffff, indicates that 
checkpoint is
-         * disabled. reference {@link
-         * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
-         */
-        if (container.identifier().equals(TestContainerId.FLINK_1_13)
-                || container.identifier().equals(TestContainerId.FLINK_1_14)
-                || container.identifier().equals(TestContainerId.FLINK_1_15)
-                || container.identifier().equals(TestContainerId.FLINK_1_16)) {
-            Assertions.assertEquals(Long.MAX_VALUE, 
jobConfig.getOrDefault("interval", 0L));
-        } else {
-            Assertions.assertEquals(0, jobConfig.getOrDefault("interval", 0));
-        }
+        Object intervalObject = jobConfig.get("interval");
+        Assertions.assertNotNull(intervalObject);
+        long interval = ((Number) intervalObject).longValue();
+        // the value here should be consistent with `checkpoint.interval` in
+        // batch_fakesource_to_localfile_checkpoint_enable.conf
+        Assertions.assertEquals(1000L, interval);
     }
 
     @TestTemplate

Reply via email to