This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new f91c709 [FLINK-11135][configuration] Reorder Hadoop config loading in HadoopUtils f91c709 is described below commit f91c7095339f1d77b41e63b12916f4a949ce30c1 Author: Paul Lin <paullin3...@gmail.com> AuthorDate: Wed Jan 8 10:22:40 2020 +0800 [FLINK-11135][configuration] Reorder Hadoop config loading in HadoopUtils This closes #7314 . --- .../org/apache/flink/runtime/util/HadoopUtils.java | 89 +++++++++++------- .../runtime/fs/hdfs/HadoopConfigLoadingTest.java | 104 ++++++++++++++++++++- 2 files changed, 154 insertions(+), 39 deletions(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java index 599ea4e..f9244d3 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java @@ -50,63 +50,62 @@ public class HadoopUtils { // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml // from the classpath + Configuration result = new HdfsConfiguration(); boolean foundHadoopConfiguration = false; // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and - // the hdfs configuration - // Try to load HDFS configuration from Hadoop's own configuration files - // 1. approach: Flink configuration + // the hdfs configuration. + // The properties of a newly added resource will override the ones in previous resources, so a configuration + // file with higher priority should be added later. + + // Approach 1: HADOOP_HOME environment variables + String[] possibleHadoopConfPaths = new String[2]; + + final String hadoopHome = System.getenv("HADOOP_HOME"); + if (hadoopHome != null) { + LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome); + possibleHadoopConfPaths[0] = hadoopHome + "/conf"; + possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2 + } + + for (String possibleHadoopConfPath : possibleHadoopConfPaths) { + if (possibleHadoopConfPath != null) { + foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath); + } + } + + // Approach 2: Flink configuration (deprecated) final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); - if (hdfsDefaultPath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); - LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath); + LOG.debug("Using hdfs-default configuration-file path from Flink config: {}", hdfsDefaultPath); foundHadoopConfiguration = true; - } else { - LOG.debug("Cannot find hdfs-default configuration-file path in Flink config."); } final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); if (hdfsSitePath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); - LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath); + LOG.debug("Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath); foundHadoopConfiguration = true; - } else { - LOG.debug("Cannot find hdfs-site configuration-file path in Flink config."); } - // 2. Approach environment variables - String[] possibleHadoopConfPaths = new String[4]; - possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); - possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); - - final String hadoopHome = System.getenv("HADOOP_HOME"); - if (hadoopHome != null) { - possibleHadoopConfPaths[2] = hadoopHome + "/conf"; - possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2 + final String hadoopConfigPath = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); + if (hadoopConfigPath != null) { + LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath); + foundHadoopConfiguration = addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration; } - for (String possibleHadoopConfPath : possibleHadoopConfPaths) { - if (possibleHadoopConfPath != null) { - if (new File(possibleHadoopConfPath).exists()) { - if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { - result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); - LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); - foundHadoopConfiguration = true; - } - if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { - result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); - LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); - foundHadoopConfiguration = true; - } - } - } + // Approach 3: HADOOP_CONF_DIR environment variable + String hadoopConfDir = System.getenv("HADOOP_CONF_DIR"); + if (hadoopConfDir != null) { + LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir); + foundHadoopConfiguration = addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration; } if (!foundHadoopConfiguration) { - LOG.debug("Could not find Hadoop configuration via any of the supported methods " + + LOG.warn("Could not find Hadoop configuration via any of the supported methods " + "(Flink configuration, environment variables)."); } @@ -162,4 +161,24 @@ public class HadoopUtils { int min = Integer.parseInt(versionParts[1]); return Tuple2.of(maj, min); } + + /** + * Search Hadoop configuration files in the given path, and add them to the configuration if found. + */ + private static boolean addHadoopConfIfFound(Configuration configuration, String possibleHadoopConfPath) { + boolean foundHadoopConfiguration = false; + if (new File(possibleHadoopConfPath).exists()) { + if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { + configuration.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); + LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); + foundHadoopConfiguration = true; + } + if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { + configuration.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); + LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); + foundHadoopConfiguration = true; + } + } + return foundHadoopConfiguration; + } } diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java index bb3d088..c736243 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopConfigLoadingTest.java @@ -180,15 +180,111 @@ public class HadoopConfigLoadingTest { assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null)); } + @Test + public void loadOverlappingConfig() throws Exception { + final String k1 = "key1"; + final String k2 = "key2"; + final String k3 = "key3"; + final String k4 = "key4"; + final String k5 = "key5"; + + final String v1 = "from HADOOP_CONF_DIR"; + final String v2 = "from Flink config `fs.hdfs.hadoopconf`"; + final String v3 = "from Flink config `fs.hdfs.hdfsdefault`"; + final String v4 = "from HADOOP_HOME/etc/hadoop"; + final String v5 = "from HADOOP_HOME/conf"; + + final File hadoopConfDir = tempFolder.newFolder("hadoopConfDir"); + final File hadoopConfEntryDir = tempFolder.newFolder("hadoopConfEntryDir"); + final File legacyConfDir = tempFolder.newFolder("legacyConfDir"); + final File hadoopHome = tempFolder.newFolder("hadoopHome"); + + final File hadoopHomeConf = new File(hadoopHome, "conf"); + final File hadoopHomeEtc = new File(hadoopHome, "etc/hadoop"); + + assertTrue(hadoopHomeConf.mkdirs()); + assertTrue(hadoopHomeEtc.mkdirs()); + + final File file1 = new File(hadoopConfDir, "core-site.xml"); + final File file2 = new File(hadoopConfEntryDir, "core-site.xml"); + final File file3 = new File(legacyConfDir, "core-site.xml"); + final File file4 = new File(hadoopHomeEtc, "core-site.xml"); + final File file5 = new File(hadoopHomeConf, "core-site.xml"); + + printConfig(file1, k1, v1); + + Map<String, String> properties2 = new HashMap<>(); + properties2.put(k1, v2); + properties2.put(k2, v2); + printConfigs(file2, properties2); + + Map<String, String> properties3 = new HashMap<>(); + properties3.put(k1, v3); + properties3.put(k2, v3); + properties3.put(k3, v3); + printConfigs(file3, properties3); + + Map<String, String> properties4 = new HashMap<>(); + properties4.put(k1, v4); + properties4.put(k2, v4); + properties4.put(k3, v4); + properties4.put(k4, v4); + printConfigs(file4, properties4); + + Map<String, String> properties5 = new HashMap<>(); + properties5.put(k1, v5); + properties5.put(k2, v5); + properties5.put(k3, v5); + properties5.put(k4, v5); + properties5.put(k5, v5); + printConfigs(file5, properties5); + + final Configuration cfg = new Configuration(); + cfg.setString(ConfigConstants.PATH_HADOOP_CONFIG, hadoopConfEntryDir.getAbsolutePath()); + cfg.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, file3.getAbsolutePath()); + + final org.apache.hadoop.conf.Configuration hadoopConf; + + final Map<String, String> originalEnv = System.getenv(); + final Map<String, String> newEnv = new HashMap<>(originalEnv); + newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath()); + newEnv.put("HADOOP_HOME", hadoopHome.getAbsolutePath()); + try { + CommonTestUtils.setEnv(newEnv); + hadoopConf = HadoopUtils.getHadoopConfiguration(cfg); + } + finally { + CommonTestUtils.setEnv(originalEnv); + } + + // contains extra entries + assertEquals(v1, hadoopConf.get(k1, null)); + assertEquals(v2, hadoopConf.get(k2, null)); + assertEquals(v3, hadoopConf.get(k3, null)); + assertEquals(v4, hadoopConf.get(k4, null)); + assertEquals(v5, hadoopConf.get(k5, null)); + + // also contains classpath defaults + assertEquals(IN_CP_CONFIG_VALUE, hadoopConf.get(IN_CP_CONFIG_KEY, null)); + } + private static void printConfig(File file, String key, String value) throws IOException { + Map<String, String> map = new HashMap<>(1); + map.put(key, value); + printConfigs(file, map); + } + + private static void printConfigs(File file, Map<String, String> properties) throws IOException { try (PrintStream out = new PrintStream(new FileOutputStream(file))) { out.println("<?xml version=\"1.0\"?>"); out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"); out.println("<configuration>"); - out.println("\t<property>"); - out.println("\t\t<name>" + key + "</name>"); - out.println("\t\t<value>" + value + "</value>"); - out.println("\t</property>"); + for (Map.Entry<String, String> entry: properties.entrySet()) { + out.println("\t<property>"); + out.println("\t\t<name>" + entry.getKey() + "</name>"); + out.println("\t\t<value>" + entry.getValue() + "</value>"); + out.println("\t</property>"); + } out.println("</configuration>"); } }