PatrickRen commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1557144006


##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java:
##########
@@ -37,15 +38,34 @@ public static Configuration loadMapFormattedConfig(Path 
configPath) throws Excep
         }
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         try {
-            Map<String, String> configMap =
+            Map<String, Object> configMap =
                     mapper.readValue(
-                            configPath.toFile(), new TypeReference<Map<String, 
String>>() {});
-            return Configuration.fromMap(configMap);
+                            configPath.toFile(), new TypeReference<Map<String, 
Object>>() {});
+            return Configuration.fromMap(flattenConfigMap(configMap));
         } catch (Exception e) {
             throw new IllegalStateException(
                     String.format(
                             "Failed to load config file \"%s\" to key-value 
pairs", configPath),
                     e);
         }
     }
+
+    private static Map<String, String> flattenConfigMap(Map<String, Object> 
configMap) {
+        Map<String, String> result = new HashMap<>();
+        flattenConfigMapHelper(configMap, "", result);
+        return result;
+    }
+
+    private static void flattenConfigMapHelper(
+            Map<String, Object> configMap, String currentPath, Map<String, 
String> result) {
+        for (Map.Entry<String, Object> entry : configMap.entrySet()) {
+            String updatedPath =
+                    currentPath.isEmpty() ? entry.getKey() : currentPath + "." 
+ entry.getKey();
+            if (entry.getValue() instanceof Map) {
+                flattenConfigMapHelper((Map<String, Object>) entry.getValue(), 
updatedPath, result);
+            } else {

Review Comment:
   `GlobalConfiguration` in Flink is marked as `@Internal`, so it's better we 
build our own wheel instead of referencing: 
   
   
https://github.com/apache/flink/blob/68cc61a86187021c61e7f51ccff8c5912125d013/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L43
   
   `@Internal` means there's no promise on compatibility. If we reference the 
function, I'm afraid there might be some compatibility and compiling issues 
with Flink in the future. The format of `flink-conf.yaml` and `config.yaml` are 
public APIs (because they are user-facing), so we can trust its stability. 
Therefore I prefer to rewrite the parsing logic on our side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to