skymilong commented on code in PR #3194:
URL: https://github.com/apache/flink-cdc/pull/3194#discussion_r1552023129


##########
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/FlinkEnvironmentUtils.java:
##########
@@ -20,17 +20,24 @@
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.composer.flink.FlinkPipelineComposer;
 
+import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.List;
 
 /** Utilities for handling Flink configuration and environment. */
 public class FlinkEnvironmentUtils {
 
     private static final String FLINK_CONF_DIR = "conf";
-    private static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
+    private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
+    private static final String NEW_FLINK_CONF_FILENAME = "config.yaml";
 
     public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
-        Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
+        Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME);
+        // If the old version of the configuration file does not exist, then 
attempt to use the new
+        // version of the file name.
+        if (!Files.exists(flinkConfPath)) {
+            flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(NEW_FLINK_CONF_FILENAME);
+        }
         return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);

Review Comment:
   Thank you for your feedback. I have rechecked the code and I agree with you. 
The new config.yaml is not in a map format anymore, so rewriting the parsing 
logic would be necessary. I am willing to take on this task and submit the 
necessary changes.
   
   In addition, I think it might be beneficial to handle the old version of the 
configuration file through exception handling. Here is a possible way to do so:
   
   ```java
   private static final String OLD_FLINK_CONF_FILENAME = "flink-conf.yaml";
   private static final String FLINK_CONF_FILENAME = "config.yaml";
   public static Configuration loadFlinkConfiguration(Path flinkHome) throws 
Exception {
       Path flinkConfPath = 
flinkHome.resolve(FLINK_CONF_DIR).resolve(FLINK_CONF_FILENAME);
       try {
           return ConfigurationUtils.loadMapFormattedConfig(flinkConfPath);
       } catch (Exception e) {
           LOG.warn(
                   "Failed to load the new configuration file:{}. Trying to 
load the old configuration file:{}.",
                   FLINK_CONF_FILENAME,
                   OLD_FLINK_CONF_FILENAME);
           return ConfigurationUtils.loadMapFormattedConfig(
                   
flinkHome.resolve(FLINK_CONF_DIR).resolve(OLD_FLINK_CONF_FILENAME));
       }
   }
   ```
   This is my idea for exception handling. We first attempt to load the new 
configuration file. If an exception occurs (which could be due to the absence 
of the new configuration file), we then try to load the old configuration file. 
This approach not only ensures backward compatibility and allows for a smoother 
transition between different versions of Flink, but also results in two log 
notifications from the `loadMapFormattedConfig` method. I think this will make 
the notifications more user-friendly. Do you think this approach is feasible? 
Or do you have any better suggestions or ideas? I look forward to your feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to