Copilot commented on code in PR #6441:
URL: https://github.com/apache/hive/pull/6441#discussion_r3155778698


##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -20,78 +20,276 @@
 package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+
+import java.lang.ref.SoftReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
-
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
 public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+  @TestOnly
+  public static <C extends Catalog> C  
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Metrics counters
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", 
false)) {
+      cacheRef = new SoftReference<>(this);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
+  }
+
+  // Getter methods for accessing metrics
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
+  }
+
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
+  }
+
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
+  }
+
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  /**
+   * Generates a map of this cache's performance metrics, including hit count,
+   * miss count, load count, invalidate count, meta-load count, and hit rate.
+   * This can be used for monitoring and debugging purposes to understand the 
effectiveness of the cache.
+   * @return a map of cache performance metrics
+   */
+  public Map<String, Number> cacheStats() {
+    return Map.of(
+            "hit", getCacheHitCount(),
+            "miss", getCacheMissCount(),
+            "load", getCacheLoadCount(),
+            "invalidate", getCacheInvalidateCount(),
+            "metaload", getCacheMetaLoadCount(),
+            "hit-rate", getCacheHitRate()
+    );
+  }
+
+
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = identifier.toLowerCase();
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    if (cachedTable != null) {
+      final String location = new 
MetadataLocator(hiveCatalog).getLocation(canonicalized);
+      if (location == null) {
+        LOG.debug("Table {} has no location, returning cached table without 
location", canonicalized);
+      } else {
+        String cachedLocation = cachedTable instanceof HasTableOperations 
tableOps
+                ? tableOps.operations().current().metadataFileLocation()
+                : null;
+        if (!location.equals(cachedLocation)) {
+          LOG.debug("Invalidate table {}, cached {} != actual {}", 
canonicalized, cachedLocation, location);
+          // Invalidate the cached table if the location is different
+          invalidateTable(canonicalized);
+          onCacheInvalidate(canonicalized);
+        } else {
+          onCacheHit(canonicalized);
+          return cachedTable;
+        }
+      }
+    } else {
+      onCacheMiss(canonicalized);
+    }
+    final Table table = tableCache.get(canonicalized, 
this::loadTableWithoutCache);
+    if (table instanceof BaseMetadataTable) {
+      // Cache underlying table
+      TableIdentifier originTableIdentifier =
+              TableIdentifier.of(canonicalized.namespace().levels());
+      Table originTable = tableCache.get(originTableIdentifier, 
this::loadTableWithoutCache);
+      // Share TableOperations instance of origin table for all metadata 
tables, so that metadata
+      // table instances are refreshed as well when origin table instance is 
refreshed.
+      if (originTable instanceof HasTableOperations tableOps) {
+        TableOperations ops = tableOps.operations();
+        MetadataTableType type = MetadataTableType.from(canonicalized.name());
+        Table metadataTable =
+                MetadataTableUtils.createMetadataTableInstance(
+                        ops, hiveCatalog.name(), originTableIdentifier, 
canonicalized, type);
+        tableCache.put(canonicalized, metadataTable);
+        onCacheMetaLoad(canonicalized);
+        LOG.debug("Loaded metadata table: {} for origin table: {}", 
canonicalized, originTableIdentifier);
+        // Return the metadata table instead of the original table
+        return metadataTable;
+      }
+    }
+    onCacheLoad(canonicalized);
+    return table;
+  }
+
+  private Table loadTableWithoutCache(TableIdentifier identifier) {
+    try {
+      return hiveCatalog.loadTable(identifier);
+    } catch (NoSuchTableException exception) {
+      return null;
+    }

Review Comment:
   This cache loader returns `null` on `NoSuchTableException`. Caffeine's 
`Cache.get(key, mappingFunction)` must not return `null` and will throw at 
runtime. Prefer rethrowing the exception (or wrapping in a runtime exception) 
so the REST layer returns a proper 404, or use a separate Optional/sentinel 
strategy outside the Caffeine loader.



##########
standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
+import org.apache.iceberg.rest.responses.HMSCacheStatsResponse;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * Integration tests that verify the {@link HMSCachingCatalog} 
cache-statistics counters
+ * (hit, miss, load, hit-rate) are updated correctly and exposed accurately 
via the
+ * {@code GET v1/cache/stats} REST endpoint.
+ *
+ * <p>The server is started with {@link AuthType#NONE} so the tests focus 
purely on
+ * caching behaviour without any authentication noise.
+ */
+@Category(MetastoreCheckinTest.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class TestHMSCachingCatalogStats {
+
+  /** 5 minutes expressed in milliseconds – the value injected into {@code 
ICEBERG_CATALOG_CACHE_EXPIRY}. */
+  private static final long CACHE_EXPIRY_MS = 5 * 60 * 1_000L;
+
+  @RegisterExtension
+  private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION =
+      HiveRESTCatalogServerExtension.builder(AuthType.NONE)
+          // Without a positive expiry the HMSCatalogFactory skips 
HMSCachingCatalog entirely.
+          .configure(
+              MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY.getVarname(),
+              String.valueOf(CACHE_EXPIRY_MS))
+          .configure("metastore.iceberg.catalog.cache.debug", "true")
+          .build();
+
+  private RESTCatalog catalog;
+  private HiveCatalog serverCatalog;
+
+  @BeforeAll
+  void setupAll() {
+    catalog = RCKUtils.initCatalogClient(clientConfig());
+    serverCatalog = 
HMSCachingCatalog.getLatestCache(HMSCachingCatalog::getCatalog);
+  }
+
+  /** Remove any namespace/table created by the test so each run starts clean. 
*/
+  @AfterEach
+  void cleanup() {
+    RCKUtils.purgeCatalogTestEntries(catalog);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // helpers
+  // 
---------------------------------------------------------------------------
+
+  private java.util.Map<String, String> clientConfig() {
+    return java.util.Map.of("uri", REST_CATALOG_EXTENSION.getRestEndpoint());
+  }
+
+  /**
+   * Calls the {@code GET v1/cache/stats} endpoint directly over HTTP and 
returns
+   * the deserialised {@link HMSCacheStatsResponse}.
+   */
+  private HMSCacheStatsResponse fetchCacheStats() throws Exception {
+    String statsUrl = REST_CATALOG_EXTENSION.getRestEndpoint() + 
"/v1/cache/stats";
+    HttpRequest request = HttpRequest.newBuilder()
+        .uri(URI.create(statsUrl))
+        .GET()
+        .build();
+    HttpResponse<String> response;
+    try (HttpClient client = HttpClient.newHttpClient()) {
+      response = client.send(request, HttpResponse.BodyHandlers.ofString());
+    }

Review Comment:
   `java.net.http.HttpClient` does not implement `AutoCloseable` (on common 
target JDKs like 11/17), so using it in try-with-resources will not compile. 
Instantiate the client without try-with-resources (optionally as a 
static/shared field for the test).



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -20,78 +20,276 @@
 package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+
+import java.lang.ref.SoftReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
-
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
 public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+  @TestOnly
+  public static <C extends Catalog> C  
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Metrics counters
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", 
false)) {
+      cacheRef = new SoftReference<>(this);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
+  }
+
+  // Getter methods for accessing metrics
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
+  }
+
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
+  }
+
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
+  }
+
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  /**
+   * Generates a map of this cache's performance metrics, including hit count,
+   * miss count, load count, invalidate count, meta-load count, and hit rate.
+   * This can be used for monitoring and debugging purposes to understand the 
effectiveness of the cache.
+   * @return a map of cache performance metrics
+   */
+  public Map<String, Number> cacheStats() {
+    return Map.of(
+            "hit", getCacheHitCount(),
+            "miss", getCacheMissCount(),
+            "load", getCacheLoadCount(),
+            "invalidate", getCacheInvalidateCount(),
+            "metaload", getCacheMetaLoadCount(),
+            "hit-rate", getCacheHitRate()
+    );
+  }
+
+
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = identifier.toLowerCase();
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    if (cachedTable != null) {
+      final String location = new 
MetadataLocator(hiveCatalog).getLocation(canonicalized);
+      if (location == null) {
+        LOG.debug("Table {} has no location, returning cached table without 
location", canonicalized);
+      } else {
+        String cachedLocation = cachedTable instanceof HasTableOperations 
tableOps
+                ? tableOps.operations().current().metadataFileLocation()
+                : null;
+        if (!location.equals(cachedLocation)) {
+          LOG.debug("Invalidate table {}, cached {} != actual {}", 
canonicalized, cachedLocation, location);
+          // Invalidate the cached table if the location is different
+          invalidateTable(canonicalized);
+          onCacheInvalidate(canonicalized);
+        } else {
+          onCacheHit(canonicalized);
+          return cachedTable;
+        }
+      }
+    } else {
+      onCacheMiss(canonicalized);
+    }
+    final Table table = tableCache.get(canonicalized, 
this::loadTableWithoutCache);
+    if (table instanceof BaseMetadataTable) {
+      // Cache underlying table
+      TableIdentifier originTableIdentifier =
+              TableIdentifier.of(canonicalized.namespace().levels());
+      Table originTable = tableCache.get(originTableIdentifier, 
this::loadTableWithoutCache);

Review Comment:
   Deriving `originTableIdentifier` from only `namespace().levels()` drops the 
base table name, so it will not identify the originating table correctly (and 
may even produce an invalid identifier depending on `TableIdentifier.of` 
overload resolution). For metadata tables, parse/construct the origin 
identifier using the metadata-table naming scheme (e.g., extract the base table 
name from `canonicalized` and build `TableIdentifier.of(namespace, 
baseTableName)`) before loading/sharing `TableOperations`.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -20,78 +20,276 @@
 package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+
+import java.lang.ref.SoftReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
-
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
 public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+  @TestOnly
+  public static <C extends Catalog> C  
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Metrics counters
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", 
false)) {
+      cacheRef = new SoftReference<>(this);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
+  }
+
+  // Getter methods for accessing metrics
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
+  }
+
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
+  }
+
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
+  }
+
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  /**
+   * Generates a map of this cache's performance metrics, including hit count,
+   * miss count, load count, invalidate count, meta-load count, and hit rate.
+   * This can be used for monitoring and debugging purposes to understand the 
effectiveness of the cache.
+   * @return a map of cache performance metrics
+   */
+  public Map<String, Number> cacheStats() {
+    return Map.of(
+            "hit", getCacheHitCount(),
+            "miss", getCacheMissCount(),
+            "load", getCacheLoadCount(),
+            "invalidate", getCacheInvalidateCount(),
+            "metaload", getCacheMetaLoadCount(),
+            "hit-rate", getCacheHitRate()
+    );
+  }
+
+
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = identifier.toLowerCase();
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    if (cachedTable != null) {
+      final String location = new 
MetadataLocator(hiveCatalog).getLocation(canonicalized);
+      if (location == null) {
+        LOG.debug("Table {} has no location, returning cached table without 
location", canonicalized);
+      } else {

Review Comment:
   This introduces an HMS call (`MetadataLocator#getLocation`) on every cache 
hit to validate `METADATA_LOCATION`, which can largely negate the benefit of 
caching and increase metastore load/latency under read-heavy workloads. 
Consider adding a cheap local guard (e.g., only re-check HMS if the cached 
entry is older than a small validation interval, or only validate on/after 
refresh/expiry), and avoid instantiating a new `MetadataLocator` per request 
(reuse a field).



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -20,78 +20,276 @@
 package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+
+import java.lang.ref.SoftReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
-
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
 public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+  @TestOnly
+  public static <C extends Catalog> C  
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Metrics counters
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", 
false)) {
+      cacheRef = new SoftReference<>(this);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
+  }
+
+  // Getter methods for accessing metrics
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
+  }
+
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
+  }
+
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
+  }
+
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  /**
+   * Generates a map of this cache's performance metrics, including hit count,
+   * miss count, load count, invalidate count, meta-load count, and hit rate.
+   * This can be used for monitoring and debugging purposes to understand the 
effectiveness of the cache.
+   * @return a map of cache performance metrics
+   */
+  public Map<String, Number> cacheStats() {
+    return Map.of(
+            "hit", getCacheHitCount(),
+            "miss", getCacheMissCount(),
+            "load", getCacheLoadCount(),
+            "invalidate", getCacheInvalidateCount(),
+            "metaload", getCacheMetaLoadCount(),
+            "hit-rate", getCacheHitRate()
+    );
+  }
+
+
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = identifier.toLowerCase();
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    if (cachedTable != null) {
+      final String location = new 
MetadataLocator(hiveCatalog).getLocation(canonicalized);
+      if (location == null) {
+        LOG.debug("Table {} has no location, returning cached table without 
location", canonicalized);

Review Comment:
   When `location == null` and `cachedTable != null`, the method does not 
return `cachedTable` immediately, and will later call `tableCache.get(...)` and 
then `onCacheLoad(...)`, which can inflate load counters even though the entry 
was already cached. If the intended behavior is 'treat as cache hit when 
location can't be fetched', return the cached table (and count it as a hit) in 
the `location == null` branch.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import 
org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Fetches the location of a given metadata table.
+ * <p>Since the location mutates with each transaction, this allows 
determining if a cached version of the
+ * table is the latest known in the HMS database.</p>
+ */
+public class MetadataLocator {
+  private static final org.slf4j.Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(MetadataLocator.class);
+  static final GetProjectionsSpec PARAM_SPEC = new 
GetTableProjectionsSpecBuilder()
+      .includeParameters()  // only fetches table.parameters
+      .build();
+  private final HiveCatalog catalog;
+
+  public MetadataLocator(HiveCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  public HiveCatalog getCatalog() {
+    return catalog;
+  }
+
+  /**
+   * Returns the location of the metadata table identified by the given 
identifier, or null if the table does not exist or is
+   * not a metadata table.
+   * <p>This uses the Thrift API to fetch the table parameters, which is more 
efficient than fetching the entire table object.</p>
+   * @param  identifier the identifier of the metadata table to fetch the 
location for
+   * @return the location of the metadata table, or null if the table does not 
exist or is not a metadata table
+   */
+  public String getLocation(TableIdentifier identifier) {
+    final ClientPool<IMetaStoreClient, TException> clients = 
catalog.clientPool();
+    final String catName = catalog.name();
+    final TableIdentifier baseTableIdentifier;
+    if (!catalog.isValidIdentifier(identifier)) {
+      if (!isValidMetadataIdentifier(identifier)) {
+        return null;
+      } else {
+        baseTableIdentifier = 
TableIdentifier.of(identifier.namespace().levels());
+      }
+    } else {
+      baseTableIdentifier = identifier;
+    }
+    String database = baseTableIdentifier.namespace().level(0);
+    String tableName = baseTableIdentifier.name();
+    try {
+      List<Table> tables = clients.run(
+          client -> client.getTables(catName, database, 
Collections.singletonList(tableName), PARAM_SPEC)
+      );
+      return tables == null || tables.isEmpty()
+          ? null
+          : 
tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);

Review Comment:
   `List#getFirst()` is only available on newer JDKs (via 
`SequencedCollection`) and will not compile on common target JDKs like 11/17. 
Use `tables.get(0)` instead.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCatalogAdapter.java:
##########
@@ -78,6 +80,7 @@
  * Adaptor class to translate REST requests into {@link Catalog} API calls.
  */
 public class HMSCatalogAdapter implements RESTClient {
+  private static final String V1_CACHE_STATS = "v1/cache/stats";

Review Comment:
   Most routes appear to use `ResourcePaths` constants, but this new path is a 
standalone string. To keep routing consistent and reduce risk of drift, 
consider adding a `ResourcePaths` entry for cache stats and referencing it here.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -20,78 +20,276 @@
 package org.apache.iceberg.rest;
 
 import com.github.benmanes.caffeine.cache.Ticker;
+
+import java.lang.ref.SoftReference;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
-
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
 public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+  @TestOnly
+  public static <C extends Catalog> C  
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Metrics counters
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    if (catalog.getConf().getBoolean("metastore.iceberg.catalog.cache.debug", 
false)) {
+      cacheRef = new SoftReference<>(this);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, cacheInvalidateCount.get());
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, cacheLoadCount.get());
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, cacheHitCount.get());
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, cacheMissCount.get());
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, cacheMetaLoadCount.get());
+  }
+
+  // Getter methods for accessing metrics
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
+  }
+
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
+  }
+
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
+  }
+
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  /**
+   * Generates a map of this cache's performance metrics, including hit count,
+   * miss count, load count, invalidate count, meta-load count, and hit rate.
+   * This can be used for monitoring and debugging purposes to understand the 
effectiveness of the cache.
+   * @return a map of cache performance metrics
+   */
+  public Map<String, Number> cacheStats() {
+    return Map.of(
+            "hit", getCacheHitCount(),
+            "miss", getCacheMissCount(),
+            "load", getCacheLoadCount(),
+            "invalidate", getCacheInvalidateCount(),
+            "metaload", getCacheMetaLoadCount(),
+            "hit-rate", getCacheHitRate()
+    );
+  }
+
+
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = identifier.toLowerCase();

