This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e670b92  [FLINK-13930][hive] Support Hive version 3.1.x
e670b92 is described below

commit e670b929ae124e04c6cdec560d4432ae9c561ff9
Author: Xuefu Zhang <xuef...@alibaba-inc.com>
AuthorDate: Fri Aug 30 11:23:57 2019 -0700

    [FLINK-13930][hive] Support Hive version 3.1.x
    
    Support Hive 3.1.x versions, including 3.1.0, 3.1.1, and 3.1.2. Those are 
latest and newest versions from Hive community.
    
    This closes #9580.
---
 flink-connectors/flink-connector-hive/pom.xml      |  13 +-
 .../flink/connectors/hive/HiveTableFactory.java    |  10 +-
 .../connectors/hive/HiveTableOutputFormat.java     |   6 +-
 .../flink/connectors/hive/HiveTableSink.java       |   8 +-
 .../flink/table/catalog/hive/HiveCatalog.java      |  15 ++-
 .../flink/table/catalog/hive/client/HiveShim.java  |  50 ++++++++
 .../table/catalog/hive/client/HiveShimLoader.java  |  12 ++
 .../table/catalog/hive/client/HiveShimV120.java    |  51 ++++++++
 .../table/catalog/hive/client/HiveShimV310.java    | 114 ++++++++++++++++++
 .../table/catalog/hive/client/HiveShimV311.java    |  26 ++++
 .../table/catalog/hive/client/HiveShimV312.java    |  26 ++++
 .../catalog/hive/util/HiveReflectionUtils.java     | 131 +++++++++++++++++++++
 .../table/functions/hive/HiveGenericUDAF.java      |   3 +-
 .../flink/table/functions/hive/HiveGenericUDF.java |  10 +-
 .../table/functions/hive/HiveGenericUDTF.java      |   9 +-
 .../flink/table/functions/hive/HiveSimpleUDF.java  |   7 +-
 .../functions/hive/conversion/HiveInspectors.java  |  22 ++--
 .../connectors/hive/FlinkStandaloneHiveRunner.java |   6 +-
 .../hive/FlinkStandaloneHiveServerContext.java     |   4 +-
 .../connectors/hive/HiveRunnerShimLoader.java      |   3 +
 .../table/functions/hive/HiveGenericUDFTest.java   |  18 +--
 .../table/functions/hive/HiveGenericUDTFTest.java  |   7 +-
 .../table/functions/hive/HiveSimpleUDFTest.java    |  29 ++---
 23 files changed, 507 insertions(+), 73 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index 66cbc19..471e045 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -115,10 +115,6 @@ under the License.
                                        <artifactId>commons-lang</artifactId>
                                </exclusion>
                                <exclusion>
-                                       <groupId>com.zaxxer</groupId>
-                                       <artifactId>HikariCP</artifactId>
-                               </exclusion>
-                               <exclusion>
                                        <groupId>co.cask.tephra</groupId>
                                        <artifactId>tephra-api</artifactId>
                                </exclusion>
@@ -616,7 +612,7 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
-        <dependency>
+               <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
             <version>${project.version}</version>
@@ -671,6 +667,13 @@ under the License.
                        </properties>
                </profile>
                <profile>
+                       <id>hive-3.1.1</id>
+                       <properties>
+                               <hive.version>3.1.1</hive.version>
+                       </properties>
+
+               </profile>
+               <profile>
                        <id>skip-hive-tests</id>
                        <build>
                                <plugins>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
index 73626d0..235919c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.config.CatalogConfig;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.factories.TableFactoryUtil;
@@ -71,6 +73,7 @@ public class HiveTableFactory
 
        private final HiveConf hiveConf;
        private final String hiveVersion;
