This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new aeda0627030 HIVE-28935: Iceberg: fix partition filtering condition in
compaction query (#5792)
aeda0627030 is described below
commit aeda0627030ec0bb7d299e38a08eb29860b51173
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Thu May 22 15:37:40 2025 -0400
HIVE-28935: Iceberg: fix partition filtering condition in compaction query
(#5792)
* HIVE-28935: Iceberg: fix partition filtering condition in compaction
query.
---------
---
.../org/apache/iceberg/hive/HiveSchemaUtil.java | 2 +-
.../iceberg/mr/hive/HiveIcebergFilterFactory.java | 26 ++
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 38 +--
.../mr/hive/compaction/IcebergQueryCompactor.java | 97 ++++++--
.../positive/iceberg_minor_compaction_bucket.q | 64 +++++
.../llap/iceberg_minor_compaction_bucket.q.out | 265 +++++++++++++++++++++
.../txn/compactor/TestIcebergCompactorOnTez.java | 86 +++++++
.../test/resources/testconfiguration.properties | 1 +
8 files changed, 537 insertions(+), 42 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 8ab320a4cdb..32eb2519ba7 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -281,7 +281,7 @@ void addCommentChanged(FieldSchema field) {
}
- private static String convertToTypeString(Type type) {
+ public static String convertToTypeString(Type type) {
switch (type.typeId()) {
case BOOLEAN:
return "boolean";
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 9e64c785d22..5e78400d3fa 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -228,6 +228,32 @@ private static Object convertLiteral(Object literal,
TransformSpec transform) {
}
}
+ public static Object convertPartitionLiteral(Object literal, TransformSpec
transform) {
+ if (transform == null) {
+ return literal;
+ }
+ try {
+ switch (transform.getTransformType()) {
+ case YEAR:
+ return parseYearToTransformYear(literal.toString());
+ case MONTH:
+ return parseMonthToTransformMonth(literal.toString());
+ case HOUR:
+ return parseHourToTransformHour(literal.toString());
+ case DAY:
+ case TRUNCATE:
+ case BUCKET:
+ case IDENTITY:
+ return literal;
+ default:
+ throw new UnsupportedOperationException("Unknown transform: " +
transform.getTransformType());
+ }
+ } catch (NumberFormatException | DateTimeException | IllegalStateException
e) {
+ throw new RuntimeException(
+ String.format("Unable to parse value '%s' as '%s' transform value",
literal.toString(), transform));
+ }
+ }
+
private static final int ICEBERG_EPOCH_YEAR = 1970;
private static final int ICEBERG_EPOCH_MONTH = 1;
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index c2beca41ede..79829c177f0 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -36,12 +36,15 @@
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -78,6 +81,7 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -520,25 +524,25 @@ public static List<String> getPartitionNames(Table table,
Map<String, String> pa
}
}
- public static long getPartitionHash(Table icebergTable, String
partitionPath) throws IOException {
- PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
- .createMetadataTableInstance(icebergTable,
MetadataTableType.PARTITIONS);
- try (CloseableIterable<FileScanTask> fileScanTasks =
partitionsTable.newScan().planFiles()) {
- return FluentIterable.from(fileScanTasks)
- .transformAndConcat(task -> task.asDataTask().rows())
- .transform(row -> {
- StructProjection data = row.get(IcebergTableUtil.PART_IDX,
StructProjection.class);
- PartitionSpec spec =
icebergTable.specs().get(row.get(IcebergTableUtil.SPEC_IDX, Integer.class));
- PartitionData partitionData =
IcebergTableUtil.toPartitionData(data,
- Partitioning.partitionType(icebergTable),
spec.partitionType());
- String path = spec.partitionToPath(partitionData);
- return Maps.immutableEntry(path, data);
- })
- .filter(e -> e.getKey().equals(partitionPath))
- .transform(e -> IcebergAcidUtil.computeHash(e.getValue()))
- .get(0);
+ public static PartitionSpec getPartitionSpec(Table icebergTable, String
partitionPath)
+ throws MetaException, HiveException {
+ if (icebergTable == null || partitionPath == null ||
partitionPath.isEmpty()) {
+ throw new HiveException("Table and partitionPath must not be null or
empty.");
}
+
+ // Extract field names from the path: "field1=val1/field2=val2" → [field1,
field2]
+ List<String> fieldNames =
Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());
+
+ return icebergTable.specs().values().stream()
+ .filter(spec -> {
+ List<String> specFieldNames = spec.fields().stream()
+ .map(PartitionField::name)
+ .collect(Collectors.toList());
+ return specFieldNames.equals(fieldNames);
+ })
+ .findFirst() // Supposed to be only one matching spec
+ .orElseThrow(() -> new HiveException("No matching partition spec found
for partition path: " + partitionPath));
}
public static TransformSpec getTransformSpec(Table table, String
transformName, int sourceId) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
index 2eff042614e..9c4a73cc7d0 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
@@ -20,13 +20,16 @@
package org.apache.iceberg.mr.hive.compaction;
import java.io.IOException;
-import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.Context.RewritePolicy;
import org.apache.hadoop.hive.ql.DriverUtils;
@@ -35,13 +38,21 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.hive.HiveIcebergFilterFactory;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,13 +69,33 @@ public boolean run(CompactorContext context) throws
IOException, HiveException,
HiveConf conf = new HiveConf(context.getConf());
CompactionInfo ci = context.getCompactionInfo();
- String partSpec = ci.partName;
+ String compactionQuery = buildCompactionQuery(context, compactTableName,
conf);
+
+ SessionState sessionState = setupQueryCompactionSession(conf, ci,
tblProperties);
+ String compactionTarget = "table " +
HiveUtils.unparseIdentifier(compactTableName) +
+ (ci.partName != null ? ", partition " +
HiveUtils.unparseIdentifier(ci.partName) : "");
+
+ try {
+ DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
+ LOG.info("Completed compaction for {}", compactionTarget);
+ return true;
+ } catch (HiveException e) {
+ LOG.error("Failed compacting {}", compactionTarget, e);
+ throw e;
+ } finally {
+ sessionState.setCompaction(false);
+ }
+ }
+
+ private String buildCompactionQuery(CompactorContext context, String
compactTableName, HiveConf conf)
+ throws HiveException {
+ CompactionInfo ci = context.getCompactionInfo();
org.apache.hadoop.hive.ql.metadata.Table table =
Hive.get(conf).getTable(context.getTable().getDbName(),
context.getTable().getTableName());
Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
- String compactionQuery;
String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
String fileSizePredicate = null;
+ String compactionQuery;
if (ci.type == CompactionType.MINOR) {
long fileSizeInBytesThreshold =
CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
@@ -76,7 +107,7 @@ public boolean run(CompactorContext context) throws
IOException, HiveException,
conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
}
- if (partSpec == null) {
+ if (ci.partName == null) {
if (!icebergTable.spec().isPartitioned()) {
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.FULL_TABLE.name());
compactionQuery = String.format("insert overwrite table %s select *
from %<s %2$s %3$s", compactTableName,
@@ -96,31 +127,49 @@ public boolean run(CompactorContext context) throws
IOException, HiveException,
throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
}
} else {
- long partitionHash = IcebergTableUtil.getPartitionHash(icebergTable,
partSpec);
+ HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
+ conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
HiveConf.setVar(conf, ConfVars.REWRITE_POLICY,
RewritePolicy.PARTITION.name());
- conf.set(IcebergCompactionService.PARTITION_PATH, new
Path(partSpec).toString());
+ conf.set(IcebergCompactionService.PARTITION_PATH, new
Path(ci.partName).toString());
- Map<String, String> partSpecMap = new LinkedHashMap<>();
- Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
+ PartitionSpec spec;
+ String partitionPredicate;
+ try {
+ spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
+ partitionPredicate = buildPartitionPredicate(ci, spec);
+ } catch (MetaException e) {
+ throw new HiveException(e);
+ }
- compactionQuery = String.format("insert overwrite table %1$s select *
from %1$s where %2$s=%3$d " +
- "and %4$s is not null %5$s %6$s", compactTableName,
VirtualColumn.PARTITION_HASH.getName(), partitionHash,
- VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" :
"and " + fileSizePredicate, orderBy);
+ compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT *
FROM %1$s WHERE %2$s IN " +
+ "(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d)
%5$s %6$s",
+ compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate,
spec.specId(),
+ fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
}
+ return compactionQuery;
+ }
- SessionState sessionState = setupQueryCompactionSession(conf, ci,
tblProperties);
- String compactionTarget = "table " +
HiveUtils.unparseIdentifier(compactTableName) +
- (partSpec != null ? ", partition " +
HiveUtils.unparseIdentifier(partSpec) : "");
+ private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec
spec) throws MetaException {
+ Map<String, String> partSpecMap = Warehouse.makeSpecFromName(ci.partName);
+ Map<String, PartitionField> partitionFieldMap = spec.fields().stream()
+ .collect(Collectors.toMap(PartitionField::name, Function.identity()));
- try {
- DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
- LOG.info("Completed compaction for {}", compactionTarget);
- return true;
- } catch (HiveException e) {
- LOG.error("Failed compacting {}", compactionTarget, e);
- throw e;
- } finally {
- sessionState.setCompaction(false);
- }
+ Types.StructType partitionType = spec.partitionType();
+ return partitionType.fields().stream().map(field -> {
+ String value = partSpecMap.get(field.name());
+ String literal = "NULL";
+
+ if (value != null && !value.equals("null")) {
+ String type = HiveSchemaUtil.convertToTypeString(field.type());
+ PartitionField partitionField = partitionFieldMap.get(field.name());
+ TransformSpec transformSpec =
TransformSpec.fromString(partitionField.transform().toString(), field.name());
+ literal = TypeInfoUtils.convertStringToLiteralForSQL(
+ HiveIcebergFilterFactory.convertPartitionLiteral(value,
transformSpec).toString(),
+ ((PrimitiveTypeInfo)
TypeInfoUtils.getTypeInfoFromTypeString(type)).getPrimitiveCategory());
+ }
+
+ return String.format("`partition`.%s %s %s",
HiveUtils.unparseIdentifier(field.name()),
+ Objects.equals(literal, "NULL") ? "IS" : "=", literal);
+ }).collect(Collectors.joining(" AND "));
}
}
diff --git
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
new file mode 100644
index 00000000000..20c5320a2e7
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
@@ -0,0 +1,64 @@
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--!
qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--!
qt:replace:/(MINOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--!
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+set hive.explain.user=true;
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=false;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1');
+
+INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null);
+
+ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key));
+
+INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null);
+
+desc formatted default.srcbucket_big;
+SELECT * FROM default.srcbucket_big ORDER BY id;
+
+select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count;
+
+alter table srcbucket_big compact 'minor' and wait;
+show compactions order by 'partition';
+
+desc formatted default.srcbucket_big;
+SELECT * FROM default.srcbucket_big ORDER BY id;
+
+select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count;
diff --git
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
new file mode 100644
index 00000000000..137d435d86b
--- /dev/null
+++
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
@@ -0,0 +1,265 @@
+PREHOOK: query: CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key))
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key))
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@srcbucket_big
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: desc formatted default.srcbucket_big
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: desc formatted default.srcbucket_big
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@srcbucket_big
+# col_name data_type comment
+id string
+key int
+value string
+
+# Partition Transform Information
+# col_name transform_type
+key BUCKET[8]
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Retention: 0
+#### A masked pattern was here ####
+Table Type: EXTERNAL_TABLE
+Table Parameters:
+ COLUMN_STATS_ACCURATE
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"key\":\"true\",\"value\":\"true\"}}
+ EXTERNAL TRUE
+ bucketing_version 2
+ compactor.threshold.min.input.files 1
+ current-schema
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"key\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]}
+ current-snapshot-id #Masked#
+ current-snapshot-summary
{\"added-data-files\":\"4\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"4\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"7\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
+ current-snapshot-timestamp-ms #Masked#
+ default-partition-spec
{\"spec-id\":1,\"fields\":[{\"name\":\"key_bucket_8\",\"transform\":\"bucket[8]\",\"source-id\":2,\"field-id\":1001}]}
+ format-version 2
+ iceberg.orc.files.only false
+#### A masked pattern was here ####
+ numFiles 7
+ numPartitions 6
+ numRows 12
+ parquet.compression zstd
+#### A masked pattern was here ####
+ rawDataSize 0
+ serialization.format 1
+ snapshot-count 2
+ storage_handler
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ table_type ICEBERG
+ totalSize #Masked#
+#### A masked pattern was here ####
+ uuid #Masked#
+ write.delete.mode merge-on-read
+ write.merge.mode merge-on-read
+ write.update.mode merge-on-read
+
+# Storage Information
+SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+Compressed: No
+Sort Columns: []
+PREHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+a 101 val_101
+b NULL val_102
+c 103 val_103
+d 104 NULL
+e 105 val_105
+f NULL NULL
+g 101 val_101
+h NULL val_102
+i 103 val_103
+j 104 NULL
+k 105 val_105
+l NULL NULL
+PREHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+{"key_bucket":0,"key_bucket_8":null} 0 3
+{"key_bucket":3,"key_bucket_8":null} 0 1
+{"key_bucket":null,"key_bucket_8":0} 1 1
+{"key_bucket":null,"key_bucket_8":3} 1 1
+{"key_bucket":null,"key_bucket_8":4} 1 2
+{"key_bucket":null,"key_bucket_8":null} 1 4
+PREHOOK: query: alter table srcbucket_big compact 'minor' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@srcbucket_big
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: alter table srcbucket_big compact 'minor' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@srcbucket_big
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: show compactions order by 'partition'
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions order by 'partition'
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId Database Table Partition Type State Worker
host Worker Enqueue Time Start Time Duration(ms) HadoopJobId
Error message Initiator host Initiator Pool name TxnId Next
TxnId Commit Time Highest WriteId
+#Masked# default srcbucket_big key_bucket_8=0 MINOR succeeded
#Masked# manual default 0 0 0 ---
+#Masked# default srcbucket_big key_bucket_8=3 MINOR succeeded
#Masked# manual default 0 0 0 ---
+#Masked# default srcbucket_big key_bucket_8=4 MINOR succeeded
#Masked# manual default 0 0 0 ---
+#Masked# default srcbucket_big key_bucket_8=null MINOR
succeeded #Masked# manual default 0 0 0 ---
+#Masked# default srcbucket_big --- MINOR succeeded
#Masked# manual default 0 0 0 ---
+PREHOOK: query: desc formatted default.srcbucket_big
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: desc formatted default.srcbucket_big
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@srcbucket_big
+# col_name data_type comment
+id string
+key int
+value string
+
+# Partition Transform Information
+# col_name transform_type
+key BUCKET[8]
+
+# Detailed Table Information
+Database: default
+#### A masked pattern was here ####
+Retention: 0
+#### A masked pattern was here ####
+Table Type: EXTERNAL_TABLE
+Table Parameters:
+ COLUMN_STATS_ACCURATE
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"key\":\"true\",\"value\":\"true\"}}
+ EXTERNAL TRUE
+ bucketing_version 2
+ compactor.threshold.min.input.files 1
+ current-schema
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"key\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]}
+ current-snapshot-id #Masked#
+ current-snapshot-summary
{\"added-data-files\":\"4\",\"deleted-data-files\":\"3\",\"added-records\":\"6\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"7\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
+ current-snapshot-timestamp-ms #Masked#
+ default-partition-spec
{\"spec-id\":1,\"fields\":[{\"name\":\"key_bucket_8\",\"transform\":\"bucket[8]\",\"source-id\":2,\"field-id\":1001}]}
+ format-version 2
+ iceberg.orc.files.only false
+#### A masked pattern was here ####
+ numFiles 8
+ numPartitions 4
+ numRows 12
+ parquet.compression zstd
+#### A masked pattern was here ####
+ rawDataSize 0
+ serialization.format 1
+ snapshot-count 7
+ storage_handler
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+ table_type ICEBERG
+ totalSize #Masked#
+#### A masked pattern was here ####
+ uuid #Masked#
+ write.delete.mode merge-on-read
+ write.merge.mode merge-on-read
+ write.update.mode merge-on-read
+
+# Storage Information
+SerDe Library: org.apache.iceberg.mr.hive.HiveIcebergSerDe
+InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
+OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
+Compressed: No
+Sort Columns: []
+PREHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+a 101 val_101
+b NULL val_102
+c 103 val_103
+d 104 NULL
+e 105 val_105
+f NULL NULL
+g 101 val_101
+h NULL val_102
+i 103 val_103
+j 104 NULL
+k 105 val_105
+l NULL NULL
+PREHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+{"key_bucket":null,"key_bucket_8":0} 1 2
+{"key_bucket":null,"key_bucket_8":3} 1 2
+{"key_bucket":null,"key_bucket_8":4} 1 4
+{"key_bucket":null,"key_bucket_8":null} 1 4
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
new file mode 100644
index 00000000000..c885423688e
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+public class TestIcebergCompactorOnTez extends CompactorOnTezTest {
+
+ @Test
+ public void testIcebergCompactorWithAllPartitionFieldTypes() throws
Exception{
+ conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE,
CUSTOM_COMPACTION_QUEUE);
+ msClient = new HiveMetaStoreClient(conf);
+
+ String dbName = "default";
+ String tableName = "ice_orc";
+ String qualifiedTableName = dbName + "." + tableName;
+
+ executeStatementOnDriver("drop table if exists " + qualifiedTableName,
driver);
+ executeStatementOnDriver(String.format("create table %s " +
+ "(id int, a string, b int, c bigint, d float, e double, f decimal(4,
2), g boolean, h date, i date, j date, k timestamp) " +
+ "partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f,
g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " +
+ "tblproperties ('compactor.threshold.min.input.files'='1')",
qualifiedTableName), driver);
+
+ // 6 records, one records per file --> 3 partitions, 2 files per partition
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null,
null, null, null, null, null, null, null, null, null, null)",
qualifiedTableName), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null,
null, null, null, null, null, null, null, null, null, null)",
qualifiedTableName), driver);
+
+ Assert.assertEquals(6, getFilesCount(qualifiedTableName));
+ List<String> recordsBefore = getAllRecords(qualifiedTableName);
+
+ CompactorTestUtil.runCompaction(conf, dbName, tableName,
CompactionType.MINOR, false,
+
"a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00/k_hour=2024-05-02-10",
+
"a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00/k_hour=2024-05-04-13",
+
"a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null"
+ );
+
+ Assert.assertEquals(3, getFilesCount(qualifiedTableName));
+ verifySuccessfulCompaction(3);
+ List<String> recordsAfter = getAllRecords(qualifiedTableName);
+
+ Assert.assertEquals(recordsBefore, recordsAfter);
+ }
+
+ private int getFilesCount(String qualifiedTableName) throws Exception {
+ driver.run(String.format("select count(*) from %s.files",
qualifiedTableName));
+ List<String> res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ return Integer.parseInt(res.get(0));
+ }
+
+ private List<String> getAllRecords(String qualifiedTableName) throws
Exception {
+ driver.run(String.format("select * from %s order by id",
qualifiedTableName));
+ List<String> res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ return res;
+ }
+}
diff --git a/itests/src/test/resources/testconfiguration.properties
b/itests/src/test/resources/testconfiguration.properties
index 9a92b096c6c..f08d0721895 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -445,6 +445,7 @@ iceberg.llap.query.compactor.files=\
iceberg_major_compaction_unpartitioned.q,\
iceberg_major_compaction_unpartitioned_ordered.q,\
iceberg_major_compaction_unpartitioned_w_filter.q,\
+ iceberg_minor_compaction_bucket.q,\
iceberg_minor_compaction_partition_evolution.q,\
iceberg_minor_compaction_unpartitioned.q