This is an automated email from the ASF dual-hosted git repository.

difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 392e7324ec1 HIVE-29075: Iceberg: Optimize auto-compaction by 
evaluating compaction needs only on modified partitions (#5957)
392e7324ec1 is described below

commit 392e7324ec12a096a5f884ee65d9871b432baf16
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Mon Jul 21 09:10:24 2025 -0400

    HIVE-29075: Iceberg: Optimize auto-compaction by evaluating compaction 
needs only on modified partitions (#5957)
---
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |   8 +-
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  31 ++++
 .../mr/hive/compaction/IcebergTableOptimizer.java  | 157 ++++++++++++++++-----
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |  15 +-
 .../txn/compactor/TestIcebergCompactorOnTez.java   |  44 ++++--
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  12 +-
 6 files changed, 202 insertions(+), 65 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index b4d9f0bea22..87d93133bd3 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -2133,12 +2133,8 @@ public boolean 
canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
   public List<Partition> 
getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
       Map<String, String> partitionSpec, boolean latestSpecOnly) throws 
SemanticException {
     Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
-    return IcebergTableUtil.getPartitionNames(table, partitionSpec, 
latestSpecOnly).stream()
-        .map(partName -> {
-          Map<String, String> partSpecMap = Maps.newLinkedHashMap();
-          Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
-          return new DummyPartition(hmsTable, partName, partSpecMap);
-        }).collect(Collectors.toList());
+    List<String> partNames = IcebergTableUtil.getPartitionNames(table, 
partitionSpec, latestSpecOnly);
+    return IcebergTableUtil.convertNameToMetastorePartition(hmsTable, 
partNames);
   }
 
   public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable) {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 7cfb555d5a5..76fdf4cbf5e 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -21,12 +21,14 @@
 
 import java.io.IOException;
 import java.time.ZoneId;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -49,8 +51,10 @@
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.TransformSpec;
@@ -58,6 +62,8 @@
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
+import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManageSnapshots;
@@ -578,6 +584,31 @@ public static boolean hasUndergonePartitionEvolution(Table 
table) {
             .anyMatch(id -> id != table.spec().specId());
   }
 
+  public static <T extends ContentFile> Set<String> getPartitionNames(Table 
icebergTable, Iterable<T> files,
+      Boolean latestSpecOnly) {
+    Set<String> partitions = Sets.newHashSet();
+    int tableSpecId = icebergTable.spec().specId();
+    for (T file : files) {
+      if (latestSpecOnly == null || Boolean.TRUE.equals(latestSpecOnly) && 
file.specId() == tableSpecId ||
+          Boolean.FALSE.equals(latestSpecOnly) && file.specId() != 
tableSpecId) {
+        String partName = 
icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
+        partitions.add(partName);
+      }
+    }
+    return partitions;
+  }
+
+  public static List<Partition> 
convertNameToMetastorePartition(org.apache.hadoop.hive.ql.metadata.Table 
hmsTable,
+      Collection<String> partNames) {
+    List<Partition> partitions = Lists.newArrayList();
+    for (String partName : partNames) {
+      Map<String, String> partSpecMap = Maps.newLinkedHashMap();
+      Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
+      partitions.add(new DummyPartition(hmsTable, partName, partSpecMap));
+    }
+    return partitions;
+  }
+
   public static TableFetcher getTableFetcher(IMetaStoreClient msc, String 
catalogName, String dbPattern,
       String tablePattern) {
     return new TableFetcher.Builder(msc, catalogName, dbPattern, 
tablePattern).tableTypes(
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index 76badb57204..e7a4afa6bb4 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -19,9 +19,17 @@
 package org.apache.iceberg.mr.hive.compaction;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -30,23 +38,27 @@
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
 import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
+import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.mr.hive.IcebergTableUtil;
 import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 
 public class IcebergTableOptimizer extends TableOptimizer {
   private HiveMetaStoreClient client;
-  private Map<String, Long> snapshotIdCache;
+  private Map<String, Long> snapshotTimeMilCache;
 
   public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler, 
MetadataCache metadataCache) throws MetaException {
     super(conf, txnHandler, metadataCache);
@@ -79,29 +91,36 @@ public Set<CompactionInfo> findPotentialCompactions(long 
lastChecked, ShowCompac
     getTables().stream()
         .filter(table -> !skipDBs.contains(table.getDb()))
         .filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
-        .map(table -> Pair.of(table, 
resolveMetastoreTable(table.getNotEmptyDbTable())))
-        .filter(tablePair -> 
MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters()))
-        .filter(tablePair -> {
-          long currentSnapshotId = 
Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id"));
-          Long cachedSnapshotId = 
snapshotIdCache.get(tablePair.getKey().getNotEmptyDbTable());
-          return cachedSnapshotId == null || cachedSnapshotId != 
currentSnapshotId;
+        .map(table -> {
+          org.apache.hadoop.hive.ql.metadata.Table hiveTable = 
getHiveTable(table.getDb(), table.getTable());
+          org.apache.iceberg.Table icebergTable = 
IcebergTableUtil.getTable(conf, hiveTable.getTTable());
+          return Pair.of(hiveTable, icebergTable);
         })
-        .forEach(tablePair -> {
-          org.apache.iceberg.Table icebergTable = 
IcebergTableUtil.getTable(conf, tablePair.getValue());
+        .filter(t -> hasNewCommits(t.getRight(),
+            snapshotTimeMilCache.get(t.getLeft().getFullyQualifiedName())))
+        .forEach(t -> {
+          String qualifiedTableName = t.getLeft().getFullyQualifiedName();
+          org.apache.hadoop.hive.ql.metadata.Table hiveTable = t.getLeft();
+          org.apache.iceberg.Table icebergTable = t.getRight();
 
           if (icebergTable.spec().isPartitioned()) {
-            List<String> partitions = getPartitions(icebergTable);
-
-            partitions.forEach(partition -> 
addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
-                partition, compactionTargets, currentCompactions, skipDBs, 
skipTables));
-          }
-
-          if (icebergTable.spec().isUnpartitioned() || 
IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)) {
-            addCompactionTargetIfEligible(tablePair.getValue(), icebergTable, 
null, compactionTargets,
+            List<org.apache.hadoop.hive.ql.metadata.Partition> partitions = 
findModifiedPartitions(hiveTable,
+                icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
true);
+
+            partitions.forEach(partition -> 
addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
+                partition.getName(), compactionTargets, currentCompactions, 
skipDBs, skipTables));
+
+            if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) 
&& !findModifiedPartitions(hiveTable,
+                icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
false).isEmpty()) {
+              addCompactionTargetIfEligible(hiveTable.getTTable(), 
icebergTable,
+                  null, compactionTargets, currentCompactions, skipDBs, 
skipTables);
+            }
+          } else {
+            addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable, 
null, compactionTargets,
                 currentCompactions, skipDBs, skipTables);
           }
 
-          snapshotIdCache.put(tablePair.getKey().getNotEmptyDbTable(), 
icebergTable.currentSnapshot().snapshotId());
+          snapshotTimeMilCache.put(qualifiedTableName, 
icebergTable.currentSnapshot().timestampMillis());
         });
 
     return compactionTargets;
@@ -115,27 +134,19 @@ private List<org.apache.hadoop.hive.common.TableName> 
getTables() {
     }
   }
 
-  private List<String> getPartitions(org.apache.iceberg.Table icebergTable) {
-    try {
-      return IcebergTableUtil.getPartitionNames(icebergTable, 
Maps.newHashMap(), true);
-    } catch (SemanticException e) {
-      throw new RuntimeMetaException(e, "Error getting partition names for 
Iceberg table %s", icebergTable.name());
-    }
-  }
-
-  private Table resolveMetastoreTable(String qualifiedTableName) {
-    String[] dbTableName = TxnUtils.getDbTableName(qualifiedTableName);
+  private org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String dbName, 
String tableName) {
     try {
-      return metadataCache.computeIfAbsent(qualifiedTableName,
-          () -> CompactorUtil.resolveTable(conf, dbTableName[0], 
dbTableName[1]));
+      Table metastoreTable = 
metadataCache.computeIfAbsent(TableName.getDbTable(dbName, tableName), () ->
+          CompactorUtil.resolveTable(conf, dbName, tableName));
+      return new org.apache.hadoop.hive.ql.metadata.Table(metastoreTable);
     } catch (Exception e) {
-      throw new RuntimeMetaException(e, "Error resolving table %s", 
qualifiedTableName);
+      throw new RuntimeMetaException(e, "Error getting Hive table for %s.%s", 
dbName, tableName);
     }
   }
 
   public void init() throws MetaException {
     client = new HiveMetaStoreClient(new HiveConf());
-    snapshotIdCache = Maps.newConcurrentMap();
+    snapshotTimeMilCache = Maps.newConcurrentMap();
   }
 
   private void addCompactionTargetIfEligible(Table table, 
org.apache.iceberg.Table icebergTable, String partitionName,
@@ -165,4 +176,80 @@ private void addCompactionTargetIfEligible(Table table, 
org.apache.iceberg.Table
     ci.type = compactionEvaluator.determineCompactionType();
     compactions.add(ci);
   }
+
+  /**
+   * Finds all unique non-compaction-modified partitions (with added or 
deleted files) between a given past
+   * snapshot ID and the table's current (latest) snapshot.
+   * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table} 
instance to inspect.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @param latestSpecOnly when True, returns partitions with the current spec 
only;
+   *                       False - older specs only;
+   *                       Null - any spec
+   * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition} 
representing the unique modified
+   *                       partition names.
+   * @throws IllegalArgumentException if snapshot IDs are invalid or out of 
order, or if the table has no current
+   *                       snapshot.
+   */
+  private List<Partition> 
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+      org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean 
latestSpecOnly) {
+
+    List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable, 
pastSnapshotTimeMil).toList();
+    if (relevantSnapshots.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    try (ExecutorService executor = 
Executors.newVirtualThreadPerTaskExecutor()) {
+      // Submit a task for each snapshot and collect the Futures
+      List<Future<Set<String>>> futures = relevantSnapshots.stream()
+          .map(snapshot -> executor.submit(() -> {
+            FileIO io = icebergTable.io();
+            List<ContentFile<?>> affectedFiles = 
FluentIterable.<ContentFile<?>>concat(
+                    snapshot.addedDataFiles(io),
+                    snapshot.removedDataFiles(io),
+                    snapshot.addedDeleteFiles(io),
+                    snapshot.removedDeleteFiles(io))
+                .toList();
+            return IcebergTableUtil.getPartitionNames(icebergTable, 
affectedFiles, latestSpecOnly);
+          }))
+          .toList();
+
+      // Collect the results from all completed futures
+      Set<String> modifiedPartitions = Sets.newHashSet();
+      for (Future<Set<String>> future : futures) {
+        modifiedPartitions.addAll(future.get());
+      }
+
+      return IcebergTableUtil.convertNameToMetastorePartition(hiveTable, 
modifiedPartitions);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeMetaException(e, "Interrupted while finding modified 
partitions");
+    } catch (ExecutionException e) {
+      // Just wrap this one in a runtime exception
+      throw new RuntimeMetaException(e, "Failed to find modified partitions in 
parallel");
+    }
+  }
+
+  /**
+   * Checks if a table has had new commits since a given snapshot that were 
not caused by compaction.
+   * @param icebergTable The Iceberg table to check.
+   * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot 
to check from (exclusive).
+   * @return true if at least one non-compaction snapshot exists since the 
pastSnapshotTimeMil
+   * whose source is not compaction, false otherwise.
+   */
+  private boolean hasNewCommits(org.apache.iceberg.Table icebergTable, Long 
pastSnapshotTimeMil) {
+    return getRelevantSnapshots(icebergTable, pastSnapshotTimeMil)
+        .findAny().isPresent();
+  }
+
+  private Stream<Snapshot> getRelevantSnapshots(org.apache.iceberg.Table 
icebergTable, Long pastSnapshotTimeMil) {
+    Snapshot currentSnapshot = icebergTable.currentSnapshot();
+    if (currentSnapshot == null || 
Objects.equals(currentSnapshot.timestampMillis(), pastSnapshotTimeMil)) {
+      return Stream.empty();
+    }
+
+    return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
+        .filter(s -> pastSnapshotTimeMil == null || s.timestampMillis() > 
pastSnapshotTimeMil)
+        .filter(s -> s.timestampMillis() <= currentSnapshot.timestampMillis())
+        .filter(s -> !s.operation().equals(DataOperations.REPLACE));
+  }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 57a7fc442e5..428aedcd8f4 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -544,12 +544,21 @@ protected void dropTable(String tblName) throws Exception 
{
     }
   }
 
-  protected void runSingleInitiatorCycle() throws Exception {
+  protected Initiator createInitiator() throws Exception {
     TestTxnDbUtil.setConfValues(conf);
-    CompactorThread t = new Initiator();
+    Initiator t = new Initiator();
     t.setConf(conf);
     stop.set(true);
     t.init(stop);
-    t.run();
+    return t;
+  }
+
+  static Worker createWorker(HiveConf conf) throws  Exception {
+    HiveConf hiveConf = new HiveConf(conf);
+    hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    Worker t = new Worker();
+    t.setConf(hiveConf);
+    t.init(new AtomicBoolean(true));
+    return t;
   }
 }
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
index f6cc76572f1..4ca42733391 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -83,8 +83,8 @@ public void testIcebergCompactorWithAllPartitionFieldTypes() 
throws Exception{
   @Test
   public void testIcebergAutoCompactionPartitionEvolution() throws Exception {
     executeStatementOnDriver(String.format("create table %s " +
-        "(id int, a string) " +
-        "partitioned by spec(id) stored by iceberg stored as orc " +
+        "(a int, b string) " +
+        "partitioned by spec(a) stored by iceberg stored as orc " +
         "tblproperties ('compactor.threshold.min.input.files'='1')", 
QUALIFIED_TABLE_NAME), driver);
 
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')", 
QUALIFIED_TABLE_NAME), driver);
@@ -94,26 +94,44 @@ public void testIcebergAutoCompactionPartitionEvolution() 
throws Exception {
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')", 
QUALIFIED_TABLE_NAME), driver);
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')", 
QUALIFIED_TABLE_NAME), driver);
 
-    executeStatementOnDriver(String.format("alter table %s set partition 
spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("alter table %s set partition 
spec(truncate(3, b))", QUALIFIED_TABLE_NAME), driver);
 
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8, 
'aaa111')", QUALIFIED_TABLE_NAME), driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
-    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9, 
'bbb111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10, 
'bbb111')", QUALIFIED_TABLE_NAME), driver);
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", 
QUALIFIED_TABLE_NAME), driver);
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", 
QUALIFIED_TABLE_NAME), driver);
 