+       private final HiveShim hiveShim;
 
        public HiveTableFactory(HiveConf hiveConf) {
                this.hiveConf = checkNotNull(hiveConf, "hiveConf cannot be 
null");
@@ -78,6 +81,7 @@ public class HiveTableFactory
                // this has to come from hiveConf, otherwise we may lose what 
user specifies in the yaml file
                this.hiveVersion = 
checkNotNull(hiveConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
                                "Hive version is not defined");
+               this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
        }
 
        @Override
@@ -166,19 +170,19 @@ public class HiveTableFactory
 
                        return new ScalarFunctionDefinition(
                                name,
-                               new HiveSimpleUDF(new 
HiveFunctionWrapper<>(functionClassName))
+                               new HiveSimpleUDF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim)
                        );
                } else if (GenericUDF.class.isAssignableFrom(clazz)) {
                        LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDF", name);
 
                        return new ScalarFunctionDefinition(
                                name,
-                               new HiveGenericUDF(new 
HiveFunctionWrapper<>(functionClassName))
+                               new HiveGenericUDF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim)
                        );
                } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
                        LOG.info("Transforming Hive function '{}' into a 
HiveGenericUDTF", name);
 
-                       HiveGenericUDTF udtf = new HiveGenericUDTF(new 
HiveFunctionWrapper<>(functionClassName));
+                       HiveGenericUDTF udtf = new HiveGenericUDTF(new 
HiveFunctionWrapper<>(functionClassName), hiveShim);
 
                        return new TableFunctionDefinition(
                                name,
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
index ec0b41d..0760ae5 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableOutputFormat.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -224,12 +223,13 @@ public class HiveTableOutputFormat extends 
HadoopOutputFormatCommonBase<Row> imp
                        }
                        if (isPartitioned) {
                                if (isDynamicPartition) {
-                                       FileStatus[] generatedParts = 
HiveStatsUtils.getFileStatusRecurse(stagingDir,
+                                       HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(hiveVersion);
+                                       FileStatus[] generatedParts = 
hiveShim.getFileStatusRecurse(stagingDir,
                                                partitionColumns.size() - 
hiveTablePartition.getPartitionSpec().size(), fs);
                                        for (FileStatus part : generatedParts) {
                                                
commitJob(part.getPath().toString());
                                                LinkedHashMap<String, String> 
fullPartSpec = new LinkedHashMap<>();
-                                               
Warehouse.makeSpecFromName(fullPartSpec, part.getPath());
+                                               
hiveShim.makeSpecFromName(fullPartSpec, part.getPath());
                                                loadPartition(part.getPath(), 
table, fullPartSpec, client);
                                        }
                                } else {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
index 2974de0..d23a3ed 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java
@@ -26,7 +26,10 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.table.sinks.OverwritableTableSink;
 import org.apache.flink.table.sinks.PartitionableTableSink;
@@ -40,7 +43,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -66,6 +68,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> 
implements Partiti
        private final ObjectPath tablePath;
        private final TableSchema tableSchema;
        private final String hiveVersion;
+       private final HiveShim hiveShim;
 
        private Map<String, String> staticPartitionSpec = 
Collections.emptyMap();
 
@@ -77,6 +80,7 @@ public class HiveTableSink extends OutputFormatTableSink<Row> 
implements Partiti
                this.catalogTable = table;
                hiveVersion = 
Preconditions.checkNotNull(jobConf.get(HiveCatalogValidator.CATALOG_HIVE_VERSION),
                                "Hive version is not defined");
+               hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
                tableSchema = table.getSchema();
        }
 
@@ -120,7 +124,7 @@ public class HiveTableSink extends 
OutputFormatTableSink<Row> implements Partiti
                                tablePath,
                                catalogTable,
                                hiveTablePartition,
-                               MetaStoreUtils.getTableMetadata(table),
+                               HiveReflectionUtils.getTableMetadata(hiveShim, 
table),
                                overwrite);
                } catch (TException e) {
                        throw new CatalogException("Failed to query Hive 
metaStore", e);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index dd50ce2..7914c89 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -53,7 +53,10 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
 import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveStatsUtil;
 import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
@@ -63,7 +66,6 @@ import org.apache.flink.util.StringUtils;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -121,6 +123,7 @@ public class HiveCatalog extends AbstractCatalog {
 
        private final HiveConf hiveConf;
        private final String hiveVersion;
+       private final HiveShim hiveShim;
 
        private HiveMetastoreClientWrapper client;
 
@@ -138,6 +141,7 @@ public class HiveCatalog extends AbstractCatalog {
                this.hiveConf = hiveConf == null ? createHiveConf(null) : 
hiveConf;
                checkArgument(!StringUtils.isNullOrWhitespaceOnly(hiveVersion), 
"hiveVersion cannot be null or empty");
                this.hiveVersion = hiveVersion;
+               hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
                // add this to hiveConf to make sure table factory and 
source/sink see the same Hive version as HiveCatalog
                this.hiveConf.set(HiveCatalogValidator.CATALOG_HIVE_VERSION, 
hiveVersion);
 
@@ -494,7 +498,7 @@ public class HiveCatalog extends AbstractCatalog {
                }
        }
 
-       private static CatalogBaseTable instantiateCatalogTable(Table 
hiveTable, HiveConf hiveConf) {
+       private CatalogBaseTable instantiateCatalogTable(Table hiveTable, 
HiveConf hiveConf) {
                boolean isView = TableType.valueOf(hiveTable.getTableType()) == 
TableType.VIRTUAL_VIEW;
 
                // Table properties
@@ -515,8 +519,8 @@ public class HiveCatalog extends AbstractCatalog {
                } else {
                        // get schema from deserializer
                        try {
-                               fields = 
MetaStoreUtils.getFieldsFromDeserializer(hiveTable.getTableName(),
-                                               
MetaStoreUtils.getDeserializer(hiveConf, hiveTable, true));
+                               fields = 
HiveReflectionUtils.getFieldsFromDeserializer(hiveShim, 
hiveTable.getTableName(),
+                                               
HiveReflectionUtils.getDeserializer(hiveShim, hiveConf, hiveTable, true));
                        } catch (SerDeException | MetaException e) {
                                throw new CatalogException("Failed to get Hive 
table schema from deserializer", e);
                        }
@@ -735,7 +739,8 @@ public class HiveCatalog extends AbstractCatalog {
 
                try {
                        // partition spec can be partial
-                       List<String> partialVals = 
MetaStoreUtils.getPvals(hiveTable.getPartitionKeys(), 
partitionSpec.getPartitionSpec());
+                       List<String> partialVals = 
HiveReflectionUtils.getPvals(hiveShim, hiveTable.getPartitionKeys(),
+                               partitionSpec.getPartitionSpec());
                        return 
client.listPartitionNames(tablePath.getDatabaseName(), 
tablePath.getObjectName(), partialVals,
                                (short) 
-1).stream().map(HiveCatalog::createPartitionSpec).collect(Collectors.toList());
                } catch (TException e) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 33e9708..1f39d84 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -36,6 +37,7 @@ import org.apache.thrift.TException;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * A shim layer to support different versions of Hive.
@@ -109,4 +111,52 @@ public interface HiveShim {
         */
        SimpleGenericUDAFParameterInfo 
createUDAFParameterInfo(ObjectInspector[] params, boolean isWindowing,
                        boolean distinct, boolean allColumns);
+
+       /**
+        * Get the class of Hive's MetaStoreUtils because its package name was 
changed in Hive 3.1.0.
+        *
+        * @return MetaStoreUtils class
+        */
+       Class<?> getMetaStoreUtilsClass();
+
+       /**
+        * Get the class of Hive's HiveMetaStoreUtils as it was split from 
MetaStoreUtils class in Hive 3.1.0.
+        *
+        * @return HiveMetaStoreUtils class
+        */
+       Class<?> getHiveMetaStoreUtilsClass();
+
+       /**
+        * Hive Date data type class was changed in Hive 3.1.0.
+        *
+        * @return Hive's Date class
+        */
+       Class<?> getDateDataTypeClass();
+
+       /**
+        * Hive Timestamp data type class was changed in Hive 3.1.0.
+        *
+        * @return Hive's Timestamp class
+        */
+       Class<?> getTimestampDataTypeClass();
+
+       /**
+        * The return type of HiveStatsUtils.getFileStatusRecurse was changed 
from array to List in Hive 3.1.0.
+        *
+        * @param path the path of the directory
+        * @param level the level of recursion
+        * @param fs the file system of the directory
+        * @return an array of the entries
+        * @throws IOException in case of any io error
+        */
+       FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs) 
throws IOException;
+
+       /**
+        * The signature of HiveStatsUtils.makeSpecFromName() was changed in 
Hive 3.1.0.
+        *
+        * @param partSpec partition specs
+        * @param currPath the current path
+        */
+       void makeSpecFromName(Map<String, String> partSpec, Path currPath);
+
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
index 802f2bd..fec8b7c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimLoader.java
@@ -44,6 +44,9 @@ public class HiveShimLoader {
        public static final String HIVE_VERSION_V2_3_4 = "2.3.4";
        public static final String HIVE_VERSION_V2_3_5 = "2.3.5";
        public static final String HIVE_VERSION_V2_3_6 = "2.3.6";
+       public static final String HIVE_VERSION_V3_1_0 = "3.1.0";
+       public static final String HIVE_VERSION_V3_1_1 = "3.1.1";
+       public static final String HIVE_VERSION_V3_1_2 = "3.1.2";
 
        private static final Map<String, HiveShim> hiveShims = new 
ConcurrentHashMap<>(2);
 
@@ -90,6 +93,15 @@ public class HiveShimLoader {
                        if (v.startsWith(HIVE_VERSION_V2_3_6)) {
                                return new HiveShimV236();
                        }
+                       if (v.startsWith(HIVE_VERSION_V3_1_0)) {
+                               return new HiveShimV310();
+                       }
+                       if (v.startsWith(HIVE_VERSION_V3_1_1)) {
+                               return new HiveShimV311();
+                       }
+                       if (v.startsWith(HIVE_VERSION_V3_1_2)) {
+                               return new HiveShimV312();
+                       }
                        throw new CatalogException("Unsupported Hive version " 
+ v);
                });
        }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
index a3f30ff..d1ed444 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV120.java
@@ -21,13 +21,16 @@ package org.apache.flink.table.catalog.hive.client;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -45,6 +48,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Shim for Hive version 1.2.0.
@@ -143,4 +147,51 @@ public class HiveShimV120 implements HiveShim {
                        throw new CatalogException("Failed to create 
SimpleGenericUDAFParameterInfo", e);
                }
        }
+
+       @Override
+       public Class<?> getMetaStoreUtilsClass() {
+               try {
+                       return 
Class.forName("org.apache.hadoop.hive.metastore.MetaStoreUtils");
+               } catch (ClassNotFoundException e) {
+                       throw new CatalogException("Failed to find class 
MetaStoreUtils", e);
+               }
+       }
+
+       @Override
+       public Class<?> getHiveMetaStoreUtilsClass() {
+               return getMetaStoreUtilsClass();
+       }
+
+       @Override
+       public Class<?> getDateDataTypeClass() {
+               return java.sql.Date.class;
+       }
+
+       @Override
+       public Class<?> getTimestampDataTypeClass() {
+               return java.sql.Timestamp.class;
+       }
+
+       @Override
+       public FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs) throws IOException {
+               try {
+                       Method method = 
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, 
Integer.TYPE, FileSystem.class);
+                       // getFileStatusRecurse is a static method
+                       return (FileStatus[]) method.invoke(null, path, level, 
fs);
+               } catch (Exception ex) {
+                       throw new CatalogException("Failed to invoke 
HiveStatsUtils.getFileStatusRecurse()", ex);
+               }
+       }
+
+       @Override
+       public void makeSpecFromName(Map<String, String> partSpec, Path 
currPath) {
+               try {
+                       Method method = 
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class);
+                       // makeSpecFromName is a static method
+                       method.invoke(null, partSpec, currPath);
+               } catch (Exception ex) {
+                       throw new CatalogException("Failed to invoke 
Warehouse.makeSpecFromName()", ex);
+               }
+       }
+
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
new file mode 100644
index 0000000..62438d4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Shim for Hive version 3.1.0.
+ */
+public class HiveShimV310 extends HiveShimV235 {
+
+       @Override
+       public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
+               try {
+                       Method method = 
RetryingMetaStoreClient.class.getMethod("getProxy", Configuration.class, 
Boolean.TYPE);
+                       // getProxy is a static method
+                       return (IMetaStoreClient) method.invoke(null, hiveConf, 
true);
+               } catch (Exception ex) {
+                       throw new CatalogException("Failed to create Hive 
Metastore client", ex);
+               }
+       }
+
+       @Override
+       public Class<?> getMetaStoreUtilsClass() {
+               try {
+                       return 
Class.forName("org.apache.hadoop.hive.metastore.utils.MetaStoreUtils");
+               } catch (ClassNotFoundException e) {
+                       throw new CatalogException("Failed to find class 
MetaStoreUtils", e);
+               }
+       }
+
+       @Override
+       public Class<?> getHiveMetaStoreUtilsClass() {
+               try {
+                       return 
Class.forName("org.apache.hadoop.hive.metastore.HiveMetaStoreUtils");
+               } catch (ClassNotFoundException e) {
+                       throw new CatalogException("Failed to find class 
HiveMetaStoreUtils", e);
+               }
+       }
+
+       @Override
+       public Class<?> getDateDataTypeClass() {
+               try {
+                       return 
Class.forName("org.apache.hadoop.hive.common.type.Date");
+               } catch (ClassNotFoundException e) {
+                       throw new CatalogException("Failed to find class 
org.apache.hadoop.hive.common.type.Date", e);
+               }
+       }
+
+       @Override
+       public Class<?> getTimestampDataTypeClass() {
+               try {
+                       return 
Class.forName("org.apache.hadoop.hive.common.type.Timestamp");
+               } catch (ClassNotFoundException e) {
+                       throw new CatalogException("Failed to find class 
org.apache.hadoop.hive.common.type.Timestamp", e);
+               }
+       }
+
+       @Override
+       public FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs) throws IOException {
+               try {
+                       Method method = 
HiveStatsUtils.class.getMethod("getFileStatusRecurse", Path.class, 
Integer.TYPE, FileSystem.class);
+                       // getFileStatusRecurse is a static method
+                       List<FileStatus> results = (List<FileStatus>) 
method.invoke(null, path, level, fs);
+                       return results.toArray(new FileStatus[0]);
+               } catch (Exception ex) {
+                       throw new CatalogException("Failed to invoke 
HiveStatsUtils.getFileStatusRecurse()", ex);
+               }
+       }
+
+       @Override
+       public void makeSpecFromName(Map<String, String> partSpec, Path 
currPath) {
+               try {
+                       Method method = 
Warehouse.class.getMethod("makeSpecFromName", Map.class, Path.class, Set.class);
+                       // makeSpecFromName is a static method
+                       method.invoke(null, partSpec, currPath, null);
+               } catch (Exception ex) {
+                       throw new CatalogException("Failed to invoke 
Warehouse.makeSpecFromName()", ex);
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java
new file mode 100644
index 0000000..fc8a811
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV311.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+/**
+ * Shim for Hive version 3.1.1.
+ */
+public class HiveShimV311 extends HiveShimV310 {
+
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java
new file mode 100644
index 0000000..bac74f4
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV312.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.client;
+
+/**
+ * Shim for Hive version 3.1.2.
+ */
+public class HiveShimV312 extends HiveShimV311 {
+
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
new file mode 100644
index 0000000..1941096
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.hive.util;
+
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Utilities for accessing Hive class or methods via Java reflection.
+ *
+ * <p>They are put here not for code sharing. Rather, this is a boiler place 
for managing similar code that involves
+ * reflection. (In fact, they could be just private method in their respective 
calling class.)
+ *
+ * <p>Relevant Hive methods cannot be called directly because shimming is 
required to support different, possibly
+ * incompatible Hive versions.
+ */
+public class HiveReflectionUtils {
+
+       public static Properties getTableMetadata(HiveShim hiveShim, Table 
table) {
+               try {
+                       Method method = 
hiveShim.getMetaStoreUtilsClass().getMethod("getTableMetadata", Table.class);
+                       return (Properties) method.invoke(null, table);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getTableMetadata()", e);
+               }
+       }
+
+       public static List<FieldSchema> getFieldsFromDeserializer(HiveShim 
hiveShim, String tableName, Deserializer deserializer)
+                       throws SerDeException, MetaException {
+               try {
+                       Method method = 
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getFieldsFromDeserializer", 
String.class, Deserializer.class);
+                       return (List<FieldSchema>) method.invoke(null, 
tableName, deserializer);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getFieldsFromDeserializer()", e);
+               }
+       }
+
+       public static Deserializer getDeserializer(HiveShim hiveShim, 
Configuration conf, Table table, boolean skipConfError)
+                       throws MetaException {
+               try {
+                       Method method = 
hiveShim.getHiveMetaStoreUtilsClass().getMethod("getDeserializer", 
Configuration.class,
+                               Table.class, boolean.class);
+                       return (Deserializer) method.invoke(null, conf, table, 
skipConfError);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getDeserializer()", e);
+               }
+       }
+
+       public static List<String> getPvals(HiveShim hiveShim, 
List<FieldSchema> partCols, Map<String, String> partSpec) {
+               try {
+                       Method method = 
hiveShim.getMetaStoreUtilsClass().getMethod("getPvals", List.class, Map.class);
+                       return (List<String>) method.invoke(null, partCols, 
partSpec);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new CatalogException("Failed to invoke 
MetaStoreUtils.getFieldsFromDeserializer", e);
+               }
+       }
+
+       public static JavaConstantDateObjectInspector 
createJavaConstantDateObjectInspector(HiveShim hiveShim, Object value) {
+               Constructor<?> meth = null;
+               try {
+                       meth = 
JavaConstantDateObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass());
+                       meth.setAccessible(true);
+                       return (JavaConstantDateObjectInspector) 
meth.newInstance(value);
+               } catch (NoSuchMethodException | InstantiationException | 
IllegalAccessException | InvocationTargetException e) {
+                       throw new FlinkHiveUDFException("Failed to instantiate 
JavaConstantDateObjectInspector");
+               }
+       }
+
+       public static JavaConstantTimestampObjectInspector 
createJavaConstantTimestampObjectInspector(HiveShim hiveShim, Object value) {
+               Constructor<?> meth = null;
+               try {
+                       meth = 
JavaConstantTimestampObjectInspector.class.getDeclaredConstructor(hiveShim.getDateDataTypeClass());
+                       meth.setAccessible(true);
+                       return (JavaConstantTimestampObjectInspector) 
meth.newInstance(value);
+               } catch (NoSuchMethodException | InstantiationException | 
IllegalAccessException | InvocationTargetException e) {
+                       throw new FlinkHiveUDFException("Failed to instantiate 
JavaConstantTimestampObjectInspector");
+               }
+       }
+
+       public static Object convertToHiveDate(HiveShim hiveShim, String s) 
throws FlinkHiveUDFException {
+               try {
+                       Method method = 
hiveShim.getDateDataTypeClass().getMethod("valueOf", String.class);
+                       return method.invoke(null, s);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new FlinkHiveUDFException("Failed to invoke 
Hive's Date.valueOf()", e);
+               }
+       }
+
+       public static Object convertToHiveTimestamp(HiveShim hiveShim, String 
s) throws FlinkHiveUDFException {
+               try {
+                       Method method = 
hiveShim.getTimestampDataTypeClass().getMethod("valueOf", String.class);
+                       return method.invoke(null, s);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new FlinkHiveUDFException("Failed to invoke 
Hive's Timestamp.valueOf()", e);
+               }
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index b2f2fe5..a6f4e3b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -84,7 +84,8 @@ public class HiveGenericUDAF
        }
 
        private void init() throws HiveException {
-               ObjectInspector[] inputInspectors = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+               ObjectInspector[] inputInspectors = 
HiveInspectors.toInspectors(HiveShimLoader.loadHiveShim(hiveVersion),
+                       constantArguments, argTypes);
 
                // Flink UDAF only supports Hive UDAF's PARTIAL_1 and FINAL mode
 
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
index 70c875b..e970ace 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
@@ -40,10 +41,11 @@ public class HiveGenericUDF extends 
HiveScalarFunction<GenericUDF> {
        private static final Logger LOG = 
LoggerFactory.getLogger(HiveGenericUDF.class);
 
        private transient GenericUDF.DeferredObject[] deferredObjects;
+       private transient HiveShim hiveShim;
 
-       public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> 
hiveFunctionWrapper) {
+       public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> 
hiveFunctionWrapper, HiveShim hiveShim) {
                super(hiveFunctionWrapper);
-
+               this.hiveShim = hiveShim;
                LOG.info("Creating HiveGenericUDF from '{}'", 
hiveFunctionWrapper.getClassName());
        }
 
@@ -56,7 +58,7 @@ public class HiveGenericUDF extends 
HiveScalarFunction<GenericUDF> {
 
                try {
                        returnInspector = function.initializeAndFoldConstants(
-                               HiveInspectors.toInspectors(constantArguments, 
argTypes));
+                               HiveInspectors.toInspectors(hiveShim, 
constantArguments, argTypes));
                } catch (UDFArgumentException e) {
                        throw new FlinkHiveUDFException(e);
                }
@@ -91,7 +93,7 @@ public class HiveGenericUDF extends 
HiveScalarFunction<GenericUDF> {
                LOG.info("Getting result type of HiveGenericUDF from {}", 
hiveFunctionWrapper.getClassName());
 
                try {
-                       ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+                       ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
 
                        ObjectInspector resultObjectInspector =
                                
hiveFunctionWrapper.createFunction().initializeAndFoldConstants(argumentInspectors);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
index 8ed619e..a7dbc8b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.functions.hive;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.TableFunction;
@@ -63,9 +64,11 @@ public class HiveGenericUDTF extends TableFunction<Row> 
implements HiveFunction
 
        private transient boolean allIdentityConverter;
        private transient HiveObjectConversion[] conversions;
+       private transient HiveShim hiveShim;
 
-       public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> 
hiveFunctionWrapper) {
+       public HiveGenericUDTF(HiveFunctionWrapper<GenericUDTF> 
hiveFunctionWrapper, HiveShim hiveShim) {
                this.hiveFunctionWrapper = hiveFunctionWrapper;
+               this.hiveShim = hiveShim;
        }
 
        @Override
@@ -77,7 +80,7 @@ public class HiveGenericUDTF extends TableFunction<Row> 
implements HiveFunction
                        HiveGenericUDTF.this.collect(row);
                });
 
-               ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+               ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
                returnInspector = function.initialize(argumentInspectors);
 
                isArgsSingleArray = 
HiveFunctionUtil.isSingleBoxedArray(argTypes);
@@ -129,7 +132,7 @@ public class HiveGenericUDTF extends TableFunction<Row> 
implements HiveFunction
                LOG.info("Getting result type of HiveGenericUDTF with {}", 
hiveFunctionWrapper.getClassName());
 
                try {
-                       ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(constantArguments, argTypes);
+                       ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
                        return HiveTypeUtil.toFlinkType(
                                
hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
                } catch (UDFArgumentException e) {
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index fb3b6e2..d58cb85 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
@@ -56,9 +57,11 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
        private transient GenericUDFUtils.ConversionHelper conversionHelper;
        private transient HiveObjectConversion[] conversions;
        private transient boolean allIdentityConverter;
+       private transient HiveShim hiveShim;
 
-       public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper) {
+       public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, 
HiveShim hiveShim) {
                super(hiveFunctionWrapper);
+               this.hiveShim = hiveShim;
                LOG.info("Creating HiveSimpleUDF from '{}'", 
this.hiveFunctionWrapper.getClassName());
        }
 
@@ -127,7 +130,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
                                
.getResolver().getEvalMethod(argTypeInfo).getReturnType();
 
                        return HiveInspectors.toFlinkType(
-                               HiveInspectors.getObjectInspector(returnType));
+                               HiveInspectors.getObjectInspector(hiveShim, 
returnType));
                } catch (UDFArgumentException e) {
                        throw new FlinkHiveUDFException(e);
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 3546668..22ce60b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -21,6 +21,8 @@ package org.apache.flink.table.functions.hive.conversion;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
 import org.apache.flink.table.types.DataType;
@@ -65,7 +67,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspecto
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBinaryObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantByteObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDateObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantHiveCharObjectInspector;
@@ -75,7 +76,6 @@ import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntOb
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantShortObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantTimestampObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -97,8 +97,6 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 
 import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -117,7 +115,7 @@ public class HiveInspectors {
        /**
         * Get an array of ObjectInspector from the give array of args and 
their types.
         */
-       public static ObjectInspector[] toInspectors(Object[] args, DataType[] 
argTypes) {
+       public static ObjectInspector[] toInspectors(HiveShim hiveShim, 
Object[] args, DataType[] argTypes) {
                assert args.length == argTypes.length;
 
                ObjectInspector[] argumentInspectors = new 
ObjectInspector[argTypes.length];
@@ -132,6 +130,7 @@ public class HiveInspectors {
                        } else {
                                argumentInspectors[i] =
                                        
HiveInspectors.getPrimitiveJavaConstantObjectInspector(
+                                               hiveShim,
                                                (PrimitiveTypeInfo) 
HiveTypeUtil.toHiveTypeInfo(argTypes[i]),
                                                constant
                                        );
@@ -141,7 +140,8 @@ public class HiveInspectors {
                return argumentInspectors;
        }
 
-       private static ConstantObjectInspector 
getPrimitiveJavaConstantObjectInspector(PrimitiveTypeInfo typeInfo, Object 
value) {
+       private static ConstantObjectInspector 
getPrimitiveJavaConstantObjectInspector(HiveShim hiveShim,
+                               PrimitiveTypeInfo typeInfo, Object value) {
                switch (typeInfo.getPrimitiveCategory()) {
                        case BOOLEAN:
                                return new 
JavaConstantBooleanObjectInspector((Boolean) value);
@@ -164,9 +164,9 @@ public class HiveInspectors {
                        case VARCHAR:
                                return new 
JavaConstantHiveVarcharObjectInspector((HiveVarchar) value);
                        case DATE:
-                               return new 
JavaConstantDateObjectInspector((Date) value);
+                               return 
HiveReflectionUtils.createJavaConstantDateObjectInspector(hiveShim, value);
                        case TIMESTAMP:
-                               return new 
JavaConstantTimestampObjectInspector((Timestamp) value);
+                               return 
HiveReflectionUtils.createJavaConstantTimestampObjectInspector(hiveShim, value);
                        case DECIMAL:
                                return new 
JavaConstantHiveDecimalObjectInspector((HiveDecimal) value);
                        case BINARY:
@@ -357,7 +357,7 @@ public class HiveInspectors {
                        String.format("Unwrap does not support ObjectInspector 
'%s' yet", inspector));
        }
 
-       public static ObjectInspector getObjectInspector(Class clazz) {
+       public static ObjectInspector getObjectInspector(HiveShim hiveShim, 
Class clazz) {
                TypeInfo typeInfo;
 
                if (clazz.equals(String.class) || clazz.equals(Text.class)) {
@@ -384,10 +384,10 @@ public class HiveInspectors {
                } else if (clazz.equals(Double.class) || 
clazz.equals(DoubleWritable.class)) {
 
                        typeInfo = TypeInfoFactory.doubleTypeInfo;
-               } else if (clazz.equals(Date.class) || 
clazz.equals(DateWritable.class)) {
+               } else if (clazz.equals(hiveShim.getDateDataTypeClass()) || 
clazz.equals(DateWritable.class)) {
 
                        typeInfo = TypeInfoFactory.dateTypeInfo;
-               } else if (clazz.equals(Timestamp.class) || 
clazz.equals(TimestampWritable.class)) {
+               } else if (clazz.equals(hiveShim.getTimestampDataTypeClass()) 
|| clazz.equals(TimestampWritable.class)) {
 
                        typeInfo = TypeInfoFactory.timestampTypeInfo;
                } else if (clazz.equals(byte[].class) || 
clazz.equals(BytesWritable.class)) {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
index 343a299..ac623d7 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.java
@@ -78,7 +78,6 @@ import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEHISTORYFILELOC;
-import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORECONNECTURLKEY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
@@ -359,8 +358,9 @@ public class FlinkStandaloneHiveRunner extends 
BlockJUnit4ClassRunner {
                args.add(hiveCmdLineConfig(SCRATCHDIR.varname, 
outsideConf.getVar(SCRATCHDIR)));
                args.add(hiveCmdLineConfig(LOCALSCRATCHDIR.varname, 
outsideConf.getVar(LOCALSCRATCHDIR)));
                args.add(hiveCmdLineConfig(HIVEHISTORYFILELOC.varname, 
outsideConf.getVar(HIVEHISTORYFILELOC)));
-               
args.add(hiveCmdLineConfig(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS.varname,
-                               
String.valueOf(outsideConf.getBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS))));
+               // The following config is removed in Hive 3.1.0.
+               
args.add(hiveCmdLineConfig("hive.warehouse.subdir.inherit.perms",
+                               
String.valueOf(outsideConf.getBoolean("hive.warehouse.subdir.inherit.perms", 
true))));
                args.add(hiveCmdLineConfig("hadoop.tmp.dir", 
outsideConf.get("hadoop.tmp.dir")));
                args.add(hiveCmdLineConfig("test.log.dir", 
outsideConf.get("test.log.dir")));
                String metaStorageUrl = "jdbc:derby:memory:" + 
UUID.randomUUID().toString();
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
index 605a341..236f253 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/FlinkStandaloneHiveServerContext.java
@@ -43,7 +43,6 @@ import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_CBO_ENABLED;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY;
-import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.LOCALSCRATCHDIR;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREWAREHOUSE;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_VALIDATE_COLUMNS;
@@ -185,7 +184,8 @@ public class FlinkStandaloneHiveServerContext implements 
HiveServerContext {
                createAndSetFolderProperty(LOCALSCRATCHDIR, "localscratchdir", 
conf, basedir);
                createAndSetFolderProperty(HIVEHISTORYFILELOC, "tmp", conf, 
basedir);
 
-               conf.setBoolVar(HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS, true);
+               // HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS is removed from Hive 
3.1.0
+               conf.setBoolean("hive.warehouse.subdir.inherit.perms", true);
 
                createAndSetFolderProperty("hadoop.tmp.dir", "hadooptmp", conf, 
basedir);
                createAndSetFolderProperty("test.log.dir", "logs", conf, 
basedir);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
index ab0ee80..9a64238 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveRunnerShimLoader.java
@@ -50,6 +50,9 @@ public class HiveRunnerShimLoader {
                                case HiveShimLoader.HIVE_VERSION_V2_3_4:
                                case HiveShimLoader.HIVE_VERSION_V2_3_5:
                                case HiveShimLoader.HIVE_VERSION_V2_3_6:
+                               case HiveShimLoader.HIVE_VERSION_V3_1_0:
+                               case HiveShimLoader.HIVE_VERSION_V3_1_1:
+                               case HiveShimLoader.HIVE_VERSION_V3_1_2:
                                        return new HiveRunnerShimV4();
                                default:
                                        throw new RuntimeException("Unsupported 
Hive version " + v);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 058b5c4..938f794 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.functions.hive.util.TestGenericUDFArray;
 import org.apache.flink.table.functions.hive.util.TestGenericUDFStructSize;
 import org.apache.flink.table.types.DataType;
@@ -38,9 +41,8 @@ import 
org.apache.hadoop.hive.ql.udf.generic.GenericUDFStringToMap;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFStruct;
 import org.junit.Test;
 
+import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
 import java.util.HashMap;
 
 import static org.junit.Assert.assertEquals;
@@ -49,6 +51,7 @@ import static org.junit.Assert.assertEquals;
  * Test for {@link HiveGenericUDF}.
  */
 public class HiveGenericUDFTest {
+       private static HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
 
        @Test
        public void testAbs() {
@@ -108,7 +111,7 @@ public class HiveGenericUDFTest {
        }
 
        @Test
-       public void testDateFormat() {
+       public void testDateFormat() throws NoSuchMethodException, 
IllegalAccessException, InvocationTargetException {
                String constYear = "y";
                String constMonth = "M";
 
@@ -138,7 +141,7 @@ public class HiveGenericUDFTest {
                        }
                );
 
-               assertEquals("8", udf.eval(Date.valueOf("2019-08-31"), 
constMonth));
+               assertEquals("8", 
udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, "2019-08-31"), 
constMonth));
        }
 
        @Test
@@ -235,7 +238,7 @@ public class HiveGenericUDFTest {
        }
 
        @Test
-       public void testDataDiff() {
+       public void testDataDiff() throws NoSuchMethodException, 
IllegalAccessException, InvocationTargetException {
 
                String d = "1969-07-20";
                String t1 = "1969-07-20 00:00:00";
@@ -267,7 +270,8 @@ public class HiveGenericUDFTest {
                        }
                );
 
-               assertEquals(-4182, udf.eval(Date.valueOf(d), 
Timestamp.valueOf(t2)));
+               assertEquals(-4182, 
udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, d),
+                       HiveReflectionUtils.convertToHiveTimestamp(hiveShim, 
t2)));
 
                // Test invalid char length
                udf = init(
@@ -381,7 +385,7 @@ public class HiveGenericUDFTest {
        }
 
        private static HiveGenericUDF init(Class hiveUdfClass, Object[] 
constantArgs, DataType[] argTypes) {
-               HiveGenericUDF udf = new HiveGenericUDF(new 
HiveFunctionWrapper(hiveUdfClass.getName()));
+               HiveGenericUDF udf = new HiveGenericUDF(new 
HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
 
                udf.setArgumentTypesAndConstants(constantArgs, argTypes);
                udf.getHiveResultType(constantArgs, argTypes);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
index 71712b7..12cb080 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
@@ -47,6 +49,7 @@ import static org.junit.Assert.assertEquals;
  * Test for {@link HiveGenericUDTF}.
  */
 public class HiveGenericUDTFTest {
+       private static HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
 
        private static TestCollector collector;
 
@@ -193,12 +196,12 @@ public class HiveGenericUDTFTest {
        private static HiveGenericUDTF init(Class hiveUdfClass, Object[] 
constantArgs, DataType[] argTypes) throws Exception {
                HiveFunctionWrapper<GenericUDTF> wrapper = new 
HiveFunctionWrapper(hiveUdfClass.getName());
 
-               HiveGenericUDTF udf = new HiveGenericUDTF(wrapper);
+               HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim);
 
                udf.setArgumentTypesAndConstants(constantArgs, argTypes);
                udf.getHiveResultType(constantArgs, argTypes);
 
-               ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(constantArgs, argTypes);
+               ObjectInspector[] argumentInspectors = 
HiveInspectors.toInspectors(hiveShim, constantArgs, argTypes);
                ObjectInspector returnInspector = 
wrapper.createFunction().initialize(argumentInspectors);
 
                udf.open(null);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index 4bbe0d0..7e64bf7 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.functions.hive.util.TestHiveUDFArray;
 import org.apache.flink.table.types.DataType;
 
@@ -26,7 +29,6 @@ import org.apache.hadoop.hive.ql.udf.UDFBase64;
 import org.apache.hadoop.hive.ql.udf.UDFBin;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
 import org.apache.hadoop.hive.ql.udf.UDFJson;
-import org.apache.hadoop.hive.ql.udf.UDFMinute;
 import org.apache.hadoop.hive.ql.udf.UDFRand;
 import org.apache.hadoop.hive.ql.udf.UDFRegExpExtract;
 import org.apache.hadoop.hive.ql.udf.UDFToInteger;
@@ -36,8 +38,6 @@ import org.junit.Test;
 
 import java.io.UnsupportedEncodingException;
 import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -46,6 +46,8 @@ import static org.junit.Assert.assertTrue;
  * Test for {@link HiveSimpleUDF}.
  */
 public class HiveSimpleUDFTest {
+       private static HiveShim hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
+
        @Test
        public void testUDFRand() {
                HiveSimpleUDF udf = init(UDFRand.class, new DataType[0]);
@@ -122,20 +124,7 @@ public class HiveSimpleUDFTest {
        }
 
        @Test
-       public void testUDFMinute() {
-               HiveSimpleUDF udf = init(
-                       UDFMinute.class,
-                       new DataType[]{
-                               DataTypes.STRING()
-                       });
-
-               assertEquals(17, udf.eval("1969-07-20 20:17:40"));
-               assertEquals(17, udf.eval(Timestamp.valueOf("1969-07-20 
20:17:40")));
-               assertEquals(58, udf.eval("12:58:59"));
-       }
-
-       @Test
-       public void testUDFWeekOfYear() {
+       public void testUDFWeekOfYear() throws FlinkHiveUDFException {
                HiveSimpleUDF udf = init(
                        UDFWeekOfYear.class,
                        new DataType[]{
@@ -143,8 +132,8 @@ public class HiveSimpleUDFTest {
                        });
 
                assertEquals(29, udf.eval("1969-07-20"));
-               assertEquals(29, udf.eval(Date.valueOf("1969-07-20")));
-               assertEquals(29, udf.eval(Timestamp.valueOf("1969-07-20 
00:00:00")));
+               assertEquals(29, 
udf.eval(HiveReflectionUtils.convertToHiveDate(hiveShim, "1969-07-20")));
+               assertEquals(29, 
udf.eval(HiveReflectionUtils.convertToHiveTimestamp(hiveShim, "1969-07-20 
00:00:00")));
                assertEquals(1, udf.eval("1980-12-31 12:59:59"));
        }
 
@@ -230,7 +219,7 @@ public class HiveSimpleUDFTest {
        }
 
        protected static HiveSimpleUDF init(Class hiveUdfClass, DataType[] 
argTypes) {
-               HiveSimpleUDF udf = new HiveSimpleUDF(new 
HiveFunctionWrapper(hiveUdfClass.getName()));
+               HiveSimpleUDF udf = new HiveSimpleUDF(new 
HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
 
                // Hive UDF won't have literal args
                udf.setArgumentTypesAndConstants(new Object[0], argTypes);

Reply via email to