This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 430f031ae8f HIVE-28952: Addendum: Process Table objects in batches 
using TableIterable (#6039)
430f031ae8f is described below

commit 430f031ae8f133eca20b3b28e3242f5b15a1c5d2
Author: Neeraj Khatri <[email protected]>
AuthorDate: Thu Aug 28 13:58:17 2025 +0530

    HIVE-28952: Addendum: Process Table objects in batches using TableIterable 
(#6039)
---
 .../mr/hive/compaction/IcebergTableOptimizer.java  | 81 +++++++++-------------
 .../metastore/task/IcebergHouseKeeperService.java  | 22 ++----
 .../task/TestIcebergHouseKeeperService.java        | 10 +--
 .../hadoop/hive/metastore/utils/TableFetcher.java  | 35 ++++++----
 4 files changed, 65 insertions(+), 83 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index c04e8c013cd..dfc352ee0da 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -30,21 +30,19 @@
 import java.util.concurrent.Future;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
 import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
 import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
-import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.Snapshot;
@@ -85,62 +83,47 @@ public IcebergTableOptimizer(HiveConf conf, TxnStore 
txnHandler, MetadataCache m
    */
   @Override
   public Set<CompactionInfo> findPotentialCompactions(long lastChecked, 
ShowCompactResponse currentCompactions,
-      Set<String> skipDBs, Set<String> skipTables) {
+                                                      Set<String> skipDBs, 
Set<String> skipTables) {
     Set<CompactionInfo> compactionTargets = Sets.newHashSet();
 
-    getTableNames().stream()
-        .filter(table -> !skipDBs.contains(table.getDb()))
-        .filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
-        .map(table -> {
-          org.apache.hadoop.hive.ql.metadata.Table hiveTable = 
getHiveTable(table.getDb(), table.getTable());
-          org.apache.iceberg.Table icebergTable = 
IcebergTableUtil.getTable(conf, hiveTable.getTTable());
-          return Pair.of(hiveTable, icebergTable);
-        })
-        .filter(t -> hasNewCommits(t.getRight(),
-            snapshotTimeMilCache.get(t.getLeft().getFullyQualifiedName())))
-        .forEach(t -> {
-          String qualifiedTableName = t.getLeft().getFullyQualifiedName();
-          org.apache.hadoop.hive.ql.metadata.Table hiveTable = t.getLeft();
-          org.apache.iceberg.Table icebergTable = t.getRight();
-
-          if (icebergTable.spec().isPartitioned()) {
-            List<org.apache.hadoop.hive.ql.metadata.Partition> partitions = 
findModifiedPartitions(hiveTable,
-                icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
true);
-
-            partitions.forEach(partition -> 
addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable,
-                partition.getName(), compactionTargets, currentCompactions, 
skipDBs, skipTables));
-
-            if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) 
&& !findModifiedPartitions(hiveTable,
-                icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
false).isEmpty()) {
-              addCompactionTargetIfEligible(hiveTable.getTTable(), 
icebergTable,
-                  null, compactionTargets, currentCompactions, skipDBs, 
skipTables);
-            }
-          } else {
-            addCompactionTargetIfEligible(hiveTable.getTTable(), icebergTable, 
null, compactionTargets,
-                currentCompactions, skipDBs, skipTables);
-          }
+    Iterable<Table> tables = getTables(skipDBs, skipTables);
 
-          snapshotTimeMilCache.put(qualifiedTableName, 
icebergTable.currentSnapshot().timestampMillis());
-        });
+    for (Table table : tables) {
+      org.apache.hadoop.hive.ql.metadata.Table hiveTable = new 
org.apache.hadoop.hive.ql.metadata.Table(table);
+      org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, 
table);
+      String qualifiedTableName = hiveTable.getFullyQualifiedName();
 
-    return compactionTargets;
-  }
+      if (hasNewCommits(icebergTable, 
snapshotTimeMilCache.get(qualifiedTableName))) {
+        if (icebergTable.spec().isPartitioned()) {
+          List<org.apache.hadoop.hive.ql.metadata.Partition> partitions = 
findModifiedPartitions(hiveTable,
+              icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
true);
 
-  private List<org.apache.hadoop.hive.common.TableName> getTableNames() {
-    try {
-      return IcebergTableUtil.getTableFetcher(client, null, "*", 
null).getTableNames();
-    } catch (Exception e) {
-      throw new RuntimeMetaException(e, "Error getting table names");
+          partitions.forEach(partition -> addCompactionTargetIfEligible(table, 
icebergTable,
+              partition.getName(), compactionTargets, currentCompactions, 
skipDBs, skipTables));
+
+          if (IcebergTableUtil.hasUndergonePartitionEvolution(icebergTable) && 
!findModifiedPartitions(hiveTable,
+              icebergTable, snapshotTimeMilCache.get(qualifiedTableName), 
false).isEmpty()) {
+            addCompactionTargetIfEligible(table, icebergTable,
+                null, compactionTargets, currentCompactions, skipDBs, 
skipTables);
+          }
+        } else {
+          addCompactionTargetIfEligible(table, icebergTable, null, 
compactionTargets,
+              currentCompactions, skipDBs, skipTables);
+        }
+
+        snapshotTimeMilCache.put(qualifiedTableName, 
icebergTable.currentSnapshot().timestampMillis());
+      }
     }
+
+    return compactionTargets;
   }
 
