This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push: new adcf3974 [FLINK-28760] Enable table store support in Hive by creating auxlib directory adcf3974 is described below commit adcf39744c7a2c7b3ec143d1d217eaa02efa10a6 Author: tsreaper <tsreape...@gmail.com> AuthorDate: Tue Aug 2 10:21:38 2022 +0800 [FLINK-28760] Enable table store support in Hive by creating auxlib directory This closes #257 --- docs/content/docs/engines/hive.md | 15 ++- flink-table-store-e2e-tests/pom.xml | 12 ++- .../flink/table/store/tests/E2eTestBase.java | 33 +----- .../flink/table/store/tests/HiveE2eTest.java | 113 ++++++++++++++------- .../test/resources-filtered/docker-compose.yaml | 7 +- 5 files changed, 97 insertions(+), 83 deletions(-) diff --git a/docs/content/docs/engines/hive.md b/docs/content/docs/engines/hive.md index 92a51578..3a8927ec 100644 --- a/docs/content/docs/engines/hive.md +++ b/docs/content/docs/engines/hive.md @@ -49,7 +49,10 @@ You are using an unreleased version of Table Store. See [Build From Source]({{< {{< /unstable >}} -Copy table store Hive connector bundle jar to a path accessible by Hive, then use `add jar /path/to/flink-table-store-hive-connector-{{< version >}}.jar` to enable table store support in Hive. +There are several ways to add this jar to Hive. + +* You can create an `auxlib` folder under the root directory of Hive, and copy `flink-table-store-hive-connector-{{< version >}}.jar` into `auxlib`. +* You can also copy this jar to a path accessible by Hive, then use `add jar /path/to/flink-table-store-hive-connector-{{< version >}}.jar` to enable table store support in Hive. Note that this method is not recommended. If you're using the MR execution engine and running a join statement, you may be faced with the exception `org.apache.hive.com.esotericsoftware.kryo.kryoexception: unable to find class`. ## Using Table Store Hive Catalog @@ -100,10 +103,7 @@ SELECT * FROM test_table; Run the following Hive SQL in Hive CLI to access the created table. ```sql --- Enable table store support in Hive - -ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar; - +-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already in auxlib directory. -- List tables in Hive -- (you might need to switch to "default" database if you're not there by default) @@ -130,10 +130,7 @@ OK To access existing table store table, you can also register them as external tables in Hive. Run the following Hive SQL in Hive CLI. ```sql --- Enable table store support in Hive - -ADD JAR /path/to/flink-table-store-hive-connector-{{< version >}}.jar; - +-- Assume that flink-table-store-hive-connector-{{< version >}}.jar is already in auxlib directory. -- Let's use the test_table created in the above section. -- To create an external table, you don't need to specify any column or table properties. -- Pointing the location to the path of table is enough. diff --git a/flink-table-store-e2e-tests/pom.xml b/flink-table-store-e2e-tests/pom.xml index 85a62b5f..4be0beaf 100644 --- a/flink-table-store-e2e-tests/pom.xml +++ b/flink-table-store-e2e-tests/pom.xml @@ -31,6 +31,10 @@ under the License. <artifactId>flink-table-store-e2e-tests</artifactId> <name>Flink Table Store : End to End Tests</name> + <properties> + <flink.shaded.hadoop.version>2.8.3-10.0</flink.shaded.hadoop.version> + </properties> + <dependencies> <dependency> <groupId>org.apache.flink</groupId> @@ -100,7 +104,7 @@ under the License. <destFileName>flink-table-store.jar</destFileName> <type>jar</type> <overWrite>true</overWrite> - <outputDirectory>${project.build.directory}/dependencies + <outputDirectory>/tmp/flink-table-store-e2e-tests-jars </outputDirectory> </artifactItem> <artifactItem> @@ -110,17 +114,17 @@ under the License. <destFileName>flink-table-store-hive-connector.jar</destFileName> <type>jar</type> <overWrite>true</overWrite> - <outputDirectory>${project.build.directory}/dependencies + <outputDirectory>/tmp/flink-table-store-e2e-tests-jars </outputDirectory> </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop-2-uber</artifactId> - <version>2.8.3-10.0</version> + <version>${flink.shaded.hadoop.version}</version> <destFileName>bundled-hadoop.jar</destFileName> <type>jar</type> <overWrite>true</overWrite> - <outputDirectory>${project.build.directory}/dependencies + <outputDirectory>/tmp/flink-table-store-e2e-tests-jars </outputDirectory> </artifactItem> </artifactItems> diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java index be375f42..1cdf8da8 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java @@ -18,8 +18,6 @@ package org.apache.flink.table.store.tests; -import org.apache.flink.table.store.tests.utils.TestUtils; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.slf4j.Logger; @@ -30,7 +28,6 @@ import org.testcontainers.containers.DockerComposeContainer; import org.testcontainers.containers.output.OutputFrame; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; -import org.testcontainers.utility.MountableFile; import java.io.File; import java.util.ArrayList; @@ -64,11 +61,6 @@ public abstract class E2eTestBase { this.withHive = withHive; } - private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar"; - protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME = - "flink-table-store-hive-connector.jar"; - private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar"; - protected static final String TEST_DATA_DIR = "/test-data"; protected static final String HDFS_ROOT = "hdfs://namenode:8020"; @@ -121,10 +113,6 @@ public abstract class E2eTestBase { environment.start(); jobManager = environment.getContainerByServiceName("jobmanager_1").get(); jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR); - - copyResource(TABLE_STORE_JAR_NAME); - copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME); - copyResource(BUNDLED_HADOOP_JAR_NAME); } @AfterEach @@ -134,12 +122,6 @@ public abstract class E2eTestBase { } } - private void copyResource(String resourceName) { - jobManager.copyFileToContainer( - MountableFile.forHostPath(TestUtils.getResource(resourceName).toString()), - TEST_DATA_DIR + "/" + resourceName); - } - protected void writeSharedFile(String filename, String content) throws Exception { if (content.length() == 0 || content.charAt(content.length() - 1) != '\n') { content += "\n"; @@ -172,20 +154,7 @@ public abstract class E2eTestBase { "su", "flink", "-c", - "bin/sql-client.sh -f " - + TEST_DATA_DIR - + "/" - + fileName - // run with table store jar - + " --jar " - + TEST_DATA_DIR - + "/" - + TABLE_STORE_JAR_NAME - // run with bundled hadoop jar - + " --jar " - + TEST_DATA_DIR - + "/" - + BUNDLED_HADOOP_JAR_NAME); + "bin/sql-client.sh -f " + TEST_DATA_DIR + "/" + fileName); LOG.info(execResult.getStdout()); LOG.info(execResult.getStderr()); if (execResult.getExitCode() != 0) { diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java index 9a4f7833..ef15d508 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store.tests; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.containers.ContainerState; @@ -34,13 +35,26 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class HiveE2eTest extends E2eTestBase { - private static final String ADD_JAR_HQL = - "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";"; + private static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME = + "flink-table-store-hive-connector.jar"; public HiveE2eTest() { super(false, true); } + @BeforeEach + @Override + public void before() throws Exception { + super.before(); + getHive() + .execInContainer( + "/bin/bash", + "-c", + "mkdir /opt/hive/auxlib && cp /jars/" + + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + + " /opt/hive/auxlib"); + } + @Test public void testReadExternalTable() throws Exception { String tableStorePkDdl = @@ -58,6 +72,7 @@ public class HiveE2eTest extends E2eTestBase { runSql( "INSERT INTO table_store_pk VALUES " + "(1, 10, 'Hi'), " + + "(1, 100, 'Hi Again'), " + "(2, 20, 'Hello'), " + "(3, 30, 'Table'), " + "(4, 40, 'Store');", @@ -68,28 +83,30 @@ public class HiveE2eTest extends E2eTestBase { + "STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n" + "LOCATION '" + tableStorePkPath - + "/default_catalog.catalog/default_database.db/table_store_pk';"; - writeSharedFile( - "pk.hql", - // same default database name as Flink - ADD_JAR_HQL - + "\n" - + externalTablePkDdl - + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;"); + + "/default_catalog.catalog/default_database.db/table_store_pk';\n"; - ContainerState hive = getHive(); - Container.ExecResult execResult = - hive.execInContainer( - "/opt/hive/bin/hive", - "--hiveconf", - "hive.root.logger=INFO,console", - "-f", - TEST_DATA_DIR + "/pk.hql"); - assertThat(execResult.getStdout()) - .isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n" + "30\t3\tTable\n" + "40\t4\tStore\n"); - if (execResult.getExitCode() != 0) { - throw new AssertionError("Failed when running hive sql."); - } + checkQueryResult( + externalTablePkDdl + "SELECT * FROM table_store_pk ORDER BY b;", + "1\t10\tHi\n" + + "2\t20\tHello\n" + + "3\t30\tTable\n" + + "4\t40\tStore\n" + + "1\t100\tHi Again\n"); + checkQueryResult( + externalTablePkDdl + "SELECT b, a FROM table_store_pk ORDER BY b;", + "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n"); + checkQueryResult( + externalTablePkDdl + "SELECT * FROM table_store_pk WHERE a > 1 ORDER BY b;", + "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n"); + checkQueryResult( + externalTablePkDdl + + "SELECT a, SUM(b), MIN(c) FROM table_store_pk GROUP BY a ORDER BY a;", + "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n"); + checkQueryResult( + externalTablePkDdl + + "SELECT T1.a, T1.b, T2.b FROM table_store_pk T1 JOIN table_store_pk T2 " + + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b, T2.b;", + "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" + "1\t100\t100\n" + "2\t20\t20\n"); } @Test @@ -118,26 +135,50 @@ public class HiveE2eTest extends E2eTestBase { " 'bucket' = '2'", ");", "", - "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20, 'Hello');"); + "INSERT INTO T VALUES " + + "(1, 10, 'Hi'), " + + "(1, 100, 'Hi Again'), " + + "(2, 20, 'Hello'), " + + "(3, 30, 'Table'), " + + "(4, 40, 'Store');"); runSql(sql); - writeSharedFile( - "query.hql", - // same default database name as Flink - ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;"); + checkQueryResult( + "SELECT * FROM t ORDER BY b;", + "1\t10\tHi\n" + + "2\t20\tHello\n" + + "3\t30\tTable\n" + + "4\t40\tStore\n" + + "1\t100\tHi Again\n"); + checkQueryResult( + "SELECT b, a FROM t ORDER BY b;", + "10\t1\n" + "20\t2\n" + "30\t3\n" + "40\t4\n" + "100\t1\n"); + checkQueryResult( + "SELECT * FROM t WHERE a > 1 ORDER BY b;", + "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n"); + checkQueryResult( + "SELECT a, SUM(b), MIN(c) FROM t GROUP BY a ORDER BY a;", + "1\t110\tHi\n" + "2\t20\tHello\n" + "3\t30\tTable\n" + "4\t40\tStore\n"); + checkQueryResult( + "SELECT T1.a, T1.b, T2.b FROM t T1 JOIN t T2 " + + "ON T1.a = T2.a WHERE T1.a <= 2 ORDER BY T1.a, T1.b, T2.b;", + "1\t10\t10\n" + "1\t10\t100\n" + "1\t100\t10\n" + "1\t100\t100\n" + "2\t20\t20\n"); + } - ContainerState hive = getHive(); + private void checkQueryResult(String query, String expected) throws Exception { + writeSharedFile("pk.hql", query); Container.ExecResult execResult = - hive.execInContainer( - "/opt/hive/bin/hive", - "--hiveconf", - "hive.root.logger=INFO,console", - "-f", - TEST_DATA_DIR + "/query.hql"); - assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n"); + getHive() + .execInContainer( + "/opt/hive/bin/hive", + "--hiveconf", + "hive.root.logger=INFO,console", + "-f", + TEST_DATA_DIR + "/pk.hql"); if (execResult.getExitCode() != 0) { throw new AssertionError("Failed when running hive sql."); } + assertThat(execResult.getStdout()).isEqualTo(expected); } private ContainerState getHive() { diff --git a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml index d414d99f..0e273220 100644 --- a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml +++ b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml @@ -28,7 +28,8 @@ services: image: apache/flink:${flink.version}-java8 volumes: - testdata:/test-data - entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh jobmanager" + - /tmp/flink-table-store-e2e-tests-jars:/jars + entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar /jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh jobmanager" env_file: - ./flink.env networks: @@ -42,7 +43,8 @@ services: image: apache/flink:${flink.version}-java8 volumes: - testdata:/test-data - entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh taskmanager" + - /tmp/flink-table-store-e2e-tests-jars:/jars + entrypoint: /bin/bash -c "cp /jars/flink-table-store.jar /jars/bundled-hadoop.jar /opt/flink/lib && /docker-entrypoint.sh taskmanager" env_file: - ./flink.env networks: @@ -130,6 +132,7 @@ services: image: bde2020/hive:2.3.2-postgresql-metastore volumes: - testdata:/test-data + - /tmp/flink-table-store-e2e-tests-jars:/jars networks: testnetwork: aliases: