This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new f057286 [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn f057286 is described below commit f057286da1ae454cda2ee820e171aad936c294bc Author: Hongtao Zhang <553780...@qq.com> AuthorDate: Fri Jul 26 19:08:24 2019 +0800 [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when running HiveConnector on Yarn This closes #9237 (cherry picked from commit a9d393167cc3927f55b196baa918deea29695225) --- .../flink/table/catalog/hive/HiveCatalog.java | 10 +++- .../hive/factories/HiveCatalogFactoryTest.java | 55 ++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index 3b4218d..19ffa0d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.batch.connectors.hive.HiveTableFactory; import org.apache.flink.table.api.TableSchema; @@ -150,7 +151,14 @@ public class HiveCatalog extends AbstractCatalog { String.format("Failed to get hive-site.xml from %s", hiveConfDir), e); } - return new HiveConf(); + // create HiveConf from hadoop configuration + return new HiveConf(HadoopUtils.getHadoopConfiguration(new org.apache.flink.configuration.Configuration()), + HiveConf.class); + } + + @VisibleForTesting + public HiveConf getHiveConf() { + return hiveConf; } @VisibleForTesting diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java index feaa868..1339a94 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog.hive.factories; +import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; @@ -27,8 +28,16 @@ import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.TableFactoryService; import org.apache.flink.util.TestLogger; +import org.apache.hadoop.hive.conf.HiveConf; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -37,6 +46,8 @@ import static org.junit.Assert.assertEquals; * Test for {@link HiveCatalog} created by {@link HiveCatalogFactory}. */ public class HiveCatalogFactoryTest extends TestLogger { + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); @Test public void test() { @@ -54,9 +65,53 @@ public class HiveCatalogFactoryTest extends TestLogger { checkEquals(expectedCatalog, (HiveCatalog) actualCatalog); } + @Test + public void testLoadHDFSConfigFromEnv() throws IOException { + final String k1 = "what is connector?"; + final String v1 = "Hive"; + final String catalogName = "HiveCatalog"; + + // set HADOOP_CONF_DIR env + final File hadoopConfDir = tempFolder.newFolder(); + final File hdfsSiteFile = new File(hadoopConfDir, "hdfs-site.xml"); + writeProperty(hdfsSiteFile, k1, v1); + final Map<String, String> originalEnv = System.getenv(); + final Map<String, String> newEnv = new HashMap<>(originalEnv); + newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath()); + CommonTestUtils.setEnv(newEnv); + + // create HiveCatalog use the Hadoop Configuration + final CatalogDescriptor catalogDescriptor = new HiveCatalogDescriptor(); + final Map<String, String> properties = catalogDescriptor.toProperties(); + final HiveConf hiveConf; + try { + final HiveCatalog hiveCatalog = (HiveCatalog) TableFactoryService.find(CatalogFactory.class, properties) + .createCatalog(catalogName, properties); + hiveConf = hiveCatalog.getHiveConf(); + } finally { + // set the Env back + CommonTestUtils.setEnv(originalEnv); + } + //validate the result + assertEquals(v1, hiveConf.get(k1, null)); + } + private static void checkEquals(HiveCatalog c1, HiveCatalog c2) { // Only assert a few selected properties for now assertEquals(c1.getName(), c2.getName()); assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase()); } + + private static void writeProperty(File file, String key, String value) throws IOException { + try (PrintStream out = new PrintStream(new FileOutputStream(file))) { + out.println("<?xml version=\"1.0\"?>"); + out.println("<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"); + out.println("<configuration>"); + out.println("\t<property>"); + out.println("\t\t<name>" + key + "</name>"); + out.println("\t\t<value>" + value + "</value>"); + out.println("\t</property>"); + out.println("</configuration>"); + } + } }