This is an automated email from the ASF dual-hosted git repository.
okumin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9234ddd400e HIVE-29035: Fixing cache handling for REST catalog (#5882)
9234ddd400e is described below
commit 9234ddd400e709b650023bbf48d2f7b67dffd8f1
Author: Henrib <[email protected]>
AuthorDate: Sat Aug 9 06:07:26 2025 +0200
HIVE-29035: Fixing cache handling for REST catalog (#5882)
* HIVE-29016: rebasing;
* HIVE-29016: fixing cache handling for REST catalog;
- add an event listener to invalidate cached tables impervious to source of
change (direct HMS or REST);
- added configuration option for event class handler;
- lengthened default cache TTL;
* HIVE-29016: changing default to disable catalog caching;
* HIVE-29016: rebasing;
* HIVE-29016: fixing rebasing;
* HIVE-29016: fixing dependency;
* HIVE-29016: revert to simpler cache but check that cached table location
is latest (Hive DB, get location) on loadTable() ensuring no-stale table is
returned;
* HIVE-29016: clean up;
* - Improved loadTable cache handling
- Reduced redundant calls (avoid super call);
- Call invalidateTable for thorough eviction;
* Update
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
Co-authored-by: Shohei Okumiya <[email protected]>
* HIVE-29035: removed extensibility from HMSCatalogFactory;
---------
Co-authored-by: Shohei Okumiya <[email protected]>
---
.../java/org/apache/iceberg/hive/HiveCatalog.java | 36 ++++++++++--
.../hadoop/hive/metastore/conf/MetastoreConf.java | 2 +-
.../org/apache/iceberg/rest/HMSCachingCatalog.java | 65 ++++++++++++++++++++--
.../org/apache/iceberg/rest/HMSCatalogFactory.java | 15 +++--
.../hadoop/hive/metastore/HiveMetaStore.java | 1 +
.../hadoop/hive/metastore/ServletSecurity.java | 10 ++++
6 files changed, 111 insertions(+), 18 deletions(-)
diff --git
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
index 72280449ad5..ce30a647c74 100644
---
a/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
+++
b/iceberg/iceberg-catalog/src/main/java/org/apache/iceberg/hive/HiveCatalog.java
@@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configurable;
@@ -31,6 +32,7 @@
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -408,23 +410,46 @@ private void validateTableIsIcebergTableOrView(
*/
@Override
public boolean tableExists(TableIdentifier identifier) {
+ return Objects.nonNull(fetchTable(identifier));
+ }
+
+ /**
+ * Check whether table or metadata table exists and return its location.
+ *
+ * <p>Note: If a hive table with the same identifier exists in catalog, this
method will return
+ * {@code null}.
+ *
+ * @param identifier a table identifier
+ * @return the location of the table if it exists, null otherwise
+ */
+ public String getTableLocation(TableIdentifier identifier) {
+ Table table = fetchTable(identifier);
+ if (table == null) {
+ return null;
+ }
+ return
table.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+ }
+
+ private Table fetchTable(TableIdentifier identifier) {
TableIdentifier baseTableIdentifier = identifier;
if (!isValidIdentifier(identifier)) {
if (!isValidMetadataIdentifier(identifier)) {
- return false;
+ return null;
} else {
baseTableIdentifier =
TableIdentifier.of(identifier.namespace().levels());
}
}
-
String database = baseTableIdentifier.namespace().level(0);
String tableName = baseTableIdentifier.name();
try {
- Table table = clients.run(client -> client.getTable(database,
tableName));
+ GetTableRequest request = new GetTableRequest();
+ request.setDbName(database);
+ request.setTblName(tableName);
+ Table table = clients.run(client -> client.getTable(request));
HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name,
baseTableIdentifier));
- return true;
+ return table;
} catch (NoSuchTableException | NoSuchObjectException e) {
- return false;
+ return null;
} catch (TException e) {
throw new RuntimeException("Failed to check table existence of " +
baseTableIdentifier, e);
} catch (InterruptedException e) {
@@ -434,6 +459,7 @@ public boolean tableExists(TableIdentifier identifier) {
}
}
+
@Override
public boolean viewExists(TableIdentifier viewIdentifier) {
if (!isValidIdentifier(viewIdentifier)) {
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 521cbcc2eac..ae7f37fb500 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1880,7 +1880,7 @@ public enum ConfVars {
"HMS Iceberg Catalog servlet path component of URL endpoint."
),
ICEBERG_CATALOG_CACHE_EXPIRY("metastore.iceberg.catalog.cache.expiry",
- "hive.metastore.iceberg.catalog.cache.expiry", -1,
+ "hive.metastore.iceberg.catalog.cache.expiry", 600_000L,
"HMS Iceberg Catalog cache expiry."
),
HTTPSERVER_THREADPOOL_MIN("hive.metastore.httpserver.threadpool.min",
diff --git
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java
index edb5fbd41a9..53d9d60faf4 100644
---
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java
+++
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java
@@ -19,12 +19,20 @@
package org.apache.iceberg.rest;
+import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Ticker;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -35,24 +43,22 @@
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.view.View;
import org.apache.iceberg.view.ViewBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class that wraps an Iceberg Catalog to cache tables.
*/
public class HMSCachingCatalog extends CachingCatalog implements
SupportsNamespaces, ViewCatalog {
+ private static final Logger LOG =
LoggerFactory.getLogger(HMSCachingCatalog.class);
private final HiveCatalog hiveCatalog;
public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
- super(catalog, true, expiration, Ticker.systemTicker());
+ super(catalog, false, expiration, Ticker.systemTicker());
this.hiveCatalog = catalog;
}
- @Override
- public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema
schema) {
- return hiveCatalog.buildTable(identifier, schema);
- }
-
@Override
public void createNamespace(Namespace nmspc, Map<String, String> map) {
hiveCatalog.createNamespace(nmspc, map);
@@ -63,6 +69,48 @@ public List<Namespace> listNamespaces(Namespace nmspc)
throws NoSuchNamespaceExc
return hiveCatalog.listNamespaces(nmspc);
}
+ @Override
+ public Table loadTable(TableIdentifier identifier) {
+ final Cache<TableIdentifier, Table> cache = this.tableCache;
+ final HiveCatalog catalog = this.hiveCatalog;
+ final TableIdentifier canonicalized = identifier.toLowerCase();
+ Table cachedTable = cache.getIfPresent(canonicalized);
+ if (cachedTable != null) {
+ String location = catalog.getTableLocation(canonicalized);
+ if (location == null) {
+ LOG.debug("Table {} has no location, returning cached table without
location", canonicalized);
+ } else if (!location.equals(cachedTable.location())) {
+ LOG.debug("Cached table {} has a different location than the one in
the catalog: {} != {}",
+ canonicalized, cachedTable.location(), location);
+ // Invalidate the cached table if the location is different
+ invalidateTable(canonicalized);
+ } else {
+ LOG.debug("Returning cached table: {}", canonicalized);
+ return cachedTable;
+ }
+ }
+ Table table = cache.get(canonicalized, catalog::loadTable);
+ if (table instanceof BaseMetadataTable) {
+ // Cache underlying table
+ TableIdentifier originTableIdentifier =
+ TableIdentifier.of(canonicalized.namespace().levels());
+ Table originTable = cache.get(originTableIdentifier, catalog::loadTable);
+ // Share TableOperations instance of origin table for all metadata
tables, so that metadata
+ // table instances are refreshed as well when origin table instance is
refreshed.
+ if (originTable instanceof HasTableOperations) {
+ TableOperations ops = ((HasTableOperations) originTable).operations();
+ MetadataTableType type = MetadataTableType.from(canonicalized.name());
+
+ Table metadataTable =
+ MetadataTableUtils.createMetadataTableInstance(
+ ops, catalog.name(), originTableIdentifier,
canonicalized, type);
+ cache.put(canonicalized, metadataTable);
+ return metadataTable;
+ }
+ }
+ return table;
+ }
+
@Override
public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws
NoSuchNamespaceException {
return hiveCatalog.loadNamespaceMetadata(nmspc);
@@ -92,6 +140,11 @@ public boolean namespaceExists(Namespace namespace) {
return hiveCatalog.namespaceExists(namespace);
}
+ @Override
+ public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema
schema) {
+ return hiveCatalog.buildTable(identifier, schema);
+ }
+
@Override
public List<TableIdentifier> listViews(Namespace namespace) {
return hiveCatalog.listViews(namespace);
diff --git
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
index 682e7c9e264..d30fee989de 100644
---
a/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
+++
b/standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogFactory.java
@@ -33,8 +33,10 @@
/**
* Catalog & servlet factory.
+ * <p>This class is derivable on purpose; the factory class name is a
configuration property, this class
+ * can serve as a base for specialization.</p>
*/
-public class HMSCatalogFactory {
+public final class HMSCatalogFactory {
private static final String SERVLET_ID_KEY =
"metastore.in.test.iceberg.catalog.servlet.id";
private final Configuration configuration;
@@ -52,12 +54,12 @@ private HMSCatalogFactory(Configuration conf) {
path = MetastoreConf.getVar(conf,
MetastoreConf.ConfVars.ICEBERG_CATALOG_SERVLET_PATH);
this.configuration = conf;
}
-
- public int getPort() {
+
+ private int getPort() {
return port;
}
-
- public String getPath() {
+
+ private String getPath() {
return path;
}
@@ -110,7 +112,8 @@ private HttpServlet createServlet(Catalog catalog) {
*/
private HttpServlet createServlet() {
if (port >= 0 && path != null && !path.isEmpty()) {
- return createServlet(createCatalog());
+ Catalog actualCatalog = createCatalog();
+ return createServlet(actualCatalog);
}
return null;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 180baef67bf..e3d0958c503 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -670,6 +670,7 @@ private static void
constraintHttpMethods(ServletContextHandler ctxHandler, bool
}
ctxHandler.setSecurityHandler(securityHandler);
}
+
/**
* Start Metastore based on a passed {@link HadoopThriftAuthBridge}.
*
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java
index 677e814e8c1..8afc689d010 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ServletSecurity.java
@@ -19,6 +19,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.auth.HttpAuthenticationException;
import org.apache.hadoop.hive.metastore.auth.jwt.JWTValidator;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -238,6 +239,15 @@ public void execute(HttpServletRequest request,
HttpServletResponse response, Me
Thread.currentThread().interrupt();
} catch (RuntimeException e) {
throw new IOException("Exception when executing http request as user: "+
clientUgi.getUserName(), e);
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(clientUgi);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Successfully cleaned up FileSystem handles for user: {}",
clientUgi.getUserName());
+ }
+ } catch (IOException cleanupException) {
+ LOG.error("Failed to clean up FileSystem handles for UGI: {}",
clientUgi, cleanupException);
+ }
}
}