This is an automated email from the ASF dual-hosted git repository.
okumin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ac3ea055053 HIVE-28952: TableFetcher to return Table objects instead
of names (#6020)
ac3ea055053 is described below
commit ac3ea0550537510cf72ca796b5d8e5cb30494b45
Author: Neeraj Khatri <[email protected]>
AuthorDate: Fri Aug 15 11:52:41 2025 +0530
HIVE-28952: TableFetcher to return Table objects instead of names (#6020)
---
.../mr/hive/compaction/IcebergTableOptimizer.java | 6 +--
.../metastore/task/IcebergHouseKeeperService.java | 32 ++++----------
.../task/TestIcebergHouseKeeperService.java | 9 ++--
.../hadoop/hive/metastore/utils/TableFetcher.java | 50 +++++++++++++++++-----
.../hive/metastore/PartitionManagementTask.java | 2 +-
5 files changed, 57 insertions(+), 42 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index e7a4afa6bb4..c04e8c013cd 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -88,7 +88,7 @@ public Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompac
Set<String> skipDBs, Set<String> skipTables) {
Set<CompactionInfo> compactionTargets = Sets.newHashSet();
- getTables().stream()
+ getTableNames().stream()
.filter(table -> !skipDBs.contains(table.getDb()))
.filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
.map(table -> {
@@ -126,9 +126,9 @@ public Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompac
return compactionTargets;
}
- private List<org.apache.hadoop.hive.common.TableName> getTables() {
+ private List<org.apache.hadoop.hive.common.TableName> getTableNames() {
try {
- return IcebergTableUtil.getTableFetcher(client, null, "*",
null).getTables();
+ return IcebergTableUtil.getTableFetcher(client, null, "*",
null).getTableNames();
} catch (Exception e) {
throw new RuntimeMetaException(e, "Error getting table names");
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
index 0478d9c5c0f..aa732e27eaf 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
@@ -29,7 +29,6 @@
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
-import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -37,7 +36,6 @@
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,35 +77,21 @@ public void run() {
private void expireTables(String catalogName, String dbPattern, String
tablePattern) {
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
- // TODO: HIVE-28952 – modify TableFetcher to return HMS Table API
objects directly,
- // avoiding the need for subsequent msc.getTable calls to fetch each
matched table individually
- List<TableName> tables = IcebergTableUtil.getTableFetcher(msc,
catalogName, dbPattern, tablePattern).getTables();
-
+ int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+ List<org.apache.hadoop.hive.metastore.api.Table> tables =
+ IcebergTableUtil.getTableFetcher(msc, catalogName, dbPattern,
tablePattern).getTables(maxBatchSize);
LOG.debug("{} candidate tables found", tables.size());
-
- for (TableName table : tables) {
- try {
- expireSnapshotsForTable(getIcebergTable(table, msc));
- } catch (Exception e) {
- LOG.error("Exception while running iceberg expiry service on
catalog/db/table: {}/{}/{}",
- catalogName, dbPattern, tablePattern, e);
- }
+ for (org.apache.hadoop.hive.metastore.api.Table table : tables) {
+ expireSnapshotsForTable(getIcebergTable(table));
}
} catch (Exception e) {
throw new RuntimeException("Error while getting tables from metastore",
e);
}
}
- private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) {
- return tableCache.get(tableName, key -> {
- LOG.debug("Getting iceberg table from metastore as it's not present in
table cache: {}", tableName);
- GetTableRequest request = new GetTableRequest(tableName.getDb(),
tableName.getTable());
- try {
- return IcebergTableUtil.getTable(conf, msc.getTable(request));
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- });
+ private Table getIcebergTable(org.apache.hadoop.hive.metastore.api.Table
table) {
+ TableName tableName = TableName.fromString(table.getTableName(),
table.getCatName(), table.getDbName());
+ return tableCache.get(tableName, key -> IcebergTableUtil.getTable(conf,
table));
}
/**
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
index ca248e9b1a0..879d59b8de1 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
@@ -23,11 +23,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -69,8 +69,11 @@ public void testIcebergTableFetched() throws Exception {
TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(),
null, "default", "*");
- List<TableName> tables = tableFetcher.getTables();
- Assert.assertEquals(new TableName("hive", "default", "iceberg_table"),
tables.get(0));
+ int maxBatchSize = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX);
+ List<org.apache.hadoop.hive.metastore.api.Table> tables =
tableFetcher.getTables(maxBatchSize);
+ Assert.assertEquals("hive", tables.get(0).getCatName());
+ Assert.assertEquals("default", tables.get(0).getDbName());
+ Assert.assertEquals("iceberg_table", tables.get(0).getTableName());
}
@Test
diff --git
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
index 0e10635bc74..8b3c08aa72b 100644
---
a/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
+++
b/standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/utils/TableFetcher.java
@@ -20,9 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.TableIterable;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +92,7 @@ private void buildTableFilter(String tablePattern,
List<String> conditions) {
this.tableFilter = String.join(" and ", conditions);
}
- public List<TableName> getTables() throws Exception {
+ public List<TableName> getTableNames() throws Exception {
List<TableName> candidates = new ArrayList<>();
// if tableTypes is empty, then a list with single empty string has to
specified to scan no tables.
@@ -102,21 +104,47 @@ public List<TableName> getTables() throws Exception {
List<String> databases = client.getDatabases(catalogName, dbPattern);
for (String db : databases) {
- Database database = client.getDatabase(catalogName, db);
- if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
- LOG.debug("Skipping table under database: {}", db);
- continue;
- }
- if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
- LOG.info("Skipping table that belongs to database {} being failed
over.", db);
- continue;
- }
- List<String> tablesNames = client.listTableNamesByFilter(catalogName,
db, tableFilter, -1);
+ List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
tablesNames.forEach(tablesName ->
candidates.add(TableName.fromString(tablesName, catalogName, db)));
}
return candidates;
}
+ public List<Table> getTables(int maxBatchSize) throws Exception {
+ List<Table> candidates = new ArrayList<>();
+
+ // if tableTypes is empty, then a list with single empty string has to
specified to scan no tables.
+ if (tableTypes.isEmpty()) {
+ LOG.info("Table fetcher returns empty list as no table types specified");
+ return candidates;
+ }
+
+ List<String> databases = client.getDatabases(catalogName, dbPattern);
+
+ for (String db : databases) {
+ List<String> tablesNames = getTableNamesForDatabase(catalogName, db);
+ for (Table table : new TableIterable(client, db, tablesNames,
maxBatchSize)) {
+ candidates.add(table);
+ }
+ }
+ return candidates;
+ }
+
+ private List<String> getTableNamesForDatabase(String catalogName, String
dbName) throws Exception {
+ List<String> tableNames = new ArrayList<>();
+ Database database = client.getDatabase(catalogName, dbName);
+ if (MetaStoreUtils.checkIfDbNeedsToBeSkipped(database)) {
+ LOG.debug("Skipping table under database: {}", dbName);
+ return tableNames;
+ }
+ if (MetaStoreUtils.isDbBeingPlannedFailedOver(database)) {
+ LOG.info("Skipping table that belongs to database {} being failed
over.", dbName);
+ return tableNames;
+ }
+ tableNames = client.listTableNamesByFilter(catalogName, dbName,
tableFilter, -1);
+ return tableNames;
+ }
+
public static class Builder {
private final IMetaStoreClient client;
private final String catalogName;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index 0749985392f..fa9d5e2e9dd 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -101,7 +101,7 @@ public void run() {
.tableCondition(
hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS +
"discover__partitions like \"true\" ")
.build()
- .getTables();
+ .getTableNames();
if (candidates.isEmpty()) {
LOG.info("Got empty table list in catalog: {}, dbPattern: {}",
catalogName, dbPattern);