This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c91cc59998 [FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive 
dialect
6c91cc59998 is described below

commit 6c91cc5999828f0a61f8a54498bc71a581a05dd8
Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Aug 9 14:46:37 2022 +0800

    [FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive dialect
    
    This closes #19556
---
 flink-connectors/flink-connector-hive/pom.xml      |   7 +
 .../flink/table/catalog/hive/HiveCatalog.java      |  42 +++
 .../hive/client/HiveMetastoreClientWrapper.java    |  26 ++
 .../flink/table/catalog/hive/client/HiveShim.java  |  13 +
 .../table/catalog/hive/client/HiveShimV100.java    |  75 ++++++
 .../table/catalog/hive/client/HiveShimV200.java    |  62 +++++
 .../table/catalog/hive/client/HiveShimV210.java    |  74 ++++++
 .../table/catalog/hive/client/HiveShimV310.java    | 132 ++++++++-
 .../delegation/hive/HiveOperationExecutor.java     |  93 +++++++
 .../table/planner/delegation/hive/HiveParser.java  | 156 +----------
 .../planner/delegation/hive/HiveSessionState.java  | 171 ++++++++++++
 .../delegation/hive/SqlFunctionConverter.java      |   3 +-
 .../delegation/hive/copy/HiveParserContext.java    |   2 +-
 .../hive/operation/HiveLoadDataOperation.java      |  99 +++++++
 .../hive/parse/HiveParserLoadSemanticAnalyzer.java | 294 +++++++++++++++++++++
 .../connectors/hive/HiveDialectQueryITCase.java    |  99 +++++--
 .../src/test/resources/explain/testLoadData.out    |   8 +
 17 files changed, 1189 insertions(+), 167 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index f42554dfae0..6fd4f86c199 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -891,6 +891,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.hadoop</groupId>
+                       <artifactId>hadoop-hdfs</artifactId>
+                       <version>${hivemetastore.hadoop.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <!-- ArchUit test dependencies -->
 
                <dependency>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
index c72ebbe8edc..dee28e2a58f 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog.hive;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils;
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils;
@@ -103,6 +104,7 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,6 +120,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -1769,6 +1772,45 @@ public class HiveCatalog extends AbstractCatalog {
         return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key()));
     }
 
+    @Internal
+    public void loadTable(
+            Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean 
isSrcLocal) {
+        try {
+            client.loadTable(loadPath, tablePath.getFullName(), isOverwrite, 
isSrcLocal);
+        } catch (HiveException e) {
+            throw new FlinkHiveException("Fail to load table.", e);
+        }
+    }
+
+    @Internal
+    public void loadPartition(
+            Path loadPath,
+            ObjectPath tablePath,
+            Map<String, String> partSpec,
+            boolean isOverwrite,
+            boolean isSrcLocal) {
+        Table hiveTable;
+        Map<String, String> orderedPartitionSpec = new LinkedHashMap<>();
+        try {
+            hiveTable = getHiveTable(tablePath);
+        } catch (TableNotExistException e) {
+            throw new FlinkHiveException("Fail to get Hive table when try to 
load partition", e);
+        }
+        hiveTable
+                .getPartitionKeys()
+                .forEach(
+                        column ->
+                                orderedPartitionSpec.put(
+                                        column.getName(), 
partSpec.get(column.getName())));
+        client.loadPartition(
+                loadPath,
+                tablePath.getFullName(),
+                orderedPartitionSpec,
+                hiveTable.getSd().isStoredAsSubDirectories(),
+                isOverwrite,
+                isSrcLocal);
+    }
+
     private static void disallowChangeCatalogTableType(
             Map<String, String> existingTableOptions, Map<String, String> 
newTableOptions) {
         CatalogTableType existingTableType = 
getCatalogTableType(existingTableOptions);
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
index d7f48e8d129..3acc9860025 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -47,6 +49,8 @@ import 
org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +71,7 @@ public class HiveMetastoreClientWrapper implements 
AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveMetastoreClientWrapper.class);
 
     private final IMetaStoreClient client;
+    private final Hive hive;
     private final HiveConf hiveConf;
     private final HiveShim hiveShim;
 
@@ -82,6 +87,11 @@ public class HiveMetastoreClientWrapper implements 
AutoCloseable {
                 HiveCatalog.isEmbeddedMetastore(hiveConf)
                         ? createMetastoreClient()
                         : 
HiveMetaStoreClient.newSynchronizedClient(createMetastoreClient());
+        try {
+            this.hive = Hive.get(hiveConf);
+        } catch (HiveException e) {
+            throw new FlinkHiveException(e);
+        }
     }
 
     @Override
@@ -336,4 +346,20 @@ public class HiveMetastoreClientWrapper implements 
AutoCloseable {
     public void unlock(long lockid) throws NoSuchLockException, 
TxnOpenException, TException {
         client.unlock(lockid);
     }
+
+    public void loadTable(Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal)
+            throws HiveException {
+        hiveShim.loadTable(hive, loadPath, tableName, replace, isSrcLocal);
+    }
+
+    public void loadPartition(
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal) {
+        hiveShim.loadPartition(
+                hive, loadPath, tableName, partSpec, isSkewedStoreAsSubdir, 
replace, isSrcLocal);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 8e3e3cf2608..58a0e98b75b 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hive.metastore.api.UnknownDBException;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionInfo;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
@@ -52,6 +53,7 @@ import java.io.Serializable;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -241,4 +243,15 @@ public interface HiveShim extends Serializable {
     }
 
     void registerTemporaryFunction(String funcName, Class funcClass);
+
+    void loadPartition(
+            Hive hive,
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal);
+
+    void loadTable(Hive hive, Path loadPath, String tableName, boolean 
replace, boolean isSrcLocal);
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index 955c0e50fa4..976e0e00a3e 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -86,6 +87,7 @@ import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -94,6 +96,11 @@ import java.util.stream.Collectors;
 /** Shim for Hive version 1.0.0. */
 public class HiveShimV100 implements HiveShim {
 
+    protected final boolean holdDDLTime = false;
+    protected final boolean isAcid = false;
+    protected final boolean inheritTableSpecs = true;
+    protected final boolean isSkewedStoreAsSubdir = false;
+
     private static final Method registerTemporaryFunction =
             HiveReflectionUtils.tryGetMethod(
                     FunctionRegistry.class,
@@ -435,6 +442,74 @@ public class HiveShimV100 implements HiveShim {
         }
     }
 
+    @Override
+    public void loadPartition(
+            Hive hive,
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadPartitionMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadPartition",
+                            Path.class,
+                            String.class,
+                            Map.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadPartitionMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    partSpec,
+                    replace,
+                    holdDDLTime,
+                    inheritTableSpecs,
+                    isSkewedStoreAsSubdir,
+                    isSrcLocal,
+                    isAcid);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load partition", e);
+        }
+    }
+
+    @Override
+    public void loadTable(
+            Hive hive, Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadTableMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadTable",
+                            Path.class,
+                            String.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadTableMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    replace,
+                    holdDDLTime,
+                    isSrcLocal,
+                    isSkewedStoreAsSubdir,
+                    isAcid);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load table", e);
+        }
+    }
+
     boolean isBuiltInFunctionInfo(FunctionInfo info) {
         return info.isNative();
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
index 3fa8d83da65..5ef79d611a7 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.catalog.hive.client;
 
 import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.orc.vector.RowDataVectorizer;
 import org.apache.flink.orc.writer.OrcBulkWriterFactory;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -26,12 +27,15 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 
 import java.lang.reflect.Method;
+import java.util.Map;
 import java.util.Properties;
 
 /** Shim for Hive version 2.0.0. */
@@ -68,4 +72,62 @@ public class HiveShimV200 extends HiveShimV122 {
         return new OrcBulkWriterFactory<>(
                 new RowDataVectorizer(schema, fieldTypes), new Properties(), 
conf);
     }
+
+    @Override
+    public void loadTable(
+            Hive hive, Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadTableMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadTable",
+                            Path.class,
+                            String.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadTableMethod.invoke(
+                    hive, loadPath, tableName, replace, isSrcLocal, 
isSkewedStoreAsSubdir, isAcid);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load table", e);
+        }
+    }
+
+    @Override
+    public void loadPartition(
+            Hive hive,
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadPartitionMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadPartition",
+                            Path.class,
+                            String.class,
+                            Map.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadPartitionMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    partSpec,
+                    replace,
+                    inheritTableSpecs,
+                    isSkewedStoreAsSubdir,
+                    isSrcLocal,
+                    isAcid);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load partition", e);
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
index f32371b23e7..1daa75fa362 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog.hive.client;
 
+import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
@@ -25,12 +26,14 @@ import 
org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 
@@ -40,11 +43,14 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /** Shim for Hive version 2.1.0. */
 public class HiveShimV210 extends HiveShimV201 {
 
+    protected final boolean hasFollowingStatsTask = false;
+
     @Override
     public void alterPartition(
             IMetaStoreClient client, String databaseName, String tableName, 
Partition partition)
@@ -211,4 +217,72 @@ public class HiveShimV210 extends HiveShimV201 {
         }
         return res;
     }
+
+    @Override
+    public void loadTable(
+            Hive hive, Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadTableMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadTable",
+                            Path.class,
+                            String.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadTableMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    replace,
+                    isSrcLocal,
+                    isSkewedStoreAsSubdir,
+                    isAcid,
+                    hasFollowingStatsTask);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load table", e);
+        }
+    }
+
+    @Override
+    public void loadPartition(
+            Hive hive,
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            Method loadPartitionMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadPartition",
+                            Path.class,
+                            String.class,
+                            Map.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class);
+            loadPartitionMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    partSpec,
+                    replace,
+                    inheritTableSpecs,
+                    isSkewedStoreAsSubdir,
+                    isSrcLocal,
+                    isAcid,
+                    hasFollowingStatsTask);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load partition", e);
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index 0aeccf927bf..f01d5329ed1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -26,10 +26,12 @@ import 
org.apache.flink.table.catalog.hive.util.HiveTableUtil;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.io.Writable;
 
 import java.lang.reflect.Constructor;
