This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new aeda0627030 HIVE-28935: Iceberg: fix partition filtering condition in 
compaction query (#5792)
aeda0627030 is described below

commit aeda0627030ec0bb7d299e38a08eb29860b51173
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Thu May 22 15:37:40 2025 -0400

    HIVE-28935: Iceberg: fix partition filtering condition in compaction query 
(#5792)
    
    * HIVE-28935: Iceberg: fix partition filtering condition in compaction 
query.
    
    ---------
---
 .../org/apache/iceberg/hive/HiveSchemaUtil.java    |   2 +-
 .../iceberg/mr/hive/HiveIcebergFilterFactory.java  |  26 ++
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  38 +--
 .../mr/hive/compaction/IcebergQueryCompactor.java  |  97 ++++++--
 .../positive/iceberg_minor_compaction_bucket.q     |  64 +++++
 .../llap/iceberg_minor_compaction_bucket.q.out     | 265 +++++++++++++++++++++
 .../txn/compactor/TestIcebergCompactorOnTez.java   |  86 +++++++
 .../test/resources/testconfiguration.properties    |   1 +
 8 files changed, 537 insertions(+), 42 deletions(-)

diff --git 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
index 8ab320a4cdb..32eb2519ba7 100644
--- 
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
+++ 
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveSchemaUtil.java
@@ -281,7 +281,7 @@ void addCommentChanged(FieldSchema field) {
   }
 
 
-  private static String convertToTypeString(Type type) {
+  public static String convertToTypeString(Type type) {
     switch (type.typeId()) {
       case BOOLEAN:
         return "boolean";
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
index 9e64c785d22..5e78400d3fa 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java
@@ -228,6 +228,32 @@ private static Object convertLiteral(Object literal, 
TransformSpec transform) {
     }
   }
 
+  public static Object convertPartitionLiteral(Object literal, TransformSpec 
transform) {
+    if (transform == null) {
+      return literal;
+    }
+    try {
+      switch (transform.getTransformType()) {
+        case YEAR:
+          return parseYearToTransformYear(literal.toString());
+        case MONTH:
+          return parseMonthToTransformMonth(literal.toString());
+        case HOUR:
+          return parseHourToTransformHour(literal.toString());
+        case DAY:
+        case TRUNCATE:
+        case BUCKET:
+        case IDENTITY:
+          return literal;
+        default:
+          throw new UnsupportedOperationException("Unknown transform: " + 
transform.getTransformType());
+      }
+    } catch (NumberFormatException | DateTimeException | IllegalStateException 
e) {
+      throw new RuntimeException(
+          String.format("Unable to parse value '%s' as '%s' transform value", 
literal.toString(), transform));
+    }
+  }
+
   private static final int ICEBERG_EPOCH_YEAR = 1970;
   private static final int ICEBERG_EPOCH_MONTH = 1;
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index c2beca41ede..79829c177f0 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -36,12 +36,15 @@
 import org.apache.hadoop.hive.common.type.TimestampTZ;
 import org.apache.hadoop.hive.common.type.TimestampTZUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -78,6 +81,7 @@
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
@@ -520,25 +524,25 @@ public static List<String> getPartitionNames(Table table, 
Map<String, String> pa
     }
   }
 
-  public static long getPartitionHash(Table icebergTable, String 
partitionPath) throws IOException {
-    PartitionsTable partitionsTable = (PartitionsTable) MetadataTableUtils
-        .createMetadataTableInstance(icebergTable, 
MetadataTableType.PARTITIONS);
 
-    try (CloseableIterable<FileScanTask> fileScanTasks = 
partitionsTable.newScan().planFiles()) {
-      return FluentIterable.from(fileScanTasks)
-          .transformAndConcat(task -> task.asDataTask().rows())
-          .transform(row -> {
-            StructProjection data = row.get(IcebergTableUtil.PART_IDX, 
StructProjection.class);
-            PartitionSpec spec = 
icebergTable.specs().get(row.get(IcebergTableUtil.SPEC_IDX, Integer.class));
-            PartitionData partitionData = 
IcebergTableUtil.toPartitionData(data,
-                Partitioning.partitionType(icebergTable), 
spec.partitionType());
-            String path = spec.partitionToPath(partitionData);
-            return Maps.immutableEntry(path, data);
-          })
-          .filter(e -> e.getKey().equals(partitionPath))
-          .transform(e -> IcebergAcidUtil.computeHash(e.getValue()))
-          .get(0);
+  public static PartitionSpec getPartitionSpec(Table icebergTable, String 
partitionPath)
+      throws MetaException, HiveException {
+    if (icebergTable == null || partitionPath == null || 
partitionPath.isEmpty()) {
+      throw new HiveException("Table and partitionPath must not be null or 
empty.");
     }
+
+    // Extract field names from the path: "field1=val1/field2=val2" → [field1, 
field2]
+    List<String> fieldNames = 
Lists.newArrayList(Warehouse.makeSpecFromName(partitionPath).keySet());
+
+    return icebergTable.specs().values().stream()
+        .filter(spec -> {
+          List<String> specFieldNames = spec.fields().stream()
+              .map(PartitionField::name)
+              .collect(Collectors.toList());
+          return specFieldNames.equals(fieldNames);
+        })
+        .findFirst() // Supposed to be only one matching spec
+        .orElseThrow(() -> new HiveException("No matching partition spec found 
for partition path: " + partitionPath));
   }
 
   public static TransformSpec getTransformSpec(Table table, String 
transformName, int sourceId) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
index 2eff042614e..9c4a73cc7d0 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergQueryCompactor.java
@@ -20,13 +20,16 @@
 package org.apache.iceberg.mr.hive.compaction;
 
 import java.io.IOException;
-import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.ql.Context.RewritePolicy;
 import org.apache.hadoop.hive.ql.DriverUtils;
@@ -35,13 +38,21 @@
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorContext;
 import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.hive.HiveSchemaUtil;
+import org.apache.iceberg.mr.hive.HiveIcebergFilterFactory;
 import org.apache.iceberg.mr.hive.IcebergTableUtil;
 import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.types.Types;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,13 +69,33 @@ public boolean run(CompactorContext context) throws 
IOException, HiveException,
 
     HiveConf conf = new HiveConf(context.getConf());
     CompactionInfo ci = context.getCompactionInfo();
-    String partSpec = ci.partName;
+    String compactionQuery = buildCompactionQuery(context, compactTableName, 
conf);
+
+    SessionState sessionState = setupQueryCompactionSession(conf, ci, 
tblProperties);
+    String compactionTarget = "table " + 
HiveUtils.unparseIdentifier(compactTableName) +
+        (ci.partName != null ? ", partition " + 
HiveUtils.unparseIdentifier(ci.partName) : "");
+
+    try {
+      DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
+      LOG.info("Completed compaction for {}", compactionTarget);
+      return true;
+    } catch (HiveException e) {
+      LOG.error("Failed compacting {}", compactionTarget, e);
+      throw e;
+    } finally {
+      sessionState.setCompaction(false);
+    }
+  }
+
+  private String buildCompactionQuery(CompactorContext context, String 
compactTableName, HiveConf conf)
+      throws HiveException {
+    CompactionInfo ci = context.getCompactionInfo();
     org.apache.hadoop.hive.ql.metadata.Table table = 
Hive.get(conf).getTable(context.getTable().getDbName(),
         context.getTable().getTableName());
     Table icebergTable = IcebergTableUtil.getTable(conf, table.getTTable());
-    String compactionQuery;
     String orderBy = ci.orderByClause == null ? "" : ci.orderByClause;
     String fileSizePredicate = null;
+    String compactionQuery;
 
     if (ci.type == CompactionType.MINOR) {
       long fileSizeInBytesThreshold = 
CompactionEvaluator.getFragmentSizeBytes(table.getParameters());
@@ -76,7 +107,7 @@ public boolean run(CompactorContext context) throws 
IOException, HiveException,
       conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
     }
 
-    if (partSpec == null) {
+    if (ci.partName == null) {
       if (!icebergTable.spec().isPartitioned()) {
         HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, 
RewritePolicy.FULL_TABLE.name());
         compactionQuery = String.format("insert overwrite table %s select * 
from %<s %2$s %3$s", compactTableName,
@@ -96,31 +127,49 @@ public boolean run(CompactorContext context) throws 
IOException, HiveException,
         throw new HiveException(ErrorMsg.COMPACTION_NO_PARTITION);
       }
     } else {
-      long partitionHash = IcebergTableUtil.getPartitionHash(icebergTable, 
partSpec);
+      HiveConf.setBoolVar(conf, ConfVars.HIVE_CONVERT_JOIN, false);
+      conf.setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
       HiveConf.setVar(conf, ConfVars.REWRITE_POLICY, 
RewritePolicy.PARTITION.name());
-      conf.set(IcebergCompactionService.PARTITION_PATH, new 
Path(partSpec).toString());
+      conf.set(IcebergCompactionService.PARTITION_PATH, new 
Path(ci.partName).toString());
 
-      Map<String, String> partSpecMap = new LinkedHashMap<>();
-      Warehouse.makeSpecFromName(partSpecMap, new Path(partSpec), null);
+      PartitionSpec spec;
+      String partitionPredicate;
+      try {
+        spec = IcebergTableUtil.getPartitionSpec(icebergTable, ci.partName);
+        partitionPredicate = buildPartitionPredicate(ci, spec);
+      } catch (MetaException e) {
+        throw new HiveException(e);
+      }
 
-      compactionQuery = String.format("insert overwrite table %1$s select * 
from %1$s where %2$s=%3$d " +
-              "and %4$s is not null %5$s %6$s", compactTableName, 
VirtualColumn.PARTITION_HASH.getName(), partitionHash,
-          VirtualColumn.FILE_PATH.getName(), fileSizePredicate == null ? "" : 
"and " + fileSizePredicate, orderBy);
+      compactionQuery = String.format("INSERT OVERWRITE TABLE %1$s SELECT * 
FROM %1$s WHERE %2$s IN " +
+          "(SELECT FILE_PATH FROM %1$s.FILES WHERE %3$s AND SPEC_ID = %4$d) 
%5$s %6$s",
+      compactTableName, VirtualColumn.FILE_PATH.getName(), partitionPredicate, 
spec.specId(),
+      fileSizePredicate == null ? "" : "AND " + fileSizePredicate, orderBy);
     }
+    return compactionQuery;
+  }
 
-    SessionState sessionState = setupQueryCompactionSession(conf, ci, 
tblProperties);
-    String compactionTarget = "table " + 
HiveUtils.unparseIdentifier(compactTableName) +
-        (partSpec != null ? ", partition " + 
HiveUtils.unparseIdentifier(partSpec) : "");
+  private String buildPartitionPredicate(CompactionInfo ci, PartitionSpec 
spec) throws MetaException {
+    Map<String, String> partSpecMap = Warehouse.makeSpecFromName(ci.partName);
+    Map<String, PartitionField> partitionFieldMap = spec.fields().stream()
+        .collect(Collectors.toMap(PartitionField::name, Function.identity()));
 
-    try {
-      DriverUtils.runOnDriver(conf, sessionState, compactionQuery);
-      LOG.info("Completed compaction for {}", compactionTarget);
-      return true;
-    } catch (HiveException e) {
-      LOG.error("Failed compacting {}", compactionTarget, e);
-      throw e;
-    } finally {
-      sessionState.setCompaction(false);
-    }
+    Types.StructType partitionType = spec.partitionType();
+    return  partitionType.fields().stream().map(field -> {
+      String value = partSpecMap.get(field.name());
+      String literal = "NULL";
+
+      if (value != null && !value.equals("null")) {
+        String type = HiveSchemaUtil.convertToTypeString(field.type());
+        PartitionField partitionField = partitionFieldMap.get(field.name());
+        TransformSpec transformSpec = 
TransformSpec.fromString(partitionField.transform().toString(), field.name());
+        literal = TypeInfoUtils.convertStringToLiteralForSQL(
+            HiveIcebergFilterFactory.convertPartitionLiteral(value, 
transformSpec).toString(),
+            ((PrimitiveTypeInfo) 
TypeInfoUtils.getTypeInfoFromTypeString(type)).getPrimitiveCategory());
+      }
+
+      return String.format("`partition`.%s %s %s", 
HiveUtils.unparseIdentifier(field.name()),
+          Objects.equals(literal, "NULL") ? "IS" : "=", literal);
+    }).collect(Collectors.joining(" AND "));
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
new file mode 100644
index 00000000000..20c5320a2e7
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_minor_compaction_bucket.q
@@ -0,0 +1,64 @@
+-- Mask neededVirtualColumns due to non-strict order
+--! qt:replace:/(\s+neededVirtualColumns:\s)(.*)/$1#Masked#/
+-- Mask random uuid
+--! qt:replace:/(\s+uuid\s+)\S+(\s*)/$1#Masked#$2/
+-- Mask a random snapshot id
+--! qt:replace:/(\s+current-snapshot-id\s+)\S+(\s*)/$1#Masked#/
+-- Mask added file size
+--! qt:replace:/(\S\"added-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask total file size
+--! qt:replace:/(\S\"total-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask current-snapshot-timestamp-ms
+--! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/
+--! 
qt:replace:/(MINOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+--! 
qt:replace:/(MINOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+-- Mask removed file size
+--! qt:replace:/(\S\"removed-files-size\\\":\\\")(\d+)(\\\")/$1#Masked#$3/
+-- Mask iceberg version
+--! 
qt:replace:/(\S\"iceberg-version\\\":\\\")(\w+\s\w+\s\d+\.\d+\.\d+\s\(\w+\s\w+\))(\\\")/$1#Masked#$3/
+
+set hive.explain.user=true;
+set hive.auto.convert.join=true;
+set hive.optimize.dynamic.partition.hashjoin=false;
+set hive.convert.join.bucket.mapjoin.tez=true;
+
+CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1');
+
+INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null);
+
+ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key));
+
+INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null);
+
+desc formatted default.srcbucket_big;
+SELECT * FROM default.srcbucket_big ORDER BY id;
+
+select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count;
+
+alter table srcbucket_big compact 'minor' and wait;
+show compactions order by 'partition';
+
+desc formatted default.srcbucket_big;
+SELECT * FROM default.srcbucket_big ORDER BY id;
+
+select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count;
diff --git 
a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
new file mode 100644
index 00000000000..137d435d86b
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_minor_compaction_bucket.q.out
@@ -0,0 +1,265 @@
+PREHOOK: query: CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: CREATE TABLE srcbucket_big(id string, key int, value string)
+PARTITIONED BY SPEC(bucket(4, key)) STORED BY ICEBERG
+TBLPROPERTIES ('compactor.threshold.min.input.files'='1')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: INSERT INTO srcbucket_big VALUES
+('a', 101, 'val_101'),
+('b', null, 'val_102'),
+('c', 103, 'val_103'),
+('d', 104, null),
+('e', 105, 'val_105'),
+('f', null, null)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key))
+PREHOOK: type: ALTERTABLE_SETPARTSPEC
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: ALTER TABLE srcbucket_big SET PARTITION SPEC (bucket(8, key))
+POSTHOOK: type: ALTERTABLE_SETPARTSPEC
+POSTHOOK: Input: default@srcbucket_big
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null)
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: INSERT INTO srcbucket_big VALUES
+('g', 101, 'val_101'),
+('h', null, 'val_102'),
+('i', 103, 'val_103'),
+('j', 104, null),
+('k', 105, 'val_105'),
+('l', null, null)
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: desc formatted default.srcbucket_big
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: desc formatted default.srcbucket_big
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@srcbucket_big
+# col_name             data_type               comment             
+id                     string                                      
+key                    int                                         
+value                  string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+key                    BUCKET[8]                
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"key\":\"true\",\"value\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       compactor.threshold.min.input.files     1                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"key\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"4\",\"added-records\":\"6\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"4\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"7\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"key_bucket_8\",\"transform\":\"bucket[8]\",\"source-id\":2,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  false               
+#### A masked pattern was here ####
+       numFiles                7                   
+       numPartitions           6                   
+       numRows                 12                  
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          2                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+a      101     val_101
+b      NULL    val_102
+c      103     val_103
+d      104     NULL
+e      105     val_105
+f      NULL    NULL
+g      101     val_101
+h      NULL    val_102
+i      103     val_103
+j      104     NULL
+k      105     val_105
+l      NULL    NULL
+PREHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+{"key_bucket":0,"key_bucket_8":null}   0       3
+{"key_bucket":3,"key_bucket_8":null}   0       1
+{"key_bucket":null,"key_bucket_8":0}   1       1
+{"key_bucket":null,"key_bucket_8":3}   1       1
+{"key_bucket":null,"key_bucket_8":4}   1       2
+{"key_bucket":null,"key_bucket_8":null}        1       4
+PREHOOK: query: alter table srcbucket_big compact 'minor' and wait
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@srcbucket_big
+PREHOOK: Output: default@srcbucket_big
+POSTHOOK: query: alter table srcbucket_big compact 'minor' and wait
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@srcbucket_big
+POSTHOOK: Output: default@srcbucket_big
+PREHOOK: query: show compactions order by 'partition'
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: show compactions order by 'partition'
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId   Database        Table   Partition       Type    State   Worker 
host     Worker  Enqueue Time    Start Time      Duration(ms)    HadoopJobId    
 Error message   Initiator host  Initiator       Pool name       TxnId   Next 
TxnId      Commit Time     Highest WriteId
+#Masked#       default srcbucket_big   key_bucket_8=0  MINOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default srcbucket_big   key_bucket_8=3  MINOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default srcbucket_big   key_bucket_8=4  MINOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+#Masked#       default srcbucket_big   key_bucket_8=null       MINOR   
succeeded       #Masked#        manual  default 0       0       0        --- 
+#Masked#       default srcbucket_big    ---    MINOR   succeeded       
#Masked#        manual  default 0       0       0        --- 
+PREHOOK: query: desc formatted default.srcbucket_big
+PREHOOK: type: DESCTABLE
+PREHOOK: Input: default@srcbucket_big
+POSTHOOK: query: desc formatted default.srcbucket_big
+POSTHOOK: type: DESCTABLE
+POSTHOOK: Input: default@srcbucket_big
+# col_name             data_type               comment             
+id                     string                                      
+key                    int                                         
+value                  string                                      
+                
+# Partition Transform Information               
+# col_name             transform_type           
+key                    BUCKET[8]                
+                
+# Detailed Table Information            
+Database:              default                  
+#### A masked pattern was here ####
+Retention:             0                        
+#### A masked pattern was here ####
+Table Type:            EXTERNAL_TABLE           
+Table Parameters:               
+       COLUMN_STATS_ACCURATE   
{\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"key\":\"true\",\"value\":\"true\"}}
+       EXTERNAL                TRUE                
+       bucketing_version       2                   
+       compactor.threshold.min.input.files     1                   
+       current-schema          
{\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"key\",\"required\":false,\"type\":\"int\"},{\"id\":3,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]}
+       current-snapshot-id     #Masked#
+       current-snapshot-summary        
{\"added-data-files\":\"4\",\"deleted-data-files\":\"3\",\"added-records\":\"6\",\"deleted-records\":\"6\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"7\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
+       current-snapshot-timestamp-ms   #Masked#       
+       default-partition-spec  
{\"spec-id\":1,\"fields\":[{\"name\":\"key_bucket_8\",\"transform\":\"bucket[8]\",\"source-id\":2,\"field-id\":1001}]}
+       format-version          2                   
+       iceberg.orc.files.only  false               
+#### A masked pattern was here ####
+       numFiles                8                   
+       numPartitions           4                   
+       numRows                 12                  
+       parquet.compression     zstd                
+#### A masked pattern was here ####
+       rawDataSize             0                   
+       serialization.format    1                   
+       snapshot-count          7                   
+       storage_handler         
org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
+       table_type              ICEBERG             
+       totalSize               #Masked#
+#### A masked pattern was here ####
+       uuid                    #Masked#
+       write.delete.mode       merge-on-read       
+       write.merge.mode        merge-on-read       
+       write.update.mode       merge-on-read       
+                
+# Storage Information           
+SerDe Library:         org.apache.iceberg.mr.hive.HiveIcebergSerDe      
+InputFormat:           org.apache.iceberg.mr.hive.HiveIcebergInputFormat       
 
+OutputFormat:          org.apache.iceberg.mr.hive.HiveIcebergOutputFormat      
 
+Compressed:            No                       
+Sort Columns:          []                       
+PREHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM default.srcbucket_big ORDER BY id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+a      101     val_101
+b      NULL    val_102
+c      103     val_103
+d      104     NULL
+e      105     val_105
+f      NULL    NULL
+g      101     val_101
+h      NULL    val_102
+i      103     val_103
+j      104     NULL
+k      105     val_105
+l      NULL    NULL
+PREHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+PREHOOK: type: QUERY
+PREHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+POSTHOOK: query: select `partition`, spec_id, record_count
+from default.srcbucket_big.partitions
+order by `partition`, spec_id, record_count
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@srcbucket_big
+#### A masked pattern was here ####
+{"key_bucket":null,"key_bucket_8":0}   1       2
+{"key_bucket":null,"key_bucket_8":3}   1       2
+{"key_bucket":null,"key_bucket_8":4}   1       4
+{"key_bucket":null,"key_bucket_8":null}        1       4
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
new file mode 100644
index 00000000000..c885423688e
--- /dev/null
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static 
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+public class TestIcebergCompactorOnTez extends CompactorOnTezTest {
+
+  @Test
+  public void testIcebergCompactorWithAllPartitionFieldTypes() throws 
Exception{
+    conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE, 
CUSTOM_COMPACTION_QUEUE);
+    msClient = new HiveMetaStoreClient(conf);
+
+    String dbName = "default";
+    String tableName = "ice_orc";
+    String qualifiedTableName = dbName + "." + tableName;
+
+    executeStatementOnDriver("drop table if exists " + qualifiedTableName, 
driver);
+    executeStatementOnDriver(String.format("create table %s " +
+        "(id int, a string, b int, c bigint, d float, e double, f decimal(4, 
2), g boolean, h date, i date, j date, k timestamp) " +
+        "partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f, 
g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " +
+        "tblproperties ('compactor.threshold.min.input.files'='1')", 
qualifiedTableName), driver);
+
+    // 6 records, one records per file --> 3 partitions, 2 files per partition
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 
'aaa111', 1, 100, 1.0, 2.0, 4.00, true,  DATE '2024-05-01', DATE '2024-05-01', 
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03', 
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName), 
driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null, 
null, null, null, null, null, null, null, null, null, null)", 
qualifiedTableName), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null, 
null, null, null, null, null, null, null, null, null, null)", 
qualifiedTableName), driver);
+
+    Assert.assertEquals(6, getFilesCount(qualifiedTableName));
+    List<String> recordsBefore = getAllRecords(qualifiedTableName);
+
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, 
CompactionType.MINOR, false, 
+        
"a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00/k_hour=2024-05-02-10",
+        
"a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00/k_hour=2024-05-04-13",
+        
"a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null"
+    );
+    
+    Assert.assertEquals(3, getFilesCount(qualifiedTableName));
+    verifySuccessfulCompaction(3);
+    List<String> recordsAfter = getAllRecords(qualifiedTableName);
+    
+    Assert.assertEquals(recordsBefore, recordsAfter);
+  }
+  
+  private int getFilesCount(String qualifiedTableName) throws Exception {
+    driver.run(String.format("select count(*) from %s.files", 
qualifiedTableName));
+    List<String> res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    return Integer.parseInt(res.get(0));
+  }
+
+  private List<String> getAllRecords(String qualifiedTableName) throws 
Exception {
+    driver.run(String.format("select * from %s order by id", 
qualifiedTableName));
+    List<String> res = new ArrayList<>();
+    driver.getFetchTask().fetch(res);
+    return res;
+  }
+}
diff --git a/itests/src/test/resources/testconfiguration.properties 
b/itests/src/test/resources/testconfiguration.properties
index 9a92b096c6c..f08d0721895 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -445,6 +445,7 @@ iceberg.llap.query.compactor.files=\
   iceberg_major_compaction_unpartitioned.q,\
   iceberg_major_compaction_unpartitioned_ordered.q,\
   iceberg_major_compaction_unpartitioned_w_filter.q,\
+  iceberg_minor_compaction_bucket.q,\
   iceberg_minor_compaction_partition_evolution.q,\
   iceberg_minor_compaction_unpartitioned.q
 


Reply via email to