This is an automated email from the ASF dual-hosted git repository. mbod pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e8effb5 HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary) e8effb5 is described below commit e8effb585db81edd809d1cb783ce22b079caa264 Author: Marton Bod <m...@cloudera.com> AuthorDate: Fri Jan 7 16:34:04 2022 +0100 HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary) --- .../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 13 +++++++++++++ .../java/org/apache/iceberg/mr/hive/IcebergTableUtil.java | 4 ++++ .../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 15 +++++++++++++++ .../hadoop/hive/ql/metadata/HiveStorageHandler.java | 11 +++++++++++ .../org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 ++++++++++++ 5 files changed, 55 insertions(+) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 60f9d42..676cb6a 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -48,11 +48,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; import org.apache.hadoop.hive.ql.session.SessionState; @@ -369,6 +371,17 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H return new URI(ICEBERG_URI_PREFIX + table.location()); } + @Override + public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException { + HiveStorageHandler.super.validateSinkDesc(sinkDesc); + if (sinkDesc.getInsertOverwrite()) { + Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties()); + if (IcebergTableUtil.isBucketed(table)) { + throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table."); + } + } + } + private void setCommonJobConf(JobConf jobConf) { jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids"); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index d201099..9a1f316 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -194,4 +194,8 @@ public class IcebergTableUtil { updatePartitionSpec.commit(); } + + public static boolean isBucketed(Table table) { + return table.spec().fields().stream().anyMatch(f -> f.transform().toString().startsWith("bucket[")); + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java index 5222361..9b8def1 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java @@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -169,6 +170,20 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB HiveIcebergTestUtils.validateData(table, expected, 0); } + @Test + public void testInsertOverwriteBucketPartitionedTableThrowsError() { + TableIdentifier target = TableIdentifier.of("default", "target"); + PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + .bucket("last_name", 16).identity("customer_id").build(); + testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + spec, fileFormat, ImmutableList.of()); + + AssertHelpers.assertThrows("IOW should not work on bucket partitioned table", IllegalArgumentException.class, + "Cannot perform insert overwrite query on bucket partitioned Iceberg table", + () -> shell.executeStatement( + testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, target, true))); + } + /** * Testing map-reduce inserts. * @throws IOException If there is an underlying IOException diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java index b2f5a1a..0e38574 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java @@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec; +import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; @@ -352,4 +354,13 @@ public interface HiveStorageHandler extends Configurable { return new URI(this.getClass().getSimpleName().toLowerCase() + "://" + HiveCustomStorageHandlerUtils.getTablePropsForCustomStorageHandler(tableProperties)); } + + /** + * Validates whether the sink operation is permitted for the specific storage handler, based + * on information contained in the sinkDesc. + * @param sinkDesc The sink descriptor + * @throws SemanticException if the sink operation is not allowed + */ + default void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException { + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bbf1258..d1d5ee3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -12587,6 +12587,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } + // validate if this sink operation is allowed for non-native tables + if (sinkOp instanceof FileSinkOperator) { + FileSinkOperator fileSinkOperator = (FileSinkOperator) sinkOp; + Optional<HiveStorageHandler> handler = Optional.ofNullable(fileSinkOperator) + .map(FileSinkOperator::getConf) + .map(FileSinkDesc::getTable) + .map(Table::getStorageHandler); + if (handler.isPresent()) { + handler.get().validateSinkDesc(fileSinkOperator.getConf()); + } + } + // Check query results cache // In the case that row or column masking/filtering was required, we do not support caching. // TODO: Enable caching for queries with masking/filtering