This is an automated email from the ASF dual-hosted git repository. ayushsaxena pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e6c6029439f HIVE-27245: Iceberg: Implement LOAD data for partitioned tables via Append API. (#4453). (Ayush Saxena, reviewed by Denys Kuzmenko) e6c6029439f is described below commit e6c6029439f831545fe5fa267452b662c3a71403 Author: Ayush Saxena <ayushsax...@apache.org> AuthorDate: Fri Jun 30 08:52:27 2023 +0530 HIVE-27245: Iceberg: Implement LOAD data for partitioned tables via Append API. (#4453). (Ayush Saxena, reviewed by Denys Kuzmenko) --- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 25 +++++- .../org/apache/iceberg/mr/hive/HiveTableUtil.java | 37 ++++++--- .../src/test/queries/positive/iceberg_load_data.q | 28 ++++++- .../test/results/positive/iceberg_load_data.q.out | 94 ++++++++++++++++++++++ .../org/apache/hadoop/hive/ql/exec/MoveTask.java | 2 +- .../hive/ql/metadata/HiveStorageHandler.java | 14 +++- .../hadoop/hive/ql/parse/BaseSemanticAnalyzer.java | 15 ++++ .../hadoop/hive/ql/parse/LoadSemanticAnalyzer.java | 6 +- .../apache/hadoop/hive/ql/plan/LoadTableDesc.java | 4 +- 9 files changed, 202 insertions(+), 23 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index dfb4500f811..761482a342d 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -362,16 +362,33 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H return null; } - public boolean supportsAppendData(org.apache.hadoop.hive.metastore.api.Table table) throws SemanticException { + public boolean supportsAppendData(org.apache.hadoop.hive.metastore.api.Table table, boolean withPartClause) + throws SemanticException { Table icebergTbl = IcebergTableUtil.getTable(conf, table); - return icebergTbl.spec().isUnpartitioned(); + if (icebergTbl.spec().isUnpartitioned()) { + return true; + } + // If it is a table which has undergone partition evolution, return false; + if (icebergTbl.currentSnapshot() != null) { + if (icebergTbl.currentSnapshot().allManifests(icebergTbl.io()).parallelStream() + .map(ManifestFile::partitionSpecId) + .anyMatch(id -> id < icebergTbl.spec().specId())) { + if (withPartClause) { + throw new SemanticException("Can not Load into an iceberg table, which has undergone partition evolution " + + "using the PARTITION clause"); + } + return false; + } + } + return withPartClause; } - public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fromURI, boolean isOverwrite) + public void appendFiles(org.apache.hadoop.hive.metastore.api.Table table, URI fromURI, boolean isOverwrite, + Map<String, String> partitionSpec) throws SemanticException { Table icebergTbl = IcebergTableUtil.getTable(conf, table); String format = table.getParameters().get(TableProperties.DEFAULT_FILE_FORMAT); - HiveTableUtil.appendFiles(fromURI, format, icebergTbl, isOverwrite, conf); + HiveTableUtil.appendFiles(fromURI, format, icebergTbl, isOverwrite, partitionSpec, conf); } @Override diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java index 6bc39e8f40e..a453e5ea723 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveTableUtil.java @@ -29,6 +29,7 @@ import java.net.URI; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -78,6 +79,8 @@ import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; import org.apache.iceberg.util.SerializationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,23 +185,37 @@ public class HiveTableUtil { return dataFiles; } - public static void appendFiles(URI fromURI, String format, Table icebergTbl, boolean isOverwrite, Configuration conf) - throws SemanticException { + public static void appendFiles(URI fromURI, String format, Table icebergTbl, boolean isOverwrite, + Map<String, String> partitionSpec, Configuration conf) throws SemanticException { try { Transaction transaction = icebergTbl.newTransaction(); if (isOverwrite) { DeleteFiles delete = transaction.newDelete(); - delete.deleteFromRowFilter(Expressions.alwaysTrue()); + if (partitionSpec != null) { + for (Map.Entry<String, String> part : partitionSpec.entrySet()) { + final Type partKeyType = icebergTbl.schema().findType(part.getKey()); + final Object partKeyVal = Conversions.fromPartitionString(partKeyType, part.getValue()); + delete.deleteFromRowFilter(Expressions.equal(part.getKey(), partKeyVal)); + } + } else { + delete.deleteFromRowFilter(Expressions.alwaysTrue()); + } delete.commit(); } - AppendFiles append = transaction.newAppend(); - PartitionSpec spec = icebergTbl.spec(); + MetricsConfig metricsConfig = MetricsConfig.fromProperties(icebergTbl.properties()); - String nameMappingString = icebergTbl.properties().get(TableProperties.DEFAULT_NAME_MAPPING); - NameMapping nameMapping = nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null; - RemoteIterator<LocatedFileStatus> filesIterator = HiveTableUtil.getFilesIterator(new Path(fromURI), conf); - List<DataFile> dataFiles = HiveTableUtil.getDataFiles(filesIterator, Collections.emptyMap(), - format == null ? IOConstants.PARQUET : format, spec, metricsConfig, nameMapping, conf); + PartitionSpec spec = icebergTbl.spec(); + String nameMappingStr = icebergTbl.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + NameMapping nameMapping = null; + if (nameMappingStr != null) { + nameMapping = NameMappingParser.fromJson(nameMappingStr); + } + AppendFiles append = transaction.newAppend(); + Map<String, String> actualPartitionSpec = Optional.ofNullable(partitionSpec).orElse(Collections.emptyMap()); + String actualFormat = Optional.ofNullable(format).orElse(IOConstants.PARQUET).toLowerCase(); + RemoteIterator<LocatedFileStatus> iterator = getFilesIterator(new Path(fromURI), conf); + List<DataFile> dataFiles = + getDataFiles(iterator, actualPartitionSpec, actualFormat, spec, metricsConfig, nameMapping, conf); dataFiles.forEach(append::appendFile); append.commit(); transaction.commitTransaction(); diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_load_data.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_load_data.q index 63e0f55d684..23c32bd2490 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_load_data.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_load_data.q @@ -57,4 +57,30 @@ LOAD DATA LOCAL INPATH '../../data/files/part.orc' INTO TABLE ice_orc; select * from ice_orc order by p_partkey; -select count(*) from ice_orc; \ No newline at end of file +select count(*) from ice_orc; + +create external table ice_parquet_partitioned ( + strcol string, + intcol integer +) partitioned by (pcol int) +stored by iceberg; + +insert into ice_parquet_partitioned values ('AA', 10, 100), ('BB', 20, 200), ('CC', 30, 300); + +select * from ice_parquet_partitioned order by intcol; + +explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE ice_parquet_partitioned +PARTITION (pcol='300'); + +LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE +ice_parquet_partitioned PARTITION (pcol='100'); + +select * from ice_parquet_partitioned order by intcol; + +explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE + ice_parquet_partitioned PARTITION (pcol='200'); + +LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE +ice_parquet_partitioned PARTITION (pcol='200'); + +select * from ice_parquet_partitioned order by intcol; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/positive/iceberg_load_data.q.out b/iceberg/iceberg-handler/src/test/results/positive/iceberg_load_data.q.out index 00ba9725619..95c429a944d 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/iceberg_load_data.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/iceberg_load_data.q.out @@ -406,3 +406,97 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc POSTHOOK: Output: hdfs://### HDFS PATH ### 52 +PREHOOK: query: create external table ice_parquet_partitioned ( + strcol string, + intcol integer +) partitioned by (pcol int) +stored by iceberg +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice_parquet_partitioned +POSTHOOK: query: create external table ice_parquet_partitioned ( + strcol string, + intcol integer +) partitioned by (pcol int) +stored by iceberg +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice_parquet_partitioned +PREHOOK: query: insert into ice_parquet_partitioned values ('AA', 10, 100), ('BB', 20, 200), ('CC', 30, 300) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_parquet_partitioned +POSTHOOK: query: insert into ice_parquet_partitioned values ('AA', 10, 100), ('BB', 20, 200), ('CC', 30, 300) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_parquet_partitioned +PREHOOK: query: select * from ice_parquet_partitioned order by intcol +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_parquet_partitioned +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice_parquet_partitioned order by intcol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_parquet_partitioned +POSTHOOK: Output: hdfs://### HDFS PATH ### +AA 10 100 +BB 20 200 +CC 30 300 +PREHOOK: query: explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE ice_parquet_partitioned +PARTITION (pcol='300') +PREHOOK: type: LOAD +POSTHOOK: query: explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE ice_parquet_partitioned +PARTITION (pcol='300') +POSTHOOK: type: LOAD +Stage-0 + Move Operator + table:{"name:":"default.ice_parquet_partitioned"} + +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE +ice_parquet_partitioned PARTITION (pcol='100') +PREHOOK: type: LOAD +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=100' INTO TABLE +ice_parquet_partitioned PARTITION (pcol='100') +POSTHOOK: type: LOAD +PREHOOK: query: select * from ice_parquet_partitioned order by intcol +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_parquet_partitioned +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice_parquet_partitioned order by intcol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_parquet_partitioned +POSTHOOK: Output: hdfs://### HDFS PATH ### +a 1 100 +b 2 100 +AA 10 100 +BB 20 200 +CC 30 300 +PREHOOK: query: explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE + ice_parquet_partitioned PARTITION (pcol='200') +PREHOOK: type: LOAD +POSTHOOK: query: explain LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE + ice_parquet_partitioned PARTITION (pcol='200') +POSTHOOK: type: LOAD +Stage-0 + Move Operator + table:{"name:":"default.ice_parquet_partitioned"} + +PREHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE +ice_parquet_partitioned PARTITION (pcol='200') +PREHOOK: type: LOAD +POSTHOOK: query: LOAD DATA LOCAL INPATH '../../data/files/parquet_partition/pcol=200' OVERWRITE INTO TABLE +ice_parquet_partitioned PARTITION (pcol='200') +POSTHOOK: type: LOAD +PREHOOK: query: select * from ice_parquet_partitioned order by intcol +PREHOOK: type: QUERY +PREHOOK: Input: default@ice_parquet_partitioned +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from ice_parquet_partitioned order by intcol +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice_parquet_partitioned +POSTHOOK: Output: hdfs://### HDFS PATH ### +a 1 100 +b 2 100 +c 3 200 +d 4 200 +AA 10 100 +CC 30 300 diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 6aadb955332..5d167695f92 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -1069,7 +1069,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { if (loadTableWork.isUseAppendForLoad()) { loadTableWork.getMdTable().getStorageHandler() .appendFiles(loadTableWork.getMdTable().getTTable(), loadTableWork.getSourcePath().toUri(), - loadTableWork.getLoadFileType() == LoadFileType.REPLACE_ALL); + loadTableWork.getLoadFileType() == LoadFileType.REPLACE_ALL, loadTableWork.getPartitionSpec()); return true; } // Get the info from the table data diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index 863b597fa4f..9ff2d3645c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -24,7 +24,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.classification.InterfaceAudience; import org.apache.hadoop.hive.common.classification.InterfaceStability; import org.apache.hadoop.hive.common.type.SnapshotContext; @@ -316,14 +315,23 @@ public interface HiveStorageHandler extends Configurable { /** * Checks whether the table supports appending data files to the table. * @param table the table + * @param withPartClause whether a partition is specified * @return true if the table can append files directly to the table * @throws SemanticException in case of any error. */ - default boolean supportsAppendData(Table table) throws SemanticException { + default boolean supportsAppendData(Table table, boolean withPartClause) throws SemanticException { return false; } - default void appendFiles(Table tbl, URI fromURI, boolean isOverwrite) + /** + * Appends files to the table + * @param tbl the table object. + * @param fromURI the source of files. + * @param isOverwrite whether to overwrite the existing table data. + * @param partitionSpec the partition spec. + * @throws SemanticException in case of any error + */ + default void appendFiles(Table tbl, URI fromURI, boolean isOverwrite, Map<String, String> partitionSpec) throws SemanticException { throw new SemanticException(ErrorMsg.LOAD_INTO_NON_NATIVE.getMsg()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java index a8d271f2b0d..65a81becdeb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java @@ -116,6 +116,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_LOAD_DATA_USE_NATIVE_API; import static org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.NullOrder.NULLS_FIRST; import static org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.NullOrder.NULLS_LAST; import static org.apache.hadoop.hive.ql.parse.PTFInvocationSpec.Order.ASC; @@ -1215,6 +1216,9 @@ public abstract class BaseSemanticAnalyzer { ast.getChild(childIndex), e.getMessage()), e); } + boolean isUseNativeLoadApi = checkUseNativeApi(conf, ast); + allowDynamicPartitionsSpec &= !isUseNativeLoadApi; + // get partition metadata if partition specified if (ast.getChildCount() == 2 && ast.getToken().getType() != HiveParser.TOK_CREATETABLE && ast.getToken().getType() != HiveParser.TOK_CREATE_MATERIALIZED_VIEW && @@ -1241,6 +1245,12 @@ public abstract class BaseSemanticAnalyzer { tmpPartSpec.put(colName, val); } + // Return here in case of native load api, as for now the iceberg tables are considered unpartitioned in HMS + if (isUseNativeLoadApi) { + partSpec = tmpPartSpec; + return; + } + // check if the columns, as well as value types in the partition() clause are valid validatePartSpec(tableHandle, tmpPartSpec, ast, conf, false); @@ -1321,6 +1331,11 @@ public abstract class BaseSemanticAnalyzer { } } + private boolean checkUseNativeApi(HiveConf conf, ASTNode ast) { + boolean isLoad = ast.getParent() != null && ast.getParent().getType() == HiveParser.TOK_LOAD; + return isLoad && tableHandle.isNonNative() && conf.getBoolVar(HIVE_LOAD_DATA_USE_NATIVE_API); + } + public TableName getTableName() { return tableName; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java index a172813a291..3a18b7af729 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java @@ -300,9 +300,11 @@ public class LoadSemanticAnalyzer extends SemanticAnalyzer { if (ts.tableHandle.isNonNative()) { HiveStorageHandler storageHandler = ts.tableHandle.getStorageHandler(); boolean isUseNativeApi = conf.getBoolVar(HIVE_LOAD_DATA_USE_NATIVE_API); - if (isUseNativeApi && storageHandler.supportsAppendData(ts.tableHandle.getTTable())) { + boolean supportAppend = isUseNativeApi && storageHandler.supportsAppendData(ts.tableHandle.getTTable(), + ts.getPartSpec() != null && !ts.getPartSpec().isEmpty()); + if (supportAppend) { LoadTableDesc loadTableWork = - new LoadTableDesc(new Path(fromURI), ts.tableHandle, isOverWrite, true, isOverWrite); + new LoadTableDesc(new Path(fromURI), ts.tableHandle, isOverWrite, true, ts.getPartSpec()); Task<?> childTask = TaskFactory.get(new MoveWork(getInputs(), getOutputs(), loadTableWork, null, true, isLocal)); rootTasks.add(childTask); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index ca45779c389..2a9772d029c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -160,13 +160,13 @@ public class LoadTableDesc extends LoadDesc implements Serializable { } public LoadTableDesc(Path path, Table tableHandle, boolean isOverWrite, boolean useAppendForLoad, - boolean isInsertOverwrite) { + Map<String, String> partitionSpec) { super(path, AcidUtils.Operation.NOT_ACID); this.mdTable = tableHandle; this.useAppendForLoad = useAppendForLoad; this.loadFileType = isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; this.table = Utilities.getTableDesc(tableHandle); - this.isInsertOverwrite = isInsertOverwrite; + this.partitionSpec = partitionSpec; } public boolean isUseAppendForLoad() {