This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 430f031ae8f HIVE-28952: Addendum: Process Table objects in batches
using TableIterable (#6039)
430f031ae8f is described below
commit 430f031ae8f133eca20b3b28e3242f5b15a1c5d2
Author: Neeraj Khatri <[email protected]>
AuthorDate: Thu Aug 28 13:58:17 2025 +0530
HIVE-28952: Addendum: Process Table objects in batches using TableIterable
(#6039)
---
.../mr/hive/compaction/IcebergTableOptimizer.java | 81 +++++++++-------------
.../metastore/task/IcebergHouseKeeperService.java | 22 ++----
.../task/TestIcebergHouseKeeperService.java | 10 +--
.../hadoop/hive/metastore/utils/TableFetcher.java | 35 ++++++----
4 files changed, 65 insertions(+), 83 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index c04e8c013cd..dfc352ee0da 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -30,21 +30,19 @@
import java.util.concurrent.Future;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
-import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.Snapshot;
@@ -85,62 +83,47 @@ public IcebergTableOptimizer(HiveConf conf, TxnStore
txnHandler, MetadataCache m
*/
@Override
public Set<CompactionInfo> findPotentialCompactions(long lastChecked,
ShowCompactResponse currentCompactions,
- Set<String> skipDBs, Set<String> skipTables) {
+ Set<String> skipDBs,
Set<String> skipTables) {
Set<CompactionInfo> compactionTargets = Sets.newHashSet();
- getTableNames().stream()
- .filter(table -> !skipDBs.contains(table.getDb()))
- .filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
- .map(table -> {
- org.apache.hadoop.hive.ql.metadata.Table hiveTable =
getHiveTable(table.getDb(), table.getTable());
- org.apache.iceberg.Table icebergTable =
IcebergTableUtil.getTable(conf, hiveTable.getTTable());
- return Pair.of(hiveTable, icebergTable);
- })
- .filter(t -> hasNewCommits(t.getRight(),
- snapshotTimeMilCache.get(t.getLeft().getFullyQualifiedName())))
- .forEach(t -> {
- String qualifiedTableName = t.getLeft().getFullyQualifiedName();
- org.apache.hadoop.hive.ql.metadata.Table hiveTable = t.getLeft();
- org.apache.iceberg.Table icebergTable = t.getRight();
-
- if (icebergTable.spec().isPartitioned()) {
- List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
findModifiedPartitions(hiveTable,
- icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
true);
-
- partitions.forEach(partition ->
addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
- partition.getName(), compactionTargets, currentCompactions,
skipDBs, skipTables));
-
- if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable)
&& !findModifiedPartitions(hiveTable,
- icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
false).isEmpty()) {
- addCompactionTargetIfEligible(hiveTable.getTTable(),
icebergTable,
- null, compactionTargets, currentCompactions, skipDBs,
skipTables);
- }
- } else {
- addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
null, compactionTargets,
- currentCompactions, skipDBs, skipTables);
- }
+ Iterable<Table> tables = getTables(skipDBs, skipTables);
- snapshotTimeMilCache.put(qualifiedTableName,
icebergTable.currentSnapshot().timestampMillis());
- });
+ for (Table table : tables) {
+ org.apache.hadoop.hive.ql.metadata.Table hiveTable = new
org.apache.hadoop.hive.ql.metadata.Table(table);
+ org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf,
table);
+ String qualifiedTableName = hiveTable.getFullyQualifiedName();
- return compactionTargets;
- }
+ if (hasNewCommits(icebergTable,
snapshotTimeMilCache.get(qualifiedTableName))) {
+ if (icebergTable.spec().isPartitioned()) {
+ List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
findModifiedPartitions(hiveTable,
+ icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
true);
- private List<org.apache.hadoop.hive.common.TableName> getTableNames() {
- try {
- return IcebergTableUtil.getTableFetcher(client, null, "*",
null).getTableNames();
- } catch (Exception e) {
- throw new RuntimeMetaException(e, "Error getting table names");
+ partitions.forEach(partition -> addCompactionTargetIfEligible(table,
icebergTable,
+ partition.getName(), compactionTargets, currentCompactions,
skipDBs, skipTables));
+
+ if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) &&
!findModifiedPartitions(hiveTable,
+ icebergTable, snapshotTimeMilCache.get(qualifiedTableName),
false).isEmpty()) {
+ addCompactionTargetIfEligible(table, icebergTable,
+ null, compactionTargets, currentCompactions, skipDBs,
skipTables);
+ }
+ } else {
+ addCompactionTargetIfEligible(table, icebergTable, null,
compactionTargets,
+ currentCompactions, skipDBs, skipTables);
+ }
+
+ snapshotTimeMilCache.put(qualifiedTableName,
icebergTable.currentSnapshot().timestampMillis());
+ }
}
+
+ return compactionTargets;
}
- private org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String dbName,
String tableName) {
+ private Iterable<Table> getTables(Set<String> skipDBs, Set<String>
skipTables) {
try {
- Table metastoreTable =
metadataCache.computeIfAbsent(TableName.getDbTable(dbName, tableName), () ->
- CompactorUtil.resolveTable(conf, dbName, tableName));
- return new org.apache.hadoop.hive.ql.metadata.Table(metastoreTable);
+ int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+ return IcebergTableUtil.getTableFetcher(client, null, "*",
null).getTables(skipDBs, skipTables, maxBatchSize);
} catch (Exception e) {
- throw new RuntimeMetaException(e, "Error getting Hive table for %s.%s",
dbName, tableName);
+ throw new RuntimeMetaException(e, "Error getting tables");
}
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
index aa732e27eaf..13ab0ca1ab2 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
@@ -18,13 +18,9 @@
package org.apache.iceberg.mr.hive.metastore.task;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -47,12 +43,6 @@ public class IcebergHouseKeeperService implements
MetastoreTaskThread {
private boolean shouldUseMutex;
private ExecutorService deleteExecutorService = null;
- // table cache to avoid making repeated requests for the same Iceberg tables
more than once per day
- private final Cache<TableName, Table> tableCache = Caffeine.newBuilder()
- .maximumSize(1000)
- .expireAfterWrite(1, TimeUnit.DAYS)
- .build();
-
@Override
public long runFrequency(TimeUnit unit) {
return MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_INTERVAL, unit);
@@ -78,22 +68,18 @@ public void run() {
private void expireTables(String catalogName, String dbPattern, String
tablePattern) {
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
- List<org.apache.hadoop.hive.metastore.api.Table> tables =
+ Iterable<org.apache.hadoop.hive.metastore.api.Table> tables =
IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern,
tablePattern).getTables(maxBatchSize);
- LOG.debug("{} candidate tables found", tables.size());
+ // TODO : HIVE-29163 - Create client with cache in metastore package and
then use it in TableFetcher
+ // and HiveTableOperations to reduce the number of msc calls and fetch
it from cache
for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
- expireSnapshotsForTable(getIcebergTable(table));
+ expireSnapshotsForTable(IcebergTableUtil.getTable(conf, table));
}
} catch (Exception e) {
throw new RuntimeException("Error while getting tables from metastore",
e);
}
}
- private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table
table) {
- TableName tableName = TableName.fromString(table.getTableName(),
table.getCatName(), table.getDbName());
- return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf,
table));
- }
-
/**
* Deletes snapshots of an Iceberg table, using the number of threads
defined by the
* Hive config HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
index 1635b352781..4b56edbd07a 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
@@ -21,6 +21,7 @@
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,10 +71,11 @@ public void testIcebergTableFetched() throws Exception {
TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(),
null, "default", "*");
int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
- List<org.apache.hadoop.hive.metastore.api.Table> tables =
tableFetcher.getTables(maxBatchSize);
- Assertions.assertEquals("hive", tables.get(0).getCatName());
- Assertions.assertEquals("default", tables.get(0).getDbName());
- Assertions.assertEquals("iceberg_table", tables.get(0).getTableName());
+ Iterator<org.apache.hadoop.hive.metastore.api.Table> tables =
tableFetcher.getTables(maxBatchSize).iterator();
+ org.apache.hadoop.hive.metastore.api.Table table = tables.next();
+ Assertions.assertEquals("hive", table.getCatName());
+ Assertions.assertEquals("default", table.getDbName());
+ Assertions.assertEquals("iceberg_table", table.getTableName());
}
@Test
diff --git
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
index 8b3c08aa72b..8381fa03263 100644
---
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
+++
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.metastore.utils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import java.util.Collections;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableIterable;
@@ -110,24 +112,33 @@ public List<TableName> getTableNames() throws Exception {
return candidates;
}
- public List<Table> getTables(int maxBatchSize) throws Exception {
- List<Table> candidates = new ArrayList<>();
-
+ public Iterable<Table> getTables(Set<String> skipDBs, Set<String>
skipTables, int maxBatchSize) throws Exception {
// if tableTypes is empty, then a list with single empty string has to
specified to scan no tables.
if (tableTypes.isEmpty()) {
LOG.info("Table fetcher returns empty list as no table types specified");
- return candidates;
+ return Collections.emptyList();
}
- List<String> databases = client.getDatabases(catalogName, dbPattern);
+ List<String> databases = client.getDatabases(catalogName,
dbPattern).stream()
+ .filter(dbName -> skipDBs == null || !skipDBs.contains(dbName))
+ .toList();
+
+ return () -> Iterators.concat(
+ Iterators.transform(databases.iterator(), db -> {
+ try {
+ List<String> tableNames = getTableNamesForDatabase(catalogName,
db).stream()
+ .filter(tableName -> skipTables == null ||
!skipTables.contains(TableName.getDbTable(db, tableName)))
+ .toList();
+ return new TableIterable(client, db, tableNames,
maxBatchSize).iterator();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to fetch tables for db: " + db,
e);
+ }
+ })
+ );
+ }
- for (String db : databases) {
- List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
- for (Table table : new TableIterable(client, db, tablesNames,
maxBatchSize)) {
- candidates.add(table);
- }
- }
- return candidates;
+ public Iterable<Table> getTables(int maxBatchSize) throws Exception {
+ return getTables(null, null, maxBatchSize);
}
private List<String> getTableNamesForDatabase(String catalogName, String
dbName) throws Exception {