This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new f057286  [FLINK-13431][hive] Fix nameNode HA configuration was not 
loaded when running HiveConnector on Yarn
f057286 is described below

commit f057286da1ae454cda2ee820e171aad936c294bc
Author: Hongtao Zhang <553780...@qq.com>
AuthorDate: Fri Jul 26 19:08:24 2019 +0800

    [FLINK-13431][hive] Fix nameNode HA configuration was not loaded when 
running HiveConnector on Yarn
    
    This closes #9237
    
    (cherry picked from commit a9d393167cc3927f55b196baa918deea29695225)
---
 .../flink/table/catalog/hive/HiveCatalog.java      | 10 +++-
 .../hive/factories/HiveCatalogFactoryTest.java     | 55 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index 3b4218d..19ffa0d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.batch.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableSchema;
@@ -150,7 +151,14 @@ public class HiveCatalog extends AbstractCatalog {
                                String.format("Failed to get hive-site.xml from 
%s", hiveConfDir), e);
                }
 
-               return new HiveConf();
+               // create HiveConf from hadoop configuration
+               return new HiveConf(HadoopUtils.getHadoopConfiguration(new 
org.apache.flink.configuration.Configuration()),
+                       HiveConf.class);
+       }
+
+       @VisibleForTesting
+       public HiveConf getHiveConf() {
+               return hiveConf;
        }
 
        @VisibleForTesting
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
index feaa868..1339a94 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/factories/HiveCatalogFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.factories;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
@@ -27,8 +28,16 @@ import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -37,6 +46,8 @@ import static org.junit.Assert.assertEquals;
  * Test for {@link HiveCatalog} created by {@link HiveCatalogFactory}.
  */
 public class HiveCatalogFactoryTest extends TestLogger {
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
 
        @Test
        public void test() {
@@ -54,9 +65,53 @@ public class HiveCatalogFactoryTest extends TestLogger {
                checkEquals(expectedCatalog, (HiveCatalog) actualCatalog);
        }
 
+       @Test
+       public void testLoadHDFSConfigFromEnv() throws IOException {
+               final String k1 = "what is connector?";
+               final String v1 = "Hive";
+               final String catalogName = "HiveCatalog";
+
+               // set HADOOP_CONF_DIR env
+               final File hadoopConfDir = tempFolder.newFolder();
+               final File hdfsSiteFile = new File(hadoopConfDir, 
"hdfs-site.xml");
+               writeProperty(hdfsSiteFile, k1, v1);
+               final Map<String, String> originalEnv = System.getenv();
+               final Map<String, String> newEnv = new HashMap<>(originalEnv);
+               newEnv.put("HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath());
+               CommonTestUtils.setEnv(newEnv);
+
+               // create HiveCatalog use the Hadoop Configuration
+               final CatalogDescriptor catalogDescriptor = new 
HiveCatalogDescriptor();
+               final Map<String, String> properties = 
catalogDescriptor.toProperties();
+               final HiveConf hiveConf;
+               try {
+                       final HiveCatalog hiveCatalog = (HiveCatalog) 
TableFactoryService.find(CatalogFactory.class, properties)
+                               .createCatalog(catalogName, properties);
+                       hiveConf = hiveCatalog.getHiveConf();
+               } finally {
+                       // set the Env back
+                       CommonTestUtils.setEnv(originalEnv);
+               }
+               //validate the result
+               assertEquals(v1, hiveConf.get(k1, null));
+       }
+
        private static void checkEquals(HiveCatalog c1, HiveCatalog c2) {
                // Only assert a few selected properties for now
                assertEquals(c1.getName(), c2.getName());
                assertEquals(c1.getDefaultDatabase(), c2.getDefaultDatabase());
        }
+
+       private static void writeProperty(File file, String key, String value) 
throws IOException {
+               try (PrintStream out = new PrintStream(new 
FileOutputStream(file))) {
+                       out.println("<?xml version=\"1.0\"?>");
+                       out.println("<?xml-stylesheet type=\"text/xsl\" 
href=\"configuration.xsl\"?>");
+                       out.println("<configuration>");
+                       out.println("\t<property>");
+                       out.println("\t\t<name>" + key + "</name>");
+                       out.println("\t\t<value>" + value + "</value>");
+                       out.println("\t</property>");
+                       out.println("</configuration>");
+               }
+       }
 }

Reply via email to