Hisoka-X commented on code in PR #7040:
URL: https://github.com/apache/seatunnel/pull/7040#discussion_r1668058602


##########
seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java:
##########
@@ -54,7 +54,7 @@ public interface EnvCommonOptions {
     Option<Long> CHECKPOINT_INTERVAL =
             Options.key("checkpoint.interval")
                     .longType()
-                    .noDefaultValue()
+                    .defaultValue(10000L)

Review Comment:
   Hi, we should not change the default value. Because no `checkpoint.interval` 
value and have default value is different. The behavior of the two is different.



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java:
##########
@@ -247,87 +247,84 @@ private void setCheckpoint() {
             log.warn(
                     "Disabled Checkpointing. In flink execution environment, 
checkpointing is not supported and not needed when executing jobs in BATCH 
mode");
         }
-        long interval = 0;
+        long interval;
         if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
             interval = 
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
         } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
             log.warn(
                     "the parameter 'execution.checkpoint.interval' will be 
deprecated, please use common parameter 'checkpoint.interval' to set it");
             interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
+        } else {
+            interval = EnvCommonOptions.CHECKPOINT_INTERVAL.defaultValue();
         }
 
-        if (interval > 0) {
-            CheckpointConfig checkpointConfig = 
environment.getCheckpointConfig();
-            environment.enableCheckpointing(interval);
-
-            if (EnvironmentUtil.hasPathAndWaring(config, 
ConfigKeyName.CHECKPOINT_MODE)) {
-                String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
-                switch (mode.toLowerCase()) {
-                    case "exactly-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-                        break;
-                    case "at-least-once":
-                        
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
-                        break;
-                    default:
-                        log.warn(
-                                "set checkpoint.mode failed, unknown 
checkpoint.mode [{}],only support exactly-once,at-least-once",
-                                mode);
-                        break;
-                }
-            }
+        CheckpointConfig checkpointConfig = environment.getCheckpointConfig();

Review Comment:
   How about doing some refactoring? Currently there are two copies of the same 
processing logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to