This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e670b92 [FLINK-13930][hive] Support Hive version 3.1.x e670b92 is described below commit e670b929ae124e04c6cdec560d4432ae9c561ff9 Author: Xuefu Zhang <xuef...@alibaba-inc.com> AuthorDate: Fri Aug 30 11:23:57 2019 -0700 [FLINK-13930][hive] Support Hive version 3.1.x Support Hive 3.1.x versions, including 3.1.0, 3.1.1, and 3.1.2. Those are latest and newest versions from Hive community. This closes #9580. --- flink-connectors/flink-connector-hive/pom.xml | 13 +- .../flink/connectors/hive/HiveTableFactory.java | 10 +- .../connectors/hive/HiveTableOutputFormat.java | 6 +- .../flink/connectors/hive/HiveTableSink.java | 8 +- .../flink/table/catalog/hive/HiveCatalog.java | 15 ++- .../flink/table/catalog/hive/client/HiveShim.java | 50 ++++++++ .../table/catalog/hive/client/HiveShimLoader.java | 12 ++ .../table/catalog/hive/client/HiveShimV120.java | 51 ++++++++ .../table/catalog/hive/client/HiveShimV310.java | 114 ++++++++++++++++++ .../table/catalog/hive/client/HiveShimV311.java | 26 ++++ .../table/catalog/hive/client/HiveShimV312.java | 26 ++++ .../catalog/hive/util/HiveReflectionUtils.java | 131 +++++++++++++++++++++ .../table/functions/hive/HiveGenericUDAF.java | 3 +- .../flink/table/functions/hive/HiveGenericUDF.java | 10 +- .../table/functions/hive/HiveGenericUDTF.java | 9 +- .../flink/table/functions/hive/HiveSimpleUDF.java | 7 +- .../functions/hive/conversion/HiveInspectors.java | 22 ++-- .../connectors/hive/FlinkStandaloneHiveRunner.java | 6 +- .../hive/FlinkStandaloneHiveServerContext.java | 4 +- .../connectors/hive/HiveRunnerShimLoader.java | 3 + .../table/functions/hive/HiveGenericUDFTest.java | 18 +-- .../table/functions/hive/HiveGenericUDTFTest.java | 7 +- .../table/functions/hive/HiveSimpleUDFTest.java | 29 ++--- 23 files changed, 507 insertions(+), 73 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 66cbc19..471e045 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -115,10 +115,6 @@ under the License. <artifactId>commons-lang</artifactId> </exclusion> <exclusion> - <groupId>com.zaxxer</groupId> - <artifactId>HikariCP</artifactId> - </exclusion> - <exclusion> <groupId>co.cask.tephra</groupId> <artifactId>tephra-api</artifactId> </exclusion> @@ -616,7 +612,7 @@ under the License. <scope>test</scope> </dependency> - <dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.binary.version}</artifactId> <version>${project.version}</version> @@ -671,6 +667,13 @@ under the License. </properties> </profile> <profile> + <id>hive-3.1.1</id> + <properties> + <hive.version>3.1.1</hive.version> + </properties> + + </profile> + <profile> <id>skip-hive-tests</id> <build> <plugins> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java index 73626d0..235919c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java @@ -25,6 +25,8 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.config.CatalogConfig; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; import org.apache.flink.table.factories.FunctionDefinitionFactory; import org.apache.flink.table.factories.TableFactoryUtil; @@ -71,6 +73,7 @@ public class HiveTableFactory private final HiveConf hiveConf; private final String hiveVersion; + private final HiveShim hiveShim; public HiveTableFactory(HiveConf hiveConf) { this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be null"); @@ -78,6 +81,7 @@ public class HiveTableFactory // this has to come from hiveConf, otherwise we may lose what user specifies in the yaml file this.hiveVersion = checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), "Hive version is not defined"); + this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); } @Override @@ -166,19 +170,19 @@ public class HiveTableFactory return new ScalarFunctionDefinition( name, - new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName)) + new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim) ); } else if (GenericUDF.class.isAssignableFrom(clazz)) { LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name); return new ScalarFunctionDefinition( name, - new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName)) + new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim) ); } else if (GenericUDTF.class.isAssignableFrom(clazz)) { LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name); - HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName)); + HiveGenericUDTF udtf = new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim); return new TableFunctionDefinition( name, diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java index ec0b41d..0760ae5 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java @@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -224,12 +223,13 @@ public class HiveTableOutputFormat extends HadoopOutputFormatCommonBase<Row> imp } if (isPartitioned) { if (isDynamicPartition) { - FileStatus[] generatedParts = HiveStatsUtils.getFileStatusRecurse(stagingDir, + HiveShim hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); + FileStatus[] generatedParts = hiveShim.getFileStatusRecurse(stagingDir, partitionColumns.size() - hiveTablePartition.getPartitionSpec().size(), fs); for (FileStatus part : generatedParts) { commitJob(part.getPath().toString()); LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>(); - Warehouse.makeSpecFromName(fullPartSpec, part.getPath()); + hiveShim.makeSpecFromName(fullPartSpec, part.getPath()); loadPartition(part.getPath(), table, fullPartSpec, client); } } else { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 2974de0..d23a3ed 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -26,7 +26,10 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.sinks.OutputFormatTableSink; import org.apache.flink.table.sinks.OverwritableTableSink; import org.apache.flink.table.sinks.PartitionableTableSink; @@ -40,7 +43,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; @@ -66,6 +68,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti private final ObjectPath tablePath; private final TableSchema tableSchema; private final String hiveVersion; + private final HiveShim hiveShim; private Map<String, String> staticPartitionSpec = Collections.emptyMap(); @@ -77,6 +80,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti this.catalogTable = table; hiveVersion = Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION), "Hive version is not defined"); + hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); tableSchema = table.getSchema(); } @@ -120,7 +124,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> implements Partiti tablePath, catalogTable, hiveTablePartition, - MetaStoreUtils.getTableMetadata(table), + HiveReflectionUtils.getTableMetadata(hiveShim, table), overwrite); } catch (TException e) { throw new CatalogException("Failed to query Hive metaStore", e); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index dd50ce2..7914c89 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -53,7 +53,10 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.exceptions.TablePartitionedException; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.hive.util.HiveStatsUtil; import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; @@ -63,7 +66,6 @@ import org.apache.flink.util.StringUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -121,6 +123,7 @@ public class HiveCatalog extends AbstractCatalog { private final HiveConf hiveConf; private final String hiveVersion; + private final HiveShim hiveShim; private HiveMetastoreClientWrapper client; @@ -138,6 +141,7 @@ public class HiveCatalog extends AbstractCatalog { this.hiveConf = hiveConf == null ? createHiveConf(null) : hiveConf; checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null or empty"); this.hiveVersion = hiveVersion; + hiveShim = HiveShimLoader.loadHiveShim(hiveVersion); // add this to hiveConf to make sure table factory and source/sink see the same Hive version as HiveCatalog this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, hiveVersion); @@ -494,7 +498,7 @@ public class HiveCatalog extends AbstractCatalog { } } - private static CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) { + private CatalogBaseTable instantiateCatalogTable(Table hiveTable, HiveConf hiveConf) { boolean isView = TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW; // Table properties @@ -515,8 +519,8 @@ public class HiveCatalog extends AbstractCatalog { } else { // get schema from deserializer try { - fields = MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(), - MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true)); + fields = HiveReflectionUtils.getFieldsFromDeserializer(hiveShim, hiveTable.getTableName(), + HiveReflectionUtils.getDeserializer(hiveShim, hiveConf, hiveTable, true)); } catch (SerDeException | MetaException e) { throw new CatalogException("Failed to get Hive table schema from deserializer", e); } @@ -735,7 +739,8 @@ public class HiveCatalog extends AbstractCatalog { try { // partition spec can be partial - List<String> partialVals = MetaStoreUtils.getPvals(hiveTable.getPartitionKeys(), partitionSpec.getPartitionSpec()); + List<String> partialVals = HiveReflectionUtils.getPvals(hiveShim, hiveTable.getPartitionKeys(), + partitionSpec.getPartitionSpec()); return client.listPartitionNames(tablePath.getDatabaseName(), tablePath.getObjectName(), partialVals, (short) -1).stream().map(HiveCatalog::createPartitionSpec).collect(Collectors.toList()); } catch (TException e) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index 33e9708..1f39d84 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog.hive.client; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -36,6 +37,7 @@ import org.apache.thrift.TException; import java.io.IOException; import java.util.List; +import java.util.Map; /** * A shim layer to support different versions of Hive. @@ -109,4 +111,52 @@ public interface HiveShim { */ SimpleGenericUDAFParameterInfo createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing, boolean distinct, boolean allColumns); + + /** + * Get the class of Hive's MetaStoreUtils because its package name was changed in Hive 3.1.0. + * + * @return MetaStoreUtils class + */ + Class<?> getMetaStoreUtilsClass(); + + /** + * Get the class of Hive's HiveMetaStoreUtils as it was split from MetaStoreUtils class in Hive 3.1.0. + * + * @return HiveMetaStoreUtils class + */ + Class<?> getHiveMetaStoreUtilsClass(); + + /** + * Hive Date data type class was changed in Hive 3.1.0. + * + * @return Hive's Date class + */ + Class<?> getDateDataTypeClass(); + + /** + * Hive Timestamp data type class was changed in Hive 3.1.0. + * + * @return Hive's Timestamp class + */ + Class<?> getTimestampDataTypeClass(); + + /** + * The return type of HiveStatsUtils.getFileStatusRecurse was changed from array to List in Hive 3.1.0. + * + * @param path the path of the directory + * @param level the level of recursion + * @param fs the file system of the directory + * @return an array of the entries + * @throws IOException in case of any io error + */ + FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException; + + /** + * The signature of HiveStatsUtils.makeSpecFromName() was changed in Hive 3.1.0. + * + * @param partSpec partition specs + * @param currPath the current path + */ + void makeSpecFromName(Map<String, String> partSpec, Path currPath); + } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java index 802f2bd..fec8b7c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java @@ -44,6 +44,9 @@ public class HiveShimLoader { public static final String HIVE_VERSION_V2_3_4 = "2.3.4"; public static final String HIVE_VERSION_V2_3_5 = "2.3.5"; public static final String HIVE_VERSION_V2_3_6 = "2.3.6"; + public static final String HIVE_VERSION_V3_1_0 = "3.1.0"; + public static final String HIVE_VERSION_V3_1_1 = "3.1.1"; + public static final String HIVE_VERSION_V3_1_2 = "3.1.2"; private static final Map<String, HiveShim> hiveShims = new ConcurrentHashMap<>(2); @@ -90,6 +93,15 @@ public class HiveShimLoader { if (v.startsWith(HIVE_VERSION_V2_3_6)) { return new HiveShimV236(); } + if (v.startsWith(HIVE_VERSION_V3_1_0)) { + return new HiveShimV310(); + } + if (v.startsWith(HIVE_VERSION_V3_1_1)) { + return new HiveShimV311(); + } + if (v.startsWith(HIVE_VERSION_V3_1_2)) { + return new HiveShimV312(); + } throw new CatalogException("Unsupported Hive version " + v); }); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java index a3f30ff..d1ed444 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java @@ -21,13 +21,16 @@ package org.apache.flink.table.catalog.hive.client; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; @@ -45,6 +48,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Shim for Hive version 1.2.0. @@ -143,4 +147,51 @@ public class HiveShimV120 implements HiveShim { throw new CatalogException("Failed to create SimpleGenericUDAFParameterInfo", e); } } + + @Override + public Class<?> getMetaStoreUtilsClass() { + try { + return Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils"); + } catch (ClassNotFoundException e) { + throw new CatalogException("Failed to find class MetaStoreUtils", e); + } + } + + @Override + public Class<?> getHiveMetaStoreUtilsClass() { + return getMetaStoreUtilsClass(); + } + + @Override + public Class<?> getDateDataTypeClass() { + return java.sql.Date.class; + } + + @Override + public Class<?> getTimestampDataTypeClass() { + return java.sql.Timestamp.class; + } + + @Override + public FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException { + try { + Method method = HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, Integer.TYPE, FileSystem.class); + // getFileStatusRecurse is a static method + return (FileStatus[]) method.invoke(null, path, level, fs); + } catch (Exception ex) { + throw new CatalogException("Failed to invoke HiveStatsUtils.getFileStatusRecurse()", ex); + } + } + + @Override + public void makeSpecFromName(Map<String, String> partSpec, Path currPath) { + try { + Method method = Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class); + // makeSpecFromName is a static method + method.invoke(null, partSpec, currPath); + } catch (Exception ex) { + throw new CatalogException("Failed to invoke Warehouse.makeSpecFromName()", ex); + } + } + } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java new file mode 100644 index 0000000..62438d4 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.client; + +import org.apache.flink.table.catalog.exceptions.CatalogException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.HiveStatsUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Shim for Hive version 3.1.0. + */ +public class HiveShimV310 extends HiveShimV235 { + + @Override + public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) { + try { + Method method = RetryingMetaStoreClient.class.getMethod("getProxy", Configuration.class, Boolean.TYPE); + // getProxy is a static method + return (IMetaStoreClient) method.invoke(null, hiveConf, true); + } catch (Exception ex) { + throw new CatalogException("Failed to create Hive Metastore client", ex); + } + } + + @Override + public Class<?> getMetaStoreUtilsClass() { + try { + return Class.forName("org.apache.hadoop.hive.metastore.utils.MetaStoreUtils"); + } catch (ClassNotFoundException e) { + throw new CatalogException("Failed to find class MetaStoreUtils", e); + } + } + + @Override + public Class<?> getHiveMetaStoreUtilsClass() { + try { + return Class.forName("org.apache.hadoop.hive.metastore.HiveMetaStoreUtils"); + } catch (ClassNotFoundException e) { + throw new CatalogException("Failed to find class HiveMetaStoreUtils", e); + } + } + + @Override + public Class<?> getDateDataTypeClass() { + try { + return Class.forName("org.apache.hadoop.hive.common.type.Date"); + } catch (ClassNotFoundException e) { + throw new CatalogException("Failed to find class org.apache.hadoop.hive.common.type.Date", e); + } + } + + @Override + public Class<?> getTimestampDataTypeClass() { + try { + return Class.forName("org.apache.hadoop.hive.common.type.Timestamp"); + } catch (ClassNotFoundException e) { + throw new CatalogException("Failed to find class org.apache.hadoop.hive.common.type.Timestamp", e); + } + } + + @Override + public FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) throws IOException { + try { + Method method = HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, Integer.TYPE, FileSystem.class); + // getFileStatusRecurse is a static method + List<FileStatus> results = (List<FileStatus>) method.invoke(null, path, level, fs); + return results.toArray(new FileStatus[0]); + } catch (Exception ex) { + throw new CatalogException("Failed to invoke HiveStatsUtils.getFileStatusRecurse()", ex); + } + } + + @Override + public void makeSpecFromName(Map<String, String> partSpec, Path currPath) { + try { + Method method = Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class, Set.class); + // makeSpecFromName is a static method + method.invoke(null, partSpec, currPath, null); + } catch (Exception ex) { + throw new CatalogException("Failed to invoke Warehouse.makeSpecFromName()", ex); + } + } + +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java new file mode 100644 index 0000000..fc8a811 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.client; + +/** + * Shim for Hive version 3.1.1. + */ +public class HiveShimV311 extends HiveShimV310 { + +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java new file mode 100644 index 0000000..bac74f4 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.client; + +/** + * Shim for Hive version 3.1.2. + */ +public class HiveShimV312 extends HiveShimV311 { + +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java new file mode 100644 index 0000000..1941096 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog.hive.util; + +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.functions.hive.FlinkHiveUDFException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde2.Deserializer; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + * Utilities for accessing Hive class or methods via Java reflection. + * + * <p>They are put here not for code sharing. Rather, this is a boiler place for managing similar code that involves + * reflection. (In fact, they could be just private method in their respective calling class.) + * + * <p>Relevant Hive methods cannot be called directly because shimming is required to support different, possibly + * incompatible Hive versions. + */ +public class HiveReflectionUtils { + + public static Properties getTableMetadata(HiveShim hiveShim, Table table) { + try { + Method method = hiveShim.getMetaStoreUtilsClass().getMethod("getTableMetadata", Table.class); + return (Properties) method.invoke(null, table); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new CatalogException("Failed to invoke MetaStoreUtils.getTableMetadata()", e); + } + } + + public static List<FieldSchema> getFieldsFromDeserializer(HiveShim hiveShim, String tableName, Deserializer deserializer) + throws SerDeException, MetaException { + try { + Method method = hiveShim.getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", String.class, Deserializer.class); + return (List<FieldSchema>) method.invoke(null, tableName, deserializer); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new CatalogException("Failed to invoke MetaStoreUtils.getFieldsFromDeserializer()", e); + } + } + + public static Deserializer getDeserializer(HiveShim hiveShim, Configuration conf, Table table, boolean skipConfError) + throws MetaException { + try { + Method method = hiveShim.getHiveMetaStoreUtilsClass().getMethod("getDeserializer", Configuration.class, + Table.class, boolean.class); + return (Deserializer) method.invoke(null, conf, table, skipConfError); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new CatalogException("Failed to invoke MetaStoreUtils.getDeserializer()", e); + } + } + + public static List<String> getPvals(HiveShim hiveShim, List<FieldSchema> partCols, Map<String, String> partSpec) { + try { + Method method = hiveShim.getMetaStoreUtilsClass().getMethod("getPvals", List.class, Map.class); + return (List<String>) method.invoke(null, partCols, partSpec); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new CatalogException("Failed to invoke MetaStoreUtils.getFieldsFromDeserializer", e); + } + } + + public static JavaConstantDateObjectInspector createJavaConstantDateObjectInspector(HiveShim hiveShim, Object value) { + Constructor<?> meth = null; + try { + meth = JavaConstantDateObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass()); + meth.setAccessible(true); + return (JavaConstantDateObjectInspector) meth.newInstance(value); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new FlinkHiveUDFException("Failed to instantiate JavaConstantDateObjectInspector"); + } + } + + public static JavaConstantTimestampObjectInspector createJavaConstantTimestampObjectInspector(HiveShim hiveShim, Object value) { + Constructor<?> meth = null; + try { + meth = JavaConstantTimestampObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass()); + meth.setAccessible(true); + return (JavaConstantTimestampObjectInspector) meth.newInstance(value); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new FlinkHiveUDFException("Failed to instantiate JavaConstantTimestampObjectInspector"); + } + } + + public static Object convertToHiveDate(HiveShim hiveShim, String s) throws FlinkHiveUDFException { + try { + Method method = hiveShim.getDateDataTypeClass().getMethod("valueOf", String.class); + return method.invoke(null, s); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new FlinkHiveUDFException("Failed to invoke Hive's Date.valueOf()", e); + } + } + + public static Object convertToHiveTimestamp(HiveShim hiveShim, String s) throws FlinkHiveUDFException { + try { + Method method = hiveShim.getTimestampDataTypeClass().getMethod("valueOf", String.class); + return method.invoke(null, s); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new FlinkHiveUDFException("Failed to invoke Hive's Timestamp.valueOf()", e); + } + } + +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java index b2f2fe5..a6f4e3b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java @@ -84,7 +84,8 @@ public class HiveGenericUDAF } private void init() throws HiveException { - ObjectInspector[] inputInspectors = HiveInspectors.toInspectors(constantArguments, argTypes); + ObjectInspector[] inputInspectors = HiveInspectors.toInspectors(HiveShimLoader.loadHiveShim(hiveVersion), + constantArguments, argTypes); // Flink UDAF only supports Hive UDAF's PARTIAL_1 and FINAL mode diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java index 70c875b..e970ace 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java @@ -19,6 +19,7 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.types.DataType; @@ -40,10 +41,11 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> { private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDF.class); private transient GenericUDF.DeferredObject[] deferredObjects; + private transient HiveShim hiveShim; - public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper) { + public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim) { super(hiveFunctionWrapper); - + this.hiveShim = hiveShim; LOG.info("Creating HiveGenericUDF from '{}'", hiveFunctionWrapper.getClassName()); } @@ -56,7 +58,7 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> { try { returnInspector = function.initializeAndFoldConstants( - HiveInspectors.toInspectors(constantArguments, argTypes)); + HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes)); } catch (UDFArgumentException e) { throw new FlinkHiveUDFException(e); } @@ -91,7 +93,7 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> { LOG.info("Getting result type of HiveGenericUDF from {}", hiveFunctionWrapper.getClassName()); try { - ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(constantArguments, argTypes); + ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes); ObjectInspector resultObjectInspector = hiveFunctionWrapper.createFunction().initializeAndFoldConstants(argumentInspectors); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java index 8ed619e..a7dbc8b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java @@ -21,6 +21,7 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.FunctionContext; import org.apache.flink.table.functions.TableFunction; @@ -63,9 +64,11 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction private transient boolean allIdentityConverter; private transient HiveObjectConversion[] conversions; + private transient HiveShim hiveShim; - public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper) { + public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper, HiveShim hiveShim) { this.hiveFunctionWrapper = hiveFunctionWrapper; + this.hiveShim = hiveShim; } @Override @@ -77,7 +80,7 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction HiveGenericUDTF.this.collect(row); }); - ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(constantArguments, argTypes); + ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes); returnInspector = function.initialize(argumentInspectors); isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(argTypes); @@ -129,7 +132,7 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction LOG.info("Getting result type of HiveGenericUDTF with {}", hiveFunctionWrapper.getClassName()); try { - ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(constantArguments, argTypes); + ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes); return HiveTypeUtil.toFlinkType( hiveFunctionWrapper.createFunction().initialize(argumentInspectors)); } catch (UDFArgumentException e) { diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java index fb3b6e2..d58cb85 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java @@ -19,6 +19,7 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion; @@ -56,9 +57,11 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> { private transient GenericUDFUtils.ConversionHelper conversionHelper; private transient HiveObjectConversion[] conversions; private transient boolean allIdentityConverter; + private transient HiveShim hiveShim; - public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper) { + public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim) { super(hiveFunctionWrapper); + this.hiveShim = hiveShim; LOG.info("Creating HiveSimpleUDF from '{}'", this.hiveFunctionWrapper.getClassName()); } @@ -127,7 +130,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> { .getResolver().getEvalMethod(argTypeInfo).getReturnType(); return HiveInspectors.toFlinkType( - HiveInspectors.getObjectInspector(returnType)); + HiveInspectors.getObjectInspector(hiveShim, returnType)); } catch (UDFArgumentException e) { throw new FlinkHiveUDFException(e); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java index 3546668..22ce60b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java @@ -21,6 +21,8 @@ package org.apache.flink.table.functions.hive.conversion; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; import org.apache.flink.table.functions.hive.FlinkHiveUDFException; import org.apache.flink.table.types.DataType; @@ -65,7 +67,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantByteObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveCharObjectInspector; @@ -75,7 +76,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntOb import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantShortObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; @@ -97,8 +97,6 @@ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -117,7 +115,7 @@ public class HiveInspectors { /** * Get an array of ObjectInspector from the give array of args and their types. */ - public static ObjectInspector[] toInspectors(Object[] args, DataType[] argTypes) { + public static ObjectInspector[] toInspectors(HiveShim hiveShim, Object[] args, DataType[] argTypes) { assert args.length == argTypes.length; ObjectInspector[] argumentInspectors = new ObjectInspector[argTypes.length]; @@ -132,6 +130,7 @@ public class HiveInspectors { } else { argumentInspectors[i] = HiveInspectors.getPrimitiveJavaConstantObjectInspector( + hiveShim, (PrimitiveTypeInfo) HiveTypeUtil.toHiveTypeInfo(argTypes[i]), constant ); @@ -141,7 +140,8 @@ public class HiveInspectors { return argumentInspectors; } - private static ConstantObjectInspector getPrimitiveJavaConstantObjectInspector(PrimitiveTypeInfo typeInfo, Object value) { + private static ConstantObjectInspector getPrimitiveJavaConstantObjectInspector(HiveShim hiveShim, + PrimitiveTypeInfo typeInfo, Object value) { switch (typeInfo.getPrimitiveCategory()) { case BOOLEAN: return new JavaConstantBooleanObjectInspector((Boolean) value); @@ -164,9 +164,9 @@ public class HiveInspectors { case VARCHAR: return new JavaConstantHiveVarcharObjectInspector((HiveVarchar) value); case DATE: - return new JavaConstantDateObjectInspector((Date) value); + return HiveReflectionUtils.createJavaConstantDateObjectInspector(hiveShim, value); case TIMESTAMP: - return new JavaConstantTimestampObjectInspector((Timestamp) value); + return HiveReflectionUtils.createJavaConstantTimestampObjectInspector(hiveShim, value); case DECIMAL: return new JavaConstantHiveDecimalObjectInspector((HiveDecimal) value); case BINARY: @@ -357,7 +357,7 @@ public class HiveInspectors { String.format("Unwrap does not support ObjectInspector '%s' yet", inspector)); } - public static ObjectInspector getObjectInspector(Class clazz) { + public static ObjectInspector getObjectInspector(HiveShim hiveShim, Class clazz) { TypeInfo typeInfo; if (clazz.equals(String.class) || clazz.equals(Text.class)) { @@ -384,10 +384,10 @@ public class HiveInspectors { } else if (clazz.equals(Double.class) || clazz.equals(DoubleWritable.class)) { typeInfo = TypeInfoFactory.doubleTypeInfo; - } else if (clazz.equals(Date.class) || clazz.equals(DateWritable.class)) { + } else if (clazz.equals(hiveShim.getDateDataTypeClass()) || clazz.equals(DateWritable.class)) { typeInfo = TypeInfoFactory.dateTypeInfo; - } else if (clazz.equals(Timestamp.class) || clazz.equals(TimestampWritable.class)) { + } else if (clazz.equals(hiveShim.getTimestampDataTypeClass()) || clazz.equals(TimestampWritable.class)) { typeInfo = TypeInfoFactory.timestampTypeInfo; } else if (clazz.equals(byte[].class) || clazz.equals(BytesWritable.class)) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java index 343a299..ac623d7 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java @@ -78,7 +78,6 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; @@ -359,8 +358,9 @@ public class FlinkStandaloneHiveRunner extends BlockJUnit4ClassRunner { args.add(hiveCmdLineConfig(SCRATCHDIR.varname, outsideConf.getVar(SCRATCHDIR))); args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, outsideConf.getVar(LOCALSCRATCHDIR))); args.add(hiveCmdLineConfig(HIVEHISTORYFILELOC.varname, outsideConf.getVar(HIVEHISTORYFILELOC))); - args.add(hiveCmdLineConfig(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname, - String.valueOf(outsideConf.getBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS)))); + // The following config is removed in Hive 3.1.0. + args.add(hiveCmdLineConfig("hive.warehouse.subdir.inherit.perms", + String.valueOf(outsideConf.getBoolean("hive.warehouse.subdir.inherit.perms", true)))); args.add(hiveCmdLineConfig("hadoop.tmp.dir", outsideConf.get("hadoop.tmp.dir"))); args.add(hiveCmdLineConfig("test.log.dir", outsideConf.get("test.log.dir"))); String metaStorageUrl = "jdbc:derby:memory:" + UUID.randomUUID().toString(); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java index 605a341..236f253 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java @@ -43,7 +43,6 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_COLUMNS; @@ -185,7 +184,8 @@ public class FlinkStandaloneHiveServerContext implements HiveServerContext { createAndSetFolderProperty(LOCALSCRATCHDIR, "localscratchdir", conf, basedir); createAndSetFolderProperty(HIVEHISTORYFILELOC, "tmp", conf, basedir); - conf.setBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS, true); + // HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS is removed from Hive 3.1.0 + conf.setBoolean("hive.warehouse.subdir.inherit.perms", true); createAndSetFolderProperty("hadoop.tmp.dir", "hadooptmp", conf, basedir); createAndSetFolderProperty("test.log.dir", "logs", conf, basedir); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java index ab0ee80..9a64238 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java @@ -50,6 +50,9 @@ public class HiveRunnerShimLoader { case HiveShimLoader.HIVE_VERSION_V2_3_4: case HiveShimLoader.HIVE_VERSION_V2_3_5: case HiveShimLoader.HIVE_VERSION_V2_3_6: + case HiveShimLoader.HIVE_VERSION_V3_1_0: + case HiveShimLoader.HIVE_VERSION_V3_1_1: + case HiveShimLoader.HIVE_VERSION_V3_1_2: return new HiveRunnerShimV4(); default: throw new RuntimeException("Unsupported Hive version " + v); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java index 058b5c4..938f794 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java @@ -19,6 +19,9 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.functions.hive.util.TestGenericUDFArray; import org.apache.flink.table.functions.hive.util.TestGenericUDFStructSize; import org.apache.flink.table.types.DataType; @@ -38,9 +41,8 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStringToMap; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct; import org.junit.Test; +import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; import java.util.HashMap; import static org.junit.Assert.assertEquals; @@ -49,6 +51,7 @@ import static org.junit.Assert.assertEquals; * Test for {@link HiveGenericUDF}. */ public class HiveGenericUDFTest { + private static HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()); @Test public void testAbs() { @@ -108,7 +111,7 @@ public class HiveGenericUDFTest { } @Test - public void testDateFormat() { + public void testDateFormat() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { String constYear = "y"; String constMonth = "M"; @@ -138,7 +141,7 @@ public class HiveGenericUDFTest { } ); - assertEquals("8", udf.eval(Date.valueOf("2019-08-31"), constMonth)); + assertEquals("8", udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, "2019-08-31"), constMonth)); } @Test @@ -235,7 +238,7 @@ public class HiveGenericUDFTest { } @Test - public void testDataDiff() { + public void testDataDiff() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException { String d = "1969-07-20"; String t1 = "1969-07-20 00:00:00"; @@ -267,7 +270,8 @@ public class HiveGenericUDFTest { } ); - assertEquals(-4182, udf.eval(Date.valueOf(d), Timestamp.valueOf(t2))); + assertEquals(-4182, udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, d), + HiveReflectionUtils.convertToHiveTimestamp(hiveShim, t2))); // Test invalid char length udf = init( @@ -381,7 +385,7 @@ public class HiveGenericUDFTest { } private static HiveGenericUDF init(Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) { - HiveGenericUDF udf = new HiveGenericUDF(new HiveFunctionWrapper(hiveUdfClass.getName())); + HiveGenericUDF udf = new HiveGenericUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim); udf.setArgumentTypesAndConstants(constantArgs, argTypes); udf.getHiveResultType(constantArgs, argTypes); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java index 71712b7..12cb080 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java @@ -19,6 +19,8 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; import org.apache.flink.table.functions.hive.conversion.HiveInspectors; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; @@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals; * Test for {@link HiveGenericUDTF}. */ public class HiveGenericUDTFTest { + private static HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()); private static TestCollector collector; @@ -193,12 +196,12 @@ public class HiveGenericUDTFTest { private static HiveGenericUDTF init(Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception { HiveFunctionWrapper<GenericUDTF> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName()); - HiveGenericUDTF udf = new HiveGenericUDTF(wrapper); + HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim); udf.setArgumentTypesAndConstants(constantArgs, argTypes); udf.getHiveResultType(constantArgs, argTypes); - ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(constantArgs, argTypes); + ObjectInspector[] argumentInspectors = HiveInspectors.toInspectors(hiveShim, constantArgs, argTypes); ObjectInspector returnInspector = wrapper.createFunction().initialize(argumentInspectors); udf.open(null); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java index 4bbe0d0..7e64bf7 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java @@ -19,6 +19,9 @@ package org.apache.flink.table.functions.hive; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.hive.client.HiveShim; +import org.apache.flink.table.catalog.hive.client.HiveShimLoader; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.functions.hive.util.TestHiveUDFArray; import org.apache.flink.table.types.DataType; @@ -26,7 +29,6 @@ import org.apache.hadoop.hive.ql.udf.UDFBase64; import org.apache.hadoop.hive.ql.udf.UDFBin; import org.apache.hadoop.hive.ql.udf.UDFConv; import org.apache.hadoop.hive.ql.udf.UDFJson; -import org.apache.hadoop.hive.ql.udf.UDFMinute; import org.apache.hadoop.hive.ql.udf.UDFRand; import org.apache.hadoop.hive.ql.udf.UDFRegExpExtract; import org.apache.hadoop.hive.ql.udf.UDFToInteger; @@ -36,8 +38,6 @@ import org.junit.Test; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -46,6 +46,8 @@ import static org.junit.Assert.assertTrue; * Test for {@link HiveSimpleUDF}. */ public class HiveSimpleUDFTest { + private static HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()); + @Test public void testUDFRand() { HiveSimpleUDF udf = init(UDFRand.class, new DataType[0]); @@ -122,20 +124,7 @@ public class HiveSimpleUDFTest { } @Test - public void testUDFMinute() { - HiveSimpleUDF udf = init( - UDFMinute.class, - new DataType[]{ - DataTypes.STRING() - }); - - assertEquals(17, udf.eval("1969-07-20 20:17:40")); - assertEquals(17, udf.eval(Timestamp.valueOf("1969-07-20 20:17:40"))); - assertEquals(58, udf.eval("12:58:59")); - } - - @Test - public void testUDFWeekOfYear() { + public void testUDFWeekOfYear() throws FlinkHiveUDFException { HiveSimpleUDF udf = init( UDFWeekOfYear.class, new DataType[]{ @@ -143,8 +132,8 @@ public class HiveSimpleUDFTest { }); assertEquals(29, udf.eval("1969-07-20")); - assertEquals(29, udf.eval(Date.valueOf("1969-07-20"))); - assertEquals(29, udf.eval(Timestamp.valueOf("1969-07-20 00:00:00"))); + assertEquals(29, udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, "1969-07-20"))); + assertEquals(29, udf.eval(HiveReflectionUtils.convertToHiveTimestamp(hiveShim, "1969-07-20 00:00:00"))); assertEquals(1, udf.eval("1980-12-31 12:59:59")); } @@ -230,7 +219,7 @@ public class HiveSimpleUDFTest { } protected static HiveSimpleUDF init(Class hiveUdfClass, DataType[] argTypes) { - HiveSimpleUDF udf = new HiveSimpleUDF(new HiveFunctionWrapper(hiveUdfClass.getName())); + HiveSimpleUDF udf = new HiveSimpleUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim); // Hive UDF won't have literal args udf.setArgumentTypesAndConstants(new Object[0], argTypes);