This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new cb688c5b [FLINK-28067] Introduce HiveCatalog cb688c5b is described below commit cb688c5bed4e02ceff608d85a6578a73f4fe2c1f Author: tsreaper <tsreape...@gmail.com> AuthorDate: Thu Jun 30 17:05:10 2022 +0800 [FLINK-28067] Introduce HiveCatalog This closes #181 --- .../flink/table/store/connector/FlinkCatalog.java | 12 +- .../table/store/connector/FlinkCatalogFactory.java | 82 ++-- .../table/store/connector/FlinkCatalogTest.java | 5 +- .../table/store/file/catalog/AbstractCatalog.java | 39 ++ .../flink/table/store/file/catalog/Catalog.java | 5 - .../table/store/file/catalog/CatalogFactory.java | 29 ++ .../store/file/catalog/FileSystemCatalog.java | 34 +- .../file/catalog/FileSystemCatalogFactory.java | 38 ++ ...e.flink.table.store.file.catalog.CatalogFactory | 16 + flink-table-store-dist/pom.xml | 7 + flink-table-store-e2e-tests/pom.xml | 6 +- .../flink/table/store/tests/E2eTestBase.java | 5 +- .../flink/table/store/tests/HiveE2eTest.java | 61 ++- .../test/resources-filtered/docker-compose.yaml | 8 +- .../{ => flink-table-store-hive-catalog}/pom.xml | 141 ++++--- .../apache/flink/table/store/hive/HiveCatalog.java | 356 ++++++++++++++++ .../flink/table/store/hive/HiveCatalogFactory.java | 52 +++ ...e.flink.table.store.file.catalog.CatalogFactory | 16 + .../flink/table/store/hive/HiveCatalogITCase.java | 235 +++++++++++ .../flink-table-store-hive-common/pom.xml | 77 ++++ .../flink/table/store/hive/HiveTypeUtils.java | 55 --- .../{ => flink-table-store-hive-connector}/pom.xml | 18 +- .../apache/flink/table/store/RowDataContainer.java | 0 .../store/SearchArgumentToPredicateConverter.java | 0 .../flink/table/store/TableStoreJobConf.java | 0 .../apache/flink/table/store/hive/HiveSchema.java | 0 .../table/store/hive/TableStoreHiveMetaHook.java | 0 .../store/hive/TableStoreHiveStorageHandler.java | 0 .../flink/table/store/hive/TableStoreSerDe.java | 0 .../TableStoreCharObjectInspector.java | 0 .../TableStoreDateObjectInspector.java | 0 .../TableStoreDecimalObjectInspector.java | 0 .../TableStoreListObjectInspector.java | 3 +- .../TableStoreMapObjectInspector.java | 5 +- .../TableStoreObjectInspectorFactory.java | 79 ++++ .../TableStoreRowDataObjectInspector.java | 2 +- .../TableStoreStringObjectInspector.java | 0 .../TableStoreTimestampObjectInspector.java | 0 .../TableStoreVarcharObjectInspector.java | 0 .../table/store/mapred/TableStoreInputFormat.java | 0 .../table/store/mapred/TableStoreInputSplit.java | 0 .../table/store/mapred/TableStoreOutputFormat.java | 0 .../table/store/mapred/TableStoreRecordReader.java | 0 .../flink/table/store/FileStoreTestUtils.java | 0 .../SearchArgumentToPredicateConverterTest.java | 0 .../table/store/hive/HiveTableSchemaTest.java | 0 .../store/hive/RandomGenericRowDataGenerator.java | 0 .../hive/TableStoreHiveStorageHandlerITCase.java | 3 +- .../table/store/hive/TableStoreSerDeTest.java | 0 .../TableStoreCharObjectInspectorTest.java | 0 .../TableStoreDateObjectInspectorTest.java | 0 .../TableStoreDecimalObjectInspectorTest.java | 0 .../TableStoreListObjectInspectorTest.java | 0 .../TableStoreMapObjectInspectorTest.java | 0 .../TableStoreRowDataObjectInspectorTest.java | 0 .../TableStoreStringObjectInspectorTest.java | 0 .../TableStoreTimestampObjectInspectorTest.java | 0 .../TableStoreVarcharObjectInspectorTest.java | 0 .../store/mapred/TableStoreInputSplitTest.java | 0 .../store/mapred/TableStoreRecordReaderTest.java | 0 .../src/test/resources/log4j2-test.properties | 0 flink-table-store-hive/pom.xml | 459 +-------------------- pom.xml | 1 - 63 files changed, 1195 insertions(+), 654 deletions(-) diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java index 232015e5..56840aa3 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java @@ -220,13 +220,19 @@ public class FlinkCatalog extends AbstractCatalog { return UpdateSchema.fromCatalogTable(table); } - // --------------------- unsupported methods ---------------------------- - @Override public final void open() throws CatalogException {} @Override - public final void close() throws CatalogException {} + public final void close() throws CatalogException { + try { + catalog.close(); + } catch (Exception e) { + throw new CatalogException("Failed to close catalog " + catalog.toString(), e); + } + } + + // --------------------- unsupported methods ---------------------------- @Override public CatalogDatabase getDatabase(String databaseName) throws CatalogException { diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java index 6937e888..6021260f 100644 --- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java +++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java @@ -21,29 +21,34 @@ package org.apache.flink.table.store.connector; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.FactoryUtil; -import org.apache.flink.table.store.file.catalog.Catalog; +import org.apache.flink.table.store.file.catalog.CatalogFactory; +import org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory; +import org.apache.flink.util.Preconditions; -import java.util.HashSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ServiceLoader; import java.util.Set; - -import static org.apache.flink.table.factories.FactoryUtil.PROPERTY_VERSION; +import java.util.stream.Collectors; /** Factory for {@link FlinkCatalog}. */ -public class FlinkCatalogFactory implements CatalogFactory { +public class FlinkCatalogFactory implements org.apache.flink.table.factories.CatalogFactory { public static final String IDENTIFIER = "table-store"; - public static final ConfigOption<String> DEFAULT_DATABASE = - ConfigOptions.key("default-database").stringType().defaultValue("default"); - - public static final ConfigOption<String> WAREHOUSE = + public static final ConfigOption<String> METASTORE = + ConfigOptions.key("metastore") + .stringType() + .defaultValue(FileSystemCatalogFactory.IDENTIFIER); + private static final ConfigOption<String> WAREHOUSE = ConfigOptions.key("warehouse") .stringType() .noDefaultValue() .withDescription("The warehouse root path of catalog."); + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key("default-database").stringType().defaultValue("default"); @Override public String factoryIdentifier() { @@ -52,35 +57,58 @@ public class FlinkCatalogFactory implements CatalogFactory { @Override public Set<ConfigOption<?>> requiredOptions() { - Set<ConfigOption<?>> options = new HashSet<>(); - options.add(WAREHOUSE); - return options; + return Collections.emptySet(); } @Override public Set<ConfigOption<?>> optionalOptions() { - Set<ConfigOption<?>> options = new HashSet<>(); - options.add(DEFAULT_DATABASE); - options.add(PROPERTY_VERSION); - return options; + return Collections.emptySet(); } @Override public FlinkCatalog createCatalog(Context context) { FactoryUtil.CatalogFactoryHelper helper = FactoryUtil.createCatalogFactoryHelper(this, context); - helper.validate(); ReadableConfig options = helper.getOptions(); - return createCatalog( - new Path(options.get(WAREHOUSE)), context.getName(), options.get(DEFAULT_DATABASE)); + return createCatalog(context.getName(), options); } - public static FlinkCatalog createCatalog(Path warehouse, String catalogName) { - return createCatalog(warehouse, catalogName, DEFAULT_DATABASE.defaultValue()); - } + public static FlinkCatalog createCatalog(String catalogName, ReadableConfig options) { + // manual validation + // because different catalog types may have different options + // we can't list them all in the optionalOptions() method + String warehouse = + Preconditions.checkNotNull( + options.get(WAREHOUSE), + "Table store '" + WAREHOUSE.key() + "' path must be set"); + + String metastore = options.get(METASTORE); + List<CatalogFactory> factories = new ArrayList<>(); + ServiceLoader.load(CatalogFactory.class, Thread.currentThread().getContextClassLoader()) + .iterator() + .forEachRemaining( + f -> { + if (f.identifier().equals(metastore)) { + factories.add(f); + } + }); + if (factories.size() != 1) { + throw new RuntimeException( + "Found " + + factories.size() + + " classes implementing " + + CatalogFactory.class.getName() + + " with metastore " + + metastore + + ". They are:\n" + + factories.stream() + .map(t -> t.getClass().getName()) + .collect(Collectors.joining("\n"))); + } - public static FlinkCatalog createCatalog( - Path warehouse, String catalogName, String defaultDatabase) { - return new FlinkCatalog(Catalog.create(warehouse), catalogName, defaultDatabase); + return new FlinkCatalog( + factories.get(0).create(warehouse, options), + catalogName, + options.get(DEFAULT_DATABASE)); } } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java index 79b8153d..8294294e 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store.connector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; @@ -71,7 +72,9 @@ public class FlinkCatalogTest { @Before public void beforeEach() throws IOException { String path = TEMPORARY_FOLDER.newFolder().toURI().toString(); - catalog = FlinkCatalogFactory.createCatalog(new Path(path), "test-catalog"); + Configuration conf = new Configuration(); + conf.setString("warehouse", path); + catalog = FlinkCatalogFactory.createCatalog("test-catalog", conf); } private ResolvedSchema createSchema() { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java new file mode 100644 index 00000000..c3e2dd59 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/AbstractCatalog.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.catalog; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.catalog.ObjectPath; + +/** Common implementation of {@link Catalog}. */ +public abstract class AbstractCatalog implements Catalog { + + protected static final String DB_SUFFIX = ".db"; + + @Override + public Path getTableLocation(ObjectPath tablePath) { + return new Path(databasePath(tablePath.getDatabaseName()), tablePath.getObjectName()); + } + + protected Path databasePath(String database) { + return new Path(warehouse(), database + DB_SUFFIX); + } + + protected abstract String warehouse(); +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java index 01b23661..65c099f4 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java @@ -241,9 +241,4 @@ public interface Catalog extends AutoCloseable { return tablePath; } } - - /** Create a {@link Catalog} from warehouse path. */ - static Catalog create(Path warehouse) { - return new FileSystemCatalog(warehouse); - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java new file mode 100644 index 00000000..5e93a3ed --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.catalog; + +import org.apache.flink.configuration.ReadableConfig; + +/** Factory to create {@link Catalog}. Each factory should have a unique identifier. */ +public interface CatalogFactory { + + String identifier(); + + Catalog create(String warehouse, ReadableConfig options); +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java index d4a451ea..451ad8f5 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java @@ -18,7 +18,6 @@ package org.apache.flink.table.store.file.catalog; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -34,9 +33,7 @@ import java.util.concurrent.Callable; import static org.apache.flink.table.store.file.utils.FileUtils.safelyListFileStatus; /** A catalog implementation for {@link FileSystem}. */ -public class FileSystemCatalog implements Catalog { - - public static final String DB_SUFFIX = ".db"; +public class FileSystemCatalog extends AbstractCatalog { private final FileSystem fs; private final Path warehouse; @@ -108,14 +105,9 @@ public class FileSystemCatalog implements Catalog { return tables; } - @Override - public Path getTableLocation(ObjectPath tablePath) { - return tablePath(tablePath); - } - @Override public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException { - Path path = tablePath(tablePath); + Path path = getTableLocation(tablePath); return new SchemaManager(path) .latest() .orElseThrow(() -> new TableNotExistException(tablePath)); @@ -123,7 +115,7 @@ public class FileSystemCatalog implements Catalog { @Override public boolean tableExists(ObjectPath tablePath) { - return tableExists(tablePath(tablePath)); + return tableExists(getTableLocation(tablePath)); } private boolean tableExists(Path tablePath) { @@ -133,7 +125,7 @@ public class FileSystemCatalog implements Catalog { @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException { - Path path = tablePath(tablePath); + Path path = getTableLocation(tablePath); if (!tableExists(path)) { if (ignoreIfNotExists) { return; @@ -152,7 +144,7 @@ public class FileSystemCatalog implements Catalog { throw new DatabaseNotExistException(tablePath.getDatabaseName()); } - Path path = tablePath(tablePath); + Path path = getTableLocation(tablePath); if (tableExists(path)) { if (ignoreIfExists) { return; @@ -167,7 +159,7 @@ public class FileSystemCatalog implements Catalog { @Override public void alterTable(ObjectPath tablePath, UpdateSchema newTable, boolean ignoreIfNotExists) throws TableNotExistException { - Path path = tablePath(tablePath); + Path path = getTableLocation(tablePath); if (!tableExists(path)) { if (ignoreIfNotExists) { return; @@ -196,19 +188,15 @@ public class FileSystemCatalog implements Catalog { return name.substring(0, name.length() - DB_SUFFIX.length()); } - private Path databasePath(String database) { - return new Path(warehouse, database + DB_SUFFIX); - } - - @VisibleForTesting - Path tablePath(ObjectPath objectPath) { - return new Path(databasePath(objectPath.getDatabaseName()), objectPath.getObjectName()); - } - private void commitTableChange(Path tablePath, UpdateSchema table) { uncheck(() -> new SchemaManager(tablePath).commitNewVersion(table)); } @Override public void close() throws Exception {} + + @Override + protected String warehouse() { + return warehouse.toString(); + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java new file mode 100644 index 00000000..c705872e --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalogFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.catalog; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.fs.Path; + +/** Factory to create {@link FileSystemCatalog}. */ +public class FileSystemCatalogFactory implements CatalogFactory { + + public static final String IDENTIFIER = "filesystem"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Catalog create(String warehouse, ReadableConfig options) { + return new FileSystemCatalog(new Path(warehouse)); + } +} diff --git a/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory new file mode 100644 index 00000000..3edb81e1 --- /dev/null +++ b/flink-table-store-core/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.store.file.catalog.FileSystemCatalogFactory diff --git a/flink-table-store-dist/pom.xml b/flink-table-store-dist/pom.xml index f58e91f5..8da30d51 100644 --- a/flink-table-store-dist/pom.xml +++ b/flink-table-store-dist/pom.xml @@ -62,6 +62,12 @@ under the License. <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-catalog</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-store-kafka</artifactId> @@ -97,6 +103,7 @@ under the License. <include>org.apache.flink:flink-table-store-connector</include> <include>org.apache.flink:flink-table-store-core</include> <include>org.apache.flink:flink-table-store-format</include> + <include>org.apache.flink:flink-table-store-hive-catalog</include> <include>org.apache.flink:flink-table-store-kafka</include> <include>org.apache.flink:flink-sql-connector-kafka</include> </includes> diff --git a/flink-table-store-e2e-tests/pom.xml b/flink-table-store-e2e-tests/pom.xml index aefec3cb..85a62b5f 100644 --- a/flink-table-store-e2e-tests/pom.xml +++ b/flink-table-store-e2e-tests/pom.xml @@ -40,7 +40,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-hive</artifactId> + <artifactId>flink-table-store-hive-connector</artifactId> <version>${project.version}</version> </dependency> @@ -105,9 +105,9 @@ under the License. </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-hive</artifactId> + <artifactId>flink-table-store-hive-connector</artifactId> <version>${project.version}</version> - <destFileName>flink-table-store-hive.jar</destFileName> + <destFileName>flink-table-store-hive-connector.jar</destFileName> <type>jar</type> <overWrite>true</overWrite> <outputDirectory>${project.build.directory}/dependencies diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java index 563c121b..be375f42 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/E2eTestBase.java @@ -65,7 +65,8 @@ public abstract class E2eTestBase { } private static final String TABLE_STORE_JAR_NAME = "flink-table-store.jar"; - protected static final String TABLE_STORE_HIVE_JAR_NAME = "flink-table-store-hive.jar"; + protected static final String TABLE_STORE_HIVE_CONNECTOR_JAR_NAME = + "flink-table-store-hive-connector.jar"; private static final String BUNDLED_HADOOP_JAR_NAME = "bundled-hadoop.jar"; protected static final String TEST_DATA_DIR = "/test-data"; @@ -122,7 +123,7 @@ public abstract class E2eTestBase { jobManager.execInContainer("chown", "-R", "flink:flink", TEST_DATA_DIR); copyResource(TABLE_STORE_JAR_NAME); - copyResource(TABLE_STORE_HIVE_JAR_NAME); + copyResource(TABLE_STORE_HIVE_CONNECTOR_JAR_NAME); copyResource(BUNDLED_HADOOP_JAR_NAME); } diff --git a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java index 98ce0893..9a4f7833 100644 --- a/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java +++ b/flink-table-store-e2e-tests/src/test/java/org/apache/flink/table/store/tests/HiveE2eTest.java @@ -35,7 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class HiveE2eTest extends E2eTestBase { private static final String ADD_JAR_HQL = - "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_JAR_NAME + ";"; + "ADD JAR " + TEST_DATA_DIR + "/" + TABLE_STORE_HIVE_CONNECTOR_JAR_NAME + ";"; public HiveE2eTest() { super(false, true); @@ -43,7 +43,6 @@ public class HiveE2eTest extends E2eTestBase { @Test public void testReadExternalTable() throws Exception { - // TODO write data directly to HDFS after FLINK-27562 is solved String tableStorePkDdl = "CREATE TABLE IF NOT EXISTS table_store_pk (\n" + " a int,\n" @@ -54,7 +53,7 @@ public class HiveE2eTest extends E2eTestBase { + " 'bucket' = '2',\n" + " 'root-path' = '%s'\n" + ");"; - String tableStorePkPath = TEST_DATA_DIR + "/" + UUID.randomUUID().toString() + ".store"; + String tableStorePkPath = HDFS_ROOT + "/" + UUID.randomUUID().toString() + ".store"; tableStorePkDdl = String.format(tableStorePkDdl, tableStorePkPath); runSql( "INSERT INTO table_store_pk VALUES " @@ -68,8 +67,6 @@ public class HiveE2eTest extends E2eTestBase { "CREATE EXTERNAL TABLE IF NOT EXISTS table_store_pk\n" + "STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'\n" + "LOCATION '" - // hive cannot read from local path - + HDFS_ROOT + tableStorePkPath + "/default_catalog.catalog/default_database.db/table_store_pk';"; writeSharedFile( @@ -78,13 +75,9 @@ public class HiveE2eTest extends E2eTestBase { ADD_JAR_HQL + "\n" + externalTablePkDdl - + "\n" - + "SELECT b, a, c FROM table_store_pk ORDER BY b;"); + + "\nSELECT b, a, c FROM table_store_pk ORDER BY b;"); ContainerState hive = getHive(); - hive.execInContainer("hdfs", "dfs", "-mkdir", "-p", HDFS_ROOT + TEST_DATA_DIR); - hive.execInContainer( - "hdfs", "dfs", "-copyFromLocal", tableStorePkPath, HDFS_ROOT + tableStorePkPath); Container.ExecResult execResult = hive.execInContainer( "/opt/hive/bin/hive", @@ -99,6 +92,54 @@ public class HiveE2eTest extends E2eTestBase { } } + @Test + public void testFlinkWriteAndHiveRead() throws Exception { + String sql = + String.join( + "\n", + "CREATE CATALOG my_hive WITH (", + " 'type' = 'table-store',", + " 'metastore' = 'hive',", + " 'uri' = 'thrift://hive-metastore:9083',", + " 'warehouse' = '" + + HDFS_ROOT + + "/" + + UUID.randomUUID().toString() + + ".warehouse'", + ");", + "", + "USE CATALOG my_hive;", + "", + "CREATE TABLE T (", + " a int,", + " b bigint,", + " c string", + ") WITH (", + " 'bucket' = '2'", + ");", + "", + "INSERT INTO T VALUES (1, 10, 'Hi'), (2, 20, 'Hello');"); + runSql(sql); + + writeSharedFile( + "query.hql", + // same default database name as Flink + ADD_JAR_HQL + "\nSELECT b, a, c FROM t ORDER BY b;"); + + ContainerState hive = getHive(); + Container.ExecResult execResult = + hive.execInContainer( + "/opt/hive/bin/hive", + "--hiveconf", + "hive.root.logger=INFO,console", + "-f", + TEST_DATA_DIR + "/query.hql"); + assertThat(execResult.getStdout()).isEqualTo("10\t1\tHi\n" + "20\t2\tHello\n"); + if (execResult.getExitCode() != 0) { + throw new AssertionError("Failed when running hive sql."); + } + } + private ContainerState getHive() { return environment.getContainerByServiceName("hive-server_1").get(); } diff --git a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml index 42c89b27..d414d99f 100644 --- a/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml +++ b/flink-table-store-e2e-tests/src/test/resources-filtered/docker-compose.yaml @@ -25,10 +25,10 @@ services: # ---------------------------------------- jobmanager: - image: apache/flink:${flink.version} + image: apache/flink:${flink.version}-java8 volumes: - testdata:/test-data - command: jobmanager + entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh jobmanager" env_file: - ./flink.env networks: @@ -39,10 +39,10 @@ services: - "8081" taskmanager: - image: apache/flink:${flink.version} + image: apache/flink:${flink.version}-java8 volumes: - testdata:/test-data - command: taskmanager + entrypoint: /bin/bash -c "wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && /docker-entrypoint.sh taskmanager" env_file: - ./flink.env networks: diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml similarity index 81% copy from flink-table-store-hive/pom.xml copy to flink-table-store-hive/flink-table-store-hive-catalog/pom.xml index 1d6bb57d..8d5c6c86 100644 --- a/flink-table-store-hive/pom.xml +++ b/flink-table-store-hive/flink-table-store-hive-catalog/pom.xml @@ -23,68 +23,41 @@ under the License. <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>flink-table-store-parent</artifactId> + <artifactId>flink-table-store-hive</artifactId> <groupId>org.apache.flink</groupId> <version>0.2-SNAPSHOT</version> </parent> - <artifactId>flink-table-store-hive</artifactId> - <name>Flink Table Store : Hive</name> + <artifactId>flink-table-store-hive-catalog</artifactId> + <name>Flink Table Store : Hive Catalog</name> <packaging>jar</packaging> - <properties> - <hiverunner.version>4.0.0</hiverunner.version> - <reflections.version>0.9.8</reflections.version> - </properties> - <dependencies> - <!-- Flink All dependencies --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-shade</artifactId> + <artifactId>flink-table-store-core</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-common</artifactId> + <version>${project.version}</version> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> + <artifactId>hive-metastore</artifactId> <version>${hive.version}</version> - <scope>provided</scope> <exclusions> <exclusion> <groupId>log4j</groupId> @@ -110,11 +83,6 @@ under the License. <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> </exclusion> - <!-- this dependency cannot be fetched from central maven repository anymore --> - <exclusion> - <groupId>org.pentaho</groupId> - <artifactId>*</artifactId> - </exclusion> </exclusions> </dependency> @@ -122,26 +90,31 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-core</artifactId> + <artifactId>flink-table-store-connector</artifactId> <version>${project.version}</version> <scope>test</scope> - <type>test-jar</type> </dependency> - <!-- - IDEA reads classes from the same project from target/classes of that module, - so even though we've packaged and shaded avro classes into flink-table-store-format.jar - we still have to include this test dependency here. - --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-connector</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-avro</artifactId> + <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>test</scope> </dependency> - <!-- dependencies for IT cases --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -158,6 +131,34 @@ under the License. <type>test-jar</type> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + <exclusion> + <groupId>org.junit.vintage</groupId> + <artifactId>junit-vintage-engine</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.klarna</groupId> <artifactId>hiverunner</artifactId> @@ -475,11 +476,41 @@ under the License. <goal>shade</goal> </goals> <configuration> + <minimizeJar>true</minimizeJar> <artifactSet> - <includes combine.children="append"> - <include>org.apache.flink:flink-table-store-shade</include> + <includes> + <include>org.apache.flink:flink-table-store-hive-common</include> + <include>org.apache.thrift:libthrift</include> + <include>org.apache.thrift:libfb303</include> + <include>com.google.guava:guava</include> + <include>org.apache.hive:hive-common</include> + <include>org.apache.hive.shims:hive-shims-common</include> + <include>org.apache.hive:hive-serde</include> + <include>org.apache.hive:hive-metastore</include> </includes> </artifactSet> + <relocations> + <relocation> + <pattern>com.facebook.fb303</pattern> + <shadedPattern>org.apache.flink.table.store.shaded.com.facebook.fb303</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.flink.table.store.shaded.com.google.common</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.hadoop.hive</pattern> + <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hadoop.hive</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.hive</pattern> + <shadedPattern>org.apache.flink.table.store.shaded.org.apache.hive</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.thrift</pattern> + <shadedPattern>org.apache.flink.table.store.shaded.org.apache.thrift</shadedPattern> + </relocation> + </relocations> </configuration> </execution> </executions> diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java new file mode 100644 index 00000000..50a3f635 --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive; + +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.store.file.catalog.AbstractCatalog; +import org.apache.flink.table.store.file.schema.DataField; +import org.apache.flink.table.store.file.schema.SchemaManager; +import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.table.store.file.schema.UpdateSchema; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnknownDBException; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.thrift.TException; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.stream.Collectors; + +/** A catalog implementation for Hive. */ +public class HiveCatalog extends AbstractCatalog { + + // we don't include flink-table-store-hive-connector as dependencies because it depends on + // hive-exec + private static final String INPUT_FORMAT_CLASS_NAME = + "org.apache.flink.table.store.mapred.TableStoreInputFormat"; + private static final String OUTPUT_FORMAT_CLASS_NAME = + "org.apache.flink.table.store.mapred.TableStoreOutputFormat"; + private static final String SERDE_CLASS_NAME = + "org.apache.flink.table.store.hive.TableStoreSerDe"; + private static final String STORAGE_HANDLER_CLASS_NAME = + "org.apache.flink.table.store.hive.TableStoreHiveStorageHandler"; + + private final HiveConf hiveConf; + private final IMetaStoreClient client; + + public HiveCatalog(String thriftUri, String warehousePath) { + Configuration conf = new Configuration(); + conf.set(HiveConf.ConfVars.METASTOREURIS.varname, thriftUri); + conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath); + this.hiveConf = new HiveConf(conf, HiveConf.class); + try { + IMetaStoreClient client = + RetryingMetaStoreClient.getProxy( + hiveConf, tbl -> null, HiveMetaStoreClient.class.getName()); + this.client = + StringUtils.isNullOrWhitespaceOnly(thriftUri) + ? client + : HiveMetaStoreClient.newSynchronizedClient(client); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public List<String> listDatabases() { + try { + return client.getAllDatabases(); + } catch (TException e) { + throw new RuntimeException("Failed to list all databases", e); + } + } + + @Override + public boolean databaseExists(String databaseName) { + try { + client.getDatabase(databaseName); + return true; + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException( + "Failed to determine if database " + databaseName + " exists", e); + } + } + + @Override + public void createDatabase(String name, boolean ignoreIfExists) + throws DatabaseAlreadyExistException { + try { + client.createDatabase(convertToDatabase(name)); + } catch (AlreadyExistsException e) { + if (!ignoreIfExists) { + throw new DatabaseAlreadyExistException(name, e); + } + } catch (TException e) { + throw new RuntimeException("Failed to create database " + name, e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + try { + if (!cascade && client.getAllTables(name).size() > 0) { + throw new DatabaseNotEmptyException(name); + } + client.dropDatabase(name, true, false, true); + } catch (NoSuchObjectException | UnknownDBException e) { + if (!ignoreIfNotExists) { + throw new DatabaseNotExistException(name, e); + } + } catch (TException e) { + throw new RuntimeException("Failed to drop database " + name, e); + } + } + + @Override + public List<String> listTables(String databaseName) throws DatabaseNotExistException { + try { + return client.getAllTables(databaseName); + } catch (UnknownDBException e) { + throw new DatabaseNotExistException(databaseName, e); + } catch (TException e) { + throw new RuntimeException("Failed to list all tables in database " + databaseName, e); + } + } + + @Override + public TableSchema getTable(ObjectPath tablePath) throws TableNotExistException { + if (isTableStoreTableNotExisted(tablePath)) { + throw new TableNotExistException(tablePath); + } + Path tableLocation = getTableLocation(tablePath); + return new SchemaManager(tableLocation) + .latest() + .orElseThrow( + () -> new RuntimeException("There is no table stored in " + tableLocation)); + } + + @Override + public boolean tableExists(ObjectPath tablePath) { + try { + client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + return true; + } catch (NoSuchObjectException e) { + return false; + } catch (TException e) { + throw new RuntimeException( + "Failed to determine if table " + tablePath.getFullName() + " exists", e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException { + if (isTableStoreTableNotExisted(tablePath)) { + if (ignoreIfNotExists) { + return; + } else { + throw new TableNotExistException(tablePath); + } + } + + try { + client.dropTable( + tablePath.getDatabaseName(), tablePath.getObjectName(), true, false, true); + } catch (TException e) { + throw new RuntimeException("Failed to drop table " + tablePath.getFullName(), e); + } + } + + @Override + public void createTable(ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + String databaseName = tablePath.getDatabaseName(); + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } else { + throw new TableAlreadyExistException(tablePath); + } + } + + // first commit changes to underlying files + // if changes on Hive fails there is no harm to perform the same changes to files again + TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema); + Table table = newHmsTable(tablePath); + updateHmsTable(table, tablePath, schema); + try { + client.createTable(table); + } catch (TException e) { + throw new RuntimeException("Failed to create table " + tablePath.getFullName(), e); + } + } + + @Override + public void alterTable( + ObjectPath tablePath, UpdateSchema updateSchema, boolean ignoreIfNotExists) + throws TableNotExistException { + if (isTableStoreTableNotExisted(tablePath)) { + if (ignoreIfNotExists) { + return; + } else { + throw new TableNotExistException(tablePath); + } + } + + // first commit changes to underlying files + // if changes on Hive fails there is no harm to perform the same changes to files again + TableSchema schema = commitToUnderlyingFiles(tablePath, updateSchema); + try { + Table table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + updateHmsTable(table, tablePath, schema); + client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table); + } catch (TException e) { + throw new RuntimeException("Failed to alter table " + tablePath.getFullName(), e); + } + } + + @Override + public void close() throws Exception { + client.close(); + } + + @Override + protected String warehouse() { + return hiveConf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname); + } + + private Database convertToDatabase(String name) { + Database database = new Database(); + database.setName(name); + database.setLocationUri(databasePath(name).toString()); + return database; + } + + private Table newHmsTable(ObjectPath tablePath) { + long currentTimeMillis = System.currentTimeMillis(); + Table table = + new Table( + tablePath.getObjectName(), + tablePath.getDatabaseName(), + // current linux user + System.getProperty("user.name"), + (int) (currentTimeMillis / 1000), + (int) (currentTimeMillis / 1000), + Integer.MAX_VALUE, + null, + Collections.emptyList(), + new HashMap<>(), + null, + null, + TableType.MANAGED_TABLE.toString()); + table.getParameters() + .put(hive_metastoreConstants.META_TABLE_STORAGE, STORAGE_HANDLER_CLASS_NAME); + return table; + } + + private void updateHmsTable(Table table, ObjectPath tablePath, TableSchema schema) { + StorageDescriptor sd = convertToStorageDescriptor(tablePath, schema); + table.setSd(sd); + } + + private StorageDescriptor convertToStorageDescriptor(ObjectPath tablePath, TableSchema schema) { + StorageDescriptor sd = new StorageDescriptor(); + + sd.setCols( + schema.fields().stream() + .map(this::convertToFieldSchema) + .collect(Collectors.toList())); + sd.setLocation(getTableLocation(tablePath).toString()); + + sd.setInputFormat(INPUT_FORMAT_CLASS_NAME); + sd.setOutputFormat(OUTPUT_FORMAT_CLASS_NAME); + + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setParameters(new HashMap<>()); + serDeInfo.setSerializationLib(SERDE_CLASS_NAME); + sd.setSerdeInfo(serDeInfo); + + return sd; + } + + private FieldSchema convertToFieldSchema(DataField dataField) { + return new FieldSchema( + dataField.name(), + HiveTypeUtils.logicalTypeToTypeInfo(dataField.type().logicalType()).getTypeName(), + dataField.description()); + } + + private boolean isTableStoreTableNotExisted(ObjectPath tablePath) { + Table table; + try { + table = client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName()); + } catch (NoSuchObjectException e) { + return true; + } catch (TException e) { + throw new RuntimeException( + "Cannot determine if table " + + tablePath.getFullName() + + " is a table store table.", + e); + } + + if (!INPUT_FORMAT_CLASS_NAME.equals(table.getSd().getInputFormat()) + || !OUTPUT_FORMAT_CLASS_NAME.equals(table.getSd().getOutputFormat())) { + throw new IllegalArgumentException( + "Table " + + tablePath.getFullName() + + " is not a table store table. It's input format is " + + table.getSd().getInputFormat() + + " and its output format is " + + table.getSd().getOutputFormat()); + } + return false; + } + + private TableSchema commitToUnderlyingFiles(ObjectPath tablePath, UpdateSchema schema) { + Path path = getTableLocation(tablePath); + try { + return new SchemaManager(path).commitNewVersion(schema); + } catch (Exception e) { + throw new RuntimeException( + "Failed to commit changes of table " + + tablePath.getFullName() + + " to underlying files", + e); + } + } +} diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java new file mode 100644 index 00000000..1e2cbabb --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalogFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.store.file.catalog.Catalog; +import org.apache.flink.table.store.file.catalog.CatalogFactory; +import org.apache.flink.util.Preconditions; + +/** Factory to create {@link HiveCatalog}. */ +public class HiveCatalogFactory implements CatalogFactory { + + private static final String IDENTIFIER = "hive"; + + private static final ConfigOption<String> URI = + ConfigOptions.key("uri") + .stringType() + .noDefaultValue() + .withDescription("Uri of Hive metastore's thrift server."); + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Catalog create(String warehouse, ReadableConfig options) { + String uri = + Preconditions.checkNotNull( + options.get(URI), + URI.key() + " must be set for table store " + IDENTIFIER + " catalog"); + return new HiveCatalog(uri, warehouse); + } +} diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory new file mode 100644 index 00000000..e9404fcd --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/resources/META-INF/services/org.apache.flink.table.store.file.catalog.CatalogFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.store.hive.HiveCatalogFactory diff --git a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java new file mode 100644 index 00000000..1b5654b7 --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive; + +import org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.ExceptionUtils; + +import com.klarna.hiverunner.HiveShell; +import com.klarna.hiverunner.annotations.HiveSQL; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** IT cases for {@link HiveCatalog}. */ +@RunWith(FlinkEmbeddedHiveRunner.class) +public class HiveCatalogITCase { + + @Rule public TemporaryFolder folder = new TemporaryFolder(); + + private String path; + private TableEnvironment tEnv; + + @HiveSQL(files = {}) + private static HiveShell hiveShell; + + @Before + public void before() throws Exception { + hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db"); + hiveShell.execute("USE test_db"); + hiveShell.execute("CREATE TABLE hive_table ( a INT, b STRING )"); + hiveShell.execute("INSERT INTO hive_table VALUES (100, 'Hive'), (200, 'Table')"); + + path = folder.newFolder().toURI().toString(); + EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); + tEnv = TableEnvironmentImpl.create(settings); + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG my_hive WITH (", + " 'type' = 'table-store',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG my_hive").await(); + tEnv.executeSql("USE test_db").await(); + } + + @After + public void after() { + hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE"); + hiveShell.execute("DROP DATABASE IF EXISTS test_db2 CASCADE"); + } + + @Test + public void testDatabaseOperations() throws Exception { + // create database + tEnv.executeSql("CREATE DATABASE test_db2").await(); + Assert.assertEquals( + Arrays.asList(Row.of("default"), Row.of("test_db"), Row.of("test_db2")), + collect("SHOW DATABASES")); + tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db2").await(); + try { + tEnv.executeSql("CREATE DATABASE test_db2").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Database test_db2 already exists in Catalog my_hive"); + } + + // drop database + tEnv.executeSql("DROP DATABASE test_db2").await(); + Assert.assertEquals( + Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES")); + tEnv.executeSql("DROP DATABASE IF EXISTS test_db2").await(); + try { + tEnv.executeSql("DROP DATABASE test_db2").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Database test_db2 does not exist in Catalog my_hive"); + } + + // drop non-empty database + tEnv.executeSql("CREATE DATABASE test_db2").await(); + tEnv.executeSql("USE test_db2").await(); + tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await(); + Path tablePath = new Path(path, "test_db2.db/T"); + Assert.assertTrue(tablePath.getFileSystem().exists(tablePath)); + try { + tEnv.executeSql("DROP DATABASE test_db2").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Database test_db2 in catalog my_hive is not empty"); + } + tEnv.executeSql("DROP DATABASE test_db2 CASCADE").await(); + Assert.assertEquals( + Arrays.asList(Row.of("default"), Row.of("test_db")), collect("SHOW DATABASES")); + Assert.assertFalse(tablePath.getFileSystem().exists(tablePath)); + } + + @Test + public void testTableOperations() throws Exception { + // create table + tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + Assert.assertEquals( + Arrays.asList(Row.of("hive_table"), Row.of("s"), Row.of("t")), + collect("SHOW TABLES")); + tEnv.executeSql( + "CREATE TABLE IF NOT EXISTS S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + try { + tEnv.executeSql("CREATE TABLE S ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table (or view) test_db.S already exists in Catalog my_hive"); + } + + // drop table + tEnv.executeSql("INSERT INTO S VALUES (1, 'Hi'), (2, 'Hello')").await(); + Path tablePath = new Path(path, "test_db.db/S"); + Assert.assertTrue(tablePath.getFileSystem().exists(tablePath)); + tEnv.executeSql("DROP TABLE S").await(); + Assert.assertEquals( + Arrays.asList(Row.of("hive_table"), Row.of("t")), collect("SHOW TABLES")); + Assert.assertFalse(tablePath.getFileSystem().exists(tablePath)); + tEnv.executeSql("DROP TABLE IF EXISTS S").await(); + try { + tEnv.executeSql("DROP TABLE S").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table with identifier 'my_hive.test_db.S' does not exist"); + } + try { + tEnv.executeSql("DROP TABLE hive_table").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table test_db.hive_table is not a table store table"); + } + + // alter table + tEnv.executeSql("ALTER TABLE T SET ( 'manifest.target-file-size' = '16MB' )").await(); + List<Row> actual = collect("SHOW CREATE TABLE T"); + Assert.assertEquals(1, actual.size()); + Assert.assertTrue( + actual.get(0) + .getField(0) + .toString() + .contains("'manifest.target-file-size' = '16MB'")); + try { + tEnv.executeSql("ALTER TABLE S SET ( 'manifest.target-file-size' = '16MB' )").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table `my_hive`.`test_db`.`S` doesn't exist or is a temporary table"); + } + try { + tEnv.executeSql("ALTER TABLE hive_table SET ( 'manifest.target-file-size' = '16MB' )") + .await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table test_db.hive_table is not a table store table"); + } + } + + @Test + public void testFlinkWriteAndHiveRead() throws Exception { + tEnv.executeSql("CREATE TABLE T ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + tEnv.executeSql("INSERT INTO T VALUES (1, 'Hi'), (2, 'Hello')").await(); + Assert.assertEquals( + Arrays.asList("1\tHi", "2\tHello"), + hiveShell.executeQuery("SELECT * FROM t ORDER BY a")); + + try { + tEnv.executeSql("INSERT INTO hive_table VALUES (1, 'Hi'), (2, 'Hello')").await(); + Assert.fail("No exception is thrown"); + } catch (Throwable t) { + ExceptionUtils.assertThrowableWithMessage( + t, "Table test_db.hive_table is not a table store table"); + } + } + + private List<Row> collect(String sql) throws Exception { + List<Row> result = new ArrayList<>(); + try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) { + while (it.hasNext()) { + result.add(it.next()); + } + } + return result; + } +} diff --git a/flink-table-store-hive/flink-table-store-hive-common/pom.xml b/flink-table-store-hive/flink-table-store-hive-common/pom.xml new file mode 100644 index 00000000..a87aa767 --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-common/pom.xml @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table-store-hive</artifactId> + <groupId>org.apache.flink</groupId> + <version>0.2-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-store-hive-common</artifactId> + <name>Flink Table Store : Hive Common</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> +</project> diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java similarity index 55% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java rename to flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java index d6061b12..b9af52f6 100644 --- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java +++ b/flink-table-store-hive/flink-table-store-hive-common/src/main/java/org/apache/flink/table/store/hive/HiveTypeUtils.java @@ -18,14 +18,6 @@ package org.apache.flink.table.store.hive; -import org.apache.flink.table.store.hive.objectinspector.TableStoreCharObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreDateObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreDecimalObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreListObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreMapObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreStringObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreTimestampObjectInspector; -import org.apache.flink.table.store.hive.objectinspector.TableStoreVarcharObjectInspector; import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DecimalType; @@ -33,9 +25,6 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.VarCharType; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -93,48 +82,4 @@ public class HiveTypeUtils { "Unsupported logical type " + logicalType.asSummaryString()); } } - - public static ObjectInspector getObjectInspector(LogicalType logicalType) { - switch (logicalType.getTypeRoot()) { - case BOOLEAN: - case TINYINT: - case SMALLINT: - case INTEGER: - case BIGINT: - case FLOAT: - case DOUBLE: - case BINARY: - case VARBINARY: - return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( - (PrimitiveTypeInfo) logicalTypeToTypeInfo(logicalType)); - case DECIMAL: - DecimalType decimalType = (DecimalType) logicalType; - return new TableStoreDecimalObjectInspector( - decimalType.getPrecision(), decimalType.getScale()); - case CHAR: - CharType charType = (CharType) logicalType; - return new TableStoreCharObjectInspector(charType.getLength()); - case VARCHAR: - VarCharType varCharType = (VarCharType) logicalType; - if (varCharType.getLength() == VarCharType.MAX_LENGTH) { - return new TableStoreStringObjectInspector(); - } else { - return new TableStoreVarcharObjectInspector(varCharType.getLength()); - } - case DATE: - return new TableStoreDateObjectInspector(); - case TIMESTAMP_WITHOUT_TIME_ZONE: - return new TableStoreTimestampObjectInspector(); - case ARRAY: - ArrayType arrayType = (ArrayType) logicalType; - return new TableStoreListObjectInspector(arrayType.getElementType()); - case MAP: - MapType mapType = (MapType) logicalType; - return new TableStoreMapObjectInspector( - mapType.getKeyType(), mapType.getValueType()); - default: - throw new UnsupportedOperationException( - "Unsupported logical type " + logicalType.asSummaryString()); - } - } } diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml similarity index 97% copy from flink-table-store-hive/pom.xml copy to flink-table-store-hive/flink-table-store-hive-connector/pom.xml index 1d6bb57d..23598562 100644 --- a/flink-table-store-hive/pom.xml +++ b/flink-table-store-hive/flink-table-store-hive-connector/pom.xml @@ -23,22 +23,23 @@ under the License. <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>flink-table-store-parent</artifactId> + <artifactId>flink-table-store-hive</artifactId> <groupId>org.apache.flink</groupId> <version>0.2-SNAPSHOT</version> </parent> - <artifactId>flink-table-store-hive</artifactId> - <name>Flink Table Store : Hive</name> + <artifactId>flink-table-store-hive-connector</artifactId> + <name>Flink Table Store : Hive Connector</name> <packaging>jar</packaging> - <properties> - <hiverunner.version>4.0.0</hiverunner.version> - <reflections.version>0.9.8</reflections.version> - </properties> - <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-store-hive-common</artifactId> + <version>${project.version}</version> + </dependency> + <!-- Flink All dependencies --> <dependency> <groupId>org.apache.flink</groupId> @@ -477,6 +478,7 @@ under the License. <configuration> <artifactSet> <includes combine.children="append"> + <include>org.apache.flink:flink-table-store-hive-common</include> <include>org.apache.flink:flink-table-store-shade</include> </includes> </artifactSet> diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/RowDataContainer.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/RowDataContainer.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/SearchArgumentToPredicateConverter.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/TableStoreJobConf.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveMetaHook.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandler.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/TableStoreSerDe.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java similarity index 95% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java index 02b0e724..225d2ed2 100644 --- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspector.java @@ -19,7 +19,6 @@ package org.apache.flink.table.store.hive.objectinspector; import org.apache.flink.table.data.ArrayData; -import org.apache.flink.table.store.hive.HiveTypeUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.hadoop.hive.serde.serdeConstants; @@ -41,7 +40,7 @@ public class TableStoreListObjectInspector implements ListObjectInspector { private final ArrayData.ElementGetter elementGetter; public TableStoreListObjectInspector(LogicalType elementType) { - this.elementObjectInspector = HiveTypeUtils.getObjectInspector(elementType); + this.elementObjectInspector = TableStoreObjectInspectorFactory.create(elementType); this.elementGetter = ArrayData.createElementGetter(elementType); } diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java similarity index 94% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java index 4b4d606f..c37d3f2e 100644 --- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspector.java @@ -20,7 +20,6 @@ package org.apache.flink.table.store.hive.objectinspector; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.MapData; -import org.apache.flink.table.store.hive.HiveTypeUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.hadoop.hive.serde.serdeConstants; @@ -45,8 +44,8 @@ public class TableStoreMapObjectInspector implements MapObjectInspector { private final ArrayData.ElementGetter valueGetter; public TableStoreMapObjectInspector(LogicalType keyType, LogicalType valueType) { - this.keyObjectInspector = HiveTypeUtils.getObjectInspector(keyType); - this.valueObjectInspector = HiveTypeUtils.getObjectInspector(valueType); + this.keyObjectInspector = TableStoreObjectInspectorFactory.create(keyType); + this.valueObjectInspector = TableStoreObjectInspectorFactory.create(valueType); this.keyGetter = ArrayData.createElementGetter(keyType); this.valueGetter = ArrayData.createElementGetter(valueType); } diff --git a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java new file mode 100644 index 00000000..dca50ca0 --- /dev/null +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreObjectInspectorFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.hive.objectinspector; + +import org.apache.flink.table.store.hive.HiveTypeUtils; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.CharType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.VarCharType; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; + +/** Factory to create {@link ObjectInspector}s according to the given {@link LogicalType}. */ +public class TableStoreObjectInspectorFactory { + + public static ObjectInspector create(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case BINARY: + case VARBINARY: + return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector( + (PrimitiveTypeInfo) HiveTypeUtils.logicalTypeToTypeInfo(logicalType)); + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + return new TableStoreDecimalObjectInspector( + decimalType.getPrecision(), decimalType.getScale()); + case CHAR: + CharType charType = (CharType) logicalType; + return new TableStoreCharObjectInspector(charType.getLength()); + case VARCHAR: + VarCharType varCharType = (VarCharType) logicalType; + if (varCharType.getLength() == VarCharType.MAX_LENGTH) { + return new TableStoreStringObjectInspector(); + } else { + return new TableStoreVarcharObjectInspector(varCharType.getLength()); + } + case DATE: + return new TableStoreDateObjectInspector(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return new TableStoreTimestampObjectInspector(); + case ARRAY: + ArrayType arrayType = (ArrayType) logicalType; + return new TableStoreListObjectInspector(arrayType.getElementType()); + case MAP: + MapType mapType = (MapType) logicalType; + return new TableStoreMapObjectInspector( + mapType.getKeyType(), mapType.getValueType()); + default: + throw new UnsupportedOperationException( + "Unsupported logical type " + logicalType.asSummaryString()); + } + } +} diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java similarity index 98% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java index 9545ccdc..a783be83 100644 --- a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspector.java @@ -51,7 +51,7 @@ public class TableStoreRowDataObjectInspector extends StructObjectInspector { TableStoreStructField structField = new TableStoreStructField( name, - HiveTypeUtils.getObjectInspector(logicalType), + TableStoreObjectInspectorFactory.create(logicalType), i, RowData.createFieldGetter(logicalType, i), fieldComments.get(i)); diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspector.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputFormat.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreInputSplit.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreOutputFormat.java diff --git a/flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java similarity index 100% rename from flink-table-store-hive/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/mapred/TableStoreRecordReader.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/FileStoreTestUtils.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/SearchArgumentToPredicateConverterTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/HiveTableSchemaTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/RandomGenericRowDataGenerator.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java similarity index 99% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java index 5b518d50..d6e87bf7 100644 --- a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java +++ b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreHiveStorageHandlerITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.store.FileStoreTestUtils; import org.apache.flink.table.store.file.FileStoreOptions; +import org.apache.flink.table.store.hive.objectinspector.TableStoreObjectInspectorFactory; import org.apache.flink.table.store.table.FileStoreTable; import org.apache.flink.table.store.table.sink.TableWrite; import org.apache.flink.table.types.logical.LogicalType; @@ -225,7 +226,7 @@ public class TableStoreHiveStorageHandlerITCase { continue; } ObjectInspector oi = - HiveTypeUtils.getObjectInspector( + TableStoreObjectInspectorFactory.create( RandomGenericRowDataGenerator.LOGICAL_TYPES.get(i)); switch (oi.getCategory()) { case PRIMITIVE: diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/TableStoreSerDeTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreCharObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDateObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreDecimalObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreListObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreMapObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreRowDataObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreStringObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreTimestampObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/hive/objectinspector/TableStoreVarcharObjectInspectorTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreInputSplitTest.java diff --git a/flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java b/flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java similarity index 100% rename from flink-table-store-hive/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/java/org/apache/flink/table/store/mapred/TableStoreRecordReaderTest.java diff --git a/flink-table-store-hive/src/test/resources/log4j2-test.properties b/flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties similarity index 100% rename from flink-table-store-hive/src/test/resources/log4j2-test.properties rename to flink-table-store-hive/flink-table-store-hive-connector/src/test/resources/log4j2-test.properties diff --git a/flink-table-store-hive/pom.xml b/flink-table-store-hive/pom.xml index 1d6bb57d..ebf590fb 100644 --- a/flink-table-store-hive/pom.xml +++ b/flink-table-store-hive/pom.xml @@ -31,459 +31,18 @@ under the License. <artifactId>flink-table-store-hive</artifactId> <name>Flink Table Store : Hive</name> - <packaging>jar</packaging> + <packaging>pom</packaging> + + <modules> + <module>flink-table-store-hive-catalog</module> + <module>flink-table-store-hive-common</module> + <module>flink-table-store-hive-connector</module> + </modules> <properties> + <hive.version>2.3.4</hive.version> <hiverunner.version>4.0.0</hiverunner.version> <reflections.version>0.9.8</reflections.version> </properties> - <dependencies> - <!-- Flink All dependencies --> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-shade</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <version>${hive.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - </exclusion> - <!-- this dependency cannot be fetched from central maven repository anymore --> - <exclusion> - <groupId>org.pentaho</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - - <!-- test dependencies --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-store-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <!-- - IDEA reads classes from the same project from target/classes of that module, - so even though we've packaged and shaded avro classes into flink-table-store-format.jar - we still have to include this test dependency here. - --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-sql-avro</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - - <!-- dependencies for IT cases --> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-hive_${scala.binary.version}</artifactId> - <version>${flink.version}</version> - <scope>test</scope> - <type>test-jar</type> - </dependency> - - <dependency> - <groupId>com.klarna</groupId> - <artifactId>hiverunner</artifactId> - <version>${hiverunner.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-jdbc</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-service</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-contrib</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-hcatalog-core</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-webhcat-java-client</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.tez</groupId> - <artifactId>tez-common</artifactId> - </exclusion> - <exclusion> - <!-- This dependency is no longer shipped with the JDK since Java 9.--> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - <exclusion> - <artifactId>hadoop-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-auth</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-annotations</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-hdfs</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-api</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-client</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-web-proxy</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-shim</artifactId> - <groupId>org.apache.tez</groupId> - </exclusion> - <exclusion> - <artifactId>jms</artifactId> - <groupId>javax.jms</groupId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.reflections</groupId> - <artifactId>reflections</artifactId> - <version>${reflections.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-service</artifactId> - <version>${hive.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-metastore</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <!-- This dependency is no longer shipped with the JDK since Java 9.--> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - <exclusion> - <artifactId>hadoop-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-auth</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-client</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-annotations</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-hdfs</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-api</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-registry</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hbase-hadoop-compat</artifactId> - <groupId>org.apache.hbase</groupId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-hcatalog-core</artifactId> - <version>${hive.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - </exclusion> - <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </exclusion> - <exclusion> - <artifactId>hadoop-common</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-archives</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-annotations</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-hdfs</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>apache-log4j-extras</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-webhcat-java-client</artifactId> - <version>${hive.version}</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>jdk.tools</groupId> - <artifactId>jdk.tools</artifactId> - </exclusion> - <exclusion> - <artifactId>jms</artifactId> - <groupId>javax.jms</groupId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>org.pentaho</groupId> - <artifactId>pentaho-aggdesigner-algorithm</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit4.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.junit.vintage</groupId> - <artifactId>junit-vintage-engine</artifactId> - <version>${junit5.version}</version> - <exclusions> - <exclusion> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </exclusion> - </exclusions> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <artifactSet> - <includes combine.children="append"> - <include>org.apache.flink:flink-table-store-shade</include> - </includes> - </artifactSet> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project> +</project> \ No newline at end of file diff --git a/pom.xml b/pom.xml index d2f88bc4..92d48714 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,6 @@ under the License. <flink.shaded.version>15.0</flink.shaded.version> <flink.shaded.jackson.version>2.12.4</flink.shaded.jackson.version> <hadoop.version>2.8.5</hadoop.version> - <hive.version>2.3.4</hive.version> <scala.version>2.12.7</scala.version> <scala.binary.version>2.12</scala.binary.version> <snappy.version>1.1.8.3</snappy.version>