okumin commented on code in PR #6441:
URL: https://github.com/apache/hive/pull/6441#discussion_r3226292817


##########
standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java:
##########
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.rest;
+
+import java.lang.management.ManagementFactory;
+import java.util.Set;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
+import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType;
+import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension;
+import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+/**
+ * Integration tests that verify the {@link HMSCachingCatalog} 
cache-statistics counters
+ * (hit, miss, load, invalidate, l1-hit, l1-miss, and their rates) are updated 
correctly
+ * and exposed accurately via the JMX MBean registered under
+ * {@code org.apache.iceberg.rest:type=HMSCachingCatalog,name=*}.
+ *
+ * <p>The server is started with {@link AuthType#NONE} so the tests focus 
purely on
+ * caching behaviour without any authentication noise.
+ */
+@Category(MetastoreCheckinTest.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+class TestHMSCachingCatalogStats {
+
+  /** 5 minutes expressed in milliseconds – the value injected into {@code 
ICEBERG_CATALOG_CACHE_EXPIRY}. */
+  private static final long CACHE_EXPIRY_MS = 5 * 60 * 1_000L;
+
+  @RegisterExtension
+  private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = 
HiveRESTCatalogServerExtension.builder(AuthType.NONE)
+      // Without a positive expiry the HMSCatalogFactory skips 
HMSCachingCatalog entirely.
+      
.configure(MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY.getVarname(), 
String.valueOf(CACHE_EXPIRY_MS))
+      .configure("hive.in.test", "true").build();
+
+  private RESTCatalog catalog;
+  private HiveCatalog serverCatalog;
+  /** The server-side {@link HMSCachingCatalog} instance; used to invalidate 
entries directly. */
+  private HMSCachingCatalog serverCachingCatalog;
+  /** The platform {@link MBeanServer} used for all JMX-based assertions. */
+  private MBeanServer mbs;
+  /** Resolved once in {@link #setupAll()} and reused across every test. */
+  private ObjectName jmxObjectName;
+
+  @BeforeAll
+  void setupAll() throws Exception {
+    catalog = RCKUtils.initCatalogClient(java.util.Map.of("uri", 
REST_CATALOG_EXTENSION.getRestEndpoint()));
+    serverCachingCatalog = HMSCachingCatalog.getLatestCache(null);
+    Assertions.assertNotNull(serverCachingCatalog, "Expected HMSCachingCatalog 
to be initialized");
+    serverCatalog = serverCachingCatalog.getCatalog();
+
+    // Resolve the JMX ObjectName registered by HMSCachingCatalog. We use a 
wildcard
+    // so the test is independent of the exact catalog name.
+    mbs = ManagementFactory.getPlatformMBeanServer();
+    Set<ObjectName> names = mbs.queryNames(
+        new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,*"), 
null);
+    Assertions.assertFalse(names.isEmpty(),
+        "HMSCachingCatalog MBean must be registered in the platform 
MBeanServer");
+    jmxObjectName = names.iterator().next();
+  }
+
+  /** Remove any namespace/table created by the test so each run starts clean. 
*/
+  @AfterEach
+  void cleanup() {
+    RCKUtils.purgeCatalogTestEntries(catalog);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // helpers
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Reads a single JMX attribute from the {@link HMSCachingCatalogMXBean}.
+   *
+   * @param attribute the attribute name as declared in {@link 
HMSCachingCatalogMXBean}
+   *                  (e.g. {@code "CacheHitCount"})
+   * @return the attribute value
+   */
+  private Object getJmxAttribute(String attribute) throws Exception {
+    return mbs.getAttribute(jmxObjectName, attribute);
+  }
+
+  /**
+   * Convenience wrapper that reads a {@code long} JMX attribute.
+   */
+  private long jmxLong(String attribute) throws Exception {
+    return (long) getJmxAttribute(attribute);
+  }
+
+  /**
+   * Convenience wrapper that reads a {@code double} JMX attribute.
+   */
+  private double jmxDouble(String attribute) throws Exception {
+    return (double) getJmxAttribute(attribute);
+  }
+
+  /**
+   * Invokes a void JMX operation on the {@link HMSCachingCatalogMXBean}.
+   *
+   * @param operationName the operation name (e.g. {@code "resetCacheStats"})
+   */
+  private void invokeJmxOperation(String operationName) throws Exception {
+    mbs.invoke(jmxObjectName, operationName, new Object[0], new String[0]);
+  }
+
+  // 
---------------------------------------------------------------------------
+  // tests
+  // 
---------------------------------------------------------------------------
+
+  /**
+   * Verifies that the {@link HMSCachingCatalog} correctly tracks cache hits, 
misses,
+   * loads, invalidations, L1 hits, and L1 misses via JMX.
+   *
+   * <p>Strategy:
+   * <ol>
+   *   <li>Snapshot JMX baseline counters before any operations so the test is 
isolated
+   *       from cumulative state left by previous tests.</li>
+   *   <li>Create a namespace and a table.</li>
+   *   <li>First {@code loadTable} call → cache miss + actual load.</li>
+   *   <li>Second and third rapid {@code loadTable} calls → L1 cache hits (TTL 
still valid).</li>
+   *   <li>Mutate the table to advance its metadata location in HMS.</li>
+   *   <li>Wait for the L1 TTL to expire, then reload → L1 miss + invalidation 
+ reload.</li>
+   *   <li>Assert JMX counter deltas match expectations.</li>
+   * </ol>
+   */
+  @Test
+  void testCacheCountersAreUpdated() throws Exception {
+    // -- JMX baseline 
-----------------------------------------------------------
+    long baseHit = jmxLong("CacheHitCount");
+    long baseMiss = jmxLong("CacheMissCount");
+    long baseLoad = jmxLong("CacheLoadCount");
+    long baseL1Hit = jmxLong("L1CacheHitCount");
+
+    // -- exercise the cache 
-----------------------------------------------------
+    var db = Namespace.of("caching_stats_test_db");
+    var tableId = TableIdentifier.of(db, "caching_stats_test_table");
+
+    catalog.createNamespace(db);
+    catalog.createTable(tableId, new Schema());
+
+    // First load  → cache miss + load
+    catalog.loadTable(tableId);
+    // Second load → L1 hit  (within TTL, HMS location check skipped)
+    catalog.loadTable(tableId);
+    // Third load  → L1 hit
+    catalog.loadTable(tableId);
+
+    // Mutate the table by appending a data file – this creates a new snapshot
+    // which advances METADATA_LOCATION in HMS, so the next loadTable call 
through
+    // the caching catalog will detect the stale cached location and 
invalidate it.
+    Table table = serverCatalog.loadTable(tableId);
+    DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath(table.location() + "/data/fake-0.parquet")
+        .withFileSizeInBytes(1024).withRecordCount(1).build();
+    table.newAppend().appendFile(dataFile).commit();
+
+    long baseInvalidate = jmxLong("CacheInvalidateCount");
+    // The L1 cache has a 3-second default TTL; wait for entries to expire.
+    Thread.sleep(3_000);
+    // Fourth load → L1 miss + cache invalidation + reload
+    catalog.loadTable(tableId);
+
+    // -- JMX assertions 
---------------------------------------------------------
+    long deltaHit = jmxLong("CacheHitCount") - baseHit;
+    long deltaMiss = jmxLong("CacheMissCount") - baseMiss;
+    long deltaLoad = jmxLong("CacheLoadCount") - baseLoad;
+    long deltaInvalidate = jmxLong("CacheInvalidateCount") - baseInvalidate;
+    long deltaL1Hit = jmxLong("L1CacheHitCount") - baseL1Hit;
+    long deltaL1Miss = jmxLong("L1CacheMissCount");   // absolute value is 
fine for L1 miss
+
+    Assertions.assertTrue(deltaMiss >= 1,
+        "Expected at least 1 cache miss (first loadTable), but delta was: " + 
deltaMiss);
+    Assertions.assertTrue(deltaLoad >= 2,
+        "Expected at least 2 cache loads (initial + post-invalidation reload), 
but delta was: " + deltaLoad);
+    Assertions.assertTrue(deltaHit >= 2,
+        "Expected at least 2 cache hits (second + third loadTable), but delta 
was: " + deltaHit);
+    Assertions.assertTrue(deltaInvalidate >= 1,
+        "Expected at least 1 cache invalidation (metadata location changed), 
but delta was: " + deltaInvalidate);
+
+    // L1 hits: the 2nd and 3rd loadTable calls should have been served by L1.
+    Assertions.assertTrue(deltaL1Hit >= 2,
+        "Expected at least 2 L1 cache hits (rapid successive loads within 
TTL), but delta was: " + deltaL1Hit);
+    // L1 miss: at least the fourth load (after TTL expiry) must have missed 
L1.
+    Assertions.assertTrue(deltaL1Miss >= 1,
+        "Expected at least 1 L1 cache miss (after TTL expiry), but was: " + 
deltaL1Miss);
+
+    // Rate attributes must be valid ratios in [0.0, 1.0].
+    double hitRate = jmxDouble("CacheHitRate");
+    Assertions.assertTrue(hitRate > 0.0 && hitRate <= 1.0,
+        "CacheHitRate must be in (0.0, 1.0] but was: " + hitRate);
+
+    double l1HitRate = jmxDouble("L1CacheHitRate");
+    Assertions.assertTrue(l1HitRate > 0.0 && l1HitRate <= 1.0,
+        "L1CacheHitRate must be in (0.0, 1.0] but was: " + l1HitRate);
+  }
+
+  /**
+   * Verifies that the {@code resetCacheStats} JMX operation zeroes all 
counters.
+   *
+   * <p>Strategy:
+   * <ol>
+   *   <li>Perform some cache operations to ensure all counters are 
non-zero.</li>
+   *   <li>Invoke {@code resetCacheStats()} via JMX.</li>
+   *   <li>Assert that every JMX counter attribute reads {@code 0} / {@code 
0.0}.</li>
+   * </ol>
+   */
+  @Test
+  void testJmxResetCacheStats() throws Exception {
+    // -- warm up counters 
-------------------------------------------------------
+    var db = Namespace.of("jmx_reset_test_db");
+    var tableId = TableIdentifier.of(db, "jmx_reset_test_table");
+    catalog.createNamespace(db);
+    catalog.createTable(tableId, new Schema());
+    catalog.loadTable(tableId);  // miss + load
+    catalog.loadTable(tableId);  // hit (L1 hit on the fast path)
+
+    // Sanity: at least one counter must be non-zero before the reset.
+    Assertions.assertTrue(jmxLong("CacheHitCount") + jmxLong("CacheMissCount") 
> 0,
+        "At least one counter must be non-zero before reset");
+
+    // -- invoke the reset operation via JMX 
-------------------------------------
+    invokeJmxOperation("resetCacheStats");
+
+    // -- assertions post-reset 
--------------------------------------------------
+    Assertions.assertEquals(0L, jmxLong("CacheHitCount"),        
"CacheHitCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("CacheMissCount"),       
"CacheMissCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("CacheLoadCount"),       
"CacheLoadCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("CacheInvalidateCount"), 
"CacheInvalidateCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("CacheMetaLoadCount"),   
"CacheMetaLoadCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("L1CacheHitCount"),      
"L1CacheHitCount must be 0 after reset");
+    Assertions.assertEquals(0L, jmxLong("L1CacheMissCount"),     
"L1CacheMissCount must be 0 after reset");
+    Assertions.assertEquals(0.0, jmxDouble("CacheHitRate"),  1e-9, 
"CacheHitRate must be 0.0 after reset");
+    Assertions.assertEquals(0.0, jmxDouble("L1CacheHitRate"), 1e-9, 
"L1CacheHitRate must be 0.0 after reset");
+
+    // -- verify rate calculation still works correctly after reset 
--------------
+    // resetCacheStats() zeroes counters but does NOT evict the L2/L1 cache, 
so the
+    // table is still cached. Invalidate it directly on the server-side 
HMSCachingCatalog
+    // so the first post-reset load is a genuine cold miss rather than an 
L1/L2 hit.
+    // NOTE: catalog.invalidateTable() only clears the REST *client* state and 
does not
+    // reach the server-side cache.
+    serverCachingCatalog.invalidateTable(tableId);

Review Comment:
   I feel we should write this not as an integration test but as a unit test. 
If we do so, we can remove an escape hatch, i.e., `getLatestCache`



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");

Review Comment:
   Should we use `CATALOG_DEFAULT` in MetastoreConf?



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onCacheInvalidate(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onCacheLoad(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    long count = cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when an L1 cache hit occurs for a given table identifier.
+   * Only fired when the L2 cache also has the entry.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheHit(TableIdentifier tid) {
+    long count = l1CacheHitCount.incrementAndGet();
+    LOG.debug("L1 cache hit {}: {}", tid, count);
   }
 
+  /**
+   * Callback when an L1 cache miss occurs for a given table identifier.
+   * Only fired when the L2 cache has the entry but L1 is absent or expired.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheMiss(TableIdentifier tid) {
+    long count = l1CacheMissCount.incrementAndGet();
+    LOG.debug("L1 cache miss {}: {}", tid, count);
+  }
+
+  // Getter methods for accessing metrics
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
   }
 
   @Override
-  public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.loadNamespaceMetadata(nmspc);
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
   }
 
   @Override
-  public boolean dropNamespace(Namespace nmspc) throws 
NamespaceNotEmptyException {
-    List<TableIdentifier> tables = listTables(nmspc);
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  @Override
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public long getL1CacheHitCount() {
+    return l1CacheHitCount.get();
+  }
+
+  @Override
+  public long getL1CacheMissCount() {
+    return l1CacheMissCount.get();
+  }
+
+  @Override
+  public double getL1CacheHitRate() {
+    long hits = l1CacheHitCount.get();
+    long total = hits + l1CacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public void resetCacheStats() {
+    cacheHitCount.set(0);
+    cacheMissCount.set(0);
+    cacheLoadCount.set(0);
+    cacheInvalidateCount.set(0);
+    cacheMetaLoadCount.set(0);
+    l1CacheHitCount.set(0);
+    l1CacheMissCount.set(0);
+    LOG.debug("Cache stats reset");
+  }
+
+  @Override
+  public void close() {
+    unregisterJmx();
+  }
+
+  /**
+   * Unregisters this instance from the platform MBeanServer.
+   */
+  private void unregisterJmx() {
+    if (jmxObjectName != null) {
+      try {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        if (mbs.isRegistered(jmxObjectName)) {
+          mbs.unregisterMBean(jmxObjectName);
+          LOG.info("Unregistered JMX MBean: {}", jmxObjectName);
+        }
+      } catch (JMException e) {
+        LOG.warn("Failed to unregister JMX MBean: {}", jmxObjectName, e);
+      } finally {
+        jmxObjectName = null;
+      }
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
+  }
+
+  /**
+   * Canonicalizes the given table identifier based on the case sensitivity of 
the underlying catalog.
+   * Copied from CachingCatalog that exposes it as private.
+   * @param tableIdentifier the table identifier to canonicalize
+   * @return the canonicalized table identifier
+   */
+  private TableIdentifier canonicalizeIdentifier(TableIdentifier 
tableIdentifier) {
+    return this.caseSensitive ? tableIdentifier : 
tableIdentifier.toLowerCase();
+  }
+
+  @Override
+  public void invalidateTable(TableIdentifier ident) {
+    super.invalidateTable(ident);
+    l1Cache.remove(ident);
+  }
+
+  @Override
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = canonicalizeIdentifier(identifier);
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    long now = System.currentTimeMillis();
+    if (cachedTable != null) {
+      // Determine if L1 cache is valid based on the last cached time and the 
TTL.
+      // If the table is in L1 cache, we can skip the location check and 
return the cached table directly,
+      // which can significantly reduce the latency for repeated access to the 
same table.
+      Long lastCached = l1Cache.get(canonicalized);
+      if (lastCached != null) {
+        if (now - lastCached < l1Ttl) {
+          LOG.debug("Table {} is in L1 cache, returning cached table", 
canonicalized);
+          onL1CacheHit(canonicalized);
+          onCacheHit(canonicalized);
+          return cachedTable;

Review Comment:
   Can we make L1 stuff out of scope from this PR? As CachingCatalog is 
designed for the client-side caching, there is no authorization when accessing 
a cached object. It means if Alice successfully loads `abc` table into the 
cache and then Bob, without access to `abc`, the security domain would be 
breached. It is OK because a user of a client-side CachingCatalog, e.g., 
SparkCatalog, is always Alice. But HMS is designed to accept multiple users' 
access. It requires additional tests and careful reviews.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onCacheMetaLoad(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    long count = cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when an L1 cache hit occurs for a given table identifier.
+   * Only fired when the L2 cache also has the entry.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheHit(TableIdentifier tid) {
+    long count = l1CacheHitCount.incrementAndGet();
+    LOG.debug("L1 cache hit {}: {}", tid, count);
   }
 
+  /**
+   * Callback when an L1 cache miss occurs for a given table identifier.
+   * Only fired when the L2 cache has the entry but L1 is absent or expired.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheMiss(TableIdentifier tid) {
+    long count = l1CacheMissCount.incrementAndGet();
+    LOG.debug("L1 cache miss {}: {}", tid, count);
+  }
+
+  // Getter methods for accessing metrics
   @Override
-  public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema 
schema) {
-    return hiveCatalog.buildTable(identifier, schema);
+  public long getCacheHitCount() {
+    return cacheHitCount.get();
   }
 
   @Override
-  public void createNamespace(Namespace nmspc, Map<String, String> map) {
-    hiveCatalog.createNamespace(nmspc, map);
+  public long getCacheMissCount() {
+    return cacheMissCount.get();
   }
 
   @Override
-  public List<Namespace> listNamespaces(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.listNamespaces(nmspc);
+  public long getCacheLoadCount() {
+    return cacheLoadCount.get();
   }
 
   @Override
-  public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws 
NoSuchNamespaceException {
-    return hiveCatalog.loadNamespaceMetadata(nmspc);
+  public long getCacheInvalidateCount() {
+    return cacheInvalidateCount.get();
   }
 
   @Override
-  public boolean dropNamespace(Namespace nmspc) throws 
NamespaceNotEmptyException {
-    List<TableIdentifier> tables = listTables(nmspc);
+  public long getCacheMetaLoadCount() {
+    return cacheMetaLoadCount.get();
+  }
+
+  @Override
+  public double getCacheHitRate() {
+    long hits = cacheHitCount.get();
+    long total = hits + cacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public long getL1CacheHitCount() {
+    return l1CacheHitCount.get();
+  }
+
+  @Override
+  public long getL1CacheMissCount() {
+    return l1CacheMissCount.get();
+  }
+
+  @Override
+  public double getL1CacheHitRate() {
+    long hits = l1CacheHitCount.get();
+    long total = hits + l1CacheMissCount.get();
+    return total == 0 ? 0.0 : (double) hits / total;
+  }
+
+  @Override
+  public void resetCacheStats() {
+    cacheHitCount.set(0);
+    cacheMissCount.set(0);
+    cacheLoadCount.set(0);
+    cacheInvalidateCount.set(0);
+    cacheMetaLoadCount.set(0);
+    l1CacheHitCount.set(0);
+    l1CacheMissCount.set(0);
+    LOG.debug("Cache stats reset");
+  }
+
+  @Override
+  public void close() {
+    unregisterJmx();
+  }
+
+  /**
+   * Unregisters this instance from the platform MBeanServer.
+   */
+  private void unregisterJmx() {
+    if (jmxObjectName != null) {
+      try {
+        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+        if (mbs.isRegistered(jmxObjectName)) {
+          mbs.unregisterMBean(jmxObjectName);
+          LOG.info("Unregistered JMX MBean: {}", jmxObjectName);
+        }
+      } catch (JMException e) {
+        LOG.warn("Failed to unregister JMX MBean: {}", jmxObjectName, e);
+      } finally {
+        jmxObjectName = null;
+      }
+    }
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> map) {
+    hiveCatalog.createNamespace(namespace, map);
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    return hiveCatalog.listNamespaces(namespace);
+  }
+
+  /**
+   * Canonicalizes the given table identifier based on the case sensitivity of 
the underlying catalog.
+   * Copied from CachingCatalog that exposes it as private.
+   * @param tableIdentifier the table identifier to canonicalize
+   * @return the canonicalized table identifier
+   */
+  private TableIdentifier canonicalizeIdentifier(TableIdentifier 
tableIdentifier) {
+    return this.caseSensitive ? tableIdentifier : 
tableIdentifier.toLowerCase();
+  }
+
+  @Override
+  public void invalidateTable(TableIdentifier ident) {
+    super.invalidateTable(ident);
+    l1Cache.remove(ident);
+  }
+
+  @Override
+  public Table loadTable(final TableIdentifier identifier) {
+    final TableIdentifier canonicalized = canonicalizeIdentifier(identifier);
+    final Table cachedTable = tableCache.getIfPresent(canonicalized);
+    long now = System.currentTimeMillis();
+    if (cachedTable != null) {
+      // Determine if L1 cache is valid based on the last cached time and the 
TTL.
+      // If the table is in L1 cache, we can skip the location check and 
return the cached table directly,
+      // which can significantly reduce the latency for repeated access to the 
same table.
+      Long lastCached = l1Cache.get(canonicalized);
+      if (lastCached != null) {
+        if (now - lastCached < l1Ttl) {
+          LOG.debug("Table {} is in L1 cache, returning cached table", 
canonicalized);
+          onL1CacheHit(canonicalized);
+          onCacheHit(canonicalized);
+          return cachedTable;
+        } else {
+          l1Cache.remove(canonicalized);
+          onL1CacheMiss(canonicalized);
+        }
+      } else {
+        onL1CacheMiss(canonicalized);
+      }
+      // If the table is no longer in L1 cache, we need to check the location.
+      final String location = metadataLocator.getLocation(canonicalized);
+      if (location == null) {
+        LOG.debug("Table {} has no location, returning cached table without 
location", canonicalized);

Review Comment:
   I think this case needs to throw NoSuchTableException because it does mean 
the table metadata is deleted, which means manifest lists are highly likely to 
be deleted or already invalid



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);

Review Comment:
   This is probably eligible for `error`
   
   ```suggestion
         LOG.error("Failed to register JMX MBean for HMSCachingCatalog", e);
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);

Review Comment:
   LongAdder could be an option



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onCacheHit(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    long count = cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when an L1 cache hit occurs for a given table identifier.
+   * Only fired when the L2 cache also has the entry.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheHit(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onL1CacheHit(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {

Review Comment:
   Can we use this constructor in the near future? If no, we may always use the 
default sensitivity.



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onCacheMiss(TableIdentifier tid) {
   ```



##########
standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java:
##########
@@ -33,65 +51,423 @@
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.MetadataLocator;
 import org.apache.iceberg.view.View;
 import org.apache.iceberg.view.ViewBuilder;
+import org.jetbrains.annotations.TestOnly;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.Ticker;
 
 /**
  * Class that wraps an Iceberg Catalog to cache tables.
  */
-public class HMSCachingCatalog extends CachingCatalog implements 
SupportsNamespaces, ViewCatalog {
+public class HMSCachingCatalog extends CachingCatalog
+    implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, 
Closeable {
+  protected static final Logger LOG = 
LoggerFactory.getLogger(HMSCachingCatalog.class);
+
+  @TestOnly
+  private static SoftReference<HMSCachingCatalog> cacheRef = new 
SoftReference<>(null);
+
+  @TestOnly
+  @SuppressWarnings("unchecked")
+  public static <C extends Catalog> C 
getLatestCache(Function<HMSCachingCatalog, C> extractor) {
+    HMSCachingCatalog cache = cacheRef.get();
+    if (cache == null) {
+      return null;
+    }
+    return extractor == null ? (C) cache : extractor.apply(cache);
+  }
+
+  @TestOnly
+  public HiveCatalog getCatalog() {
+    return hiveCatalog;
+  }
+
+  // The underlying HiveCatalog instance.
   private final HiveCatalog hiveCatalog;
-  
-  public HMSCachingCatalog(HiveCatalog catalog, long expiration) {
-    super(catalog, true, expiration, Ticker.systemTicker());
+  // Duplicate because CachingCatalog doesn't expose the case sensitivity of 
the underlying catalog,
+  // which is needed for canonicalizing identifiers before caching.
+  private final boolean caseSensitive;
+  // The locator.
+  private final MetadataLocator metadataLocator;
+  // An L1 small latency cache.
+  // This is used to cache the last cached time for each table identifier,
+  // so that we can skip location check for repeated access to the same table 
within a short period of time,
+  // which can significantly reduce the latency for repeated access to the 
same table.
+  private final Map<TableIdentifier, Long> l1Cache;
+  // The TTL for L1 cache (3s).
+  private final int l1Ttl;
+  // The L1 cache size.
+  private final int l1CacheSize;
+
+  // Metrics counters.
+  private final AtomicLong cacheHitCount = new AtomicLong(0);
+  private final AtomicLong cacheMissCount = new AtomicLong(0);
+  private final AtomicLong cacheLoadCount = new AtomicLong(0);
+  private final AtomicLong cacheInvalidateCount = new AtomicLong(0);
+  private final AtomicLong cacheMetaLoadCount = new AtomicLong(0);
+  // L1 cache metrics: counted only when the L2 (Caffeine) cache already has 
the entry.
+  private final AtomicLong l1CacheHitCount = new AtomicLong(0);
+  private final AtomicLong l1CacheMissCount = new AtomicLong(0);
+
+  // JMX ObjectName under which this instance is registered (may be null if 
registration failed).
+  private ObjectName jmxObjectName;
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) {
+    this(catalog, expirationMs, /*caseSensitive*/ true);
+  }
+
+  public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean 
caseSensitive) {
+    super(catalog, caseSensitive, expirationMs, Ticker.systemTicker());
     this.hiveCatalog = catalog;
+    this.caseSensitive = caseSensitive;
+    this.metadataLocator = new MetadataLocator(catalog);
+    Configuration conf = catalog.getConf();
+    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) {
+      // Only keep a reference to the latest cache for testing purpose, so 
that tests can manipulate the catalog.
+      cacheRef = new SoftReference<>(this);
+    }
+    int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32);
+    int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000);
+    if (l1size > 0 && l1ttl > 0) {
+      l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, 
Long>() {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> 
eldest) {
+          return size() > l1CacheSize;
+        }
+      });
+      l1Ttl = l1ttl;
+      l1CacheSize = l1size;
+    } else {
+      l1Cache = Collections.emptyMap();
+      l1Ttl = 0;
+      l1CacheSize = 0;
+    }
+    registerJmx(catalog.name());
+  }
+
+  /**
+   * Registers this instance as a JMX MBean.
+   *
+   * @param catalogName the catalog name, used to build the {@link ObjectName}
+   */
+  private void registerJmx(String catalogName) {
+    try {
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      String sanitized = catalogName == null || catalogName.isEmpty()
+        ? "default"
+        : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_");
+      ObjectName name = new 
ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized);
+      if (mbs.isRegistered(name)) {
+        mbs.unregisterMBean(name);
+      }
+      mbs.registerMBean(this, name);
+      this.jmxObjectName = name;
+      LOG.info("Registered JMX MBean: {}", name);
+    } catch (JMException e) {
+      LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e);
+    }
+  }
+
+  /**
+   * Callback when cache invalidates the entry for a given table identifier.
+   *
+   * @param tid the table identifier to invalidate
+   */
+  protected void onCacheInvalidate(TableIdentifier tid) {
+    long count = cacheInvalidateCount.incrementAndGet();
+    LOG.debug("Cache invalidate {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheLoad(TableIdentifier tid) {
+    long count = cacheLoadCount.incrementAndGet();
+    LOG.debug("Cache load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache hit for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheHit(TableIdentifier tid) {
+    long count = cacheHitCount.incrementAndGet();
+    LOG.debug("Cache hit {} : {}", tid, count);
+  }
+
+  /**
+   * Callback when cache miss occurs for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMiss(TableIdentifier tid) {
+    long count = cacheMissCount.incrementAndGet();
+    LOG.debug("Cache miss {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when cache loads a metadata table for a given table identifier.
+   *
+   * @param tid the table identifier
+   */
+  protected void onCacheMetaLoad(TableIdentifier tid) {
+    long count = cacheMetaLoadCount.incrementAndGet();
+    LOG.debug("Cache meta-load {}: {}", tid, count);
+  }
+
+  /**
+   * Callback when an L1 cache hit occurs for a given table identifier.
+   * Only fired when the L2 cache also has the entry.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheHit(TableIdentifier tid) {
+    long count = l1CacheHitCount.incrementAndGet();
+    LOG.debug("L1 cache hit {}: {}", tid, count);
   }
 
+  /**
+   * Callback when an L1 cache miss occurs for a given table identifier.
+   * Only fired when the L2 cache has the entry but L1 is absent or expired.
+   *
+   * @param tid the table identifier
+   */
+  protected void onL1CacheMiss(TableIdentifier tid) {

Review Comment:
   ```suggestion
     private void onL1CacheMiss(TableIdentifier tid) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to