@@ -41,9 +43,11 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
@@ -62,7 +66,14 @@ public class HiveShimV310 extends HiveShimV239 {
     private static Field hiveDateLocalDate;
     private static Constructor dateWritableConstructor;
 
-    private static boolean hiveClassesInited;
+    private static volatile boolean hiveClassesInited;
+
+    // LoadFileType class
+    private static volatile boolean loadFileClassInited;
+    private static Class clazzLoadFileType;
+
+    protected final long writeIdInLoadTableOrPartition = 0L;
+    protected final int stmtIdInLoadTableOrPartition = 0;
 
     private static void initDateTimeClasses() {
         if (!hiveClassesInited) {
@@ -102,6 +113,23 @@ public class HiveShimV310 extends HiveShimV239 {
         }
     }
 
+    private static void initLoadFileTypeClass() {
+        if (!loadFileClassInited) {
+            synchronized (HiveShimV310.class) {
+                if (!loadFileClassInited) {
+                    try {
+                        clazzLoadFileType =
+                                Class.forName(
+                                        
"org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType");
+                    } catch (ClassNotFoundException e) {
+                        throw new FlinkHiveException("Failed to get Hive 
LoadFileType class", e);
+                    }
+                    loadFileClassInited = true;
+                }
+            }
+        }
+    }
+
     @Override
     public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
         try {
@@ -319,6 +347,90 @@ public class HiveShimV310 extends HiveShimV239 {
         }
     }
 
+    @Override
+    public void loadTable(
+            Hive hive, Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal) {
+        try {
+            Class hiveClass = Hive.class;
+            initLoadFileTypeClass();
+            Method loadTableMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadTable",
+                            Path.class,
+                            String.class,
+                            clazzLoadFileType,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            long.class,
+                            int.class,
+                            boolean.class);
+            loadTableMethod.invoke(
+                    hive,
+                    loadPath,
+                    tableName,
+                    getLoadFileType(clazzLoadFileType, replace),
+                    isSrcLocal,
+                    isSkewedStoreAsSubdir,
+                    isAcid,
+                    hasFollowingStatsTask,
+                    writeIdInLoadTableOrPartition,
+                    stmtIdInLoadTableOrPartition,
+                    replace);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load table", e);
+        }
+    }
+
+    @Override
+    public void loadPartition(
+            Hive hive,
+            Path loadPath,
+            String tableName,
+            Map<String, String> partSpec,
+            boolean isSkewedStoreAsSubdir,
+            boolean replace,
+            boolean isSrcLocal) {
+        try {
+            initLoadFileTypeClass();
+            Class hiveClass = Hive.class;
+            Method loadPartitionMethod =
+                    hiveClass.getDeclaredMethod(
+                            "loadPartition",
+                            Path.class,
+                            org.apache.hadoop.hive.ql.metadata.Table.class,
+                            Map.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            boolean.class,
+                            long.class,
+                            int.class,
+                            boolean.class);
+            org.apache.hadoop.hive.ql.metadata.Table table = 
hive.getTable(tableName);
+            long writeIdInLoadTableOrPartition = 0L;
+            int stmtIdInLoadTableOrPartition = 0;
+            loadPartitionMethod.invoke(
+                    hive,
+                    loadPath,
+                    table,
+                    partSpec,
+                    getLoadFileType(clazzLoadFileType, replace),
+                    inheritTableSpecs,
+                    isSkewedStoreAsSubdir,
+                    isSrcLocal,
+                    isAcid,
+                    hasFollowingStatsTask,
+                    writeIdInLoadTableOrPartition,
+                    stmtIdInLoadTableOrPartition,
+                    replace);
+        } catch (Exception e) {
+            throw new FlinkHiveException("Failed to load partition", e);
+        }
+    }
+
     List<Object> createHiveNNs(
             Table table, Configuration conf, List<String> nnCols, List<Byte> 
traits)
             throws ClassNotFoundException, NoSuchMethodException, 
InvocationTargetException,
@@ -378,4 +490,22 @@ public class HiveShimV310 extends HiveShimV239 {
                         new Class[] {Configuration.class},
                         new Object[] {conf});
     }
+
+    Object getLoadFileType(Class clazzLoadFileType, boolean replace) {
+        Object loadFileType;
+        if (replace) {
+            loadFileType =
+                    Arrays.stream(clazzLoadFileType.getEnumConstants())
+                            .filter(s -> s.toString().equals("REPLACE_ALL"))
+                            .findFirst()
+                            .get();
+        } else {
+            loadFileType =
+                    Arrays.stream(clazzLoadFileType.getEnumConstants())
+                            .filter(s -> s.toString().equals("KEEP_EXISTING"))
+                            .findFirst()
+                            .get();
+        }
+        return loadFileType;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
index 459a1bbd7dd..2010062fe4c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -31,10 +31,12 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.delegation.ExtendedOperationExecutor;
+import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.HiveSetOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor;
+import 
org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -65,6 +67,14 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
     public Optional<TableResultInternal> executeOperation(Operation operation) 
{
         if (operation instanceof HiveSetOperation) {
             return executeHiveSetOperation((HiveSetOperation) operation);
+        } else if (operation instanceof HiveLoadDataOperation) {
+            return executeHiveLoadDataOperation((HiveLoadDataOperation) 
operation);
+        } else if (operation instanceof ExplainOperation) {
+            ExplainOperation explainOperation = (ExplainOperation) operation;
+            if (explainOperation.getChild() instanceof HiveLoadDataOperation) {
+                return explainHiveLoadDataOperation(
+                        (HiveLoadDataOperation) explainOperation.getChild());
+            }
         }
         return Optional.empty();
     }
@@ -123,4 +133,87 @@ public class HiveOperationExecutor implements 
ExtendedOperationExecutor {
                 .data(rows)
                 .build();
     }
+
+    private Optional<TableResultInternal> executeHiveLoadDataOperation(
+            HiveLoadDataOperation hiveLoadDataOperation) {
+        Catalog currentCatalog =
+                
catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null);
+        if (!(currentCatalog instanceof HiveCatalog)) {
+            throw new FlinkHiveException(
+                    "Only support 'LOAD DATA INPATH' when the current catalog 
is HiveCatalog in Hive dialect.");
+        }
+        try {
+            // Hive's loadTable/loadPartition will call method
+            // SessionState.get().getCurrentDatabase(), so we have to start a 
session state
+            HiveSessionState.startSessionState(
+                    ((HiveCatalog) currentCatalog).getHiveConf(), 
catalogManager);
+            HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog;
+            if (hiveLoadDataOperation.getPartitionSpec().size() > 0) {
+                hiveCatalog.loadPartition(
+                        hiveLoadDataOperation.getPath(),
+                        hiveLoadDataOperation.getTablePath(),
+                        hiveLoadDataOperation.getPartitionSpec(),
+                        hiveLoadDataOperation.isSrcLocal(),
+                        hiveLoadDataOperation.isOverwrite());
+            } else {
+                hiveCatalog.loadTable(
+                        hiveLoadDataOperation.getPath(),
+                        hiveLoadDataOperation.getTablePath(),
+                        hiveLoadDataOperation.isSrcLocal(),
+                        hiveLoadDataOperation.isOverwrite());
+            }
+            return Optional.of(TableResultImpl.TABLE_RESULT_OK);
+        } finally {
+            HiveSessionState.clearSessionState();
+        }
+    }
+
+    private Optional<TableResultInternal> explainHiveLoadDataOperation(
+            HiveLoadDataOperation hiveLoadDataOperation) {
+        // get the plan for the partition part
+        String partitionExplain = "";
+        Map<String, String> partitionSpec = 
hiveLoadDataOperation.getPartitionSpec();
+        if (!partitionSpec.isEmpty()) {
+            String[] pv = new String[partitionSpec.size()];
+            int i = 0;
+            for (Map.Entry<String, String> partition : 
partitionSpec.entrySet()) {
+                pv[i++] = String.format("%s=%s", partition.getKey(), 
partition.getValue());
+            }
+            partitionExplain = String.format(", partition=[%s]", 
String.join(", ", pv));
+        }
+        // construct the full plan
+        String plan =
+                String.format(
+                        "LoadData(filepath=[%s], "
+                                + "table=[%s],"
+                                + " overwrite=[%s], local=[%s]%s)",
+                        hiveLoadDataOperation.getPath(),
+                        hiveLoadDataOperation.getTablePath(),
+                        hiveLoadDataOperation.isOverwrite(),
+                        hiveLoadDataOperation.isSrcLocal(),
+                        partitionExplain);
+
+        String explanation =
+                "== Abstract Syntax Tree =="
+                        + System.lineSeparator()
+                        + plan
+                        + System.lineSeparator()
+                        + System.lineSeparator()
+                        + "== Optimized Physical Plan =="
+                        + System.lineSeparator()
+                        + plan
+                        + System.lineSeparator()
+                        + System.lineSeparator()
+                        + "== Optimized Execution Plan =="
+                        + System.lineSeparator()
+                        + plan
+                        + System.lineSeparator();
+
+        return Optional.of(
+                TableResultImpl.builder()
+                        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+                        .schema(ResolvedSchema.of(Column.physical("result", 
DataTypes.STRING())))
+                        .data(Collections.singletonList(Row.of(explanation)))
+                        .build());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index 1fb2bf88ce0..4214dbda09c 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.planner.delegation.hive;
 
-import org.apache.flink.connectors.hive.FlinkHiveException;
 import org.apache.flink.connectors.hive.HiveInternalOptions;
 import org.apache.flink.table.api.SqlParserException;
 import org.apache.flink.table.api.TableConfig;
@@ -28,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.HiveSetOperation;
@@ -48,31 +46,22 @@ import 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState;
 import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
 import 
org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
 import 
org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer;
+import 
org.apache.flink.table.planner.delegation.hive.parse.HiveParserLoadSemanticAnalyzer;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
 import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
-import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.tools.FrameworkConfig;
-import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.VariableSubstitution;
-import org.apache.hadoop.hive.ql.lockmgr.LockException;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.processors.HiveCommand;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.sql.Timestamp;
-import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -90,13 +79,6 @@ public class HiveParser extends ParserImpl {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HiveParser.class);
 
-    private static final Method setCurrentTSMethod =
-            HiveReflectionUtils.tryGetMethod(
-                    SessionState.class, "setupQueryCurrentTimestamp", new 
Class[0]);
-    private static final Method getCurrentTSMethod =
-            HiveReflectionUtils.tryGetMethod(
-                    SessionState.class, "getQueryCurrentTimestamp", new 
Class[0]);
-
     // need to maintain the HiveParserASTNode types for DDLs
     private static final Set<Integer> DDL_NODES;
 
@@ -225,12 +207,12 @@ public class HiveParser extends ParserImpl {
             // substitute variables for the statement
             statement = substituteVariables(hiveConf, statement);
             // creates SessionState
-            startSessionState(hiveConf, catalogManager);
+            HiveSessionState.startSessionState(hiveConf, catalogManager);
             // We override Hive's grouping function. Refer to the 
implementation for more details.
             hiveShim.registerTemporaryFunction("grouping", 
HiveGenericUDFGrouping.class);
             return processCmd(statement, hiveConf, hiveShim, (HiveCatalog) 
currentCatalog);
         } finally {
-            clearSessionState();
+            HiveSessionState.clearSessionState();
         }
     }
 
@@ -389,6 +371,12 @@ public class HiveParser extends ParserImpl {
             HiveShim hiveShim,
             HiveParserASTNode input)
             throws SemanticException {
+        if (isLoadData(input)) {
+            HiveParserLoadSemanticAnalyzer loadSemanticAnalyzer =
+                    new HiveParserLoadSemanticAnalyzer(
+                            hiveConf, frameworkConfig, 
plannerContext.getCluster());
+            return loadSemanticAnalyzer.convertToOperation(input);
+        }
         if (isMultiDestQuery(input)) {
             return processMultiDestQuery(context, hiveConf, hiveShim, input);
         } else {
@@ -396,6 +384,10 @@ public class HiveParser extends ParserImpl {
         }
     }
 
+    private boolean isLoadData(HiveParserASTNode input) {
+        return input.getType() == HiveASTParser.TOK_LOAD;
+    }
+
     private boolean isMultiDestQuery(HiveParserASTNode astNode) {
         // Hive's multi dest insert will always be [FROM, INSERT+]
         // so, if it's children count is more than 2, it should be a 
multi-dest query
@@ -481,126 +473,4 @@ public class HiveParser extends ParserImpl {
             return new PlannerQueryOperation(relNode);
         }
     }
-
-    private void startSessionState(HiveConf hiveConf, CatalogManager 
catalogManager) {
-        final ClassLoader contextCL = 
Thread.currentThread().getContextClassLoader();
-        try {
-            HiveParserSessionState sessionState = new 
HiveParserSessionState(hiveConf, contextCL);
-            sessionState.initTxnMgr(hiveConf);
-            
sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase());
-            // some Hive functions needs the timestamp
-            setCurrentTimestamp(sessionState);
-            SessionState.setCurrentSessionState(sessionState);
-        } catch (LockException e) {
-            throw new FlinkHiveException("Failed to init SessionState", e);
-        } finally {
-            // don't let SessionState mess up with our context classloader
-            Thread.currentThread().setContextClassLoader(contextCL);
-        }
-    }
-
-    private static void setCurrentTimestamp(HiveParserSessionState 
sessionState) {
-        if (setCurrentTSMethod != null) {
-            try {
-                setCurrentTSMethod.invoke(sessionState);
-                Object currentTs = getCurrentTSMethod.invoke(sessionState);
-                if (currentTs instanceof Instant) {
-                    sessionState.hiveParserCurrentTS = 
Timestamp.from((Instant) currentTs);
-                } else {
-                    sessionState.hiveParserCurrentTS = (Timestamp) currentTs;
-                }
-            } catch (IllegalAccessException | InvocationTargetException e) {
-                throw new FlinkHiveException("Failed to set current timestamp 
for session", e);
-            }
-        } else {
-            sessionState.hiveParserCurrentTS = new 
Timestamp(System.currentTimeMillis());
-        }
-    }
-
-    private void clearSessionState() {
-        SessionState sessionState = SessionState.get();
-        if (sessionState != null) {
-            try {
-                sessionState.close();
-            } catch (Exception e) {
-                LOG.warn("Error closing SessionState", e);
-            }
-        }
-    }
-
-    /** Sub-class of SessionState to meet our needs. */
-    public static class HiveParserSessionState extends SessionState {
-
-        private static final Class registryClz;
-        private static final Method getRegistry;
-        private static final Method clearRegistry;
-        private static final Method closeRegistryLoaders;
-
-        private Timestamp hiveParserCurrentTS;
-
-        static {
-            registryClz =
-                    
HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry");
-            if (registryClz != null) {
-                getRegistry =
-                        HiveReflectionUtils.tryGetMethod(
-                                SessionState.class, "getRegistry", new 
Class[0]);
-                clearRegistry =
-                        HiveReflectionUtils.tryGetMethod(registryClz, "clear", 
new Class[0]);
-                closeRegistryLoaders =
-                        HiveReflectionUtils.tryGetMethod(
-                                registryClz, "closeCUDFLoaders", new Class[0]);
-            } else {
-                getRegistry = null;
-                clearRegistry = null;
-                closeRegistryLoaders = null;
-            }
-        }
-
-        private final ClassLoader originContextLoader;
-        private final ClassLoader hiveLoader;
-
-        public HiveParserSessionState(HiveConf conf, ClassLoader 
contextLoader) {
-            super(conf);
-            this.originContextLoader = contextLoader;
-            this.hiveLoader = getConf().getClassLoader();
-            // added jars are handled by context class loader, so we always 
use it as the session
-            // class loader
-            getConf().setClassLoader(contextLoader);
-        }
-
-        @Override
-        public void close() throws IOException {
-            clearSessionRegistry();
-            if (getTxnMgr() != null) {
-                getTxnMgr().closeTxnManager();
-            }
-            // close the classloader created in hive
-            JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader);
-            File resourceDir =
-                    new 
File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
-            LOG.debug("Removing resource dir " + resourceDir);
-            FileUtils.deleteDirectoryQuietly(resourceDir);
-            Hive.closeCurrent();
-            detachSession();
-        }
-
-        public Timestamp getHiveParserCurrentTS() {
-            return hiveParserCurrentTS;
-        }
-
-        private void clearSessionRegistry() {
-            if (getRegistry != null) {
-                try {
-                    Object registry = getRegistry.invoke(this);
-                    if (registry != null) {
-                        clearRegistry.invoke(registry);
-                        closeRegistryLoaders.invoke(registry);
-                    }
-                } catch (IllegalAccessException | InvocationTargetException e) 
{
-                    LOG.warn("Failed to clear session registry", e);
-                }
-            }
-        }
-    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
new file mode 100644
index 00000000000..5bfa16481d9
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.connectors.hive.FlinkHiveException;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
+import org.apache.flink.util.FileUtils;
+
+import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Timestamp;
+import java.time.Instant;
+
+/** Sub-class of SessionState to meet our needs. */
+public class HiveSessionState extends SessionState {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HiveSessionState.class);
+
+    private static final Method setCurrentTSMethod =
+            HiveReflectionUtils.tryGetMethod(
+                    SessionState.class, "setupQueryCurrentTimestamp", new 
Class[0]);
+    private static final Method getCurrentTSMethod =
+            HiveReflectionUtils.tryGetMethod(
+                    SessionState.class, "getQueryCurrentTimestamp", new 
Class[0]);
+
+    private static final Class registryClz;
+    private static final Method getRegistry;
+    private static final Method clearRegistry;
+    private static final Method closeRegistryLoaders;
+
+    private Timestamp hiveParserCurrentTS;
+
+    static {
+        registryClz = 
HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry");
+        if (registryClz != null) {
+            getRegistry =
+                    HiveReflectionUtils.tryGetMethod(
+                            SessionState.class, "getRegistry", new Class[0]);
+            clearRegistry = HiveReflectionUtils.tryGetMethod(registryClz, 
"clear", new Class[0]);
+            closeRegistryLoaders =
+                    HiveReflectionUtils.tryGetMethod(registryClz, 
"closeCUDFLoaders", new Class[0]);
+        } else {
+            getRegistry = null;
+            clearRegistry = null;
+            closeRegistryLoaders = null;
+        }
+    }
+
+    private final ClassLoader originContextLoader;
+    private final ClassLoader hiveLoader;
+
+    public HiveSessionState(HiveConf conf, ClassLoader contextLoader) {
+        super(conf);
+        this.originContextLoader = contextLoader;
+        this.hiveLoader = getConf().getClassLoader();
+        // added jars are handled by context class loader, so we always use it 
as the session
+        // class loader
+        getConf().setClassLoader(contextLoader);
+    }
+
+    @Override
+    public void close() throws IOException {
+        clearSessionRegistry();
+        if (getTxnMgr() != null) {
+            getTxnMgr().closeTxnManager();
+        }
+        // close the classloader created in hive
+        JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader);
+        File resourceDir = new 
File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
+        LOG.debug("Removing resource dir " + resourceDir);
+        FileUtils.deleteDirectoryQuietly(resourceDir);
+        Hive.closeCurrent();
+        detachSession();
+    }
+
+    public void setHiveParserCurrentTSCurrentTS(Timestamp hiveParserCurrentTS) 
{
+        this.hiveParserCurrentTS = hiveParserCurrentTS;
+    }
+
+    public Timestamp getHiveParserCurrentTS() {
+        return hiveParserCurrentTS;
+    }
+
+    private void clearSessionRegistry() {
+        if (getRegistry != null) {
+            try {
+                Object registry = getRegistry.invoke(this);
+                if (registry != null) {
+                    clearRegistry.invoke(registry);
+                    closeRegistryLoaders.invoke(registry);
+                }
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                LOG.warn("Failed to clear session registry", e);
+            }
+        }
+    }
+
+    public static void startSessionState(HiveConf hiveConf, CatalogManager 
catalogManager) {
+        final ClassLoader contextCL = 
Thread.currentThread().getContextClassLoader();
+        try {
+            HiveSessionState sessionState = new HiveSessionState(hiveConf, 
contextCL);
+            sessionState.initTxnMgr(hiveConf);
+            
sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase());
+            // some Hive functions needs the timestamp
+            setCurrentTimestamp(sessionState);
+            SessionState.setCurrentSessionState(sessionState);
+        } catch (LockException e) {
+            throw new FlinkHiveException("Failed to init SessionState", e);
+        } finally {
+            // don't let SessionState mess up with our context classloader
+            Thread.currentThread().setContextClassLoader(contextCL);
+        }
+    }
+
+    private static void setCurrentTimestamp(HiveSessionState sessionState) {
+        if (setCurrentTSMethod != null) {
+            try {
+                setCurrentTSMethod.invoke(sessionState);
+                Object currentTs = getCurrentTSMethod.invoke(sessionState);
+                if (currentTs instanceof Instant) {
+                    sessionState.setHiveParserCurrentTSCurrentTS(
+                            Timestamp.from((Instant) currentTs));
+                } else {
+                    sessionState.setHiveParserCurrentTSCurrentTS((Timestamp) 
currentTs);
+                }
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                throw new FlinkHiveException("Failed to set current timestamp 
for session", e);
+            }
+        } else {
+            sessionState.setHiveParserCurrentTSCurrentTS(new 
Timestamp(System.currentTimeMillis()));
+        }
+    }
+
+    public static void clearSessionState() {
+        SessionState sessionState = SessionState.get();
+        if (sessionState != null) {
+            try {
+                sessionState.close();
+            } catch (Exception e) {
+                LOG.warn("Error closing SessionState", e);
+            }
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
index c9bf54febee..a154cdc6afa 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java
@@ -97,8 +97,7 @@ public class SqlFunctionConverter extends RexShuttle {
             if (convertedOp instanceof FlinkSqlTimestampFunction) {
                 // flink's current_timestamp has different type from hive's, 
convert it to a literal
                 Timestamp currentTS =
-                        ((HiveParser.HiveParserSessionState) 
SessionState.get())
-                                .getHiveParserCurrentTS();
+                        ((HiveSessionState) 
SessionState.get()).getHiveParserCurrentTS();
                 HiveShim hiveShim = HiveParserUtils.getSessionHiveShim();
                 try {
                     return HiveParserRexNodeConverter.convertConstant(
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
index f928be68724..33fd94fbe44 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java
@@ -82,7 +82,7 @@ public class HiveParserContext {
      */
     public HiveParserContext(Configuration conf) {
         this.conf = conf;
-        viewsTokenRewriteStreams = new HashMap<>();
+        this.viewsTokenRewriteStreams = new HashMap<>();
     }
 
     // Find whether we should execute the current query due to explain.
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
new file mode 100644
index 00000000000..339985336b5
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.operation;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.operations.Operation;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.Map;
+
+/**
+ * An operation that loads data into a Hive table.
+ *
+ * <pre>The syntax is: {@code
+ * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
+ * [PARTITION (partcol1=val1, partcol2=val2 ...)]
+ * }
+ * </pre>
+ */
+public class HiveLoadDataOperation implements Operation {
+    private final Path path;
+    private final ObjectPath tablePath;
+    private final boolean isOverwrite;
+    private final boolean isSrcLocal;
+    private final Map<String, String> partitionSpec;
+
+    public HiveLoadDataOperation(
+            Path path,
+            ObjectPath tablePath,
+            boolean isOverwrite,
+            boolean isSrcLocal,
+            Map<String, String> partitionSpec) {
+        this.path = path;
+        this.tablePath = tablePath;
+        this.isOverwrite = isOverwrite;
+        this.isSrcLocal = isSrcLocal;
+        this.partitionSpec = partitionSpec;
+    }
+
+    public Path getPath() {
+        return path;
+    }
+
+    public ObjectPath getTablePath() {
+        return tablePath;
+    }
+
+    public boolean isOverwrite() {
+        return isOverwrite;
+    }
+
+    public boolean isSrcLocal() {
+        return isSrcLocal;
+    }
+
+    public Map<String, String> getPartitionSpec() {
+        return partitionSpec;
+    }
+
+    @Override
+    public String asSummaryString() {
+        StringBuilder stringBuilder = new StringBuilder("LOAD DATA");
+        if (isSrcLocal) {
+            stringBuilder.append(" LOCAL");
+        }
+        stringBuilder
+                .append(" INPATH")
+                .append(String.format(" '%s'", path))
+                .append(isOverwrite ? " OVERWRITE" : "")
+                .append(" INTO TABLE ")
+                .append(tablePath.getFullName());
+        if (partitionSpec.size() > 0) {
+            String[] pv = new String[partitionSpec.size()];
+            int i = 0;
+            for (Map.Entry<String, String> partition : 
partitionSpec.entrySet()) {
+                pv[i++] = String.format("%s=%s", partition.getKey(), 
partition.getValue());
+            }
+            stringBuilder.append(" PARTITION (").append(String.join(", ", 
pv)).append(")");
+        }
+        return stringBuilder.toString();
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
new file mode 100644
index 00000000000..81047f2e5d6
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive.parse;
+
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode;
+import 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec;
+import 
org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation;
+import org.apache.flink.util.StringUtils;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.net.URLCodec;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.mapred.InputFormat;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static 
org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.stripQuotes;
+
+/** Ported hive's org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer. */
+public class HiveParserLoadSemanticAnalyzer {
+
+    private final HiveConf conf;
+    private final Hive db;
+    private final FrameworkConfig frameworkConfig;
+    private final RelOptCluster cluster;
+
+    public HiveParserLoadSemanticAnalyzer(
+            HiveConf conf, FrameworkConfig frameworkConfig, RelOptCluster 
cluster)
+            throws SemanticException {
+        this.conf = conf;
+        try {
+            this.db = Hive.get(conf);
+        } catch (HiveException e) {
+            throw new SemanticException(e);
+        }
+        this.frameworkConfig = frameworkConfig;
+        this.cluster = cluster;
+    }
+
+    public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast)
+            throws SemanticException {
+        boolean isLocal = false;
+        boolean isOverWrite = false;
+        Tree fromTree = ast.getChild(0);
+        HiveParserASTNode tableTree = (HiveParserASTNode) ast.getChild(1);
+
+        if (ast.getChildCount() == 4) {
+            isLocal = true;
+            isOverWrite = true;
+        }
+
+        if (ast.getChildCount() == 3) {
+            if (ast.getChild(2).getText().equalsIgnoreCase("local")) {
+                isLocal = true;
+            } else {
+                isOverWrite = true;
+            }
+        }
+
+        // initialize load path
+        URI fromURI;
+        try {
+            String fromPath = stripQuotes(fromTree.getText());
+            fromURI = initializeFromURI(fromPath, isLocal);
+        } catch (IOException | URISyntaxException e) {
+            throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, 
e.getMessage()), e);
+        }
+
+        // initialize destination table/partition
+        TableSpec ts = new TableSpec(db, conf, tableTree, frameworkConfig, 
cluster);
+
+        if (ts.tableHandle.isView() || ts.tableHandle.isMaterializedView()) {
+            throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg());
+        }
+        if (ts.tableHandle.isNonNative()) {
+            throw new 
SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg());
+        }
+
+        if (ts.tableHandle.isStoredAsSubDirectories()) {
+            throw new 
SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg());
+        }
+
+        List<FieldSchema> parts = ts.tableHandle.getPartitionKeys();
+        if ((parts != null && parts.size() > 0)
+                && (ts.partSpec == null || ts.partSpec.size() == 0)) {
+            throw new 
SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg());
+        }
+
+        List<String> bucketCols = ts.tableHandle.getBucketCols();
+        if (bucketCols != null && !bucketCols.isEmpty()) {
+            String error = HiveConf.StrictChecks.checkBucketing(conf);
+            if (error != null) {
+                throw new SemanticException(
+                        "Please load into an intermediate table"
+                                + " and use 'insert... select' to allow Hive 
to enforce bucketing. "
+                                + error);
+            }
+        }
+
+        // make sure the arguments make sense, may need to write "LOAD DATA" 
to "INSERT AS SELECT"
+        // when there's any directory in the fromURL
+        List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, 
fromTree, isLocal);
+
+        // for managed tables, make sure the file formats match
+        if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType())
+                && conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
+            ensureFileFormatsMatch(ts, files, fromURI);
+        }
+
+        return new HiveLoadDataOperation(
+                new Path(fromURI),
+                new ObjectPath(ts.tableHandle.getDbName(), 
ts.tableHandle.getTableName()),
+                isOverWrite,
+                isLocal,
+                ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec);
+    }
+
+    private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree 
ast, boolean isLocal)
+            throws SemanticException {
+
+        FileStatus[] srcs;
+
+        // local mode implies that scheme should be "file"
+        // we can change this going forward
+        if (isLocal && !fromURI.getScheme().equals("file")) {
+            throw new SemanticException(
+                    ErrorMsg.ILLEGAL_PATH.getMsg(
+                            ast,
+                            "Source file system should be \"file\" if 
\"local\" is specified"));
+        }
+
+        try {
+            srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new 
Path(fromURI));
+            if (srcs == null || srcs.length == 0) {
+                throw new SemanticException(
+                        HiveParserErrorMsg.getMsg(
+                                ErrorMsg.INVALID_PATH, ast, "No files matching 
path " + fromURI));
+            }
+
+            for (FileStatus oneSrc : srcs) {
+                if (oneSrc.isDir()) {
+                    throw new SemanticException(
+                            ErrorMsg.INVALID_PATH.getMsg(
+                                    ast,
+                                    "source contains directory: " + 
oneSrc.getPath().toString()));
+                }
+            }
+        } catch (IOException e) {
+            throw new 
SemanticException(HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_PATH, ast), e);
+        }
+
+        return Arrays.asList(srcs);
+    }
+
+    public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path) 
throws IOException {
+        FileStatus[] srcs =
+                fs.globStatus(
+                        path,
+                        p -> {
+                            String name = p.getName();
+                            return name.equals(EximUtil.METADATA_NAME)
+                                    || !name.startsWith("_") && 
!name.startsWith(".");
+                        });
+        if ((srcs != null) && srcs.length == 1) {
+            if (srcs[0].isDir()) {
+                srcs =
+                        fs.listStatus(
+                                srcs[0].getPath(),
+                                p -> {
+                                    String name = p.getName();
+                                    return !name.startsWith("_") && 
!name.startsWith(".");
+                                });
+            }
+        }
+        return (srcs);
+    }
+
+    private URI initializeFromURI(String fromPath, boolean isLocal)
+            throws IOException, URISyntaxException, SemanticException {
+        URI fromURI = new Path(fromPath).toUri();
+
+        String fromScheme = fromURI.getScheme();
+        String fromAuthority = fromURI.getAuthority();
+        String path = fromURI.getPath();
+
+        // generate absolute path relative to current directory or hdfs home
+        // directory
+        if (!path.startsWith("/")) {
+            if (isLocal) {
+                try {
+                    path =
+                            new String(
+                                    URLCodec.decodeUrl(
+                                            new 
Path(System.getProperty("user.dir"), fromPath)
+                                                    .toUri()
+                                                    .toString()
+                                                    
.getBytes(StandardCharsets.US_ASCII)),
+                                    StandardCharsets.US_ASCII);
+                } catch (DecoderException de) {
+                    throw new SemanticException("URL Decode failed", de);
+                }
+            } else {
+                path =
+                        new Path(new Path("/user/" + 
System.getProperty("user.name")), path)
+                                .toString();
+            }
+        }
+
+        // set correct scheme and authority
+        if (StringUtils.isNullOrWhitespaceOnly(fromScheme)) {
+            if (isLocal) {
+                // file for local
+                fromScheme = "file";
+            } else {
+                // use default values from fs.default.name
+                URI defaultURI = FileSystem.get(conf).getUri();
+                fromScheme = defaultURI.getScheme();
+                fromAuthority = defaultURI.getAuthority();
+            }
+        }
+
+        // if scheme is specified but not authority then use the default 
authority
+        if ((!fromScheme.equals("file")) && 
StringUtils.isNullOrWhitespaceOnly(fromAuthority)) {
+            URI defaultURI = FileSystem.get(conf).getUri();
+            fromAuthority = defaultURI.getAuthority();
+        }
+
+        return new URI(fromScheme, fromAuthority, path, null, null);
+    }
+
+    private void ensureFileFormatsMatch(
+            TableSpec ts, List<FileStatus> fileStatuses, final URI fromURI)
+            throws SemanticException {
+        final Class<? extends InputFormat> destInputFormat;
+        try {
+            if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) {
+                destInputFormat = ts.tableHandle.getInputFormatClass();
+            } else {
+                destInputFormat = ts.partHandle.getInputFormatClass();
+            }
+        } catch (HiveException e) {
+            throw new SemanticException(e);
+        }
+
+        try {
+            FileSystem fs = FileSystem.get(fromURI, conf);
+            boolean validFormat =
+                    HiveFileFormatUtils.checkInputFormat(fs, conf, 
destInputFormat, fileStatuses);
+            if (!validFormat) {
+                throw new 
SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg());
+            }
+        } catch (Exception e) {
+            throw new SemanticException(
+                    "Unable to load data to destination table." + " Error: " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 2a8fe3c6868..0817af4ff4e 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.planner.utils.TableTestUtil.readFromResource;
@@ -66,6 +67,7 @@ public class HiveDialectQueryITCase {
 
     private static HiveCatalog hiveCatalog;
     private static TableEnvironment tableEnv;
+    private static String warehouse;
 
     @BeforeClass
     public static void setup() throws Exception {
@@ -74,6 +76,7 @@ public class HiveDialectQueryITCase {
         
hiveCatalog.getHiveConf().setVar(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT, 
"none");
         hiveCatalog.open();
         tableEnv = getTableEnvWithHiveCatalog();
+        warehouse = 
hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE);
 
         // create tables
         tableEnv.executeSql("create table foo (x int, y int)");
@@ -505,19 +508,14 @@ public class HiveDialectQueryITCase {
         try {
             // test explain transform
             String actualPlan =
-                    (String)
-                            CollectionUtil.iteratorToList(
-                                            tableEnv.executeSql(
-                                                            "explain select 
transform(key, value)"
-                                                                    + " ROW 
FORMAT SERDE 'MySerDe'"
-                                                                    + " WITH 
SERDEPROPERTIES ('p1'='v1','p2'='v2')"
-                                                                    + " 
RECORDWRITER 'MyRecordWriter' "
-                                                                    + " using 
'cat' as (cola int, value string)"
-                                                                    + " ROW 
FORMAT DELIMITED FIELDS TERMINATED BY ','"
-                                                                    + " 
RECORDREADER 'MyRecordReader' from src")
-                                                    .collect())
-                                    .get(0)
-                                    .getField(0);
+                    explainSql(
+                            "select transform(key, value)"
+                                    + " ROW FORMAT SERDE 'MySerDe'"
+                                    + " WITH SERDEPROPERTIES 
('p1'='v1','p2'='v2')"
+                                    + " RECORDWRITER 'MyRecordWriter' "
+                                    + " using 'cat' as (cola int, value 
string)"
+                                    + " ROW FORMAT DELIMITED FIELDS TERMINATED 
BY ','"
+                                    + " RECORDREADER 'MyRecordReader' from 
src");
             
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testScriptTransform.out"));
 
             // transform using + specified schema
@@ -587,13 +585,7 @@ public class HiveDialectQueryITCase {
                             + " insert overwrite table t1 select id, name 
where age < 20"
                             + "  insert overwrite table t2 select id, name 
where age > 20";
             // test explain
-            String actualPlan =
-                    (String)
-                            CollectionUtil.iteratorToList(
-                                            tableEnv.executeSql("explain " + 
multiInsertSql)
-                                                    .collect())
-                                    .get(0)
-                                    .getField(0);
+            String actualPlan = explainSql(multiInsertSql);
             
assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMultiInsert.out"));
             // test execution
             tableEnv.executeSql("insert into table t3 values (1, 'test1', 18 
), (2, 'test2', 28 )")
@@ -673,6 +665,66 @@ public class HiveDialectQueryITCase {
         }
     }
 
+    @Test
+    public void testLoadData() throws Exception {
+        tableEnv.executeSql("create table tab1 (col1 int, col2 int) stored as 
orc");
+        tableEnv.executeSql("create table tab2 (col1 int, col2 int) STORED AS 
ORC");
+        tableEnv.executeSql(
+                "create table p_table(col1 int, col2 int) partitioned by 
(dateint int) row format delimited fields terminated by ','");
+        try {
+            String testLoadCsvFilePath =
+                    
Objects.requireNonNull(getClass().getResource("/csv/test.csv")).toString();
+            // test explain
+            String actualPlan =
+                    explainSql(
+                            String.format(
+                                    "load data local inpath '%s' overwrite 
into table p_table partition (dateint=2022) ",
+                                    testLoadCsvFilePath));
+            assertThat(actualPlan)
+                    .isEqualTo(
+                            readFromResource("/explain/testLoadData.out")
+                                    .replace("$filepath", 
testLoadCsvFilePath));
+
+            // test load data into table
+            tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 
1), (2, 2)").await();
+            tableEnv.executeSql(
+                    String.format(
+                            "load data local inpath '%s' INTO TABLE tab2", 
warehouse + "/tab1"));
+            List<Row> result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from 
tab2").collect());
+            assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], 
+I[2, 1], +I[2, 2]]");
+
+            // test load data overwrite
+            tableEnv.executeSql("insert into tab1 values (2, 1), (2, 
2)").await();
+            tableEnv.executeSql(
+                    String.format(
+                            "load data local inpath '%s' overwrite into table 
tab2",
+                            warehouse + "/tab1"));
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from 
tab2").collect());
+            assertThat(result.toString()).isEqualTo("[+I[2, 1], +I[2, 2]]");
+
+            // test load data into partition
+            tableEnv.executeSql(
+                            String.format(
+                                    "load data inpath '%s' into table p_table 
partition (dateint=2022) ",
+                                    testLoadCsvFilePath))
+                    .await();
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from p_table where 
dateint=2022")
+                                    .collect());
+            assertThat(result.toString())
+                    .isEqualTo("[+I[1, 1, 2022], +I[2, 2, 2022], +I[3, 3, 
2022]]");
+        } finally {
+            tableEnv.executeSql("drop table tab1");
+            tableEnv.executeSql("drop table tab2");
+            tableEnv.executeSql("drop table p_table");
+        }
+    }
+
     private void runQFile(File qfile) throws Exception {
         QTest qTest = extractQTest(qfile);
         for (int i = 0; i < qTest.statements.size(); i++) {
@@ -771,6 +823,13 @@ public class HiveDialectQueryITCase {
         }
     }
 
+    private String explainSql(String sql) {
+        return (String)
+                CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + 
sql).collect())
+                        .get(0)
+                        .getField(0);
+    }
+
     private static TableEnvironment getTableEnvWithHiveCatalog() {
         TableEnvironment tableEnv = 
HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE);
         tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
new file mode 100644
index 00000000000..3f750bc9800
--- /dev/null
+++ 
b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out
@@ -0,0 +1,8 @@
+== Abstract Syntax Tree ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], 
local=[true], partition=[dateint=2022])
+
+== Optimized Physical Plan ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], 
local=[true], partition=[dateint=2022])
+
+== Optimized Execution Plan ==
+LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], 
local=[true], partition=[dateint=2022])

Reply via email to