This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new da30ce53 [FLINK-28760] Enable table store support in Hive by creating 
auxlib directory
da30ce53 is described below

commit da30ce533b81dc4c8ddd8b37624924e469f7a39a
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Tue Aug 2 10:21:38 2022 +0800

    [FLINK-28760] Enable table store support in Hive by creating auxlib 
directory
    
    This closes #257
---
 docs/content/docs/engines/hive.md                  |  15 ++-
 flink-table-store-e2e-tests/pom.xml                |  12 ++-
 .../flink/table/store/tests/E2eTestBase.java       |  33 +-----
 .../flink/table/store/tests/HiveE2eTest.java       | 113 ++++++++++++++-------
 .../test/resources-filtered/docker-compose.yaml    |   7 +-
 5 files changed, 97 insertions(+), 83 deletions(-)

diff --git a/docs/content/docs/engines/hive.md 
b/docs/content/docs/engines/hive.md
index 92a51578..3a8927ec 100644
--- a/docs/content/docs/engines/hive.md
+++ b/docs/content/docs/engines/hive.md
@@ -49,7 +49,10 @@ You are using an unreleased version of Table Store. See 
[Build From Source]({{<
 
 {{< /unstable >}}
 
-Copy table store Hive connector bundle jar to a path accessible by Hive, then 
use `add jar /path/to/flink-table-store-hive-connector-{{< version >}}.jar` to 
enable table store support in Hive.
+There are several ways to add this jar to Hive.
+
+* You can create an `auxlib` folder under the root directory of Hive, and copy 
`flink-table-store-hive-connector-{{< version >}}.jar` into `auxlib`.
+* You can also copy this jar to a path accessible by Hive, then use `add jar 
/path/to/flink-table-store-hive-connector-{{< version >}}.jar` to enable table 
store support in Hive. Note that this method is not recommended. If you're 
using the MR execution engine and running a join statement, you may be faced 
with the exception `org.apache.hive.com.esotericsoftware.kryo.kryoexception: 
unable to find class`.
 
 ## Using Table Store Hive Catalog
 
@@ -100,10 +103,7 @@ SELECT * FROM test_table;
 Run the following Hive SQL in Hive CLI to access the created table.
 
 ```sql
--- Enable table store support in Hive
-
-ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar;
-
+-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already 
in auxlib directory.
 -- List tables in Hive
 -- (you might need to switch to "default" database if you're not there by 
default)
 
@@ -130,10 +130,7 @@ OK
 To access existing table store table, you can also register them as external 
tables in Hive. Run the following Hive SQL in Hive CLI.
 
 ```sql
--- Enable table store support in Hive
-
-ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar;
-
+-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already 
in auxlib directory.
 -- Let's use the test_table created in the above section.
 -- To create an external table, you don't need to specify any column or table 
properties.
 -- Pointing the location to the path of table is enough.
diff --git a/flink-table-store-e2e-tests/pom.xml 
b/flink-table-store-e2e-tests/pom.xml
index 8ea82d04..083e3f9a 100644
--- a/flink-table-store-e2e-tests/pom.xml
+++ b/flink-table-store-e2e-tests/pom.xml
@@ -31,6 +31,10 @@ under the License.
     <artifactId>flink-table-store-e2e-tests</artifactId>
     <name>Flink Table Store : End to End Tests</name>
 
+    <properties>
+        <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -100,7 +104,7 @@ under the License.
                             <destFileName>flink-table-store.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
-                            
<outputDirectory>${project.build.directory}/dependencies
+                            
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
                             </outputDirectory>
                         </artifactItem>
                         <artifactItem>
@@ -110,17 +114,17 @@ under the License.
                             
<destFileName>flink-table-store-hive-connector.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
-                            
<outputDirectory>${project.build.directory}/dependencies
+                            
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
                             </outputDirectory>
                         </artifactItem>
                         <artifactItem>
                             <groupId>org.apache.flink</groupId>
                             <artifactId>flink-shaded-hadoop-2-uber</artifactId>
-                            <version>2.8.3-10.0</version>
+                            <version>${flink.shaded.hadoop.version}</version>
                             <destFileName>bundled-hadoop.jar</destFileName>
                             <type>jar</type>
                             <overWrite>true</overWrite>
-                            
<outputDirectory>${project.build.directory}/dependencies
+                            
<outputDirectory>/tmp/flink-table-store-e2e-tests-jars
                             </outputDirectory>
                         </artifactItem>
                     </artifactItems>
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
index be375f42..1cdf8da8 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.tests;
 
-import org.apache.flink.table.store.tests.utils.TestUtils;
-
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.slf4j.Logger;
@@ -30,7 +28,6 @@ import org.testcontainers.containers.DockerComposeContainer;
 import org.testcontainers.containers.output.OutputFrame;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.MountableFile;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -64,11 +61,6 @@ public abstract class E2eTestBase {
         this.withHive = withHive;
     }
 
-    private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar";
-    protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
-            "flink-table-store-hive-connector.jar";
-    private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar";
-
     protected static final String TEST_DATA_DIR = "/test-data";
     protected static final String HDFS_ROOT = "hdfs://namenode:8020";
 
@@ -121,10 +113,6 @@ public abstract class E2eTestBase {
         environment.start();
         jobManager = 
environment.getContainerByServiceName("jobmanager_1").get();
         jobManager.execInContainer("chown", "-R", "flink:flink", 
TEST_DATA_DIR);
-
-        copyResource(TABLE_STORE_JAR_NAME);
-        copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME);
-        copyResource(BUNDLED_HADOOP_JAR_NAME);
     }
 
     @AfterEach
@@ -134,12 +122,6 @@ public abstract class E2eTestBase {
         }
     }
 
-    private void copyResource(String resourceName) {
-        jobManager.copyFileToContainer(
-                
MountableFile.forHostPath(TestUtils.getResource(resourceName).toString()),
-                TEST_DATA_DIR + "/" + resourceName);
-    }
-
     protected void writeSharedFile(String filename, String content) throws 
Exception {
         if (content.length() == 0 || content.charAt(content.length() - 1) != 
'\n') {
             content += "\n";
@@ -172,20 +154,7 @@ public abstract class E2eTestBase {
                         "su",
                         "flink",
                         "-c",
-                        "bin/sql-client.sh -f "
-                                + TEST_DATA_DIR
-                                + "/"
-                                + fileName
-                                // run with table store jar
-                                + " --jar "
-                                + TEST_DATA_DIR
-                                + "/"
-                                + TABLE_STORE_JAR_NAME
-                                // run with bundled hadoop jar
-                                + " --jar "
-                                + TEST_DATA_DIR
-                                + "/"
-                                + BUNDLED_HADOOP_JAR_NAME);
+                        "bin/sql-client.sh -f " + TEST_DATA_DIR + "/" + 
fileName);
         LOG.info(execResult.getStdout());
         LOG.info(execResult.getStderr());
         if (execResult.getExitCode() != 0) {
diff --git 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
index 9a4f7833..ef15d508 100644
--- 
a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
+++ 
b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.tests;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.ContainerState;
@@ -34,13 +35,26 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class HiveE2eTest extends E2eTestBase {
 
-    private static final String ADD_JAR_HQL =
-            "ADD JAR " + TEST_DATA_DIR + "/" + 
TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";";
+    private static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME =
+            "flink-table-store-hive-connector.jar";
 
     public HiveE2eTest() {
         super(false, true);
     }
 
+    @BeforeEach
+    @Override
+    public void before() throws Exception {
+        super.before();
+        getHive()
+                .execInContainer(
+                        "/bin/bash",
+                        "-c",
+                        "mkdir /opt/hive/auxlib && cp /jars/"
+                                + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME
+                                + " /opt/hive/auxlib");
+    }
+
     @Test
     public void testReadExternalTable() throws Exception {
         String tableStorePkDdl =
@@ -58,6 +72,7 @@ public class HiveE2eTest extends E2eTestBase {
         runSql(
                 "INSERT INTO table_store_pk VALUES "
                         + "(1, 10, 'Hi'), "
+                        + "(1, 100, 'Hi Again'), "
                         + "(2, 20, 'Hello'), "
                         + "(3, 30, 'Table'), "
                         + "(4, 40, 'Store');",
@@ -68,28 +83,30 @@ public class HiveE2eTest extends E2eTestBase {
                         + "STORED BY 
'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n"
                         + "LOCATION '"
                         + tableStorePkPath
-                        + 
"/default_catalog.catalog/default_database.db/table_store_pk';";
-        writeSharedFile(
-                "pk.hql",
-                // same default database name as Flink
-                ADD_JAR_HQL
-                        + "\n"
-                        + externalTablePkDdl
-                        + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;");
+                        + 
"/default_catalog.catalog/default_database.db/table_store_pk';\n";
 
-        ContainerState hive = getHive();
-        Container.ExecResult execResult =
-                hive.execInContainer(
-                        "/opt/hive/bin/hive",
-                        "--hiveconf",
-                        "hive.root.logger=INFO,console",
-                        "-f",
-                        TEST_DATA_DIR + "/pk.hql");
-        assertThat(execResult.getStdout())
-                .isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n" + "30\t3\tTable\n" 
+ "40\t4\tStore\n");
-        if (execResult.getExitCode() != 0) {
-            throw new AssertionError("Failed when running hive sql.");
-        }
+        checkQueryResult(
+                externalTablePkDdl + "SELECT * FROM table_store_pk ORDER BY 
b;",
+                "1\t10\tHi\n"
+                        + "2\t20\tHello\n"
+                        + "3\t30\tTable\n"
+                        + "4\t40\tStore\n"
+                        + "1\t100\tHi Again\n");
+        checkQueryResult(
+                externalTablePkDdl + "SELECT b, a FROM table_store_pk ORDER BY 
b;",
+                "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n");
+        checkQueryResult(
+                externalTablePkDdl + "SELECT * FROM table_store_pk WHERE a > 1 
ORDER BY b;",
+                "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n");
+        checkQueryResult(
+                externalTablePkDdl
+                        + "SELECT a, SUM(b), MIN(c) FROM table_store_pk GROUP 
BY a ORDER BY a;",
+                "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" + 
"4\t40\tStore\n");
+        checkQueryResult(
+                externalTablePkDdl
+                        + "SELECT T1.a, T1.b, T2.b FROM table_store_pk T1 JOIN 
table_store_pk T2 "
+                        + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b, 
T2.b;",
+                "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" + 
"1\t100\t100\n" + "2\t20\t20\n");
     }
 
     @Test
@@ -118,26 +135,50 @@ public class HiveE2eTest extends E2eTestBase {
                         "  'bucket' = '2'",
                         ");",
                         "",
-                        "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20, 
'Hello');");
+                        "INSERT INTO T VALUES "
+                                + "(1, 10, 'Hi'), "
+                                + "(1, 100, 'Hi Again'), "
+                                + "(2, 20, 'Hello'), "
+                                + "(3, 30, 'Table'), "
+                                + "(4, 40, 'Store');");
         runSql(sql);
 
-        writeSharedFile(
-                "query.hql",
-                // same default database name as Flink
-                ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;");
+        checkQueryResult(
+                "SELECT * FROM t ORDER BY b;",
+                "1\t10\tHi\n"
+                        + "2\t20\tHello\n"
+                        + "3\t30\tTable\n"
+                        + "4\t40\tStore\n"
+                        + "1\t100\tHi Again\n");
+        checkQueryResult(
+                "SELECT b, a FROM t ORDER BY b;",
+                "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n");
+        checkQueryResult(
+                "SELECT * FROM t WHERE a > 1 ORDER BY b;",
+                "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n");
+        checkQueryResult(
+                "SELECT a, SUM(b), MIN(c) FROM t GROUP BY a ORDER BY a;",
+                "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" + 
"4\t40\tStore\n");
+        checkQueryResult(
+                "SELECT T1.a, T1.b, T2.b FROM t T1 JOIN t T2 "
+                        + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b, 
T2.b;",
+                "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" + 
"1\t100\t100\n" + "2\t20\t20\n");
+    }
 
-        ContainerState hive = getHive();
+    private void checkQueryResult(String query, String expected) throws 
Exception {
+        writeSharedFile("pk.hql", query);
         Container.ExecResult execResult =
-                hive.execInContainer(
-                        "/opt/hive/bin/hive",
-                        "--hiveconf",
-                        "hive.root.logger=INFO,console",
-                        "-f",
-                        TEST_DATA_DIR + "/query.hql");
-        assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" + 
"20\t2\tHello\n");
+                getHive()
+                        .execInContainer(
+                                "/opt/hive/bin/hive",
+                                "--hiveconf",
+                                "hive.root.logger=INFO,console",
+                                "-f",
+                                TEST_DATA_DIR + "/pk.hql");
         if (execResult.getExitCode() != 0) {
             throw new AssertionError("Failed when running hive sql.");
         }
+        assertThat(execResult.getStdout()).isEqualTo(expected);
     }
 
     private ContainerState getHive() {
diff --git 
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml 
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
index d414d99f..0e273220 100644
--- 
a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
+++ 
b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml
@@ -28,7 +28,8 @@ services:
     image: apache/flink:${flink.version}-java8
     volumes:
       - testdata:/test-data
-    entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 && /docker-entrypoint.sh jobmanager"
+      - /tmp/flink-table-store-e2e-tests-jars:/jars
+    entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar 
/jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh jobmanager"
     env_file:
       - ./flink.env
     networks:
@@ -42,7 +43,8 @@ services:
     image: apache/flink:${flink.version}-java8
     volumes:
       - testdata:/test-data
-    entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 && /docker-entrypoint.sh taskmanager"
+      - /tmp/flink-table-store-e2e-tests-jars:/jars
+    entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar 
/jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh taskmanager"
     env_file:
       - ./flink.env
     networks:
@@ -130,6 +132,7 @@ services:
     image: bde2020/hive:2.3.2-postgresql-metastore
     volumes:
       - testdata:/test-data
+      - /tmp/flink-table-store-e2e-tests-jars:/jars
     networks:
       testnetwork:
         aliases:

Reply via email to