This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 6c91cc59998 [FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive dialect 6c91cc59998 is described below commit 6c91cc5999828f0a61f8a54498bc71a581a05dd8 Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Tue Aug 9 14:46:37 2022 +0800 [FLINK-26413][hive] Supports "LOAD DATA INPATH" in Hive dialect This closes #19556 --- flink-connectors/flink-connector-hive/pom.xml | 7 + .../flink/table/catalog/hive/HiveCatalog.java | 42 +++ .../hive/client/HiveMetastoreClientWrapper.java | 26 ++ .../flink/table/catalog/hive/client/HiveShim.java | 13 + .../table/catalog/hive/client/HiveShimV100.java | 75 ++++++ .../table/catalog/hive/client/HiveShimV200.java | 62 +++++ .../table/catalog/hive/client/HiveShimV210.java | 74 ++++++ .../table/catalog/hive/client/HiveShimV310.java | 132 ++++++++- .../delegation/hive/HiveOperationExecutor.java | 93 +++++++ .../table/planner/delegation/hive/HiveParser.java | 156 +---------- .../planner/delegation/hive/HiveSessionState.java | 171 ++++++++++++ .../delegation/hive/SqlFunctionConverter.java | 3 +- .../delegation/hive/copy/HiveParserContext.java | 2 +- .../hive/operation/HiveLoadDataOperation.java | 99 +++++++ .../hive/parse/HiveParserLoadSemanticAnalyzer.java | 294 +++++++++++++++++++++ .../connectors/hive/HiveDialectQueryITCase.java | 99 +++++-- .../src/test/resources/explain/testLoadData.out | 8 + 17 files changed, 1189 insertions(+), 167 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index f42554dfae0..6fd4f86c199 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -891,6 +891,13 @@ under the License. <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hivemetastore.hadoop.version}</version> + <scope>test</scope> + </dependency> + <!-- ArchUit test dependencies --> <dependency> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java index c72ebbe8edc..dee28e2a58f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalog.java @@ -21,6 +21,7 @@ package org.apache.flink.table.catalog.hive; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; +import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.connectors.hive.HiveDynamicTableFactory; import org.apache.flink.connectors.hive.HiveTableFactory; import org.apache.flink.sql.parser.hive.ddl.HiveDDLUtils; @@ -103,6 +104,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -118,6 +120,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -1769,6 +1772,45 @@ public class HiveCatalog extends AbstractCatalog { return IDENTIFIER.equalsIgnoreCase(properties.get(CONNECTOR.key())); } + @Internal + public void loadTable( + Path loadPath, ObjectPath tablePath, boolean isOverwrite, boolean isSrcLocal) { + try { + client.loadTable(loadPath, tablePath.getFullName(), isOverwrite, isSrcLocal); + } catch (HiveException e) { + throw new FlinkHiveException("Fail to load table.", e); + } + } + + @Internal + public void loadPartition( + Path loadPath, + ObjectPath tablePath, + Map<String, String> partSpec, + boolean isOverwrite, + boolean isSrcLocal) { + Table hiveTable; + Map<String, String> orderedPartitionSpec = new LinkedHashMap<>(); + try { + hiveTable = getHiveTable(tablePath); + } catch (TableNotExistException e) { + throw new FlinkHiveException("Fail to get Hive table when try to load partition", e); + } + hiveTable + .getPartitionKeys() + .forEach( + column -> + orderedPartitionSpec.put( + column.getName(), partSpec.get(column.getName()))); + client.loadPartition( + loadPath, + tablePath.getFullName(), + orderedPartitionSpec, + hiveTable.getSd().isStoredAsSubDirectories(), + isOverwrite, + isSrcLocal); + } + private static void disallowChangeCatalogTableType( Map<String, String> existingTableOptions, Map<String, String> newTableOptions) { CatalogTableType existingTableType = getCatalogTableType(existingTableOptions); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java index d7f48e8d129..3acc9860025 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveMetastoreClientWrapper.java @@ -19,11 +19,13 @@ package org.apache.flink.table.catalog.hive.client; import org.apache.flink.annotation.Internal; +import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -47,6 +49,8 @@ import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnOpenException; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +71,7 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreClientWrapper.class); private final IMetaStoreClient client; + private final Hive hive; private final HiveConf hiveConf; private final HiveShim hiveShim; @@ -82,6 +87,11 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { HiveCatalog.isEmbeddedMetastore(hiveConf) ? createMetastoreClient() : HiveMetaStoreClient.newSynchronizedClient(createMetastoreClient()); + try { + this.hive = Hive.get(hiveConf); + } catch (HiveException e) { + throw new FlinkHiveException(e); + } } @Override @@ -336,4 +346,20 @@ public class HiveMetastoreClientWrapper implements AutoCloseable { public void unlock(long lockid) throws NoSuchLockException, TxnOpenException, TException { client.unlock(lockid); } + + public void loadTable(Path loadPath, String tableName, boolean replace, boolean isSrcLocal) + throws HiveException { + hiveShim.loadTable(hive, loadPath, tableName, replace, isSrcLocal); + } + + public void loadPartition( + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal) { + hiveShim.loadPartition( + hive, loadPath, tableName, partSpec, isSkewedStoreAsSubdir, replace, isSrcLocal); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java index 8e3e3cf2608..58a0e98b75b 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FunctionInfo; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -52,6 +53,7 @@ import java.io.Serializable; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -241,4 +243,15 @@ public interface HiveShim extends Serializable { } void registerTemporaryFunction(String funcName, Class funcClass); + + void loadPartition( + Hive hive, + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal); + + void loadTable(Hive hive, Path loadPath, String tableName, boolean replace, boolean isSrcLocal); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java index 955c0e50fa4..976e0e00a3e 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionUtils; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; import org.apache.hadoop.hive.serde2.Deserializer; @@ -86,6 +87,7 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -94,6 +96,11 @@ import java.util.stream.Collectors; /** Shim for Hive version 1.0.0. */ public class HiveShimV100 implements HiveShim { + protected final boolean holdDDLTime = false; + protected final boolean isAcid = false; + protected final boolean inheritTableSpecs = true; + protected final boolean isSkewedStoreAsSubdir = false; + private static final Method registerTemporaryFunction = HiveReflectionUtils.tryGetMethod( FunctionRegistry.class, @@ -435,6 +442,74 @@ public class HiveShimV100 implements HiveShim { } } + @Override + public void loadPartition( + Hive hive, + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadPartitionMethod = + hiveClass.getDeclaredMethod( + "loadPartition", + Path.class, + String.class, + Map.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadPartitionMethod.invoke( + hive, + loadPath, + tableName, + partSpec, + replace, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir, + isSrcLocal, + isAcid); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load partition", e); + } + } + + @Override + public void loadTable( + Hive hive, Path loadPath, String tableName, boolean replace, boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadTableMethod = + hiveClass.getDeclaredMethod( + "loadTable", + Path.class, + String.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadTableMethod.invoke( + hive, + loadPath, + tableName, + replace, + holdDDLTime, + isSrcLocal, + isSkewedStoreAsSubdir, + isAcid); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load table", e); + } + } + boolean isBuiltInFunctionInfo(FunctionInfo info) { return info.isNative(); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java index 3fa8d83da65..5ef79d611a7 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV200.java @@ -19,6 +19,7 @@ package org.apache.flink.table.catalog.hive.client; import org.apache.flink.api.common.serialization.BulkWriter; +import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.orc.vector.RowDataVectorizer; import org.apache.flink.orc.writer.OrcBulkWriterFactory; import org.apache.flink.table.catalog.exceptions.CatalogException; @@ -26,12 +27,15 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.ql.metadata.Hive; import java.lang.reflect.Method; +import java.util.Map; import java.util.Properties; /** Shim for Hive version 2.0.0. */ @@ -68,4 +72,62 @@ public class HiveShimV200 extends HiveShimV122 { return new OrcBulkWriterFactory<>( new RowDataVectorizer(schema, fieldTypes), new Properties(), conf); } + + @Override + public void loadTable( + Hive hive, Path loadPath, String tableName, boolean replace, boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadTableMethod = + hiveClass.getDeclaredMethod( + "loadTable", + Path.class, + String.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadTableMethod.invoke( + hive, loadPath, tableName, replace, isSrcLocal, isSkewedStoreAsSubdir, isAcid); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load table", e); + } + } + + @Override + public void loadPartition( + Hive hive, + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadPartitionMethod = + hiveClass.getDeclaredMethod( + "loadPartition", + Path.class, + String.class, + Map.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadPartitionMethod.invoke( + hive, + loadPath, + tableName, + partSpec, + replace, + inheritTableSpecs, + isSkewedStoreAsSubdir, + isSrcLocal, + isAcid); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load partition", e); + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java index f32371b23e7..1daa75fa362 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV210.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog.hive.client; +import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; @@ -25,12 +26,14 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.EnvironmentContext; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; @@ -40,11 +43,14 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** Shim for Hive version 2.1.0. */ public class HiveShimV210 extends HiveShimV201 { + protected final boolean hasFollowingStatsTask = false; + @Override public void alterPartition( IMetaStoreClient client, String databaseName, String tableName, Partition partition) @@ -211,4 +217,72 @@ public class HiveShimV210 extends HiveShimV201 { } return res; } + + @Override + public void loadTable( + Hive hive, Path loadPath, String tableName, boolean replace, boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadTableMethod = + hiveClass.getDeclaredMethod( + "loadTable", + Path.class, + String.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadTableMethod.invoke( + hive, + loadPath, + tableName, + replace, + isSrcLocal, + isSkewedStoreAsSubdir, + isAcid, + hasFollowingStatsTask); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load table", e); + } + } + + @Override + public void loadPartition( + Hive hive, + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + Method loadPartitionMethod = + hiveClass.getDeclaredMethod( + "loadPartition", + Path.class, + String.class, + Map.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class); + loadPartitionMethod.invoke( + hive, + loadPath, + tableName, + partSpec, + replace, + inheritTableSpecs, + isSkewedStoreAsSubdir, + isSrcLocal, + isAcid, + hasFollowingStatsTask); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load partition", e); + } + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java index 0aeccf927bf..f01d5329ed1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java @@ -26,10 +26,12 @@ import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.io.Writable; import java.lang.reflect.Constructor; @@ -41,9 +43,11 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -62,7 +66,14 @@ public class HiveShimV310 extends HiveShimV239 { private static Field hiveDateLocalDate; private static Constructor dateWritableConstructor; - private static boolean hiveClassesInited; + private static volatile boolean hiveClassesInited; + + // LoadFileType class + private static volatile boolean loadFileClassInited; + private static Class clazzLoadFileType; + + protected final long writeIdInLoadTableOrPartition = 0L; + protected final int stmtIdInLoadTableOrPartition = 0; private static void initDateTimeClasses() { if (!hiveClassesInited) { @@ -102,6 +113,23 @@ public class HiveShimV310 extends HiveShimV239 { } } + private static void initLoadFileTypeClass() { + if (!loadFileClassInited) { + synchronized (HiveShimV310.class) { + if (!loadFileClassInited) { + try { + clazzLoadFileType = + Class.forName( + "org.apache.hadoop.hive.ql.plan.LoadTableDesc$LoadFileType"); + } catch (ClassNotFoundException e) { + throw new FlinkHiveException("Failed to get Hive LoadFileType class", e); + } + loadFileClassInited = true; + } + } + } + } + @Override public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) { try { @@ -319,6 +347,90 @@ public class HiveShimV310 extends HiveShimV239 { } } + @Override + public void loadTable( + Hive hive, Path loadPath, String tableName, boolean replace, boolean isSrcLocal) { + try { + Class hiveClass = Hive.class; + initLoadFileTypeClass(); + Method loadTableMethod = + hiveClass.getDeclaredMethod( + "loadTable", + Path.class, + String.class, + clazzLoadFileType, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + long.class, + int.class, + boolean.class); + loadTableMethod.invoke( + hive, + loadPath, + tableName, + getLoadFileType(clazzLoadFileType, replace), + isSrcLocal, + isSkewedStoreAsSubdir, + isAcid, + hasFollowingStatsTask, + writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, + replace); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load table", e); + } + } + + @Override + public void loadPartition( + Hive hive, + Path loadPath, + String tableName, + Map<String, String> partSpec, + boolean isSkewedStoreAsSubdir, + boolean replace, + boolean isSrcLocal) { + try { + initLoadFileTypeClass(); + Class hiveClass = Hive.class; + Method loadPartitionMethod = + hiveClass.getDeclaredMethod( + "loadPartition", + Path.class, + org.apache.hadoop.hive.ql.metadata.Table.class, + Map.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + boolean.class, + long.class, + int.class, + boolean.class); + org.apache.hadoop.hive.ql.metadata.Table table = hive.getTable(tableName); + long writeIdInLoadTableOrPartition = 0L; + int stmtIdInLoadTableOrPartition = 0; + loadPartitionMethod.invoke( + hive, + loadPath, + table, + partSpec, + getLoadFileType(clazzLoadFileType, replace), + inheritTableSpecs, + isSkewedStoreAsSubdir, + isSrcLocal, + isAcid, + hasFollowingStatsTask, + writeIdInLoadTableOrPartition, + stmtIdInLoadTableOrPartition, + replace); + } catch (Exception e) { + throw new FlinkHiveException("Failed to load partition", e); + } + } + List<Object> createHiveNNs( Table table, Configuration conf, List<String> nnCols, List<Byte> traits) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, @@ -378,4 +490,22 @@ public class HiveShimV310 extends HiveShimV239 { new Class[] {Configuration.class}, new Object[] {conf}); } + + Object getLoadFileType(Class clazzLoadFileType, boolean replace) { + Object loadFileType; + if (replace) { + loadFileType = + Arrays.stream(clazzLoadFileType.getEnumConstants()) + .filter(s -> s.toString().equals("REPLACE_ALL")) + .findFirst() + .get(); + } else { + loadFileType = + Arrays.stream(clazzLoadFileType.getEnumConstants()) + .filter(s -> s.toString().equals("KEEP_EXISTING")) + .findFirst() + .get(); + } + return loadFileType; + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java index 459a1bbd7dd..2010062fe4c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java @@ -31,10 +31,12 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.delegation.ExtendedOperationExecutor; +import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.HiveSetOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.planner.delegation.PlannerContext; import org.apache.flink.table.planner.delegation.hive.copy.HiveSetProcessor; +import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; @@ -65,6 +67,14 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { public Optional<TableResultInternal> executeOperation(Operation operation) { if (operation instanceof HiveSetOperation) { return executeHiveSetOperation((HiveSetOperation) operation); + } else if (operation instanceof HiveLoadDataOperation) { + return executeHiveLoadDataOperation((HiveLoadDataOperation) operation); + } else if (operation instanceof ExplainOperation) { + ExplainOperation explainOperation = (ExplainOperation) operation; + if (explainOperation.getChild() instanceof HiveLoadDataOperation) { + return explainHiveLoadDataOperation( + (HiveLoadDataOperation) explainOperation.getChild()); + } } return Optional.empty(); } @@ -123,4 +133,87 @@ public class HiveOperationExecutor implements ExtendedOperationExecutor { .data(rows) .build(); } + + private Optional<TableResultInternal> executeHiveLoadDataOperation( + HiveLoadDataOperation hiveLoadDataOperation) { + Catalog currentCatalog = + catalogManager.getCatalog(catalogManager.getCurrentCatalog()).orElse(null); + if (!(currentCatalog instanceof HiveCatalog)) { + throw new FlinkHiveException( + "Only support 'LOAD DATA INPATH' when the current catalog is HiveCatalog in Hive dialect."); + } + try { + // Hive's loadTable/loadPartition will call method + // SessionState.get().getCurrentDatabase(), so we have to start a session state + HiveSessionState.startSessionState( + ((HiveCatalog) currentCatalog).getHiveConf(), catalogManager); + HiveCatalog hiveCatalog = (HiveCatalog) currentCatalog; + if (hiveLoadDataOperation.getPartitionSpec().size() > 0) { + hiveCatalog.loadPartition( + hiveLoadDataOperation.getPath(), + hiveLoadDataOperation.getTablePath(), + hiveLoadDataOperation.getPartitionSpec(), + hiveLoadDataOperation.isSrcLocal(), + hiveLoadDataOperation.isOverwrite()); + } else { + hiveCatalog.loadTable( + hiveLoadDataOperation.getPath(), + hiveLoadDataOperation.getTablePath(), + hiveLoadDataOperation.isSrcLocal(), + hiveLoadDataOperation.isOverwrite()); + } + return Optional.of(TableResultImpl.TABLE_RESULT_OK); + } finally { + HiveSessionState.clearSessionState(); + } + } + + private Optional<TableResultInternal> explainHiveLoadDataOperation( + HiveLoadDataOperation hiveLoadDataOperation) { + // get the plan for the partition part + String partitionExplain = ""; + Map<String, String> partitionSpec = hiveLoadDataOperation.getPartitionSpec(); + if (!partitionSpec.isEmpty()) { + String[] pv = new String[partitionSpec.size()]; + int i = 0; + for (Map.Entry<String, String> partition : partitionSpec.entrySet()) { + pv[i++] = String.format("%s=%s", partition.getKey(), partition.getValue()); + } + partitionExplain = String.format(", partition=[%s]", String.join(", ", pv)); + } + // construct the full plan + String plan = + String.format( + "LoadData(filepath=[%s], " + + "table=[%s]," + + " overwrite=[%s], local=[%s]%s)", + hiveLoadDataOperation.getPath(), + hiveLoadDataOperation.getTablePath(), + hiveLoadDataOperation.isOverwrite(), + hiveLoadDataOperation.isSrcLocal(), + partitionExplain); + + String explanation = + "== Abstract Syntax Tree ==" + + System.lineSeparator() + + plan + + System.lineSeparator() + + System.lineSeparator() + + "== Optimized Physical Plan ==" + + System.lineSeparator() + + plan + + System.lineSeparator() + + System.lineSeparator() + + "== Optimized Execution Plan ==" + + System.lineSeparator() + + plan + + System.lineSeparator(); + + return Optional.of( + TableResultImpl.builder() + .resultKind(ResultKind.SUCCESS_WITH_CONTENT) + .schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))) + .data(Collections.singletonList(Row.of(explanation))) + .build()); + } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java index 1fb2bf88ce0..4214dbda09c 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java @@ -18,7 +18,6 @@ package org.apache.flink.table.planner.delegation.hive; -import org.apache.flink.connectors.hive.FlinkHiveException; import org.apache.flink.connectors.hive.HiveInternalOptions; import org.apache.flink.table.api.SqlParserException; import org.apache.flink.table.api.TableConfig; @@ -28,7 +27,6 @@ import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.client.HiveShim; import org.apache.flink.table.catalog.hive.client.HiveShimLoader; -import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; import org.apache.flink.table.module.hive.udf.generic.HiveGenericUDFGrouping; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.HiveSetOperation; @@ -48,31 +46,22 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserQueryState; import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo; import org.apache.flink.table.planner.delegation.hive.parse.HiveParserDDLSemanticAnalyzer; +import org.apache.flink.table.planner.delegation.hive.parse.HiveParserLoadSemanticAnalyzer; import org.apache.flink.table.planner.operations.PlannerQueryOperation; import org.apache.flink.table.planner.parse.CalciteParser; import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader; -import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; import org.apache.calcite.rel.RelNode; import org.apache.calcite.tools.FrameworkConfig; -import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.VariableSubstitution; -import org.apache.hadoop.hive.ql.lockmgr.LockException; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.processors.HiveCommand; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.sql.Timestamp; -import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -90,13 +79,6 @@ public class HiveParser extends ParserImpl { private static final Logger LOG = LoggerFactory.getLogger(HiveParser.class); - private static final Method setCurrentTSMethod = - HiveReflectionUtils.tryGetMethod( - SessionState.class, "setupQueryCurrentTimestamp", new Class[0]); - private static final Method getCurrentTSMethod = - HiveReflectionUtils.tryGetMethod( - SessionState.class, "getQueryCurrentTimestamp", new Class[0]); - // need to maintain the HiveParserASTNode types for DDLs private static final Set<Integer> DDL_NODES; @@ -225,12 +207,12 @@ public class HiveParser extends ParserImpl { // substitute variables for the statement statement = substituteVariables(hiveConf, statement); // creates SessionState - startSessionState(hiveConf, catalogManager); + HiveSessionState.startSessionState(hiveConf, catalogManager); // We override Hive's grouping function. Refer to the implementation for more details. hiveShim.registerTemporaryFunction("grouping", HiveGenericUDFGrouping.class); return processCmd(statement, hiveConf, hiveShim, (HiveCatalog) currentCatalog); } finally { - clearSessionState(); + HiveSessionState.clearSessionState(); } } @@ -389,6 +371,12 @@ public class HiveParser extends ParserImpl { HiveShim hiveShim, HiveParserASTNode input) throws SemanticException { + if (isLoadData(input)) { + HiveParserLoadSemanticAnalyzer loadSemanticAnalyzer = + new HiveParserLoadSemanticAnalyzer( + hiveConf, frameworkConfig, plannerContext.getCluster()); + return loadSemanticAnalyzer.convertToOperation(input); + } if (isMultiDestQuery(input)) { return processMultiDestQuery(context, hiveConf, hiveShim, input); } else { @@ -396,6 +384,10 @@ public class HiveParser extends ParserImpl { } } + private boolean isLoadData(HiveParserASTNode input) { + return input.getType() == HiveASTParser.TOK_LOAD; + } + private boolean isMultiDestQuery(HiveParserASTNode astNode) { // Hive's multi dest insert will always be [FROM, INSERT+] // so, if it's children count is more than 2, it should be a multi-dest query @@ -481,126 +473,4 @@ public class HiveParser extends ParserImpl { return new PlannerQueryOperation(relNode); } } - - private void startSessionState(HiveConf hiveConf, CatalogManager catalogManager) { - final ClassLoader contextCL = Thread.currentThread().getContextClassLoader(); - try { - HiveParserSessionState sessionState = new HiveParserSessionState(hiveConf, contextCL); - sessionState.initTxnMgr(hiveConf); - sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase()); - // some Hive functions needs the timestamp - setCurrentTimestamp(sessionState); - SessionState.setCurrentSessionState(sessionState); - } catch (LockException e) { - throw new FlinkHiveException("Failed to init SessionState", e); - } finally { - // don't let SessionState mess up with our context classloader - Thread.currentThread().setContextClassLoader(contextCL); - } - } - - private static void setCurrentTimestamp(HiveParserSessionState sessionState) { - if (setCurrentTSMethod != null) { - try { - setCurrentTSMethod.invoke(sessionState); - Object currentTs = getCurrentTSMethod.invoke(sessionState); - if (currentTs instanceof Instant) { - sessionState.hiveParserCurrentTS = Timestamp.from((Instant) currentTs); - } else { - sessionState.hiveParserCurrentTS = (Timestamp) currentTs; - } - } catch (IllegalAccessException | InvocationTargetException e) { - throw new FlinkHiveException("Failed to set current timestamp for session", e); - } - } else { - sessionState.hiveParserCurrentTS = new Timestamp(System.currentTimeMillis()); - } - } - - private void clearSessionState() { - SessionState sessionState = SessionState.get(); - if (sessionState != null) { - try { - sessionState.close(); - } catch (Exception e) { - LOG.warn("Error closing SessionState", e); - } - } - } - - /** Sub-class of SessionState to meet our needs. */ - public static class HiveParserSessionState extends SessionState { - - private static final Class registryClz; - private static final Method getRegistry; - private static final Method clearRegistry; - private static final Method closeRegistryLoaders; - - private Timestamp hiveParserCurrentTS; - - static { - registryClz = - HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry"); - if (registryClz != null) { - getRegistry = - HiveReflectionUtils.tryGetMethod( - SessionState.class, "getRegistry", new Class[0]); - clearRegistry = - HiveReflectionUtils.tryGetMethod(registryClz, "clear", new Class[0]); - closeRegistryLoaders = - HiveReflectionUtils.tryGetMethod( - registryClz, "closeCUDFLoaders", new Class[0]); - } else { - getRegistry = null; - clearRegistry = null; - closeRegistryLoaders = null; - } - } - - private final ClassLoader originContextLoader; - private final ClassLoader hiveLoader; - - public HiveParserSessionState(HiveConf conf, ClassLoader contextLoader) { - super(conf); - this.originContextLoader = contextLoader; - this.hiveLoader = getConf().getClassLoader(); - // added jars are handled by context class loader, so we always use it as the session - // class loader - getConf().setClassLoader(contextLoader); - } - - @Override - public void close() throws IOException { - clearSessionRegistry(); - if (getTxnMgr() != null) { - getTxnMgr().closeTxnManager(); - } - // close the classloader created in hive - JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader); - File resourceDir = - new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); - LOG.debug("Removing resource dir " + resourceDir); - FileUtils.deleteDirectoryQuietly(resourceDir); - Hive.closeCurrent(); - detachSession(); - } - - public Timestamp getHiveParserCurrentTS() { - return hiveParserCurrentTS; - } - - private void clearSessionRegistry() { - if (getRegistry != null) { - try { - Object registry = getRegistry.invoke(this); - if (registry != null) { - clearRegistry.invoke(registry); - closeRegistryLoaders.invoke(registry); - } - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.warn("Failed to clear session registry", e); - } - } - } - } } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java new file mode 100644 index 00000000000..5bfa16481d9 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveSessionState.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive; + +import org.apache.flink.connectors.hive.FlinkHiveException; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils; +import org.apache.flink.util.FileUtils; + +import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lockmgr.LockException; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.sql.Timestamp; +import java.time.Instant; + +/** Sub-class of SessionState to meet our needs. */ +public class HiveSessionState extends SessionState { + + private static final Logger LOG = LoggerFactory.getLogger(HiveSessionState.class); + + private static final Method setCurrentTSMethod = + HiveReflectionUtils.tryGetMethod( + SessionState.class, "setupQueryCurrentTimestamp", new Class[0]); + private static final Method getCurrentTSMethod = + HiveReflectionUtils.tryGetMethod( + SessionState.class, "getQueryCurrentTimestamp", new Class[0]); + + private static final Class registryClz; + private static final Method getRegistry; + private static final Method clearRegistry; + private static final Method closeRegistryLoaders; + + private Timestamp hiveParserCurrentTS; + + static { + registryClz = HiveReflectionUtils.tryGetClass("org.apache.hadoop.hive.ql.exec.Registry"); + if (registryClz != null) { + getRegistry = + HiveReflectionUtils.tryGetMethod( + SessionState.class, "getRegistry", new Class[0]); + clearRegistry = HiveReflectionUtils.tryGetMethod(registryClz, "clear", new Class[0]); + closeRegistryLoaders = + HiveReflectionUtils.tryGetMethod(registryClz, "closeCUDFLoaders", new Class[0]); + } else { + getRegistry = null; + clearRegistry = null; + closeRegistryLoaders = null; + } + } + + private final ClassLoader originContextLoader; + private final ClassLoader hiveLoader; + + public HiveSessionState(HiveConf conf, ClassLoader contextLoader) { + super(conf); + this.originContextLoader = contextLoader; + this.hiveLoader = getConf().getClassLoader(); + // added jars are handled by context class loader, so we always use it as the session + // class loader + getConf().setClassLoader(contextLoader); + } + + @Override + public void close() throws IOException { + clearSessionRegistry(); + if (getTxnMgr() != null) { + getTxnMgr().closeTxnManager(); + } + // close the classloader created in hive + JavaUtils.closeClassLoadersTo(hiveLoader, originContextLoader); + File resourceDir = new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); + LOG.debug("Removing resource dir " + resourceDir); + FileUtils.deleteDirectoryQuietly(resourceDir); + Hive.closeCurrent(); + detachSession(); + } + + public void setHiveParserCurrentTSCurrentTS(Timestamp hiveParserCurrentTS) { + this.hiveParserCurrentTS = hiveParserCurrentTS; + } + + public Timestamp getHiveParserCurrentTS() { + return hiveParserCurrentTS; + } + + private void clearSessionRegistry() { + if (getRegistry != null) { + try { + Object registry = getRegistry.invoke(this); + if (registry != null) { + clearRegistry.invoke(registry); + closeRegistryLoaders.invoke(registry); + } + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.warn("Failed to clear session registry", e); + } + } + } + + public static void startSessionState(HiveConf hiveConf, CatalogManager catalogManager) { + final ClassLoader contextCL = Thread.currentThread().getContextClassLoader(); + try { + HiveSessionState sessionState = new HiveSessionState(hiveConf, contextCL); + sessionState.initTxnMgr(hiveConf); + sessionState.setCurrentDatabase(catalogManager.getCurrentDatabase()); + // some Hive functions needs the timestamp + setCurrentTimestamp(sessionState); + SessionState.setCurrentSessionState(sessionState); + } catch (LockException e) { + throw new FlinkHiveException("Failed to init SessionState", e); + } finally { + // don't let SessionState mess up with our context classloader + Thread.currentThread().setContextClassLoader(contextCL); + } + } + + private static void setCurrentTimestamp(HiveSessionState sessionState) { + if (setCurrentTSMethod != null) { + try { + setCurrentTSMethod.invoke(sessionState); + Object currentTs = getCurrentTSMethod.invoke(sessionState); + if (currentTs instanceof Instant) { + sessionState.setHiveParserCurrentTSCurrentTS( + Timestamp.from((Instant) currentTs)); + } else { + sessionState.setHiveParserCurrentTSCurrentTS((Timestamp) currentTs); + } + } catch (IllegalAccessException | InvocationTargetException e) { + throw new FlinkHiveException("Failed to set current timestamp for session", e); + } + } else { + sessionState.setHiveParserCurrentTSCurrentTS(new Timestamp(System.currentTimeMillis())); + } + } + + public static void clearSessionState() { + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + try { + sessionState.close(); + } catch (Exception e) { + LOG.warn("Error closing SessionState", e); + } + } + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java index c9bf54febee..a154cdc6afa 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/SqlFunctionConverter.java @@ -97,8 +97,7 @@ public class SqlFunctionConverter extends RexShuttle { if (convertedOp instanceof FlinkSqlTimestampFunction) { // flink's current_timestamp has different type from hive's, convert it to a literal Timestamp currentTS = - ((HiveParser.HiveParserSessionState) SessionState.get()) - .getHiveParserCurrentTS(); + ((HiveSessionState) SessionState.get()).getHiveParserCurrentTS(); HiveShim hiveShim = HiveParserUtils.getSessionHiveShim(); try { return HiveParserRexNodeConverter.convertConstant( diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java index f928be68724..33fd94fbe44 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserContext.java @@ -82,7 +82,7 @@ public class HiveParserContext { */ public HiveParserContext(Configuration conf) { this.conf = conf; - viewsTokenRewriteStreams = new HashMap<>(); + this.viewsTokenRewriteStreams = new HashMap<>(); } // Find whether we should execute the current query due to explain. diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java new file mode 100644 index 00000000000..339985336b5 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/operation/HiveLoadDataOperation.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive.operation; + +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.operations.Operation; + +import org.apache.hadoop.fs.Path; + +import java.util.Map; + +/** + * An operation that loads data into a Hive table. + * + * <pre>The syntax is: {@code + * LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename + * [PARTITION (partcol1=val1, partcol2=val2 ...)] + * } + * </pre> + */ +public class HiveLoadDataOperation implements Operation { + private final Path path; + private final ObjectPath tablePath; + private final boolean isOverwrite; + private final boolean isSrcLocal; + private final Map<String, String> partitionSpec; + + public HiveLoadDataOperation( + Path path, + ObjectPath tablePath, + boolean isOverwrite, + boolean isSrcLocal, + Map<String, String> partitionSpec) { + this.path = path; + this.tablePath = tablePath; + this.isOverwrite = isOverwrite; + this.isSrcLocal = isSrcLocal; + this.partitionSpec = partitionSpec; + } + + public Path getPath() { + return path; + } + + public ObjectPath getTablePath() { + return tablePath; + } + + public boolean isOverwrite() { + return isOverwrite; + } + + public boolean isSrcLocal() { + return isSrcLocal; + } + + public Map<String, String> getPartitionSpec() { + return partitionSpec; + } + + @Override + public String asSummaryString() { + StringBuilder stringBuilder = new StringBuilder("LOAD DATA"); + if (isSrcLocal) { + stringBuilder.append(" LOCAL"); + } + stringBuilder + .append(" INPATH") + .append(String.format(" '%s'", path)) + .append(isOverwrite ? " OVERWRITE" : "") + .append(" INTO TABLE ") + .append(tablePath.getFullName()); + if (partitionSpec.size() > 0) { + String[] pv = new String[partitionSpec.size()]; + int i = 0; + for (Map.Entry<String, String> partition : partitionSpec.entrySet()) { + pv[i++] = String.format("%s=%s", partition.getKey(), partition.getValue()); + } + stringBuilder.append(" PARTITION (").append(String.join(", ", pv)).append(")"); + } + return stringBuilder.toString(); + } +} diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java new file mode 100644 index 00000000000..81047f2e5d6 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserLoadSemanticAnalyzer.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.delegation.hive.parse; + +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.planner.delegation.hive.copy.HiveParserASTNode; +import org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.TableSpec; +import org.apache.flink.table.planner.delegation.hive.operation.HiveLoadDataOperation; +import org.apache.flink.util.StringUtils; + +import org.antlr.runtime.tree.Tree; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.tools.FrameworkConfig; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.net.URLCodec; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.parse.EximUtil; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.mapred.InputFormat; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.flink.table.planner.delegation.hive.copy.HiveParserBaseSemanticAnalyzer.stripQuotes; + +/** Ported hive's org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer. */ +public class HiveParserLoadSemanticAnalyzer { + + private final HiveConf conf; + private final Hive db; + private final FrameworkConfig frameworkConfig; + private final RelOptCluster cluster; + + public HiveParserLoadSemanticAnalyzer( + HiveConf conf, FrameworkConfig frameworkConfig, RelOptCluster cluster) + throws SemanticException { + this.conf = conf; + try { + this.db = Hive.get(conf); + } catch (HiveException e) { + throw new SemanticException(e); + } + this.frameworkConfig = frameworkConfig; + this.cluster = cluster; + } + + public HiveLoadDataOperation convertToOperation(HiveParserASTNode ast) + throws SemanticException { + boolean isLocal = false; + boolean isOverWrite = false; + Tree fromTree = ast.getChild(0); + HiveParserASTNode tableTree = (HiveParserASTNode) ast.getChild(1); + + if (ast.getChildCount() == 4) { + isLocal = true; + isOverWrite = true; + } + + if (ast.getChildCount() == 3) { + if (ast.getChild(2).getText().equalsIgnoreCase("local")) { + isLocal = true; + } else { + isOverWrite = true; + } + } + + // initialize load path + URI fromURI; + try { + String fromPath = stripQuotes(fromTree.getText()); + fromURI = initializeFromURI(fromPath, isLocal); + } catch (IOException | URISyntaxException e) { + throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(fromTree, e.getMessage()), e); + } + + // initialize destination table/partition + TableSpec ts = new TableSpec(db, conf, tableTree, frameworkConfig, cluster); + + if (ts.tableHandle.isView() || ts.tableHandle.isMaterializedView()) { + throw new SemanticException(ErrorMsg.DML_AGAINST_VIEW.getMsg()); + } + if (ts.tableHandle.isNonNative()) { + throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg()); + } + + if (ts.tableHandle.isStoredAsSubDirectories()) { + throw new SemanticException(ErrorMsg.LOAD_INTO_STORED_AS_DIR.getMsg()); + } + + List<FieldSchema> parts = ts.tableHandle.getPartitionKeys(); + if ((parts != null && parts.size() > 0) + && (ts.partSpec == null || ts.partSpec.size() == 0)) { + throw new SemanticException(ErrorMsg.NEED_PARTITION_ERROR.getMsg()); + } + + List<String> bucketCols = ts.tableHandle.getBucketCols(); + if (bucketCols != null && !bucketCols.isEmpty()) { + String error = HiveConf.StrictChecks.checkBucketing(conf); + if (error != null) { + throw new SemanticException( + "Please load into an intermediate table" + + " and use 'insert... select' to allow Hive to enforce bucketing. " + + error); + } + } + + // make sure the arguments make sense, may need to write "LOAD DATA" to "INSERT AS SELECT" + // when there's any directory in the fromURL + List<FileStatus> files = applyConstraintsAndGetFiles(fromURI, fromTree, isLocal); + + // for managed tables, make sure the file formats match + if (TableType.MANAGED_TABLE.equals(ts.tableHandle.getTableType()) + && conf.getBoolVar(HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { + ensureFileFormatsMatch(ts, files, fromURI); + } + + return new HiveLoadDataOperation( + new Path(fromURI), + new ObjectPath(ts.tableHandle.getDbName(), ts.tableHandle.getTableName()), + isOverWrite, + isLocal, + ts.partSpec == null ? new LinkedHashMap<>() : ts.partSpec); + } + + private List<FileStatus> applyConstraintsAndGetFiles(URI fromURI, Tree ast, boolean isLocal) + throws SemanticException { + + FileStatus[] srcs; + + // local mode implies that scheme should be "file" + // we can change this going forward + if (isLocal && !fromURI.getScheme().equals("file")) { + throw new SemanticException( + ErrorMsg.ILLEGAL_PATH.getMsg( + ast, + "Source file system should be \"file\" if \"local\" is specified")); + } + + try { + srcs = matchFilesOrDir(FileSystem.get(fromURI, conf), new Path(fromURI)); + if (srcs == null || srcs.length == 0) { + throw new SemanticException( + HiveParserErrorMsg.getMsg( + ErrorMsg.INVALID_PATH, ast, "No files matching path " + fromURI)); + } + + for (FileStatus oneSrc : srcs) { + if (oneSrc.isDir()) { + throw new SemanticException( + ErrorMsg.INVALID_PATH.getMsg( + ast, + "source contains directory: " + oneSrc.getPath().toString())); + } + } + } catch (IOException e) { + throw new SemanticException(HiveParserErrorMsg.getMsg(ErrorMsg.INVALID_PATH, ast), e); + } + + return Arrays.asList(srcs); + } + + public static FileStatus[] matchFilesOrDir(FileSystem fs, Path path) throws IOException { + FileStatus[] srcs = + fs.globStatus( + path, + p -> { + String name = p.getName(); + return name.equals(EximUtil.METADATA_NAME) + || !name.startsWith("_") && !name.startsWith("."); + }); + if ((srcs != null) && srcs.length == 1) { + if (srcs[0].isDir()) { + srcs = + fs.listStatus( + srcs[0].getPath(), + p -> { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + }); + } + } + return (srcs); + } + + private URI initializeFromURI(String fromPath, boolean isLocal) + throws IOException, URISyntaxException, SemanticException { + URI fromURI = new Path(fromPath).toUri(); + + String fromScheme = fromURI.getScheme(); + String fromAuthority = fromURI.getAuthority(); + String path = fromURI.getPath(); + + // generate absolute path relative to current directory or hdfs home + // directory + if (!path.startsWith("/")) { + if (isLocal) { + try { + path = + new String( + URLCodec.decodeUrl( + new Path(System.getProperty("user.dir"), fromPath) + .toUri() + .toString() + .getBytes(StandardCharsets.US_ASCII)), + StandardCharsets.US_ASCII); + } catch (DecoderException de) { + throw new SemanticException("URL Decode failed", de); + } + } else { + path = + new Path(new Path("/user/" + System.getProperty("user.name")), path) + .toString(); + } + } + + // set correct scheme and authority + if (StringUtils.isNullOrWhitespaceOnly(fromScheme)) { + if (isLocal) { + // file for local + fromScheme = "file"; + } else { + // use default values from fs.default.name + URI defaultURI = FileSystem.get(conf).getUri(); + fromScheme = defaultURI.getScheme(); + fromAuthority = defaultURI.getAuthority(); + } + } + + // if scheme is specified but not authority then use the default authority + if ((!fromScheme.equals("file")) && StringUtils.isNullOrWhitespaceOnly(fromAuthority)) { + URI defaultURI = FileSystem.get(conf).getUri(); + fromAuthority = defaultURI.getAuthority(); + } + + return new URI(fromScheme, fromAuthority, path, null, null); + } + + private void ensureFileFormatsMatch( + TableSpec ts, List<FileStatus> fileStatuses, final URI fromURI) + throws SemanticException { + final Class<? extends InputFormat> destInputFormat; + try { + if (ts.getPartSpec() == null || ts.getPartSpec().isEmpty()) { + destInputFormat = ts.tableHandle.getInputFormatClass(); + } else { + destInputFormat = ts.partHandle.getInputFormatClass(); + } + } catch (HiveException e) { + throw new SemanticException(e); + } + + try { + FileSystem fs = FileSystem.get(fromURI, conf); + boolean validFormat = + HiveFileFormatUtils.checkInputFormat(fs, conf, destInputFormat, fileStatuses); + if (!validFormat) { + throw new SemanticException(ErrorMsg.INVALID_FILE_FORMAT_IN_LOAD.getMsg()); + } + } catch (Exception e) { + throw new SemanticException( + "Unable to load data to destination table." + " Error: " + e.getMessage()); + } + } +} diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 2a8fe3c6868..0817af4ff4e 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import static org.apache.flink.table.planner.utils.TableTestUtil.readFromResource; @@ -66,6 +67,7 @@ public class HiveDialectQueryITCase { private static HiveCatalog hiveCatalog; private static TableEnvironment tableEnv; + private static String warehouse; @BeforeClass public static void setup() throws Exception { @@ -74,6 +76,7 @@ public class HiveDialectQueryITCase { hiveCatalog.getHiveConf().setVar(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT, "none"); hiveCatalog.open(); tableEnv = getTableEnvWithHiveCatalog(); + warehouse = hiveCatalog.getHiveConf().getVar(HiveConf.ConfVars.METASTOREWAREHOUSE); // create tables tableEnv.executeSql("create table foo (x int, y int)"); @@ -505,19 +508,14 @@ public class HiveDialectQueryITCase { try { // test explain transform String actualPlan = - (String) - CollectionUtil.iteratorToList( - tableEnv.executeSql( - "explain select transform(key, value)" - + " ROW FORMAT SERDE 'MySerDe'" - + " WITH SERDEPROPERTIES ('p1'='v1','p2'='v2')" - + " RECORDWRITER 'MyRecordWriter' " - + " using 'cat' as (cola int, value string)" - + " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" - + " RECORDREADER 'MyRecordReader' from src") - .collect()) - .get(0) - .getField(0); + explainSql( + "select transform(key, value)" + + " ROW FORMAT SERDE 'MySerDe'" + + " WITH SERDEPROPERTIES ('p1'='v1','p2'='v2')" + + " RECORDWRITER 'MyRecordWriter' " + + " using 'cat' as (cola int, value string)" + + " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + + " RECORDREADER 'MyRecordReader' from src"); assertThat(actualPlan).isEqualTo(readFromResource("/explain/testScriptTransform.out")); // transform using + specified schema @@ -587,13 +585,7 @@ public class HiveDialectQueryITCase { + " insert overwrite table t1 select id, name where age < 20" + " insert overwrite table t2 select id, name where age > 20"; // test explain - String actualPlan = - (String) - CollectionUtil.iteratorToList( - tableEnv.executeSql("explain " + multiInsertSql) - .collect()) - .get(0) - .getField(0); + String actualPlan = explainSql(multiInsertSql); assertThat(actualPlan).isEqualTo(readFromResource("/explain/testMultiInsert.out")); // test execution tableEnv.executeSql("insert into table t3 values (1, 'test1', 18 ), (2, 'test2', 28 )") @@ -673,6 +665,66 @@ public class HiveDialectQueryITCase { } } + @Test + public void testLoadData() throws Exception { + tableEnv.executeSql("create table tab1 (col1 int, col2 int) stored as orc"); + tableEnv.executeSql("create table tab2 (col1 int, col2 int) STORED AS ORC"); + tableEnv.executeSql( + "create table p_table(col1 int, col2 int) partitioned by (dateint int) row format delimited fields terminated by ','"); + try { + String testLoadCsvFilePath = + Objects.requireNonNull(getClass().getResource("/csv/test.csv")).toString(); + // test explain + String actualPlan = + explainSql( + String.format( + "load data local inpath '%s' overwrite into table p_table partition (dateint=2022) ", + testLoadCsvFilePath)); + assertThat(actualPlan) + .isEqualTo( + readFromResource("/explain/testLoadData.out") + .replace("$filepath", testLoadCsvFilePath)); + + // test load data into table + tableEnv.executeSql("insert into tab1 values (1, 1), (1, 2), (2, 1), (2, 2)").await(); + tableEnv.executeSql( + String.format( + "load data local inpath '%s' INTO TABLE tab2", warehouse + "/tab1")); + List<Row> result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from tab2").collect()); + assertThat(result.toString()).isEqualTo("[+I[1, 1], +I[1, 2], +I[2, 1], +I[2, 2]]"); + + // test load data overwrite + tableEnv.executeSql("insert into tab1 values (2, 1), (2, 2)").await(); + tableEnv.executeSql( + String.format( + "load data local inpath '%s' overwrite into table tab2", + warehouse + "/tab1")); + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from tab2").collect()); + assertThat(result.toString()).isEqualTo("[+I[2, 1], +I[2, 2]]"); + + // test load data into partition + tableEnv.executeSql( + String.format( + "load data inpath '%s' into table p_table partition (dateint=2022) ", + testLoadCsvFilePath)) + .await(); + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql("select * from p_table where dateint=2022") + .collect()); + assertThat(result.toString()) + .isEqualTo("[+I[1, 1, 2022], +I[2, 2, 2022], +I[3, 3, 2022]]"); + } finally { + tableEnv.executeSql("drop table tab1"); + tableEnv.executeSql("drop table tab2"); + tableEnv.executeSql("drop table p_table"); + } + } + private void runQFile(File qfile) throws Exception { QTest qTest = extractQTest(qfile); for (int i = 0; i < qTest.statements.size(); i++) { @@ -771,6 +823,13 @@ public class HiveDialectQueryITCase { } } + private String explainSql(String sql) { + return (String) + CollectionUtil.iteratorToList(tableEnv.executeSql("explain " + sql).collect()) + .get(0) + .getField(0); + } + private static TableEnvironment getTableEnvWithHiveCatalog() { TableEnvironment tableEnv = HiveTestUtils.createTableEnvInBatchMode(SqlDialect.HIVE); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); diff --git a/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out new file mode 100644 index 00000000000..3f750bc9800 --- /dev/null +++ b/flink-connectors/flink-connector-hive/src/test/resources/explain/testLoadData.out @@ -0,0 +1,8 @@ +== Abstract Syntax Tree == +LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], local=[true], partition=[dateint=2022]) + +== Optimized Physical Plan == +LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], local=[true], partition=[dateint=2022]) + +== Optimized Execution Plan == +LoadData(filepath=[$filepath], table=[default.p_table], overwrite=[true], local=[true], partition=[dateint=2022])