PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1554892220


##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##########
@@ -20,18 +20,33 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkEnvironmentUtils.class);
     private static final String FLINK_CONF_DIR = "conf";
-    private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+    private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+    private static final String FLINK_CONF_FILENAME = "config.yaml";
 
     public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
         Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
-        return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
+        try {
+            return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   What about renaming the method to `ConfigurationUtils#loadConfigFile`? This 
method doesn't process map-formatted config (`flink-conf.yaml`) only.



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##########
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
         }
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         try {
-            Map<String, String> configMap =
+            Map<String, Object> configMap =
                     mapper.readValue(
-                            configPath.toFile(), new TypeReference<Map<String, 
String>>() {});
-            return Configuration.fromMap(configMap);
+                            configPath.toFile(), new TypeReference<Map<String, 
Object>>() {});
+            return Configuration.fromMap(flattenConfigMap(configMap));
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format(
                             "Failed to load config file \"%s\" to key-value 
pairs", configPath),
                     e);
         }
     }
+
+    private static Map<String, String> flattenConfigMap(Map<String, Object> 
configMap) {
+        Map<String, String> result = new HashMap<>();
+        flattenConfigMapHelper(configMap, "", result);
+        return result;
+    }
+
+    private static void flattenConfigMapHelper(
+            Map<String, Object> configMap, String currentPath, Map<String, 
String> result) {
+        for (Map.Entry<String, Object> entry : configMap.entrySet()) {
+            String updatedPath =
+                    currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+            if (entry.getValue() instanceof Map) {
+                flattenConfigMapHelper((Map<String, Object>) entry.getValue(), 
updatedPath, result);
+            } else {

Review Comment:
   There should be a case for handling `List` type, according to 
https://github.com/apache/flink/blob/5bbcf8de79ce1979412879b919299ffa5a9b62fe/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L301-L307



##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##########
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
         }
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         try {
-            Map<String, String> configMap =
+            Map<String, Object> configMap =
                     mapper.readValue(
-                            configPath.toFile(), new TypeReference<Map<String, 
String>>() {});
-            return Configuration.fromMap(configMap);
+                            configPath.toFile(), new TypeReference<Map<String, 
Object>>() {});
+            return Configuration.fromMap(flattenConfigMap(configMap));
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format(
                             "Failed to load config file \"%s\" to key-value 
pairs", configPath),
                     e);
         }
     }
+
+    private static Map<String, String> flattenConfigMap(Map<String, Object> 
configMap) {
+        Map<String, String> result = new HashMap<>();
+        flattenConfigMapHelper(configMap, "", result);
+        return result;
+    }

Review Comment:
   This looks like just a very simple wrapper around `flattenConfigMapHelper`. 
We can just merge the logic into `flattenConfigMap` instead of having another 
helper. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to