This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 392e7324ec1 HIVE-29075: Iceberg: Optimize auto-compaction by
evaluating compaction needs only on modified partitions (#5957)
392e7324ec1 is described below
commit 392e7324ec12a096a5f884ee65d9871b432baf16
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Mon Jul 21 09:10:24 2025 -0400
HIVE-29075: Iceberg: Optimize auto-compaction by evaluating compaction
needs only on modified partitions (#5957)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 8 +-
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 31 ++++
.../mr/hive/compaction/IcebergTableOptimizer.java | 157 ++++++++++++++++-----
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 15 +-
.../txn/compactor/TestIcebergCompactorOnTez.java | 44 ++++--
.../hadoop/hive/ql/txn/compactor/Initiator.java | 12 +-
6 files changed, 202 insertions(+), 65 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index b4d9f0bea22..87d93133bd3 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -2133,12 +2133,8 @@ public boolean
canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
public List<Partition>
getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec, boolean latestSpecOnly) throws
SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- return IcebergTableUtil.getPartitionNames(table, partitionSpec,
latestSpecOnly).stream()
- .map(partName -> {
- Map<String, String> partSpecMap = Maps.newLinkedHashMap();
- Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
- return new DummyPartition(hmsTable, partName, partSpecMap);
- }).collect(Collectors.toList());
+ List<String> partNames = IcebergTableUtil.getPartitionNames(table,
partitionSpec, latestSpecOnly);
+ return IcebergTableUtil.convertNameToMetastorePartition(hmsTable,
partNames);
}
public boolean isPartitioned(org.apache.hadoop.hive.ql.metadata.Table
hmsTable) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 7cfb555d5a5..76fdf4cbf5e 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -21,12 +21,14 @@
import java.io.IOException;
import java.time.ZoneId;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -49,8 +51,10 @@
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.TransformSpec;
@@ -58,6 +62,8 @@
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
+import org.apache.hadoop.util.Sets;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManageSnapshots;
@@ -578,6 +584,31 @@ public static boolean hasUndergonePartitionEvolution(Table
table) {
.anyMatch(id -> id != table.spec().specId());
}
+ public static <T extends ContentFile> Set<String> getPartitionNames(Table
icebergTable, Iterable<T> files,
+ Boolean latestSpecOnly) {
+ Set<String> partitions = Sets.newHashSet();
+ int tableSpecId = icebergTable.spec().specId();
+ for (T file : files) {
+ if (latestSpecOnly == null || Boolean.TRUE.equals(latestSpecOnly) &&
file.specId() == tableSpecId ||
+ Boolean.FALSE.equals(latestSpecOnly) && file.specId() !=
tableSpecId) {
+ String partName =
icebergTable.specs().get(file.specId()).partitionToPath(file.partition());
+ partitions.add(partName);
+ }
+ }
+ return partitions;
+ }
+
+ public static List<Partition>
convertNameToMetastorePartition(org.apache.hadoop.hive.ql.metadata.Table
hmsTable,
+ Collection<String> partNames) {
+ List<Partition> partitions = Lists.newArrayList();
+ for (String partName : partNames) {
+ Map<String, String> partSpecMap = Maps.newLinkedHashMap();
+ Warehouse.makeSpecFromName(partSpecMap, new Path(partName), null);
+ partitions.add(new DummyPartition(hmsTable, partName, partSpecMap));
+ }
+ return partitions;
+ }
+
public static TableFetcher getTableFetcher(IMetaStoreClient msc, String
catalogName, String dbPattern,
String tablePattern) {
return new TableFetcher.Builder(msc, catalogName, dbPattern,
tablePattern).tableTypes(
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index 76badb57204..e7a4afa6bb4 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -19,9 +19,17 @@
package org.apache.iceberg.mr.hive.compaction;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -30,23 +38,27 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
+import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
public class IcebergTableOptimizer extends TableOptimizer {
private HiveMetaStoreClient client;
- private Map<String, Long> snapshotIdCache;
+ private Map<String, Long> snapshotTimeMilCache;
public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler,
MetadataCache metadataCache) throws MetaException {
super(conf, txnHandler, metadataCache);
@@ -79,29 +91,36 @@ public Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompac
getTables().stream()
.filter(table -> !skipDBs.contains(table.getDb()))
.filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
- .map(table -> Pair.of(table,
resolveMetastoreTable(table.getNotEmptyDbTable())))
- .filter(tablePair ->
MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters()))
- .filter(tablePair -> {
- long currentSnapshotId =
Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id"));
- Long cachedSnapshotId =
snapshotIdCache.get(tablePair.getKey().getNotEmptyDbTable());
- return cachedSnapshotId == null || cachedSnapshotId !=
currentSnapshotId;
+ .map(table -> {
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable =
getHiveTable(table.getDb(), table.getTable());
+ org.apache.iceberg.Table icebergTable =
IcebergTableUtil.getTable(conf, hiveTable.getTTable());
+ return Pair.of(hiveTable, icebergTable);
})
- .forEach(tablePair -> {
- org.apache.iceberg.Table icebergTable =
IcebergTableUtil.getTable(conf, tablePair.getValue());
+ .filter(t -> hasNewCommits(t.getRight(),
+ snapshotTimeMilCache.get(t.getLeft().getFullyQualifiedName())))
+ .forEach(t -> {
+ String qualifiedTableName = t.getLeft().getFullyQualifiedName();
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable = t.getLeft();
+ org.apache.iceberg.Table icebergTable = t.getRight();
if (icebergTable.spec().isPartitioned()) {
- List<String> partitions = getPartitions(icebergTable);
-
- partitions.forEach(partition ->
addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
- partition, compactionTargets, currentCompactions, skipDBs,
skipTables));
- }
-
- if (icebergTable.spec().isUnpartitioned() ||
IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)) {
- addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
null, compactionTargets,
+ List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
findModifiedPartitions(hiveTable,
+ icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
true);
+
+ partitions.forEach(partition ->
addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
+ partition.getName(), compactionTargets, currentCompactions,
skipDBs, skipTables));
+
+ if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)
&& !findModifiedPartitions(hiveTable,
+ icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
false).isEmpty()) {
+ addCompactionTargetIfEligible(hiveTable.getTTable(),
icebergTable,
+ null, compactionTargets, currentCompactions, skipDBs,
skipTables);
+ }
+ } else {
+ addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
null, compactionTargets,
currentCompactions, skipDBs, skipTables);
}
- snapshotIdCache.put(tablePair.getKey().getNotEmptyDbTable(),
icebergTable.currentSnapshot().snapshotId());
+ snapshotTimeMilCache.put(qualifiedTableName,
icebergTable.currentSnapshot().timestampMillis());
});
return compactionTargets;
@@ -115,27 +134,19 @@ private List<org.apache.hadoop.hive.common.TableName>
getTables() {
}
}
- private List<String> getPartitions(org.apache.iceberg.Table icebergTable) {
- try {
- return IcebergTableUtil.getPartitionNames(icebergTable,
Maps.newHashMap(), true);
- } catch (SemanticException e) {
- throw new RuntimeMetaException(e, "Error getting partition names for
Iceberg table %s", icebergTable.name());
- }
- }
-
- private Table resolveMetastoreTable(String qualifiedTableName) {
- String[] dbTableName = TxnUtils.getDbTableName(qualifiedTableName);
+ private org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String dbName,
String tableName) {
try {
- return metadataCache.computeIfAbsent(qualifiedTableName,
- () -> CompactorUtil.resolveTable(conf, dbTableName[0],
dbTableName[1]));
+ Table metastoreTable =
metadataCache.computeIfAbsent(TableName.getDbTable(dbName, tableName), () ->
+ CompactorUtil.resolveTable(conf, dbName, tableName));
+ return new org.apache.hadoop.hive.ql.metadata.Table(metastoreTable);
} catch (Exception e) {
- throw new RuntimeMetaException(e, "Error resolving table %s",
qualifiedTableName);
+ throw new RuntimeMetaException(e, "Error getting Hive table for %s.%s",
dbName, tableName);
}
}
public void init() throws MetaException {
client = new HiveMetaStoreClient(new HiveConf());
- snapshotIdCache = Maps.newConcurrentMap();
+ snapshotTimeMilCache = Maps.newConcurrentMap();
}
private void addCompactionTargetIfEligible(Table table,
org.apache.iceberg.Table icebergTable, String partitionName,
@@ -165,4 +176,80 @@ private void addCompactionTargetIfEligible(Table table,
org.apache.iceberg.Table
ci.type = compactionEvaluator.determineCompactionType();
compactions.add(ci);
}
+
+ /**
+ * Finds all unique non-compaction-modified partitions (with added or
deleted files) between a given past
+ * snapshot ID and the table's current (latest) snapshot.
+ * @param hiveTable The {@link org.apache.hadoop.hive.ql.metadata.Table}
instance to inspect.
+ * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot
to check from (exclusive).
+ * @param latestSpecOnly when True, returns partitions with the current spec
only;
+ * False - older specs only;
+ * Null - any spec
+ * @return A List of {@link org.apache.hadoop.hive.ql.metadata.Partition}
representing the unique modified
+ * partition names.
+ * @throws IllegalArgumentException if snapshot IDs are invalid or out of
order, or if the table has no current
+ * snapshot.
+ */
+ private List<Partition>
findModifiedPartitions(org.apache.hadoop.hive.ql.metadata.Table hiveTable,
+ org.apache.iceberg.Table icebergTable, Long pastSnapshotTimeMil, Boolean
latestSpecOnly) {
+
+ List<Snapshot> relevantSnapshots = getRelevantSnapshots(icebergTable,
pastSnapshotTimeMil).toList();
+ if (relevantSnapshots.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ try (ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor()) {
+ // Submit a task for each snapshot and collect the Futures
+ List<Future<Set<String>>> futures = relevantSnapshots.stream()
+ .map(snapshot -> executor.submit(() -> {
+ FileIO io = icebergTable.io();
+ List<ContentFile<?>> affectedFiles =
FluentIterable.<ContentFile<?>>concat(
+ snapshot.addedDataFiles(io),
+ snapshot.removedDataFiles(io),
+ snapshot.addedDeleteFiles(io),
+ snapshot.removedDeleteFiles(io))
+ .toList();
+ return IcebergTableUtil.getPartitionNames(icebergTable,
affectedFiles, latestSpecOnly);
+ }))
+ .toList();
+
+ // Collect the results from all completed futures
+ Set<String> modifiedPartitions = Sets.newHashSet();
+ for (Future<Set<String>> future : futures) {
+ modifiedPartitions.addAll(future.get());
+ }
+
+ return IcebergTableUtil.convertNameToMetastorePartition(hiveTable,
modifiedPartitions);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeMetaException(e, "Interrupted while finding modified
partitions");
+ } catch (ExecutionException e) {
+ // Just wrap this one in a runtime exception
+ throw new RuntimeMetaException(e, "Failed to find modified partitions in
parallel");
+ }
+ }
+
+ /**
+ * Checks if a table has had new commits since a given snapshot that were
not caused by compaction.
+ * @param icebergTable The Iceberg table to check.
+ * @param pastSnapshotTimeMil The timestamp in milliseconds of the snapshot
to check from (exclusive).
+ * @return true if at least one non-compaction snapshot exists since the
pastSnapshotTimeMil
+ * whose source is not compaction, false otherwise.
+ */
+ private boolean hasNewCommits(org.apache.iceberg.Table icebergTable, Long
pastSnapshotTimeMil) {
+ return getRelevantSnapshots(icebergTable, pastSnapshotTimeMil)
+ .findAny().isPresent();
+ }
+
+ private Stream<Snapshot> getRelevantSnapshots(org.apache.iceberg.Table
icebergTable, Long pastSnapshotTimeMil) {
+ Snapshot currentSnapshot = icebergTable.currentSnapshot();
+ if (currentSnapshot == null ||
Objects.equals(currentSnapshot.timestampMillis(), pastSnapshotTimeMil)) {
+ return Stream.empty();
+ }
+
+ return StreamSupport.stream(icebergTable.snapshots().spliterator(), false)
+ .filter(s -> pastSnapshotTimeMil == null || s.timestampMillis() >
pastSnapshotTimeMil)
+ .filter(s -> s.timestampMillis() <= currentSnapshot.timestampMillis())
+ .filter(s -> !s.operation().equals(DataOperations.REPLACE));
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 57a7fc442e5..428aedcd8f4 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -544,12 +544,21 @@ protected void dropTable(String tblName) throws Exception
{
}
}
- protected void runSingleInitiatorCycle() throws Exception {
+ protected Initiator createInitiator() throws Exception {
TestTxnDbUtil.setConfValues(conf);
- CompactorThread t = new Initiator();
+ Initiator t = new Initiator();
t.setConf(conf);
stop.set(true);
t.init(stop);
- t.run();
+ return t;
+ }
+
+ static Worker createWorker(HiveConf conf) throws Exception {
+ HiveConf hiveConf = new HiveConf(conf);
+ hiveConf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ Worker t = new Worker();
+ t.setConf(hiveConf);
+ t.init(new AtomicBoolean(true));
+ return t;
}
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
index f6cc76572f1..4ca42733391 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -83,8 +83,8 @@ public void testIcebergCompactorWithAllPartitionFieldTypes()
throws Exception{
@Test
public void testIcebergAutoCompactionPartitionEvolution() throws Exception {
executeStatementOnDriver(String.format("create table %s " +
- "(id int, a string) " +
- "partitioned by spec(id) stored by iceberg stored as orc " +
+ "(a int, b string) " +
+ "partitioned by spec(a) stored by iceberg stored as orc " +
"tblproperties ('compactor.threshold.min.input.files'='1')",
QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')",
QUALIFIED_TABLE_NAME), driver);
@@ -94,26 +94,44 @@ public void testIcebergAutoCompactionPartitionEvolution()
throws Exception {
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')",
QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')",
QUALIFIED_TABLE_NAME), driver);
- executeStatementOnDriver(String.format("alter table %s set partition
spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("alter table %s set partition
spec(truncate(3, b))", QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9,
'bbb111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10,
'bbb111')", QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)",
QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)",
QUALIFIED_TABLE_NAME), driver);
- runSingleInitiatorCycle();
+ Initiator initiator = createInitiator();
+ initiator.run();
+
+ Worker worker = createWorker(conf);
+ for (int i = 0; i < 4; i++) {
+ worker.run();
+ }
+
ShowCompactResponse rsp = msClient.showCompactions();
Assert.assertEquals(4, rsp.getCompactsSize());
// Compaction should be initiated for each partition from the latest spec
- Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa",
CompactionType.MINOR, CompactionState.INITIATED));
- Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb",
CompactionType.MINOR, CompactionState.INITIATED));
- Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null",
CompactionType.MINOR, CompactionState.INITIATED));
+ Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=aaa",
CompactionType.MINOR, CompactionState.SUCCEEDED));
+ Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=bbb",
CompactionType.MINOR, CompactionState.SUCCEEDED));
+ Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=null",
CompactionType.MINOR, CompactionState.SUCCEEDED));
// Additional compaction should be initiated for all partitions from past
partition specs
- Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR,
CompactionState.INITIATED));
+ Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR,
CompactionState.SUCCEEDED));
+
+ // Data changes after Iceberg initiator has been run
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (13,
'ccc111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (14,
'ddd111')", QUALIFIED_TABLE_NAME), driver);
+
+ initiator.run();
+ rsp = msClient.showCompactions();
+ Assert.assertEquals(6, rsp.getCompactsSize());
+
+ Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=ccc",
CompactionType.MINOR, CompactionState.INITIATED));
+ Assert.assertTrue(isCompactExist(rsp, "b_trunc_3=ddd",
CompactionType.MINOR, CompactionState.INITIATED));
}
@Test
@@ -130,7 +148,9 @@ public void testIcebergAutoCompactionUnpartitioned() throws
Exception {
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)",
QUALIFIED_TABLE_NAME), driver);
executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)",
QUALIFIED_TABLE_NAME), driver);
- runSingleInitiatorCycle();
+ Initiator initiator = createInitiator();
+ initiator.run();
+
ShowCompactResponse rsp = msClient.showCompactions();
Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR,
CompactionState.INITIATED));
@@ -140,7 +160,7 @@ private int getFilesCount() throws Exception {
driver.run(String.format("select count(*) from %s.files",
QUALIFIED_TABLE_NAME));
List<String> res = new ArrayList<>();
driver.getFetchTask().fetch(res);
- return Integer.parseInt(res.get(0));
+ return Integer.parseInt(res.getFirst());
}
private List<String> getAllRecords() throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index b72eea8ce43..252defbf744 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -52,8 +52,6 @@ public class Initiator extends MetaStoreCompactorThread {
private static final String CLASS_NAME = Initiator.class.getName();
private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- private ExecutorService compactionExecutor;
-
private boolean metricsEnabled;
private boolean shouldUseMutex = true;
private List<TableOptimizer> optimizers;
@@ -63,7 +61,9 @@ public void run() {
LOG.info("Starting Initiator thread");
// Make sure nothing escapes this run method and kills the metastore at
large,
// so wrap it in a big catch Throwable statement.
- try {
+ try (ExecutorService compactionExecutor =
CompactorUtil.createExecutorWithThreadFactory(
+ conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
+ COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) {
recoverFailedCompactions(false);
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() :
new NoMutex();
@@ -183,9 +183,6 @@ public void run() {
if (Thread.currentThread().isInterrupted()) {
LOG.info("Interrupt received, Initiator is shutting down.");
}
- if (compactionExecutor != null) {
- compactionExecutor.shutdownNow();
- }
}
}
@@ -214,9 +211,6 @@ protected String resolveUserToRunAs(Map<String, String>
cache, Table t, Partitio
public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
checkInterval =
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CHECK_INTERVAL,
TimeUnit.MILLISECONDS);
- compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
- conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
- COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf,