This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 279996b0858 HIVE-29028: Iceberg: Implement auto compaction (#5886)
279996b0858 is described below
commit 279996b0858896a5b953745ec793c89d82378ea6
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Wed Jul 2 23:12:10 2025 -0400
HIVE-29028: Iceberg: Implement auto compaction (#5886)
---
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 22 +-
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 10 +
.../mr/hive/compaction/IcebergTableOptimizer.java | 191 +++++++++++++++++
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 11 +
.../txn/compactor/TestIcebergCompactorOnTez.java | 117 +++++++++--
.../hive/ql/txn/compactor/AcidTableOptimizer.java | 111 ++++++++++
.../hive/ql/txn/compactor/CompactionException.java | 6 +
.../hive/ql/txn/compactor/CompactorThread.java | 32 ---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 226 +++++----------------
.../ql/txn/compactor/MetaStoreCompactorThread.java | 31 ---
.../ql/txn/compactor/RemoteCompactorThread.java | 28 ---
.../hive/ql/txn/compactor/TableOptimizer.java | 219 ++++++++++++++++++++
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +
.../hive/ql/txn/compactor/CompactorTest.java | 2 +
.../hive/ql/txn/compactor/TestInitiator.java | 4 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 5 +
16 files changed, 712 insertions(+), 305 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index dc5ac290462..b4d9f0bea22 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -140,7 +140,6 @@
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
-import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionField;
@@ -459,7 +458,7 @@ public boolean
supportsAppendData(org.apache.hadoop.hive.metastore.api.Table tab
return true;
}
// If it is a table which has undergone partition evolution, return false;
- if (hasUndergonePartitionEvolution(icebergTbl)) {
+ if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTbl)) {
if (withPartClause) {
throw new SemanticException("Can not Load into an iceberg table, which
has undergone partition evolution " +
"using the PARTITION clause");
@@ -1405,7 +1404,7 @@ public void validateSinkDesc(FileSinkDesc sinkDesc)
throws SemanticException {
if (IcebergTableUtil.isBucketed(table)) {
throw new SemanticException("Cannot perform insert overwrite query on
bucket partitioned Iceberg table.");
}
- if (hasUndergonePartitionEvolution(table)) {
+ if (IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
throw new SemanticException(
"Cannot perform insert overwrite query on Iceberg table where
partition evolution happened. In order " +
"to successfully carry out any insert overwrite operation on this
table, the data has to be rewritten " +
@@ -2061,7 +2060,7 @@ public void
validatePartSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
private void validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table
hmsTable,
Map<String, String> partitionSpec, List<PartitionField> partitionFields)
throws SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- if (hmsTable.getSnapshotRef() != null &&
hasUndergonePartitionEvolution(table)) {
+ if (hmsTable.getSnapshotRef() != null &&
IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
// for this case we rewrite the query as delete query, so validations
would be done as part of delete.
return;
}
@@ -2108,7 +2107,7 @@ private void
validatePartSpecImpl(org.apache.hadoop.hive.ql.metadata.Table hmsTa
public boolean canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table
hmsTable, Map<String, String> partitionSpec)
throws SemanticException {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- if (MapUtils.isEmpty(partitionSpec) ||
!hasUndergonePartitionEvolution(table)) {
+ if (MapUtils.isEmpty(partitionSpec) ||
!IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
return true;
} else if (hmsTable.getSnapshotRef() != null) {
return false;
@@ -2130,15 +2129,6 @@ public boolean
canUseTruncate(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
return result;
}
- private boolean hasUndergonePartitionEvolution(Table table) {
- // The current spec is not necessary the latest which can happen when
partition spec was changed to one of
- // table's past specs.
- return table.currentSnapshot() != null &&
- table.currentSnapshot().allManifests(table.io()).parallelStream()
- .map(ManifestFile::partitionSpecId)
- .anyMatch(id -> id != table.spec().specId());
- }
-
@Override
public List<Partition>
getPartitions(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Map<String, String> partitionSpec, boolean latestSpecOnly) throws
SemanticException {
@@ -2159,7 +2149,7 @@ public boolean
isPartitioned(org.apache.hadoop.hive.ql.metadata.Table hmsTable)
Snapshot snapshot = IcebergTableUtil.getTableSnapshot(table, hmsTable);
boolean readsNonCurrentSnapshot = snapshot != null &&
!snapshot.equals(table.currentSnapshot());
- if (readsNonCurrentSnapshot && hasUndergonePartitionEvolution(table)) {
+ if (readsNonCurrentSnapshot &&
IcebergTableUtil.hasUndergonePartitionEvolution(table)) {
return false;
}
return table.spec().isPartitioned();
@@ -2368,7 +2358,7 @@ public void setMergeTaskDeleteProperties(TableDesc
tableDesc) {
@Override
public boolean
hasUndergonePartitionEvolution(org.apache.hadoop.hive.ql.metadata.Table
hmsTable) {
Table table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
- return hasUndergonePartitionEvolution(table);
+ return IcebergTableUtil.hasUndergonePartitionEvolution(table);
}
private static List<FieldSchema> schema(List<VirtualColumn> exprs) {
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 54b283527a1..2e4e5f0a094 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -59,6 +59,7 @@
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManageSnapshots;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PartitionData;
@@ -562,4 +563,13 @@ public static ExecutorService newDeleteThreadPool(String
completeName, int numTh
return thread;
});
}
+
+ public static boolean hasUndergonePartitionEvolution(Table table) {
+ // The current spec is not necessary the latest which can happen when
partition spec was changed to one of
+ // table's past specs.
+ return table.currentSnapshot() != null &&
+ table.currentSnapshot().allManifests(table.io()).parallelStream()
+ .map(ManifestFile::partitionSpecId)
+ .anyMatch(id -> id != table.spec().specId());
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
new file mode 100644
index 00000000000..ab785844461
--- /dev/null
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive.compaction;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
+import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
+import org.apache.iceberg.hive.RuntimeMetaException;
+import org.apache.iceberg.mr.hive.IcebergTableUtil;
+import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.thrift.TException;
+
+public class IcebergTableOptimizer extends TableOptimizer {
+ private HiveMetaStoreClient client;
+ private Map<String, Long> snapshotIdCache;
+
+ public IcebergTableOptimizer(HiveConf conf, TxnStore txnHandler,
MetadataCache metadataCache) throws MetaException {
+ super(conf, txnHandler, metadataCache);
+ init();
+ }
+
+ /**
+ * Scans all databases and tables in the Hive Metastore to identify Iceberg
tables
+ * that are potential candidates for compaction.
+ * <p>
+ * The method filters tables based on provided databases and tables skip
lists and a snapshot ID cache to avoid
+ * re-processing tables that haven't changed. For eligible Iceberg tables,
it determines
+ * the appropriate compaction targets (table or specific partitions) and
adds them to
+ * the {@link CompactionInfo} set.
+ * </p>
+ * @param lastChecked A timestamp of previous auto compaction's invocation.
+ * @param currentCompactions A {@link ShowCompactResponse} containing
currently active or pending
+ * compaction requests, used to avoid duplicates.
+ * @param skipDBs A {@link Set} of database names to explicitly skip during
the scan.
+ * @param skipTables A {@link Set} of fully qualified table names to
explicitly skip during the scan.
+ * @return A {@link Set} of {@link CompactionInfo} objects representing
tables and/or partitions
+ * identified as eligible for compaction.
+ * @throws MetaException If an unrecoverable error occurs during Metastore
communication or
+ * during {@link SessionState} initialization.
+ */
+ @Override
+ public Set<CompactionInfo> findPotentialCompactions(long lastChecked,
ShowCompactResponse currentCompactions,
+ Set<String> skipDBs, Set<String> skipTables) throws MetaException {
+ Set<CompactionInfo> compactionTargets = Sets.newHashSet();
+ try {
+ SessionState sessionState = SessionState.get();
+ if (sessionState == null) {
+ sessionState = new SessionState(conf);
+ SessionState.start(sessionState);
+ }
+ } catch (Exception e) {
+ throw new MetaException(String.format("Error while finding compaction
targets for Iceberg tables: %s",
+ e.getMessage()));
+ }
+
+ getDatabases().stream()
+ .filter(dbName -> !skipDBs.contains(dbName))
+ .flatMap(dbName -> getTables(dbName).stream()
+ .map(tableName -> TableName.getDbTable(dbName, tableName)))
+ .filter(qualifiedTableName -> !skipTables.contains(qualifiedTableName))
+ .map(qualifiedTableName -> Pair.of(qualifiedTableName,
resolveMetastoreTable(qualifiedTableName)))
+ .filter(tablePair ->
MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters()))
+ .filter(tablePair -> {
+ long currentSnapshotId =
Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id"));
+ Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey());
+ return cachedSnapshotId == null || cachedSnapshotId !=
currentSnapshotId;
+ })
+ .forEach(tablePair -> {
+ org.apache.iceberg.Table icebergTable =
IcebergTableUtil.getTable(conf, tablePair.getValue());
+
+ if (icebergTable.spec().isPartitioned()) {
+ List<String> partitions = getPartitions(icebergTable);
+
+ partitions.forEach(partition ->
addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
+ partition, compactionTargets, currentCompactions, skipDBs,
skipTables));
+ }
+
+ if (icebergTable.spec().isUnpartitioned() ||
IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)) {
+ addCompactionTargetIfEligible(tablePair.getValue(), icebergTable,
null, compactionTargets,
+ currentCompactions, skipDBs, skipTables);
+ }
+
+ snapshotIdCache.put(tablePair.getKey(),
icebergTable.currentSnapshot().snapshotId());
+ });
+
+ return compactionTargets;
+ }
+
+ private List<String> getDatabases() {
+ try {
+ return client.getAllDatabases();
+ } catch (TException e) {
+ throw new RuntimeMetaException(e, "Error getting database names");
+ }
+ }
+
+ private List<String> getTables(String dbName) {
+ try {
+ return client.getAllTables(dbName);
+ } catch (TException e) {
+ throw new RuntimeMetaException(e, "Error getting table names of %s
database", dbName);
+ }
+ }
+
+ private List<String> getPartitions(org.apache.iceberg.Table icebergTable) {
+ try {
+ return IcebergTableUtil.getPartitionNames(icebergTable,
Maps.newHashMap(), true);
+ } catch (SemanticException e) {
+ throw new RuntimeMetaException(e, "Error getting partition names for
Iceberg table %s", icebergTable.name());
+ }
+ }
+
+ private Table resolveMetastoreTable(String qualifiedTableName) {
+ String[] dbTableName = TxnUtils.getDbTableName(qualifiedTableName);
+ try {
+ return metadataCache.computeIfAbsent(qualifiedTableName,
+ () -> CompactorUtil.resolveTable(conf, dbTableName[0],
dbTableName[1]));
+ } catch (Exception e) {
+ throw new RuntimeMetaException(e, "Error resolving table %s",
qualifiedTableName);
+ }
+ }
+
+ public void init() throws MetaException {
+ client = new HiveMetaStoreClient(conf);
+ snapshotIdCache = Maps.newConcurrentMap();
+ }
+
+ private void addCompactionTargetIfEligible(Table table,
org.apache.iceberg.Table icebergTable, String partitionName,
+ Set<CompactionInfo> compactions, ShowCompactResponse currentCompactions,
Set<String> skipDBs,
+ Set<String> skipTables) {
+
+ CompactionInfo ci = new CompactionInfo(table.getDbName(),
table.getTableName(), partitionName,
+ CompactionType.SMART_OPTIMIZE);
+
+ // Common Hive compaction eligibility checks
+ if (!isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables))
{
+ return;
+ }
+
+ // Iceberg specific compaction checks: determine if compaction is needed
and which type is needed
+ CompactionEvaluator compactionEvaluator;
+ try {
+ compactionEvaluator = new CompactionEvaluator(icebergTable, ci,
table.getParameters());
+ } catch (IOException e) {
+ throw new RuntimeMetaException(e, "Error construction compaction
evaluator for table %s", table.getTableName());
+ }
+
+ if (!compactionEvaluator.isEligibleForCompaction()) {
+ return;
+ }
+
+ ci.type = compactionEvaluator.determineCompactionType();
+ compactions.add(ci);
+ }
+}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index fdbb454de41..57a7fc442e5 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -69,6 +70,7 @@ public abstract class CompactorOnTezTest {
protected IMetaStoreClient msClient;
protected IDriver driver;
protected boolean mmCompaction = false;
+ private final AtomicBoolean stop = new AtomicBoolean();
@ClassRule
public static TemporaryFolder folder = new TemporaryFolder();
@@ -541,4 +543,13 @@ protected void dropTable(String tblName) throws Exception {
executeStatementOnDriver("drop table " + tblName, driver);
}
}
+
+ protected void runSingleInitiatorCycle() throws Exception {
+ TestTxnDbUtil.setConfValues(conf);
+ CompactorThread t = new Initiator();
+ t.setConf(conf);
+ stop.set(true);
+ t.init(stop);
+ t.run();
+ }
}
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
index 9ca0bd9bbe4..f6cc76572f1 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestIcebergCompactorOnTez.java
@@ -21,66 +21,139 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionState;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
public class TestIcebergCompactorOnTez extends CompactorOnTezTest {
+
+ private static final String DB_NAME = "default";
+ private static final String TABLE_NAME = "ice_orc";
+ private static final String QUALIFIED_TABLE_NAME =
TxnUtils.getFullTableName(DB_NAME, TABLE_NAME);
+
+ @Override
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ executeStatementOnDriver("drop table if exists " + QUALIFIED_TABLE_NAME,
driver);
+ }
@Test
public void testIcebergCompactorWithAllPartitionFieldTypes() throws
Exception{
conf.setVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE,
CUSTOM_COMPACTION_QUEUE);
msClient = new HiveMetaStoreClient(conf);
- String dbName = "default";
- String tableName = "ice_orc";
- String qualifiedTableName = dbName + "." + tableName;
-
- executeStatementOnDriver("drop table if exists " + qualifiedTableName,
driver);
executeStatementOnDriver(String.format("create table %s " +
"(id int, a string, b int, c bigint, d float, e double, f decimal(4,
2), g boolean, h date, i date, j date, k timestamp) " +
"partitioned by spec(a, truncate(3, a), bucket(4, a), b, c, d, e, f,
g, h, year(h), month(i), day(j), k, hour(k)) stored by iceberg stored as orc " +
- "tblproperties ('compactor.threshold.min.input.files'='1')",
qualifiedTableName), driver);
+ "tblproperties ('compactor.threshold.min.input.files'='1')",
QUALIFIED_TABLE_NAME), driver);
// 6 records, one records per file --> 3 partitions, 2 files per partition
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName),
driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", qualifiedTableName),
driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName),
driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", qualifiedTableName),
driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null,
null, null, null, null, null, null, null, null, null, null)",
qualifiedTableName), driver);
- executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null,
null, null, null, null, null, null, null, null, null, null)",
qualifiedTableName), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2,
'aaa111', 1, 100, 1.0, 2.0, 4.00, true, DATE '2024-05-01', DATE '2024-05-01',
DATE '2024-05-01', TIMESTAMP '2024-05-02 10:00:00')", QUALIFIED_TABLE_NAME),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4,
'bbb222', 2, 200, 2.0, 3.0, 8.00, false, DATE '2024-05-03', DATE '2024-05-03',
DATE '2024-05-03', TIMESTAMP '2024-05-04 13:00:00')", QUALIFIED_TABLE_NAME),
driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, null,
null, null, null, null, null, null, null, null, null, null)",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, null,
null, null, null, null, null, null, null, null, null, null)",
QUALIFIED_TABLE_NAME), driver);
- Assert.assertEquals(6, getFilesCount(qualifiedTableName));
- List<String> recordsBefore = getAllRecords(qualifiedTableName);
+ Assert.assertEquals(6, getFilesCount());
+ List<String> recordsBefore = getAllRecords();
- CompactorTestUtil.runCompaction(conf, dbName, tableName,
CompactionType.MINOR, false,
+ CompactorTestUtil.runCompaction(conf, DB_NAME, TABLE_NAME,
CompactionType.MINOR, false,
"a=aaa111/a_trunc=aaa/a_bucket=0/b=1/c=100/d=1.0/e=2.0/f=4.00/g=true/h=2024-05-01/h_year=2024/i_month=2024-05/j_day=2024-05-01/k=2024-05-02T10%3A00%3A00/k_hour=2024-05-02-10",
"a=bbb222/a_trunc=bbb/a_bucket=3/b=2/c=200/d=2.0/e=3.0/f=8.00/g=false/h=2024-05-03/h_year=2024/i_month=2024-05/j_day=2024-05-03/k=2024-05-04T13%3A00%3A00/k_hour=2024-05-04-13",
"a=null/a_trunc=null/a_bucket=null/b=null/c=null/d=null/e=null/f=null/g=null/h=null/h_year=null/i_month=null/j_day=null/k=null/k_hour=null"
);
- Assert.assertEquals(3, getFilesCount(qualifiedTableName));
+ Assert.assertEquals(3, getFilesCount());
verifySuccessfulCompaction(3);
- List<String> recordsAfter = getAllRecords(qualifiedTableName);
+ List<String> recordsAfter = getAllRecords();
Assert.assertEquals(recordsBefore, recordsAfter);
}
-
- private int getFilesCount(String qualifiedTableName) throws Exception {
- driver.run(String.format("select count(*) from %s.files",
qualifiedTableName));
+
+ @Test
+ public void testIcebergAutoCompactionPartitionEvolution() throws Exception {
+ executeStatementOnDriver(String.format("create table %s " +
+ "(id int, a string) " +
+ "partitioned by spec(id) stored by iceberg stored as orc " +
+ "tblproperties ('compactor.threshold.min.input.files'='1')",
QUALIFIED_TABLE_NAME), driver);
+
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (1, 'a')",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (2, 'b')",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (3, 'c')",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (4, 'd')",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (5, 'e')",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (6, 'd')",
QUALIFIED_TABLE_NAME), driver);
+
+ executeStatementOnDriver(String.format("alter table %s set partition
spec(truncate(3, a))", QUALIFIED_TABLE_NAME), driver);
+
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)",
QUALIFIED_TABLE_NAME), driver);
+
+ runSingleInitiatorCycle();
+ ShowCompactResponse rsp = msClient.showCompactions();
+ Assert.assertEquals(4, rsp.getCompactsSize());
+
+ // Compaction should be initiated for each partition from the latest spec
+ Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=aaa",
CompactionType.MINOR, CompactionState.INITIATED));
+ Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=bbb",
CompactionType.MINOR, CompactionState.INITIATED));
+ Assert.assertTrue(isCompactExist(rsp, "a_trunc_3=null",
CompactionType.MINOR, CompactionState.INITIATED));
+
+ // Additional compaction should be initiated for all partitions from past
partition specs
+ Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR,
CompactionState.INITIATED));
+ }
+
+ @Test
+ public void testIcebergAutoCompactionUnpartitioned() throws Exception {
+ executeStatementOnDriver(String.format("create table %s " +
+ "(id int, a string) " +
+ "stored by iceberg stored as orc " +
+ "tblproperties ('compactor.threshold.min.input.files'='1')",
QUALIFIED_TABLE_NAME), driver);
+
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (7,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (8,
'aaa111')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (9,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (10,
'bbb222')", QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (11, null)",
QUALIFIED_TABLE_NAME), driver);
+ executeStatementOnDriver(String.format("INSERT INTO %s VALUES (12, null)",
QUALIFIED_TABLE_NAME), driver);
+
+ runSingleInitiatorCycle();
+ ShowCompactResponse rsp = msClient.showCompactions();
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertTrue(isCompactExist(rsp, null, CompactionType.MINOR,
CompactionState.INITIATED));
+ }
+
+ private int getFilesCount() throws Exception {
+ driver.run(String.format("select count(*) from %s.files",
QUALIFIED_TABLE_NAME));
List<String> res = new ArrayList<>();
driver.getFetchTask().fetch(res);
return Integer.parseInt(res.get(0));
}
- private List<String> getAllRecords(String qualifiedTableName) throws
Exception {
- driver.run(String.format("select * from %s order by id",
qualifiedTableName));
+ private List<String> getAllRecords() throws Exception {
+ driver.run(String.format("select * from %s order by id",
QUALIFIED_TABLE_NAME));
List<String> res = new ArrayList<>();
driver.getFetchTask().fetch(res);
return res;
}
+
+ private boolean isCompactExist(ShowCompactResponse rsp, String partName,
CompactionType type, CompactionState state) {
+ return rsp.getCompacts().stream().anyMatch(c ->
+ c.getDbname().equals(DB_NAME) && c.getTablename().equals(TABLE_NAME) &&
+ Objects.equals(c.getPartitionname(), partName) &&
c.getType().equals(type) &&
+ c.getState().equals(state.name().toLowerCase()));
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java
new file mode 100644
index 00000000000..424f50a2763
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/AcidTableOptimizer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class AcidTableOptimizer extends TableOptimizer {
+ private static final String CLASS_NAME = AcidTableOptimizer.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ public AcidTableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache
metadataCache) {
+ super(conf, txnHandler, metadataCache);
+ }
+
+ @Override
+ public Set<CompactionInfo> findPotentialCompactions(long lastChecked,
ShowCompactResponse currentCompactions,
+ Set<String> skipDBs, Set<String> skipTables) throws MetaException {
+
+ int abortedThreshold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+
+ Set<CompactionInfo> potentials =
txnHandler.findPotentialCompactions(abortedThreshold, abortedTimeThreshold,
lastChecked)
+ .parallelStream()
+ .filter(ci -> isEligibleForCompaction(ci, currentCompactions,
skipDBs, skipTables))
+ .collect(Collectors.toSet());
+
+ if (!potentials.isEmpty()) {
+ ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
+ txnHandler.getOpenTxns(), 0);
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ }
+
+ return potentials;
+ }
+
+ @Override
+ protected boolean isEligibleForCompaction(CompactionInfo ci,
ShowCompactResponse currentCompactions,
+ Set<String> skipDBs, Set<String> skipTables) {
+ try {
+ if (!super.isEligibleForCompaction(ci, currentCompactions, skipDBs,
skipTables)) {
+ return false;
+ }
+ String qualifiedTableName = ci.getFullTableName();
+ Table t = metadataCache.computeIfAbsent(qualifiedTableName, () ->
+ CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+ if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf
+ .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+ skipTables.add(ci.getFullTableName());
+ LOG.info("Table {} is insert only and {}=false so we will not compact
it.", qualifiedTableName,
+ HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname);
+ return false;
+ }
+ if (isDynPartIngest(t, ci)) {
+ return false;
+ }
+
+ } catch (Exception e) {
+ LOG.error("Caught exception while checking compaction eligibility.", e);
+ try {
+ ci.errorMessage = e.getMessage();
+ txnHandler.markFailed(ci);
+ } catch (MetaException ex) {
+ LOG.error("Caught exception while marking compaction as failed.", e);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ // Check if it's a dynamic partitioning case. If so, do not initiate
compaction for streaming ingest, only for aborts.
+ protected static boolean isDynPartIngest(Table t, CompactionInfo ci) {
+ if (t.getPartitionKeys() != null && !t.getPartitionKeys().isEmpty() &&
+ ci.partName == null && !ci.hasOldAbort) {
+ LOG.info("Skipping entry for {} as it is from dynamic partitioning",
ci.getFullTableName());
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
index 302345f0284..498b3e55819 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionException.java
@@ -91,4 +91,10 @@ public ErrorMsg getCanonicalErrorMsg() {
}
public String getRemoteErrorMsg() { return remoteErrorMsg; }
+
+ public CompactionException(Throwable throwable, String message, Object...
args) {
+ super(String.format(message, args), throwable);
+ canonicalErrorMsg = ErrorMsg.GENERIC_ERROR;
+ remoteErrorMsg = null;
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index ec0619f7997..b3f5d2a7eeb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -25,17 +25,13 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -89,25 +85,6 @@ public void init(AtomicBoolean stop) throws Exception {
*/
abstract Table resolveTable(CompactionInfo ci) throws MetaException;
- abstract boolean replIsCompactionDisabledForDatabase(String dbName) throws
TException;
-
- /**
- * Get list of partitions by name.
- * @param ci compaction info.
- * @return list of partitions
- * @throws MetaException if an error occurs.
- */
- abstract List<Partition> getPartitionsByNames(CompactionInfo ci) throws
MetaException;
-
- /**
- * Get the partition being compacted.
- * @param ci compaction info returned from the compaction queue
- * @return metastore partition, or null if there is not partition in this
compaction info
- * @throws MetaException if underlying calls throw, or if the partition name
resolves to more than
- * one partition.
- */
- abstract protected Partition resolvePartition(CompactionInfo ci) throws
MetaException;
-
protected String tableName(Table t) {
return Warehouse.getQualifiedName(t);
}
@@ -123,15 +100,6 @@ public static void
initializeAndStartThread(CompactorThread thread, Configuratio
thread.start();
}
- protected boolean replIsCompactionDisabledForTable(Table tbl) {
- // Compaction is disabled until after first successful incremental load.
Check HIVE-21197 for more detail.
- boolean isCompactDisabled =
ReplUtils.isFirstIncPending(tbl.getParameters());
- if (isCompactDisabled) {
- LOG.info("Compaction is disabled for table " + tbl.getTableName());
- }
- return isCompactDisabled;
- }
-
@VisibleForTesting
protected String getRuntimeVersion() {
return this.getClass().getPackage().getImplementationVersion();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index f166d677e40..b72eea8ce43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -20,7 +20,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.common.ServerUtils;
-import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -28,21 +27,20 @@
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NAME_FORMAT;
@@ -51,13 +49,14 @@
* It's critical that there exactly 1 of these in a given warehouse.
*/
public class Initiator extends MetaStoreCompactorThread {
- static final private String CLASS_NAME = Initiator.class.getName();
- static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+ private static final String CLASS_NAME = Initiator.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private ExecutorService compactionExecutor;
private boolean metricsEnabled;
private boolean shouldUseMutex = true;
+ private List<TableOptimizer> optimizers;
@Override
public void run() {
@@ -66,12 +65,6 @@ public void run() {
// so wrap it in a big catch Throwable statement.
try {
recoverFailedCompactions(false);
-
- int abortedThreshold = HiveConf.getIntVar(conf,
- HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
- long abortedTimeThreshold = HiveConf
- .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
- TimeUnit.MILLISECONDS);
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() :
new NoMutex();
// Make sure we run through the loop once before checking to stop as
this makes testing
@@ -113,11 +106,11 @@ public void run() {
Set<String> skipDBs = Sets.newConcurrentHashSet();
Set<String> skipTables = Sets.newConcurrentHashSet();
- Set<CompactionInfo> potentials = compactionExecutor.submit(() ->
- txnHandler.findPotentialCompactions(abortedThreshold,
abortedTimeThreshold, prevStart)
- .parallelStream()
- .filter(ci -> isEligibleForCompaction(ci, currentCompactions,
skipDBs, skipTables))
- .collect(Collectors.toSet())).get();
+ Set<CompactionInfo> potentials = Sets.newHashSet();
+ for (TableOptimizer optimizer : optimizers) {
+ potentials.addAll(compactionExecutor.submit(() ->
+ optimizer.findPotentialCompactions(prevStart,
currentCompactions, skipDBs, skipTables)).get());
+ }
LOG.debug("Found {} potential compactions, checking to see if we
should compact any of them", potentials.size());
CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -125,12 +118,6 @@ public void run() {
Map<String, String> tblNameOwners = new HashMap<>();
List<CompletableFuture<Void>> compactionList = new ArrayList<>();
- if (!potentials.isEmpty()) {
- ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
- txnHandler.getOpenTxns(), 0);
- conf.set(ValidTxnList.VALID_TXNS_KEY,
validTxnList.writeToString());
- }
-
for (CompactionInfo ci : potentials) {
try {
//Check for interruption before scheduling each compactionInfo
and return if necessary
@@ -142,8 +129,8 @@ public void run() {
ci.poolName = CompactorUtil.getPoolName(conf, t, metadataCache);
Partition p = resolvePartition(ci);
if (p == null && ci.partName != null) {
- LOG.info("Can't find partition " + ci.getFullPartitionName() +
- ", assuming it has been dropped and moving on.");
+ LOG.info("Can't find partition {}, assuming it has been
dropped and moving on.",
+ ci.getFullPartitionName());
continue;
}
String runAs = resolveUserToRunAs(tblNameOwners, t, p);
@@ -208,10 +195,6 @@ protected boolean isCacheEnabled() {
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
}
- private Database resolveDatabase(CompactionInfo ci) throws MetaException,
NoSuchObjectException {
- return CompactorUtil.resolveDatabase(conf, ci.dbname);
- }
-
@VisibleForTesting
protected String resolveUserToRunAs(Map<String, String> cache, Table t,
Partition p)
throws IOException, InterruptedException {
@@ -236,158 +219,32 @@ public void init(AtomicBoolean stop) throws Exception {
COMPACTOR_INTIATOR_THREAD_NAME_FORMAT);
metricsEnabled = MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METRICS_ENABLED) &&
MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
+ optimizers = Arrays.stream(MetastoreConf.getTrimmedStringsVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS))
+ .map(this::instantiateTableOptimizer).toList();
}
+
+ private TableOptimizer instantiateTableOptimizer(String className) {
+ try {
+ Class<? extends TableOptimizer> icebergInitiatorClazz = (Class<? extends
TableOptimizer>)
+ Class.forName(className, true,
+ Utilities.getSessionSpecifiedClassLoader());
- private void recoverFailedCompactions(boolean remoteOnly) throws
MetaException {
- if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
- txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
- HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS));
- }
-
- private boolean foundCurrentOrFailedCompactions(ShowCompactResponse
compactions, CompactionInfo ci) throws MetaException {
- if (compactions.getCompacts() == null) {
- return false;
- }
-
- //In case of an aborted Dynamic partition insert, the created entry in the
compaction queue does not contain
- //a partition name even for partitioned tables. As a result it can happen
that the ShowCompactResponse contains
- //an element without partition name for partitioned tables. Therefore, it
is necessary to null check the partition
- //name of the ShowCompactResponseElement even if the
CompactionInfo.partName is not null. These special compaction
- //requests are skipped by the worker, and only cleaner will pick them up,
so we should allow to schedule a 'normal'
- //compaction for partitions of those tables which has special (DP abort)
entry with undefined partition name.
- List<ShowCompactResponseElement> filteredElements =
compactions.getCompacts().stream()
- .filter(e -> e.getDbname().equals(ci.dbname)
- && e.getTablename().equals(ci.tableName)
- && (e.getPartitionname() == null && ci.partName == null ||
- (Objects.equals(e.getPartitionname(),ci.partName))))
- .collect(Collectors.toList());
-
- // Figure out if there are any currently running compactions on the same
table or partition.
- if (filteredElements.stream().anyMatch(
- e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) ||
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
-
- LOG.info("Found currently initiated or working compaction for " +
- ci.getFullPartitionName() + " so we will not initiate another
compaction");
- return true;
- }
-
- // Check if there is already sufficient number of consecutive failures for
this table/partition
- // so that no new automatic compactions needs to be scheduled.
- int failedThreshold = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-
- LongSummaryStatistics failedStats = filteredElements.stream()
- .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) ||
TxnStore.FAILED_RESPONSE.equals(e.getState()))
-
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
- .limit(failedThreshold)
-
- .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
-
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
-
- // If the last attempt was too long ago, ignore the failed threshold and
try compaction again
- long retryTime = MetastoreConf.getTimeVar(conf,
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME,
TimeUnit.MILLISECONDS);
-
- boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime
< System.currentTimeMillis());
- if (failedStats.getCount() == failedThreshold && !needsRetry) {
- LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName()
+ " since last " +
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
attempts to compact it failed.");
-
- ci.errorMessage = "Compaction is not initiated since last " +
- MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
consecutive compaction attempts failed)";
+ Class<?>[] constructorParameterTypes = {HiveConf.class, TxnStore.class,
MetadataCache.class};
+ Constructor<?> constructor =
icebergInitiatorClazz.getConstructor(constructorParameterTypes);
- txnHandler.markFailed(ci);
- return true;
+ Object[] constructorArgs = new Object[] {conf, txnHandler,
metadataCache};
+ return (TableOptimizer) constructor.newInstance(constructorArgs);
}
- return false;
- }
-
- // Check if it's a dynamic partitioning case. If so, do not initiate
compaction for streaming ingest, only for aborts.
- private static boolean isDynPartIngest(Table t, CompactionInfo ci){
- if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
- ci.partName == null && !ci.hasOldAbort) {
- LOG.info("Skipping entry for " + ci.getFullTableName() + " as it is from
dynamic" +
- " partitioning");
- return true;
+ catch (Exception e) {
+ throw new CompactionException(e, "Failed instantiating and calling table
optimizer %s", className);
}
- return false;
}
- private boolean isEligibleForCompaction(CompactionInfo ci,
- ShowCompactResponse currentCompactions, Set<String> skipDBs, Set<String>
skipTables) {
- try {
- if (skipDBs.contains(ci.dbname)) {
- LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName,
skipDBs.size());
- return false;
- } else {
- if (replIsCompactionDisabledForDatabase(ci.dbname)) {
- skipDBs.add(ci.dbname);
- LOG.info("Skipping {} as compaction is disabled due to repl;
skipDBs::size:{}",
- ci.dbname, skipDBs.size());
- return false;
- }
- }
-
- if (skipTables.contains(ci.getFullTableName())) {
- return false;
- }
-
- LOG.info("Checking to see if we should compact " +
ci.getFullPartitionName());
-
- // Check if we have already initiated or are working on a compaction for
this table/partition.
- // Also make sure we haven't exceeded configured number of consecutive
failures.
- // If any of the above applies, skip it.
- // Note: if we are just waiting on cleaning we can still check, as it
may be time to compact again even though we haven't cleaned.
- if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
- return false;
- }
-
- Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci));
- if (t == null) {
- LOG.info("Can't find table " + ci.getFullTableName() + ", assuming
it's a temp " +
- "table or has been dropped and moving on.");
- return false;
- }
-
- if (replIsCompactionDisabledForTable(t)) {
- skipTables.add(ci.getFullTableName());
- return false;
- }
-
- Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname,
() -> resolveDatabase(ci)).getParameters();
- if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
- if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
- skipDBs.add(ci.dbname);
- LOG.info("DB " + ci.dbname + " marked " +
hive_metastoreConstants.NO_AUTO_COMPACT +
- "=true so we will not compact it.");
- } else {
- skipTables.add(ci.getFullTableName());
- LOG.info("Table " + tableName(t) + " marked " +
hive_metastoreConstants.NO_AUTO_COMPACT +
- "=true so we will not compact it.");
- }
- return false;
- }
- if (AcidUtils.isInsertOnlyTable(t.getParameters()) && !HiveConf
- .getBoolVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
- skipTables.add(ci.getFullTableName());
- LOG.info("Table " + tableName(t) + " is insert only and " +
HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM.varname
- + "=false so we will not compact it.");
- return false;
- }
- if (isDynPartIngest(t, ci)) {
- return false;
- }
-
- } catch (Throwable e) {
- LOG.error("Caught exception while checking compaction eligibility.", e);
- try {
- ci.errorMessage = e.getMessage();
- txnHandler.markFailed(ci);
- } catch (MetaException ex) {
- LOG.error("Caught exception while marking compaction as failed.", e);
- }
- return false;
- }
- return true;
+ private void recoverFailedCompactions(boolean remoteOnly) throws
MetaException {
+ if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
+ txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT,
TimeUnit.MILLISECONDS));
}
private static class InitiatorCycleUpdater implements Runnable {
@@ -428,4 +285,25 @@ public void run() {
public void enforceMutex(boolean enableMutex) {
this.shouldUseMutex = enableMutex;
}
+
+ @VisibleForTesting
+ protected Partition resolvePartition(CompactionInfo ci) throws Exception {
+ Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () ->
+ CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+
+ if (!MetaStoreUtils.isIcebergTable(table.getParameters())) {
+ return CompactorUtil.resolvePartition(conf, null, ci.dbname,
ci.tableName, ci.partName,
+ CompactorUtil.METADATA_FETCH_MODE.LOCAL);
+ } else {
+ if (ci.partName == null) {
+ return null;
+ }
+
+ org.apache.hadoop.hive.metastore.api.Partition partition = new
org.apache.hadoop.hive.metastore.api.Partition();
+ partition.setSd(table.getSd().deepCopy());
+ partition.setParameters(com.google.common.collect.Maps.newHashMap());
+
+ return partition;
+ }
+ }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 0878330cc3e..12662e3c6e9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -20,19 +20,13 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TException;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -40,7 +34,6 @@
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.COMPACTOR_USE_CUSTOM_POOL;
-import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
/**
* Compactor threads that runs in the metastore. It uses a {@link TxnStore}
@@ -69,30 +62,6 @@ public void init(AtomicBoolean stop) throws Exception {
return CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName);
}
- @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws
TException {
- try {
- Database database =
getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName);
- // Compaction is disabled until after first successful incremental load.
Check HIVE-21197 for more detail.
- boolean isReplCompactDisabled =
ReplUtils.isFirstIncPending(database.getParameters());
- if (isReplCompactDisabled) {
- LOG.info("Compaction is disabled for database " + dbName);
- }
- return isReplCompactDisabled;
- } catch (NoSuchObjectException e) {
- LOG.info("Unable to find database " + dbName);
- return true;
- }
- }
-
- @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws
MetaException {
- return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName,
ci.partName);
- }
-
- protected Partition resolvePartition(CompactionInfo ci) throws MetaException
{
- return CompactorUtil.resolvePartition(conf, null, ci.dbname, ci.tableName,
ci.partName,
- CompactorUtil.METADATA_FETCH_MODE.LOCAL);
- }
-
protected abstract boolean isCacheEnabled();
protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
index f95834ac23d..38493939b03 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RemoteCompactorThread.java
@@ -19,20 +19,12 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.thrift.TException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-
/**
* Compactor thread that can run outside the metastore. It will
* use the metastore thrift API which will default to a remote connection
@@ -60,24 +52,4 @@ public void init(AtomicBoolean stop) throws Exception {
@Override Table resolveTable(CompactionInfo ci) throws MetaException {
return RemoteCompactorUtil.resolveTable(conf, msc, ci);
}
-
- @Override boolean replIsCompactionDisabledForDatabase(String dbName) throws
TException {
- try {
- Database database = msc.getDatabase(getDefaultCatalog(conf), dbName);
- // Compaction is disabled until after first successful incremental load.
Check HIVE-21197 for more detail.
- return ReplUtils.isFirstIncPending(database.getParameters());
- } catch (NoSuchObjectException e) {
- LOG.info("Unable to find database " + dbName);
- return true;
- }
- }
-
- @Override List<Partition> getPartitionsByNames(CompactionInfo ci) throws
MetaException {
- return RemoteCompactorUtil.getPartitionsByNames(msc, ci.dbname,
ci.tableName, ci.tableName);
- }
-
- protected Partition resolvePartition(CompactionInfo ci) throws MetaException
{
- return CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName,
ci.partName,
- CompactorUtil.METADATA_FETCH_MODE.REMOTE);
- }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java
new file mode 100644
index 00000000000..3211939ea4d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/TableOptimizer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
+
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.LongSummaryStatistics;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+
+public abstract class TableOptimizer {
+ private static final String CLASS_NAME = TableOptimizer.class.getName();
+ private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
+
+ public abstract Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompactResponse currentCompactions,
+ Set<String> skipDBs, Set<String> skipTables) throws MetaException;
+
+ protected final HiveConf conf;
+ protected final TxnStore txnHandler;
+ protected final MetadataCache metadataCache;
+
+ protected TableOptimizer(HiveConf conf, TxnStore txnHandler, MetadataCache
metadataCache) {
+ this.conf = conf;
+ this.txnHandler = txnHandler;
+ this.metadataCache = metadataCache;
+ }
+
+ protected boolean isEligibleForCompaction(CompactionInfo ci,
ShowCompactResponse currentCompactions,
+ Set<String> skipDBs, Set<String> skipTables) {
+ try {
+ if (skipDBs.contains(ci.dbname)) {
+ LOG.info("Skipping {}::{}, skipDBs::size:{}", ci.dbname, ci.tableName,
skipDBs.size());
+ return false;
+ } else {
+ if (replIsCompactionDisabledForDatabase(ci.dbname)) {
+ skipDBs.add(ci.dbname);
+ LOG.info("Skipping {} as compaction is disabled due to repl;
skipDBs::size:{}",
+ ci.dbname, skipDBs.size());
+ return false;
+ }
+ }
+
+ String qualifiedTableName = ci.getFullTableName();
+ if (skipTables.contains(qualifiedTableName)) {
+ return false;
+ }
+
+ LOG.info("Checking to see if we should compact {}",
ci.getFullPartitionName());
+
+ // Check if we have already initiated or are working on a compaction for
this table/partition.
+ // Also make sure we haven't exceeded configured number of consecutive
failures.
+ // If any of the above applies, skip it.
+ // Note: if we are just waiting on cleaning we can still check, as it
may be time to compact again even though we haven't cleaned.
+ if (foundCurrentOrFailedCompactions(currentCompactions, ci)) {
+ return false;
+ }
+
+ Table t = metadataCache.computeIfAbsent(qualifiedTableName, () ->
+ CompactorUtil.resolveTable(conf, ci.dbname, ci.tableName));
+ if (t == null) {
+ LOG.info("Can't find table {}, assuming it's a temp table or has been
dropped and moving on.",
+ qualifiedTableName);
+ return false;
+ }
+
+ if (replIsCompactionDisabledForTable(t)) {
+ skipTables.add(qualifiedTableName);
+ return false;
+ }
+
+ Map<String, String> dbParams = metadataCache.computeIfAbsent(ci.dbname,
() -> resolveDatabase(ci)).getParameters();
+ if (MetaStoreUtils.isNoAutoCompactSet(dbParams, t.getParameters())) {
+ if (Boolean.parseBoolean(MetaStoreUtils.getNoAutoCompact(dbParams))) {
+ skipDBs.add(ci.dbname);
+ LOG.info("DB {} marked {}=true so we will not compact it.",
hive_metastoreConstants.NO_AUTO_COMPACT, ci.dbname);
+ } else {
+ skipTables.add(qualifiedTableName);
+ LOG.info("Table {} marked {}=true so we will not compact it.",
hive_metastoreConstants.NO_AUTO_COMPACT,
+ qualifiedTableName);
+ }
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.error("Caught exception while checking compaction eligibility.", e);
+ try {
+ ci.errorMessage = e.getMessage();
+ txnHandler.markFailed(ci);
+ } catch (MetaException ex) {
+ LOG.error("Caught exception while marking compaction as failed.", e);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ private boolean replIsCompactionDisabledForTable(Table tbl) {
+ // Compaction is disabled until after first successful incremental load.
Check HIVE-21197 for more detail.
+ boolean isCompactDisabled =
ReplUtils.isFirstIncPending(tbl.getParameters());
+ if (isCompactDisabled) {
+ LOG.info("Compaction is disabled for table {}", tbl.getTableName());
+ }
+ return isCompactDisabled;
+ }
+
+ private boolean replIsCompactionDisabledForDatabase(String dbName) throws
TException {
+ try {
+ Database database =
getMSForConf(conf).getDatabase(getDefaultCatalog(conf), dbName);
+ // Compaction is disabled until after first successful incremental load.
Check HIVE-21197 for more detail.
+ boolean isReplCompactDisabled =
ReplUtils.isFirstIncPending(database.getParameters());
+ if (isReplCompactDisabled) {
+ LOG.info("Compaction is disabled for database {}", dbName);
+ }
+ return isReplCompactDisabled;
+ } catch (NoSuchObjectException e) {
+ LOG.info("Unable to find database {}", dbName);
+ return true;
+ }
+ }
+
+ protected boolean foundCurrentOrFailedCompactions(ShowCompactResponse
compactions, CompactionInfo ci) throws MetaException {
+ if (compactions.getCompacts() == null) {
+ return false;
+ }
+
+ //In case of an aborted Dynamic partition insert, the created entry in the
compaction queue does not contain
+ //a partition name even for partitioned tables. As a result it can happen
that the ShowCompactResponse contains
+ //an element without partition name for partitioned tables. Therefore, it
is necessary to null check the partition
+ //name of the ShowCompactResponseElement even if the
CompactionInfo.partName is not null. These special compaction
+ //requests are skipped by the worker, and only cleaner will pick them up,
so we should allow to schedule a 'normal'
+ //compaction for partitions of those tables which has special (DP abort)
entry with undefined partition name.
+ List<ShowCompactResponseElement> filteredElements =
compactions.getCompacts().stream()
+ .filter(e -> e.getDbname().equals(ci.dbname)
+ && e.getTablename().equals(ci.tableName)
+ && (e.getPartitionname() == null && ci.partName == null ||
+ (Objects.equals(e.getPartitionname(),ci.partName))))
+ .toList();
+
+ // Figure out if there are any currently running compactions on the same
table or partition.
+ if (filteredElements.stream().anyMatch(
+ e -> TxnStore.WORKING_RESPONSE.equals(e.getState()) ||
TxnStore.INITIATED_RESPONSE.equals(e.getState()))) {
+
+ LOG.info("Found currently initiated or working compaction for {} so we
will not initiate another compaction",
+ ci.getFullPartitionName());
+ return true;
+ }
+
+ // Check if there is already sufficient number of consecutive failures for
this table/partition
+ // so that no new automatic compactions needs to be scheduled.
+ int failedThreshold = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+ LongSummaryStatistics failedStats = filteredElements.stream()
+ .filter(e -> TxnStore.SUCCEEDED_RESPONSE.equals(e.getState()) ||
TxnStore.FAILED_RESPONSE.equals(e.getState()))
+
.sorted(Comparator.comparingLong(ShowCompactResponseElement::getId).reversed())
+ .limit(failedThreshold)
+
+ .filter(e -> TxnStore.FAILED_RESPONSE.equals(e.getState()))
+
.collect(Collectors.summarizingLong(ShowCompactResponseElement::getEnqueueTime));
+
+ // If the last attempt was too long ago, ignore the failed threshold and
try compaction again
+ long retryTime = MetastoreConf.getTimeVar(conf,
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME,
TimeUnit.MILLISECONDS);
+
+ boolean needsRetry = (retryTime > 0) && (failedStats.getMax() + retryTime
< System.currentTimeMillis());
+ if (failedStats.getCount() == failedThreshold && !needsRetry) {
+ LOG.warn("Will not initiate compaction for {} since last {} attempts to
compact it failed.",
+ ci.getFullPartitionName(),
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
+
+ ci.errorMessage = "Compaction is not initiated since last " +
+ MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + "
consecutive compaction attempts failed)";
+
+ txnHandler.markFailed(ci);
+ return true;
+ }
+ return false;
+ }
+
+ protected Database resolveDatabase(CompactionInfo ci) throws MetaException,
NoSuchObjectException {
+ return CompactorUtil.resolveDatabase(conf, ci.dbname);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 21f7ddfadf3..fe10a762bf9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -123,6 +123,8 @@ void initHiveConf() {
hiveConf.set("tez.grouping.max-size", "10");
hiveConf.set("tez.grouping.min-size", "1");
databaseProduct = determineDatabaseProduct(DatabaseProduct.DERBY_NAME,
hiveConf);
+ MetastoreConf.setVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
+ "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
}
void setUpInternal() throws Exception {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 38484534b77..ec2f5dacd75 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -142,6 +142,8 @@ protected final void setup(HiveConf conf) throws Exception {
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId());
+ MetastoreConf.setVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
+ "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
// Set this config to true in the base class, there are extended test
classes which set this config to false.
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
TestTxnDbUtil.setConfValues(conf);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index c98b310ca25..636550aeaca 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -1001,7 +1001,7 @@ public void resolveUserToRunAs() throws Exception {
initiator.setConf(conf);
initiator.init(new AtomicBoolean(true));
doThrow(new RuntimeException("This was thrown on purpose by
testInitiatorFailure"))
- .when(initiator).resolveTable(any());
+ .when(initiator).resolvePartition(any());
initiator.run();
// verify status of table compaction
@@ -1134,7 +1134,7 @@ public void testMetaCache() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(2, compacts.size());
- Mockito.verify(initiator, times(1)).resolveTable(Mockito.any());
+ Mockito.verify(initiator, times(2)).resolvePartition(Mockito.any());
}
private static FindNextCompactRequest aFindNextCompactRequest(String
workerId, String workerVersion) {
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9cf5361c4f8..3627517b441 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -653,6 +653,11 @@ public enum ConfVars {
COMPACTOR_INITIATOR_TABLECACHE_ON("metastore.compactor.initiator.tablecache.on",
"hive.compactor.initiator.tablecache.on", true,
"Enable table caching in the initiator. Currently the cache is cleaned
after each cycle."),
+ COMPACTOR_INITIATOR_TABLE_OPTIMIZERS("compactor.table.optimizers",
+ "hive.compactor.table.optimizers",
+ "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer," +
+ "org.apache.iceberg.mr.hive.compaction.IcebergTableOptimizer",
+ "Comma separated list of table optimizers executed by compaction
Initiator."),
COMPACTOR_WORKER_THREADS("metastore.compactor.worker.threads",
"hive.compactor.worker.threads", 0,
"How many compactor worker threads to run on this metastore instance.
Set this to a\n" +