joyCurry30 commented on code in PR #3918:
URL: https://github.com/apache/flink-cdc/pull/3918#discussion_r2022005529
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##########
@@ -62,6 +64,7 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
private static final String TRANSFORM_KEY = "transform";
private static final String PIPELINE_KEY = "pipeline";
private static final String MODEL_KEY = "model";
+ private static final String FLINK_KEY = "flink";
Review Comment:
I don’t think “flink” is a good choice as a key. Would it be better to use
“configuration” or something else instead?What do you think? @lvyanquan
##########
docs/content.zh/docs/deployment/yarn.md:
##########
@@ -147,19 +144,28 @@ Job Description: Sync MySQL Database to Doris
你可以通过 Flink Web UI 找到一个名为 `Sync MySQL Database to Doris` 的作业。
-# Yarn Application 模式
+## Yarn Application 模式
Yarn Application 模式是 Yarn 集群上运行 Flink 作业的推荐模式。对资源的管理和分配更加灵活,可以更好地利用集群资源。
-通过Cli将作业提交至 Flink Yarn Application 集群。
-```bash
-cd /path/flink-cdc-*
-./bin/flink-cdc.sh -t yarn-application -Dexecution.checkpointing.interval=2min
mysql-to-doris.yaml
+修改mysql-to-doris.yaml作业的运行方式为Yarn Application模式:
+```yaml
+...
+pipeline:
+ name: Sync MySQL Database to Doris
+ parallelism: 2
+ flink:
+ execution.target: yarn-application
+ execution.checkpointing.interval: 2min
+ #如果需要从savepoint恢复,则配置以下参数
+ #execution.savepoint.path: hdfs:///flink/savepoint-1537
```
-或者从savepoint恢复Flink-CDC作业:
+
+通过Cli将作业提交至 Flink Yarn Application 集群。
```bash
cd /path/flink-cdc-*
-./bin/flink-cdc.sh -t yarn-application -s hdfs:///flink/savepoint-1537
-Dexecution.checkpointing.interval=2min mysql-to-doris.yaml
Review Comment:
I think it’s better to keep this example to show how to submit a job using
the command line, and then show another YAML file for submitting jobs with
Flink configurations.
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##########
@@ -17,38 +17,215 @@
package org.apache.flink.cdc.cli.utils;
-import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.cli.CliFrontendOptions;
+import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG;
+import static
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
+import static
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.USE_MINI_CLUSTER;
+import static
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.LOCAL;
+import static
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.RESTORE_MODE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH;
/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
+ private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME";
private static final String FLINK_CONF_DIR = "conf";
private static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String FLINK_CONF_FILENAME = "config.yaml";
- public static Configuration loadFlinkConfiguration(Path flinkHome) throws
Exception {
+ /**
+ * Load and merge flink configuration from flink_home、command line and
flink pipeline.yaml.flink
+ * config priority: pipeline.yaml > command line > flink_home.
+ */
+ public static Configuration mergeFlinkConfigurationsWithPriority(
+ Path pipelineDefPath, CommandLine commandLine) throws Exception {
+ Configuration flinkConfig = loadBaseFlinkConfig(commandLine);
+
+ mergeCommandLineFlinkConfig(commandLine, flinkConfig);
+
+ mergePipelineFlinkConfig(pipelineDefPath, flinkConfig);
+
+ applySavepointSettings(flinkConfig);
+ return flinkConfig;
+ }
+
+ private static Configuration loadBaseFlinkConfig(CommandLine commandLine)
throws Exception {
+ Path flinkHome = getFlinkHome(commandLine);
+ Map<String, String> flinkConfigurationMap;
Path flinkConfPath =
new Path(
flinkHome,
Joiner.on(File.separator).join(FLINK_CONF_DIR,
FLINK_CONF_FILENAME));
if (flinkConfPath.getFileSystem().exists(flinkConfPath)) {
- return ConfigurationUtils.loadConfigFile(flinkConfPath);
+ flinkConfigurationMap =
ConfigurationUtils.loadConfigFile(flinkConfPath).toMap();
} else {
- return ConfigurationUtils.loadConfigFile(
- new Path(
- flinkHome,
- Joiner.on(File.separator)
- .join(FLINK_CONF_DIR,
LEGACY_FLINK_CONF_FILENAME)),
- true);
+ flinkConfigurationMap =
+ ConfigurationUtils.loadConfigFile(
+ new Path(
+ flinkHome,
+ Joiner.on(File.separator)
+ .join(
+ FLINK_CONF_DIR,
+
LEGACY_FLINK_CONF_FILENAME)),
+ true)
+ .toMap();
+ }
+ return Configuration.fromMap(flinkConfigurationMap);
+ }
+
+ private static void mergeCommandLineFlinkConfig(
+ CommandLine commandLine, Configuration flinkConfig) {
+ Properties commandLineProperties =
commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
+ // Use "remote" as the default target
+ String target =
+ commandLine.hasOption(USE_MINI_CLUSTER)
+ ? LOCAL.getName()
+ : commandLine.getOptionValue(TARGET, REMOTE.getName());
+ flinkConfig.set(DeploymentOptions.TARGET, target);
+
+ Optional.ofNullable(commandLine.getOptionValue(SAVEPOINT_PATH_OPTION))
+ .ifPresent(value -> flinkConfig.set(SAVEPOINT_PATH, value));
+
+ Optional.ofNullable(commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE))
+ .ifPresent(
+ value -> {
+ RestoreMode restoreMode =
+
org.apache.flink.configuration.ConfigurationUtils.convertValue(
+ value,
+
org.apache.flink.cdc.cli.utils.ConfigurationUtils
+ .getClaimModeClass());
+ flinkConfig.set(RESTORE_MODE, restoreMode);
+ });
+
+ if (commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)) {
+ flinkConfig.set(SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
+ }
+
+ LOG.info("Dynamic flink config items found from command-line: {}",
commandLineProperties);
+ commandLineProperties.forEach(
+ (key, value) -> {
+ String keyStr = key.toString();
+ String valueStr = value.toString();
+ if (StringUtils.isNullOrWhitespaceOnly(keyStr)
+ || StringUtils.isNullOrWhitespaceOnly(valueStr)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "null or white space argument for key
or value: %s=%s",
+ key, value));
+ }
+ flinkConfig.setString(keyStr.trim(), valueStr.trim());
Review Comment:
Please extract this code into a separate function.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]