Review Comment:
   `loadTable` always lower-cases identifiers, which can change behavior for 
case-sensitive catalogs. This also ignores the `caseSensitive` constructor 
parameter passed to `CachingCatalog`. Consider canonicalizing conditionally 
based on the configured case-sensitivity (or delegating to the superclass' 
canonicalization mechanism if available) to avoid incorrect table lookups.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/hive/MetadataLocator.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hive;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetProjectionsSpec;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import 
org.apache.hadoop.hive.metastore.client.builder.GetTableProjectionsSpecBuilder;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.ClientPool;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.thrift.TException;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Fetches the location of a given metadata table.
+ * <p>Since the location mutates with each transaction, this allows 
determining if a cached version of the
+ * table is the latest known in the HMS database.</p>
+ */
+public class MetadataLocator {
+  private static final org.slf4j.Logger LOGGER = 
org.slf4j.LoggerFactory.getLogger(MetadataLocator.class);
+  static final GetProjectionsSpec PARAM_SPEC = new 
GetTableProjectionsSpecBuilder()
+      .includeParameters()  // only fetches table.parameters
+      .build();
+  private final HiveCatalog catalog;
+
+  public MetadataLocator(HiveCatalog catalog) {
+    this.catalog = catalog;
+  }
+
+  public HiveCatalog getCatalog() {
+    return catalog;
+  }
+
+  /**
+   * Returns the location of the metadata table identified by the given 
identifier, or null if the table does not exist or is
+   * not a metadata table.
+   * <p>This uses the Thrift API to fetch the table parameters, which is more 
efficient than fetching the entire table object.</p>
+   * @param  identifier the identifier of the metadata table to fetch the 
location for
+   * @return the location of the metadata table, or null if the table does not 
exist or is not a metadata table
+   */
+  public String getLocation(TableIdentifier identifier) {
+    final ClientPool<IMetaStoreClient, TException> clients = 
catalog.clientPool();
+    final String catName = catalog.name();
+    final TableIdentifier baseTableIdentifier;
+    if (!catalog.isValidIdentifier(identifier)) {
+      if (!isValidMetadataIdentifier(identifier)) {
+        return null;
+      } else {
+        baseTableIdentifier = 
TableIdentifier.of(identifier.namespace().levels());
+      }
+    } else {
+      baseTableIdentifier = identifier;
+    }
+    String database = baseTableIdentifier.namespace().level(0);
+    String tableName = baseTableIdentifier.name();
+    try {
+      List<Table> tables = clients.run(
+          client -> client.getTables(catName, database, 
Collections.singletonList(tableName), PARAM_SPEC)
+      );
+      return tables == null || tables.isEmpty()
+          ? null
+          : 
tables.getFirst().getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
+    } catch (NoSuchTableException | NoSuchObjectException e) {
+      LOGGER.info("Table not found {}", baseTableIdentifier, e);
+    } catch (TException e) {
+      LOGGER.info("Table parameters fetch failed {}", baseTableIdentifier, e);

Review Comment:
   This method is called from `loadTable` and can run on hot paths; logging 
missing tables / transient Thrift failures at INFO with full exceptions can 
cause log noise and increased IO. Consider logging 'not found' at DEBUG (or 
INFO without stack trace), and treat Thrift exceptions similarly unless they 
indicate a server-side issue requiring operator attention.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to