-    runSingleInitiatorCycle();
+    Initiator initiator = createInitiator();
+    initiator.run();
+
+    Worker worker = createWorker(conf);
+    for (int i = 0; i < 4; i++) {
+      worker.run();
+    }
+    
     ShowCompactResponse rsp = msClient.showCompactions();
     Assert.assertEquals(4, rsp.getCompactsSize());
 
     // Compaction should be initiated for each partition from the latest spec
-    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa", 
CompactionType.MINOR, CompactionState.INITIATED));
-    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb", 
CompactionType.MINOR, CompactionState.INITIATED));
-    Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null", 
CompactionType.MINOR, CompactionState.INITIATED));
+    Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=aaa", 
CompactionType.MINOR, CompactionState.SUCCEEDED));
+    Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=bbb", 
CompactionType.MINOR, CompactionState.SUCCEEDED));
+    Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=null", 
CompactionType.MINOR, CompactionState.SUCCEEDED));
 
     // Additional compaction should be initiated for all partitions from past 
partition specs
-    Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, 
CompactionState.INITIATED));
+    Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, 
CompactionState.SUCCEEDED));
+
+    // Data changes after Iceberg initiator has been run
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (13, 
'ccc111')", QUALIFIED_TABLE_NAME), driver);
+    executeStatementOnDriver(String.format("INSERT INTO %s VALUES (14, 
'ddd111')", QUALIFIED_TABLE_NAME), driver);
+
+    initiator.run();
+    rsp = msClient.showCompactions();
+    Assert.assertEquals(6, rsp.getCompactsSize());
+    
+    Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=ccc", 
CompactionType.MINOR, CompactionState.INITIATED));
+    Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=ddd", 
CompactionType.MINOR, CompactionState.INITIATED));
   }
 
   @Test
