zhangshenghang commented on code in PR #7040:
URL: https://github.com/apache/seatunnel/pull/7040#discussion_r1669436408
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java:
##########
@@ -185,157 +82,15 @@ private void createStreamTableEnvironment() {
EnvironmentUtil.initTableEnvironmentConfiguration(this.config,
config.getConfiguration());
}
- private void createStreamEnvironment() {
- Configuration configuration = new Configuration();
- EnvironmentUtil.initConfiguration(config, configuration);
- environment =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
- setTimeCharacteristic();
-
- setCheckpoint();
-
- EnvironmentUtil.setRestartStrategy(config, environment.getConfig());
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.BUFFER_TIMEOUT_MILLIS)) {
- long timeout = config.getLong(ConfigKeyName.BUFFER_TIMEOUT_MILLIS);
- environment.setBufferTimeout(timeout);
- }
-
- if (config.hasPath(EnvCommonOptions.PARALLELISM.key())) {
- int parallelism =
config.getInt(EnvCommonOptions.PARALLELISM.key());
- environment.setParallelism(parallelism);
- } else if (config.hasPath(ConfigKeyName.PARALLELISM)) {
- log.warn(
- "the parameter 'execution.parallelism' will be deprecated,
please use common parameter 'parallelism' to set it");
- int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
- environment.setParallelism(parallelism);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.MAX_PARALLELISM)) {
- int max = config.getInt(ConfigKeyName.MAX_PARALLELISM);
- environment.setMaxParallelism(max);
- }
-
- if (this.jobMode.equals(JobMode.BATCH)) {
- environment.setRuntimeMode(RuntimeExecutionMode.BATCH);
- }
- }
-
- private void setTimeCharacteristic() {
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.TIME_CHARACTERISTIC)) {
- String timeType =
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
- switch (timeType.toLowerCase()) {
- case "event-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- break;
- case "ingestion-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
- break;
- case "processing-time":
-
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
- break;
- default:
- log.warn(
- "set time-characteristic failed, unknown
time-characteristic [{}],only support
event-time,ingestion-time,processing-time",
- timeType);
- break;
- }
- }
- }
-
- private void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
- }
- long interval = 0;
- if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
- interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
- } else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
- log.warn(
- "the parameter 'execution.checkpoint.interval' will be
deprecated, please use common parameter 'checkpoint.interval' to set it");
- interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
- }
-
- if (interval > 0) {
- CheckpointConfig checkpointConfig =
environment.getCheckpointConfig();
- environment.enableCheckpointing(interval);
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_MODE)) {
- String mode = config.getString(ConfigKeyName.CHECKPOINT_MODE);
- switch (mode.toLowerCase()) {
- case "exactly-once":
-
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- break;
- case "at-least-once":
-
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
- break;
- default:
- log.warn(
- "set checkpoint.mode failed, unknown
checkpoint.mode [{}],only support exactly-once,at-least-once",
- mode);
- break;
- }
- }
-
- if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
- long timeout =
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
- checkpointConfig.setCheckpointTimeout(timeout);
- } else if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_TIMEOUT)) {
- long timeout =
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
- checkpointConfig.setCheckpointTimeout(timeout);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_DATA_URI)) {
- String uri =
config.getString(ConfigKeyName.CHECKPOINT_DATA_URI);
- StateBackend fsStateBackend = new FsStateBackend(uri);
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.STATE_BACKEND)) {
- String stateBackend =
config.getString(ConfigKeyName.STATE_BACKEND);
- if ("rocksdb".equalsIgnoreCase(stateBackend)) {
- StateBackend rocksDBStateBackend =
- new RocksDBStateBackend(fsStateBackend,
TernaryBoolean.TRUE);
- environment.setStateBackend(rocksDBStateBackend);
- }
- } else {
- environment.setStateBackend(fsStateBackend);
- }
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS)) {
- int max =
config.getInt(ConfigKeyName.MAX_CONCURRENT_CHECKPOINTS);
- checkpointConfig.setMaxConcurrentCheckpoints(max);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(config,
ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
- boolean cleanup =
config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
- if (cleanup) {
- checkpointConfig.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
- } else {
- checkpointConfig.enableExternalizedCheckpoints(
-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
- }
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS)) {
- long minPause =
config.getLong(ConfigKeyName.MIN_PAUSE_BETWEEN_CHECKPOINTS);
- checkpointConfig.setMinPauseBetweenCheckpoints(minPause);
- }
-
- if (EnvironmentUtil.hasPathAndWaring(
- config, ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS)) {
- int failNum =
config.getInt(ConfigKeyName.FAIL_ON_CHECKPOINTING_ERRORS);
- checkpointConfig.setTolerableCheckpointFailureNumber(failNum);
- }
- }
- }
-
- public void registerResultTable(Config config, DataStream<Row> dataStream,
String name) {
- StreamTableEnvironment tableEnvironment =
this.getStreamTableEnvironment();
- if (!TableUtil.tableExists(tableEnvironment, name)) {
- tableEnvironment.createTemporaryView(
- name, tableEnvironment.fromChangelogStream(dataStream));
+ protected void setCheckpoint() {
+ super.setCheckpoint();
+ CheckpointConfig checkpointConfig = environment.getCheckpointConfig();
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
Review Comment:
@Hisoka-X `flink13-starter` use
`config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())`
`flink-starter-common` use `EnvironmentUtil.hasPathAndWaring(config,
EnvCommonOptions.CHECKPOINT_TIMEOUT.key())`
The method used in the previous code was different. In the
`flink-starter-common` model, a log prompt
```
log.warn(
"the parameter '{}' will be deprecated, please use the
'flink.' prefix with the flink official configuration item to set it",
configKey);
```
The developers may have designed it this way, so I didn't make any changes.
If they need to be unified, I can modify it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]