This is an automated email from the ASF dual-hosted git repository.
difin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0f6868b55db HIVE-29028: Iceberg: Addendum: Retrieve tables without
session (#5944)
0f6868b55db is described below
commit 0f6868b55dba877c0cca897fed1c47deb7bab4c3
Author: Dmitriy Fingerman <[email protected]>
AuthorDate: Thu Jul 10 23:22:32 2025 -0400
HIVE-29028: Iceberg: Addendum: Retrieve tables without session (#5944)
---
.../apache/iceberg/mr/hive/IcebergTableUtil.java | 11 +++++
.../mr/hive/compaction/IcebergTableOptimizer.java | 47 ++++++----------------
.../metastore/task/IcebergHouseKeeperService.java | 14 +------
.../task/TestIcebergHouseKeeperService.java | 3 +-
4 files changed, 25 insertions(+), 50 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 2e4e5f0a094..d52775ead8f 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -39,11 +39,13 @@
import org.apache.hadoop.hive.common.type.TimestampTZ;
import org.apache.hadoop.hive.common.type.TimestampTZUtil;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -572,4 +574,13 @@ public static boolean hasUndergonePartitionEvolution(Table
table) {
.map(ManifestFile::partitionSpecId)
.anyMatch(id -> id != table.spec().specId());
}
+
+ public static TableFetcher getTableFetcher(IMetaStoreClient msc, String
catalogName, String dbPattern,
+ String tablePattern) {
+ return new TableFetcher.Builder(msc, catalogName, dbPattern,
tablePattern).tableTypes(
+ "EXTERNAL_TABLE")
+ .tableCondition(
+ hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type
like \"ICEBERG\" ")
+ .build();
+ }
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
index ab785844461..76badb57204 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergTableOptimizer.java
@@ -38,13 +38,11 @@
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
import org.apache.hadoop.hive.ql.txn.compactor.TableOptimizer;
-import org.apache.hive.iceberg.org.apache.orc.storage.common.TableName;
import org.apache.iceberg.hive.RuntimeMetaException;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.mr.hive.compaction.evaluator.CompactionEvaluator;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.thrift.TException;
public class IcebergTableOptimizer extends TableOptimizer {
private HiveMetaStoreClient client;
@@ -71,34 +69,21 @@ public IcebergTableOptimizer(HiveConf conf, TxnStore
txnHandler, MetadataCache m
* @param skipTables A {@link Set} of fully qualified table names to
explicitly skip during the scan.
* @return A {@link Set} of {@link CompactionInfo} objects representing
tables and/or partitions
* identified as eligible for compaction.
- * @throws MetaException If an unrecoverable error occurs during Metastore
communication or
* during {@link SessionState} initialization.
*/
@Override
public Set<CompactionInfo> findPotentialCompactions(long lastChecked,
ShowCompactResponse currentCompactions,
- Set<String> skipDBs, Set<String> skipTables) throws MetaException {
+ Set<String> skipDBs, Set<String> skipTables) {
Set<CompactionInfo> compactionTargets = Sets.newHashSet();
- try {
- SessionState sessionState = SessionState.get();
- if (sessionState == null) {
- sessionState = new SessionState(conf);
- SessionState.start(sessionState);
- }
- } catch (Exception e) {
- throw new MetaException(String.format("Error while finding compaction
targets for Iceberg tables: %s",
- e.getMessage()));
- }
- getDatabases().stream()
- .filter(dbName -> !skipDBs.contains(dbName))
- .flatMap(dbName -> getTables(dbName).stream()
- .map(tableName -> TableName.getDbTable(dbName, tableName)))
- .filter(qualifiedTableName -> !skipTables.contains(qualifiedTableName))
- .map(qualifiedTableName -> Pair.of(qualifiedTableName,
resolveMetastoreTable(qualifiedTableName)))
+ getTables().stream()
+ .filter(table -> !skipDBs.contains(table.getDb()))
+ .filter(table -> !skipTables.contains(table.getNotEmptyDbTable()))
+ .map(table -> Pair.of(table,
resolveMetastoreTable(table.getNotEmptyDbTable())))
.filter(tablePair ->
MetaStoreUtils.isIcebergTable(tablePair.getValue().getParameters()))
.filter(tablePair -> {
long currentSnapshotId =
Long.parseLong(tablePair.getValue().getParameters().get("current-snapshot-id"));
- Long cachedSnapshotId = snapshotIdCache.get(tablePair.getKey());
+ Long cachedSnapshotId =
snapshotIdCache.get(tablePair.getKey().getNotEmptyDbTable());
return cachedSnapshotId == null || cachedSnapshotId !=
currentSnapshotId;
})
.forEach(tablePair -> {
@@ -116,25 +101,17 @@ public Set<CompactionInfo> findPotentialCompactions(long
lastChecked, ShowCompac
currentCompactions, skipDBs, skipTables);
}
- snapshotIdCache.put(tablePair.getKey(),
icebergTable.currentSnapshot().snapshotId());
+ snapshotIdCache.put(tablePair.getKey().getNotEmptyDbTable(),
icebergTable.currentSnapshot().snapshotId());
});
return compactionTargets;
}
- private List<String> getDatabases() {
- try {
- return client.getAllDatabases();
- } catch (TException e) {
- throw new RuntimeMetaException(e, "Error getting database names");
- }
- }
-
- private List<String> getTables(String dbName) {
+ private List<org.apache.hadoop.hive.common.TableName> getTables() {
try {
- return client.getAllTables(dbName);
- } catch (TException e) {
- throw new RuntimeMetaException(e, "Error getting table names of %s
database", dbName);
+ return IcebergTableUtil.getTableFetcher(client, null, "*",
null).getTables();
+ } catch (Exception e) {
+ throw new RuntimeMetaException(e, "Error getting table names");
}
}
@@ -157,7 +134,7 @@ private Table resolveMetastoreTable(String
qualifiedTableName) {
}
public void init() throws MetaException {
- client = new HiveMetaStoreClient(conf);
+ client = new HiveMetaStoreClient(new HiveConf());
snapshotIdCache = Maps.newConcurrentMap();
}
diff --git
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
index 8c2e010c238..0478d9c5c0f 100644
---
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/metastore/task/IcebergHouseKeeperService.java
@@ -30,16 +30,13 @@
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.NoMutex;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.TableFetcher;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
-import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +81,7 @@ private void expireTables(String catalogName, String
dbPattern, String tablePatt
try (IMetaStoreClient msc = new HiveMetaStoreClient(conf)) {
// TODO: HIVE-28952 – modify TableFetcher to return HMS Table API
objects directly,
// avoiding the need for subsequent msc.getTable calls to fetch each
matched table individually
- List<TableName> tables = getTableFetcher(msc, catalogName, dbPattern,
tablePattern).getTables();
+ List<TableName> tables = IcebergTableUtil.getTableFetcher(msc,
catalogName, dbPattern, tablePattern).getTables();
LOG.debug("{} candidate tables found", tables.size());
@@ -101,15 +98,6 @@ private void expireTables(String catalogName, String
dbPattern, String tablePatt
}
}
- @VisibleForTesting
- TableFetcher getTableFetcher(IMetaStoreClient msc, String catalogName,
String dbPattern, String tablePattern) {
- return new TableFetcher.Builder(msc, catalogName, dbPattern,
tablePattern).tableTypes(
- "EXTERNAL_TABLE")
- .tableCondition(
- hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS + "table_type
like \"ICEBERG\" ")
- .build();
- }
-
private Table getIcebergTable(TableName tableName, IMetaStoreClient msc) {
return tableCache.get(tableName, key -> {
LOG.debug("Getting iceberg table from metastore as it's not present in
table cache: {}", tableName);
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
index cd77beac66f..ca248e9b1a0 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/metastore/task/TestIcebergHouseKeeperService.java
@@ -67,8 +67,7 @@ public static void afterClass() {
public void testIcebergTableFetched() throws Exception {
createIcebergTable("iceberg_table");
- IcebergHouseKeeperService service = new IcebergHouseKeeperService();
- TableFetcher tableFetcher = service.getTableFetcher(db.getMSC(), null,
"default", "*");
+ TableFetcher tableFetcher = IcebergTableUtil.getTableFetcher(db.getMSC(),
null, "default", "*");
List<TableName> tables = tableFetcher.getTables();
Assert.assertEquals(new TableName("hive", "default", "iceberg_table"),
tables.get(0));