@@ -130,7 +148,9 @@ public void testIcebergAutoCompactionUnpartitioned() throws 
Exception {
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)", 
QUALIFIED_TABLE_NAME), driver);
     executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)", 
QUALIFIED_TABLE_NAME), driver);
 
-    runSingleInitiatorCycle();
+    Initiator initiator = createInitiator();
+    initiator.run();
+
     ShowCompactResponse rsp = msClient.showCompactions();
     Assert.assertEquals(1, rsp.getCompactsSize());
     Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR, 
CompactionState.INITIATED));
@@ -140,7 +160,7 @@ private int getFilesCount() throws Exception {
     driver.run(String.format("select count(*) from %s.files", 
QUALIFIED_TABLE_NAME));
     List<String> res = new ArrayList<>();
     driver.getFetchTask().fetch(res);
-    return Integer.parseInt(res.get(0));
+    return Integer.parseInt(res.getFirst());
   }
 
   private List<String> getAllRecords() throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index b72eea8ce43..252defbf744 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -52,8 +52,6 @@ public class Initiator extends MetaStoreCompactorThread {
   private static final String CLASS_NAME = Initiator.class.getName();
   private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
-  private ExecutorService compactionExecutor;
-
   private boolean metricsEnabled;
   private boolean shouldUseMutex = true;
   private List<TableOptimizer> optimizers;
@@ -63,7 +61,9 @@ public void run() {
     LOG.info("Starting Initiator thread");
     // Make sure nothing escapes this run method and kills the metastore at 
large,
     // so wrap it in a big catch Throwable statement.
-    try {
+    try (ExecutorService compactionExecutor = 
CompactorUtil.createExecutorWithThreadFactory(
+        conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
+        COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) {
       recoverFailedCompactions(false);
       TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : 
new NoMutex();
 
@@ -183,9 +183,6 @@ public void run() {
       if (Thread.currentThread().isInterrupted()) {
         LOG.info("Interrupt received, Initiator is shutting down.");
       }
-      if (compactionExecutor != null) {
-        compactionExecutor.shutdownNow();
-      }
     }
   }
 
@@ -214,9 +211,6 @@ protected String resolveUserToRunAs(Map<String, String> 
cache, Table t, Partitio
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     checkInterval = 
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL, 
TimeUnit.MILLISECONDS);
-    compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
-            conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
-            COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
     metricsEnabled = MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METRICS_ENABLED) &&
         MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
     optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf,

Reply via email to