-  private org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String dbName, 
String tableName) {
+  private Iterable<Table> getTables(Set<String> skipDBs, Set<String> 
skipTables) {
     try {
-      Table metastoreTable = 
metadataCache.computeIfAbsent(TableName.getDbTable(dbName, tableName), () ->
-          CompactorUtil.resolveTable(conf, dbName, tableName));
-      return new org.apache.hadoop.hive.ql.metadata.Table(metastoreTable);
+      int maxBatchSize = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+      return IcebergTableUtil.getTableFetcher(client, null, "*", 
null).getTables(skipDBs, skipTables, maxBatchSize);
     } catch (Exception e) {
-      throw new RuntimeMetaException(e, "Error getting Hive table for %s.%s", 
dbName, tableName);
+      throw new RuntimeMetaException(e, "Error getting tables");
     }
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
index aa732e27eaf..13ab0ca1ab2 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
@@ -18,13 +18,9 @@
 
 package org.apache.iceberg.mr.hive.metastore.task;
 
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -47,12 +43,6 @@ public class IcebergHouseKeeperService implements 
MetastoreTaskThread {
   private boolean shouldUseMutex;
   private ExecutorService deleteExecutorService = null;
 
-  // table cache to avoid making repeated requests for the same Iceberg tables 
more than once per day
-  private final Cache<TableName, Table> tableCache = Caffeine.newBuilder()
-      .maximumSize(1000)
-      .expireAfterWrite(1, TimeUnit.DAYS)
-      .build();
-
   @Override
   public long runFrequency(TimeUnit unit) {
     return MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.ICEBERG_TABLE_EXPIRY_INTERVAL, unit);
@@ -78,22 +68,18 @@ public void run() {
   private void expireTables(String catalogName, String dbPattern, String 
tablePattern) {
     try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
       int maxBatchSize = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
-      List<org.apache.hadoop.hive.metastore.api.Table> tables =
+      Iterable<org.apache.hadoop.hive.metastore.api.Table> tables =
           IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern, 
tablePattern).getTables(maxBatchSize);
-      LOG.debug("{} candidate tables found", tables.size());
+      // TODO : HIVE-29163 - Create client with cache in metastore package and 
then use it in TableFetcher
+      //  and HiveTableOperations to reduce the number of msc calls and fetch 
it from cache
       for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
-        expireSnapshotsForTable(getIcebergTable(table));
+        expireSnapshotsForTable(IcebergTableUtil.getTable(conf, table));
       }
     } catch (Exception e) {
       throw new RuntimeException("Error while getting tables from metastore", 
e);
     }
   }
 
-  private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table 
table) {
-    TableName tableName = TableName.fromString(table.getTableName(), 
table.getCatName(), table.getDbName());
-    return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf, 
table));
-  }
-
   /**
    * Deletes snapshots of an Iceberg table, using the number of threads 
defined by the
    * Hive config HIVE_ICEBERG_EXPIRE_SNAPSHOT_NUMTHREADS.
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
index 1635b352781..4b56edbd07a 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -70,10 +71,11 @@ public void testIcebergTableFetched() throws Exception {
     TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(), 
null, "default", "*");
 
     int maxBatchSize = MetastoreConf.getIntVar(conf, 
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
-    List<org.apache.hadoop.hive.metastore.api.Table> tables = 
tableFetcher.getTables(maxBatchSize);
-    Assertions.assertEquals("hive", tables.get(0).getCatName());
-    Assertions.assertEquals("default", tables.get(0).getDbName());
-    Assertions.assertEquals("iceberg_table", tables.get(0).getTableName());
+    Iterator<org.apache.hadoop.hive.metastore.api.Table> tables = 
tableFetcher.getTables(maxBatchSize).iterator();
+    org.apache.hadoop.hive.metastore.api.Table table = tables.next();
+    Assertions.assertEquals("hive", table.getCatName());
+    Assertions.assertEquals("default", table.getDbName());
+    Assertions.assertEquals("iceberg_table", table.getTableName());
   }
 
   @Test
diff --git 
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
 
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
index 8b3c08aa72b..8381fa03263 100644
--- 
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
+++ 
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hive.metastore.utils;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterators;
+import java.util.Collections;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableIterable;
@@ -110,24 +112,33 @@ public List<TableName> getTableNames() throws Exception {
     return candidates;
   }
 
-  public List<Table> getTables(int maxBatchSize) throws Exception {
-    List<Table> candidates = new ArrayList<>();
-
+  public Iterable<Table> getTables(Set<String> skipDBs, Set<String> 
skipTables, int maxBatchSize) throws Exception {
     // if tableTypes is empty, then a list with single empty string has to 
specified to scan no tables.
     if (tableTypes.isEmpty()) {
       LOG.info("Table fetcher returns empty list as no table types specified");
-      return candidates;
+      return Collections.emptyList();
     }
 
-    List<String> databases = client.getDatabases(catalogName, dbPattern);
+    List<String> databases = client.getDatabases(catalogName, 
dbPattern).stream()
+        .filter(dbName -> skipDBs == null || !skipDBs.contains(dbName))
+        .toList();
+
+    return () -> Iterators.concat(
+        Iterators.transform(databases.iterator(), db -> {
+          try {
+            List<String> tableNames = getTableNamesForDatabase(catalogName, 
db).stream()
+                .filter(tableName -> skipTables == null || 
!skipTables.contains(TableName.getDbTable(db, tableName)))
+                .toList();
+            return new TableIterable(client, db, tableNames, 
maxBatchSize).iterator();
+          } catch (Exception e) {
+            throw new RuntimeException("Failed to fetch tables for db: " + db, 
e);
+          }
+        })
+    );
+  }
 
-    for (String db : databases) {
-      List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
-      for (Table table : new TableIterable(client, db, tablesNames, 
maxBatchSize)) {
-        candidates.add(table);
-      }
-    }
-    return candidates;
+  public Iterable<Table> getTables(int maxBatchSize) throws Exception {
+    return getTables(null, null, maxBatchSize);
   }
 
   private List<String> getTableNamesForDatabase(String catalogName, String 
dbName) throws Exception {

Reply via email to