This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new f91c709  [FLINK-11135][configuration] Reorder Hadoop config loading in 
HadoopUtils
f91c709 is described below

commit f91c7095339f1d77b41e63b12916f4a949ce30c1
Author: Paul Lin <paullin3...@gmail.com>
AuthorDate: Wed Jan 8 10:22:40 2020 +0800

    [FLINK-11135][configuration] Reorder Hadoop config loading in HadoopUtils
    
    This closes #7314 .
---
 .../org/apache/flink/runtime/util/HadoopUtils.java |  89 +++++++++++-------
 .../runtime/fs/hdfs/HadoopConfigLoadingTest.java   | 104 ++++++++++++++++++++-
 2 files changed, 154 insertions(+), 39 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 599ea4e..f9244d3 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -50,63 +50,62 @@ public class HadoopUtils {
 
                // Instantiate an HdfsConfiguration to load the hdfs-site.xml 
and hdfs-default.xml
                // from the classpath
+
                Configuration result = new HdfsConfiguration();
                boolean foundHadoopConfiguration = false;
 
                // We need to load both core-site.xml and hdfs-site.xml to 
determine the default fs path and
-               // the hdfs configuration
-               // Try to load HDFS configuration from Hadoop's own 
configuration files
-               // 1. approach: Flink configuration
+               // the hdfs configuration.
+               // The properties of a newly added resource will override the 
ones in previous resources, so a configuration
+               // file with higher priority should be added later.
+
+               // Approach 1: HADOOP_HOME environment variables
+               String[] possibleHadoopConfPaths = new String[2];
+
+               final String hadoopHome = System.getenv("HADOOP_HOME");
+               if (hadoopHome != null) {
+                       LOG.debug("Searching Hadoop configuration files in 
HADOOP_HOME: {}", hadoopHome);
+                       possibleHadoopConfPaths[0] = hadoopHome + "/conf";
+                       possibleHadoopConfPaths[1] = hadoopHome + 
"/etc/hadoop"; // hadoop 2.2
+               }
+
+               for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
+                       if (possibleHadoopConfPath != null) {
+                               foundHadoopConfiguration = 
addHadoopConfIfFound(result, possibleHadoopConfPath);
+                       }
+               }
+
+               // Approach 2: Flink configuration (deprecated)
                final String hdfsDefaultPath =
                        
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
-
                if (hdfsDefaultPath != null) {
                        result.addResource(new 
org.apache.hadoop.fs.Path(hdfsDefaultPath));
-                       LOG.debug("Using hdfs-default configuration-file path 
form Flink config: {}", hdfsDefaultPath);
+                       LOG.debug("Using hdfs-default configuration-file path 
from Flink config: {}", hdfsDefaultPath);
                        foundHadoopConfiguration = true;
-               } else {
-                       LOG.debug("Cannot find hdfs-default configuration-file 
path in Flink config.");
                }
 
                final String hdfsSitePath = 
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
                if (hdfsSitePath != null) {
                        result.addResource(new 
org.apache.hadoop.fs.Path(hdfsSitePath));
-                       LOG.debug("Using hdfs-site configuration-file path form 
Flink config: {}", hdfsSitePath);
+                       LOG.debug("Using hdfs-site configuration-file path from 
Flink config: {}", hdfsSitePath);
                        foundHadoopConfiguration = true;
-               } else {
-                       LOG.debug("Cannot find hdfs-site configuration-file 
path in Flink config.");
                }
 
-               // 2. Approach environment variables
-               String[] possibleHadoopConfPaths = new String[4];
-               possibleHadoopConfPaths[0] = 
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
-               possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
-
-               final String hadoopHome = System.getenv("HADOOP_HOME");
-               if (hadoopHome != null) {
-                       possibleHadoopConfPaths[2] = hadoopHome + "/conf";
-                       possibleHadoopConfPaths[3] = hadoopHome + 
"/etc/hadoop"; // hadoop 2.2
+               final String hadoopConfigPath = 
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+               if (hadoopConfigPath != null) {
+                       LOG.debug("Searching Hadoop configuration files in 
Flink config: {}", hadoopConfigPath);
+                       foundHadoopConfiguration = addHadoopConfIfFound(result, 
hadoopConfigPath) || foundHadoopConfiguration;
                }
 
-               for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
-                       if (possibleHadoopConfPath != null) {
-                               if (new File(possibleHadoopConfPath).exists()) {
-                                       if (new File(possibleHadoopConfPath + 
"/core-site.xml").exists()) {
-                                               result.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
-                                               LOG.debug("Adding " + 
possibleHadoopConfPath + "/core-site.xml to hadoop configuration");
-                                               foundHadoopConfiguration = true;
-                                       }
-                                       if (new File(possibleHadoopConfPath + 
"/hdfs-site.xml").exists()) {
-                                               result.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
-                                               LOG.debug("Adding " + 
possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration");
-                                               foundHadoopConfiguration = true;
-                                       }
-                               }
-                       }
+               // Approach 3: HADOOP_CONF_DIR environment variable
+               String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
+               if (hadoopConfDir != null) {
+                       LOG.debug("Searching Hadoop configuration files in 
HADOOP_CONF_DIR: {}", hadoopConfDir);
+                       foundHadoopConfiguration = addHadoopConfIfFound(result, 
hadoopConfDir) || foundHadoopConfiguration;
                }
 
                if (!foundHadoopConfiguration) {
-                       LOG.debug("Could not find Hadoop configuration via any 
of the supported methods " +
+                       LOG.warn("Could not find Hadoop configuration via any 
of the supported methods " +
                                "(Flink configuration, environment 
variables).");
                }
 
@@ -162,4 +161,24 @@ public class HadoopUtils {
                int min = Integer.parseInt(versionParts[1]);
                return Tuple2.of(maj, min);
        }
+
+       /**
+        * Search Hadoop configuration files in the given path, and add them to 
the configuration if found.
+        */
+       private static boolean addHadoopConfIfFound(Configuration 
configuration, String possibleHadoopConfPath) {
+               boolean foundHadoopConfiguration = false;
+               if (new File(possibleHadoopConfPath).exists()) {
+                       if (new File(possibleHadoopConfPath + 
"/core-site.xml").exists()) {
+                               configuration.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml"));
+                               LOG.debug("Adding " + possibleHadoopConfPath + 
"/core-site.xml to hadoop configuration");
+                               foundHadoopConfiguration = true;
+                       }
+                       if (new File(possibleHadoopConfPath + 
"/hdfs-site.xml").exists()) {
+                               configuration.addResource(new 
org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml"));
+                               LOG.debug("Adding " + possibleHadoopConfPath + 
"/hdfs-site.xml to hadoop configuration");
+                               foundHadoopConfiguration = true;
+                       }
+               }
+               return foundHadoopConfiguration;
+       }
 }
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
index bb3d088..c736243 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java
@@ -180,15 +180,111 @@ public class HadoopConfigLoadingTest {
                assertEquals(IN_CP_CONFIG_VALUE, 
hadoopConf.get(IN_CP_CONFIG_KEY, null));
        }
 
+       @Test
+       public void loadOverlappingConfig() throws Exception {
+               final String k1 = "key1";
+               final String k2 = "key2";
+               final String k3 = "key3";
+               final String k4 = "key4";
+               final String k5 = "key5";
+
+               final String v1 = "from HADOOP_CONF_DIR";
+               final String v2 = "from Flink config `fs.hdfs.hadoopconf`";
+               final String v3 = "from Flink config `fs.hdfs.hdfsdefault`";
+               final String v4 = "from HADOOP_HOME/etc/hadoop";
+               final String v5 = "from HADOOP_HOME/conf";
+
+               final File hadoopConfDir = 
tempFolder.newFolder("hadoopConfDir");
+               final File hadoopConfEntryDir = 
tempFolder.newFolder("hadoopConfEntryDir");
+               final File legacyConfDir = 
tempFolder.newFolder("legacyConfDir");
+               final File hadoopHome = tempFolder.newFolder("hadoopHome");
+
+               final File hadoopHomeConf = new File(hadoopHome, "conf");
+               final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop");
+
+               assertTrue(hadoopHomeConf.mkdirs());
+               assertTrue(hadoopHomeEtc.mkdirs());
+
+               final File file1 = new File(hadoopConfDir, "core-site.xml");
+               final File file2 = new File(hadoopConfEntryDir, 
"core-site.xml");
+               final File file3 = new File(legacyConfDir, "core-site.xml");
+               final File file4 = new File(hadoopHomeEtc, "core-site.xml");
+               final File file5 = new File(hadoopHomeConf, "core-site.xml");
+
+               printConfig(file1, k1, v1);
+
+               Map<String, String> properties2 = new HashMap<>();
+               properties2.put(k1, v2);
+               properties2.put(k2, v2);
+               printConfigs(file2, properties2);
+
+               Map<String, String> properties3 = new HashMap<>();
+               properties3.put(k1, v3);
+               properties3.put(k2, v3);
+               properties3.put(k3, v3);
+               printConfigs(file3, properties3);
+
+               Map<String, String> properties4 = new HashMap<>();
+               properties4.put(k1, v4);
+               properties4.put(k2, v4);
+               properties4.put(k3, v4);
+               properties4.put(k4, v4);
+               printConfigs(file4, properties4);
+
+               Map<String, String> properties5 = new HashMap<>();
+               properties5.put(k1, v5);
+               properties5.put(k2, v5);
+               properties5.put(k3, v5);
+               properties5.put(k4, v5);
+               properties5.put(k5, v5);
+               printConfigs(file5, properties5);
+
+               final Configuration cfg = new Configuration();
+               cfg.setString(ConfigConstants.PATH_HADOOP_CONFIG, 
hadoopConfEntryDir.getAbsolutePath());
+               cfg.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, 
file3.getAbsolutePath());
+
+               final org.apache.hadoop.conf.Configuration hadoopConf;
+
+               final Map<String, String> originalEnv = System.getenv();
+               final Map<String, String> newEnv = new HashMap<>(originalEnv);
+               newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+               newEnv.put("HADOOP_HOME", hadoopHome.getAbsolutePath());
+               try {
+                       CommonTestUtils.setEnv(newEnv);
+                       hadoopConf = HadoopUtils.getHadoopConfiguration(cfg);
+               }
+               finally {
+                       CommonTestUtils.setEnv(originalEnv);
+               }
+
+               // contains extra entries
+               assertEquals(v1, hadoopConf.get(k1, null));
+               assertEquals(v2, hadoopConf.get(k2, null));
+               assertEquals(v3, hadoopConf.get(k3, null));
+               assertEquals(v4, hadoopConf.get(k4, null));
+               assertEquals(v5, hadoopConf.get(k5, null));
+
+               // also contains classpath defaults
+               assertEquals(IN_CP_CONFIG_VALUE, 
hadoopConf.get(IN_CP_CONFIG_KEY, null));
+       }
+
        private static void printConfig(File file, String key, String value) 
throws IOException {
+               Map<String, String> map = new HashMap<>(1);
+               map.put(key, value);
+               printConfigs(file, map);
+       }
+
+       private static void printConfigs(File file, Map<String, String> 
properties) throws IOException {
                try (PrintStream out = new PrintStream(new 
FileOutputStream(file))) {
                        out.println("<?xml version=\"1.0\"?>");
                        out.println("<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>");
                        out.println("<configuration>");
-                       out.println("\t<property>");
-                       out.println("\t\t<name>" + key + "</name>");
-                       out.println("\t\t<value>" + value + "</value>");
-                       out.println("\t</property>");
+                       for (Map.Entry<String, String> entry: 
properties.entrySet()) {
+                               out.println("\t<property>");
+                               out.println("\t\t<name>" + entry.getKey() + 
"</name>");
+                               out.println("\t\t<value>" + entry.getValue() + 
"</value>");
+                               out.println("\t</property>");
+                       }
                        out.println("</configuration>");
                }
        }

Reply via email to