MOBIN-F commented on code in PR #3918:
URL: https://github.com/apache/flink-cdc/pull/3918#discussion_r2043451274
##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##########
@@ -17,38 +17,215 @@
package org.apache.flink.cdc.cli.utils;
-import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.cli.CliFrontendOptions;
+import org.apache.flink.cdc.cli.parser.YamlPipelineDefinitionParser;
+import org.apache.flink.cdc.common.utils.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
+import org.apache.commons.cli.CommandLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.cdc.cli.CliFrontendOptions.FLINK_CONFIG;
+import static
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_CLAIM_MODE;
+import static
org.apache.flink.cdc.cli.CliFrontendOptions.SAVEPOINT_PATH_OPTION;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.TARGET;
+import static org.apache.flink.cdc.cli.CliFrontendOptions.USE_MINI_CLUSTER;
+import static
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.LOCAL;
+import static
org.apache.flink.cdc.composer.flink.deployment.ComposeDeployment.REMOTE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.RESTORE_MODE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE;
+import static
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH;
/** Utilities for handling Flink configuration and environment. */
public class FlinkEnvironmentUtils {
private static final Logger LOG =
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
+ private static final String FLINK_HOME_ENV_VAR = "FLINK_HOME";
private static final String FLINK_CONF_DIR = "conf";
private static final String LEGACY_FLINK_CONF_FILENAME = "flink-conf.yaml";
private static final String FLINK_CONF_FILENAME = "config.yaml";
- public static Configuration loadFlinkConfiguration(Path flinkHome) throws
Exception {
+ /**
+ * Load and merge flink configuration from flink_homećcommand line and
flink pipeline.yaml.flink
+ * config priority: pipeline.yaml > command line > flink_home.
+ */
+ public static Configuration mergeFlinkConfigurationsWithPriority(
+ Path pipelineDefPath, CommandLine commandLine) throws Exception {
+ Configuration flinkConfig = loadBaseFlinkConfig(commandLine);
+
+ mergeCommandLineFlinkConfig(commandLine, flinkConfig);
+
+ mergePipelineFlinkConfig(pipelineDefPath, flinkConfig);
+
+ applySavepointSettings(flinkConfig);
+ return flinkConfig;
+ }
+
+ private static Configuration loadBaseFlinkConfig(CommandLine commandLine)
throws Exception {
+ Path flinkHome = getFlinkHome(commandLine);
+ Map<String, String> flinkConfigurationMap;
Path flinkConfPath =
new Path(
flinkHome,
Joiner.on(File.separator).join(FLINK_CONF_DIR,
FLINK_CONF_FILENAME));
if (flinkConfPath.getFileSystem().exists(flinkConfPath)) {
- return ConfigurationUtils.loadConfigFile(flinkConfPath);
+ flinkConfigurationMap =
ConfigurationUtils.loadConfigFile(flinkConfPath).toMap();
} else {
- return ConfigurationUtils.loadConfigFile(
- new Path(
- flinkHome,
- Joiner.on(File.separator)
- .join(FLINK_CONF_DIR,
LEGACY_FLINK_CONF_FILENAME)),
- true);
+ flinkConfigurationMap =
+ ConfigurationUtils.loadConfigFile(
+ new Path(
+ flinkHome,
+ Joiner.on(File.separator)
+ .join(
+ FLINK_CONF_DIR,
+
LEGACY_FLINK_CONF_FILENAME)),
+ true)
+ .toMap();
+ }
+ return Configuration.fromMap(flinkConfigurationMap);
+ }
+
+ private static void mergeCommandLineFlinkConfig(
+ CommandLine commandLine, Configuration flinkConfig) {
+ Properties commandLineProperties =
commandLine.getOptionProperties(FLINK_CONFIG.getOpt());
+ // Use "remote" as the default target
+ String target =
+ commandLine.hasOption(USE_MINI_CLUSTER)
+ ? LOCAL.getName()
+ : commandLine.getOptionValue(TARGET, REMOTE.getName());
+ flinkConfig.set(DeploymentOptions.TARGET, target);
+
+ Optional.ofNullable(commandLine.getOptionValue(SAVEPOINT_PATH_OPTION))
+ .ifPresent(value -> flinkConfig.set(SAVEPOINT_PATH, value));
+
+ Optional.ofNullable(commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE))
+ .ifPresent(
+ value -> {
+ RestoreMode restoreMode =
+
org.apache.flink.configuration.ConfigurationUtils.convertValue(
+ value,
+
org.apache.flink.cdc.cli.utils.ConfigurationUtils
+ .getClaimModeClass());
+ flinkConfig.set(RESTORE_MODE, restoreMode);
+ });
+
+ if (commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)) {
+ flinkConfig.set(SAVEPOINT_IGNORE_UNCLAIMED_STATE, true);
+ }
+
+ LOG.info("Dynamic flink config items found from command-line: {}",
commandLineProperties);
+ commandLineProperties.forEach(
+ (key, value) -> {
+ String keyStr = key.toString();
+ String valueStr = value.toString();
+ if (StringUtils.isNullOrWhitespaceOnly(keyStr)
+ || StringUtils.isNullOrWhitespaceOnly(valueStr)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "null or white space argument for key
or value: %s=%s",
+ key, value));
+ }
+ flinkConfig.setString(keyStr.trim(), valueStr.trim());
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]