okumin commented on code in PR #6441: URL: https://github.com/apache/hive/pull/6441#discussion_r3226292817
########## standalone-metastore/metastore-rest-catalog/src/test/java/org/apache/iceberg/rest/TestHMSCachingCatalogStats.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.rest; + +import java.lang.management.ManagementFactory; +import java.util.Set; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.hadoop.hive.metastore.ServletSecurity.AuthType; +import org.apache.hadoop.hive.metastore.annotation.MetastoreCheckinTest; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.rest.extension.HiveRESTCatalogServerExtension; +import org.junit.experimental.categories.Category; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Integration tests that verify the {@link HMSCachingCatalog} cache-statistics counters + * (hit, miss, load, invalidate, l1-hit, l1-miss, and their rates) are updated correctly + * and exposed accurately via the JMX MBean registered under + * {@code org.apache.iceberg.rest:type=HMSCachingCatalog,name=*}. + * + * <p>The server is started with {@link AuthType#NONE} so the tests focus purely on + * caching behaviour without any authentication noise. + */ +@Category(MetastoreCheckinTest.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class TestHMSCachingCatalogStats { + + /** 5 minutes expressed in milliseconds – the value injected into {@code ICEBERG_CATALOG_CACHE_EXPIRY}. */ + private static final long CACHE_EXPIRY_MS = 5 * 60 * 1_000L; + + @RegisterExtension + private static final HiveRESTCatalogServerExtension REST_CATALOG_EXTENSION = HiveRESTCatalogServerExtension.builder(AuthType.NONE) + // Without a positive expiry the HMSCatalogFactory skips HMSCachingCatalog entirely. + .configure(MetastoreConf.ConfVars.ICEBERG_CATALOG_CACHE_EXPIRY.getVarname(), String.valueOf(CACHE_EXPIRY_MS)) + .configure("hive.in.test", "true").build(); + + private RESTCatalog catalog; + private HiveCatalog serverCatalog; + /** The server-side {@link HMSCachingCatalog} instance; used to invalidate entries directly. */ + private HMSCachingCatalog serverCachingCatalog; + /** The platform {@link MBeanServer} used for all JMX-based assertions. */ + private MBeanServer mbs; + /** Resolved once in {@link #setupAll()} and reused across every test. */ + private ObjectName jmxObjectName; + + @BeforeAll + void setupAll() throws Exception { + catalog = RCKUtils.initCatalogClient(java.util.Map.of("uri", REST_CATALOG_EXTENSION.getRestEndpoint())); + serverCachingCatalog = HMSCachingCatalog.getLatestCache(null); + Assertions.assertNotNull(serverCachingCatalog, "Expected HMSCachingCatalog to be initialized"); + serverCatalog = serverCachingCatalog.getCatalog(); + + // Resolve the JMX ObjectName registered by HMSCachingCatalog. We use a wildcard + // so the test is independent of the exact catalog name. + mbs = ManagementFactory.getPlatformMBeanServer(); + Set<ObjectName> names = mbs.queryNames( + new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,*"), null); + Assertions.assertFalse(names.isEmpty(), + "HMSCachingCatalog MBean must be registered in the platform MBeanServer"); + jmxObjectName = names.iterator().next(); + } + + /** Remove any namespace/table created by the test so each run starts clean. */ + @AfterEach + void cleanup() { + RCKUtils.purgeCatalogTestEntries(catalog); + } + + // --------------------------------------------------------------------------- + // helpers + // --------------------------------------------------------------------------- + + /** + * Reads a single JMX attribute from the {@link HMSCachingCatalogMXBean}. + * + * @param attribute the attribute name as declared in {@link HMSCachingCatalogMXBean} + * (e.g. {@code "CacheHitCount"}) + * @return the attribute value + */ + private Object getJmxAttribute(String attribute) throws Exception { + return mbs.getAttribute(jmxObjectName, attribute); + } + + /** + * Convenience wrapper that reads a {@code long} JMX attribute. + */ + private long jmxLong(String attribute) throws Exception { + return (long) getJmxAttribute(attribute); + } + + /** + * Convenience wrapper that reads a {@code double} JMX attribute. + */ + private double jmxDouble(String attribute) throws Exception { + return (double) getJmxAttribute(attribute); + } + + /** + * Invokes a void JMX operation on the {@link HMSCachingCatalogMXBean}. + * + * @param operationName the operation name (e.g. {@code "resetCacheStats"}) + */ + private void invokeJmxOperation(String operationName) throws Exception { + mbs.invoke(jmxObjectName, operationName, new Object[0], new String[0]); + } + + // --------------------------------------------------------------------------- + // tests + // --------------------------------------------------------------------------- + + /** + * Verifies that the {@link HMSCachingCatalog} correctly tracks cache hits, misses, + * loads, invalidations, L1 hits, and L1 misses via JMX. + * + * <p>Strategy: + * <ol> + * <li>Snapshot JMX baseline counters before any operations so the test is isolated + * from cumulative state left by previous tests.</li> + * <li>Create a namespace and a table.</li> + * <li>First {@code loadTable} call → cache miss + actual load.</li> + * <li>Second and third rapid {@code loadTable} calls → L1 cache hits (TTL still valid).</li> + * <li>Mutate the table to advance its metadata location in HMS.</li> + * <li>Wait for the L1 TTL to expire, then reload → L1 miss + invalidation + reload.</li> + * <li>Assert JMX counter deltas match expectations.</li> + * </ol> + */ + @Test + void testCacheCountersAreUpdated() throws Exception { + // -- JMX baseline ----------------------------------------------------------- + long baseHit = jmxLong("CacheHitCount"); + long baseMiss = jmxLong("CacheMissCount"); + long baseLoad = jmxLong("CacheLoadCount"); + long baseL1Hit = jmxLong("L1CacheHitCount"); + + // -- exercise the cache ----------------------------------------------------- + var db = Namespace.of("caching_stats_test_db"); + var tableId = TableIdentifier.of(db, "caching_stats_test_table"); + + catalog.createNamespace(db); + catalog.createTable(tableId, new Schema()); + + // First load → cache miss + load + catalog.loadTable(tableId); + // Second load → L1 hit (within TTL, HMS location check skipped) + catalog.loadTable(tableId); + // Third load → L1 hit + catalog.loadTable(tableId); + + // Mutate the table by appending a data file – this creates a new snapshot + // which advances METADATA_LOCATION in HMS, so the next loadTable call through + // the caching catalog will detect the stale cached location and invalidate it. + Table table = serverCatalog.loadTable(tableId); + DataFile dataFile = DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(table.location() + "/data/fake-0.parquet") + .withFileSizeInBytes(1024).withRecordCount(1).build(); + table.newAppend().appendFile(dataFile).commit(); + + long baseInvalidate = jmxLong("CacheInvalidateCount"); + // The L1 cache has a 3-second default TTL; wait for entries to expire. + Thread.sleep(3_000); + // Fourth load → L1 miss + cache invalidation + reload + catalog.loadTable(tableId); + + // -- JMX assertions --------------------------------------------------------- + long deltaHit = jmxLong("CacheHitCount") - baseHit; + long deltaMiss = jmxLong("CacheMissCount") - baseMiss; + long deltaLoad = jmxLong("CacheLoadCount") - baseLoad; + long deltaInvalidate = jmxLong("CacheInvalidateCount") - baseInvalidate; + long deltaL1Hit = jmxLong("L1CacheHitCount") - baseL1Hit; + long deltaL1Miss = jmxLong("L1CacheMissCount"); // absolute value is fine for L1 miss + + Assertions.assertTrue(deltaMiss >= 1, + "Expected at least 1 cache miss (first loadTable), but delta was: " + deltaMiss); + Assertions.assertTrue(deltaLoad >= 2, + "Expected at least 2 cache loads (initial + post-invalidation reload), but delta was: " + deltaLoad); + Assertions.assertTrue(deltaHit >= 2, + "Expected at least 2 cache hits (second + third loadTable), but delta was: " + deltaHit); + Assertions.assertTrue(deltaInvalidate >= 1, + "Expected at least 1 cache invalidation (metadata location changed), but delta was: " + deltaInvalidate); + + // L1 hits: the 2nd and 3rd loadTable calls should have been served by L1. + Assertions.assertTrue(deltaL1Hit >= 2, + "Expected at least 2 L1 cache hits (rapid successive loads within TTL), but delta was: " + deltaL1Hit); + // L1 miss: at least the fourth load (after TTL expiry) must have missed L1. + Assertions.assertTrue(deltaL1Miss >= 1, + "Expected at least 1 L1 cache miss (after TTL expiry), but was: " + deltaL1Miss); + + // Rate attributes must be valid ratios in [0.0, 1.0]. + double hitRate = jmxDouble("CacheHitRate"); + Assertions.assertTrue(hitRate > 0.0 && hitRate <= 1.0, + "CacheHitRate must be in (0.0, 1.0] but was: " + hitRate); + + double l1HitRate = jmxDouble("L1CacheHitRate"); + Assertions.assertTrue(l1HitRate > 0.0 && l1HitRate <= 1.0, + "L1CacheHitRate must be in (0.0, 1.0] but was: " + l1HitRate); + } + + /** + * Verifies that the {@code resetCacheStats} JMX operation zeroes all counters. + * + * <p>Strategy: + * <ol> + * <li>Perform some cache operations to ensure all counters are non-zero.</li> + * <li>Invoke {@code resetCacheStats()} via JMX.</li> + * <li>Assert that every JMX counter attribute reads {@code 0} / {@code 0.0}.</li> + * </ol> + */ + @Test + void testJmxResetCacheStats() throws Exception { + // -- warm up counters ------------------------------------------------------- + var db = Namespace.of("jmx_reset_test_db"); + var tableId = TableIdentifier.of(db, "jmx_reset_test_table"); + catalog.createNamespace(db); + catalog.createTable(tableId, new Schema()); + catalog.loadTable(tableId); // miss + load + catalog.loadTable(tableId); // hit (L1 hit on the fast path) + + // Sanity: at least one counter must be non-zero before the reset. + Assertions.assertTrue(jmxLong("CacheHitCount") + jmxLong("CacheMissCount") > 0, + "At least one counter must be non-zero before reset"); + + // -- invoke the reset operation via JMX ------------------------------------- + invokeJmxOperation("resetCacheStats"); + + // -- assertions post-reset -------------------------------------------------- + Assertions.assertEquals(0L, jmxLong("CacheHitCount"), "CacheHitCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("CacheMissCount"), "CacheMissCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("CacheLoadCount"), "CacheLoadCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("CacheInvalidateCount"), "CacheInvalidateCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("CacheMetaLoadCount"), "CacheMetaLoadCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("L1CacheHitCount"), "L1CacheHitCount must be 0 after reset"); + Assertions.assertEquals(0L, jmxLong("L1CacheMissCount"), "L1CacheMissCount must be 0 after reset"); + Assertions.assertEquals(0.0, jmxDouble("CacheHitRate"), 1e-9, "CacheHitRate must be 0.0 after reset"); + Assertions.assertEquals(0.0, jmxDouble("L1CacheHitRate"), 1e-9, "L1CacheHitRate must be 0.0 after reset"); + + // -- verify rate calculation still works correctly after reset -------------- + // resetCacheStats() zeroes counters but does NOT evict the L2/L1 cache, so the + // table is still cached. Invalidate it directly on the server-side HMSCachingCatalog + // so the first post-reset load is a genuine cold miss rather than an L1/L2 hit. + // NOTE: catalog.invalidateTable() only clears the REST *client* state and does not + // reach the server-side cache. + serverCachingCatalog.invalidateTable(tableId); Review Comment: I feel we should write this not as an integration test but as a unit test. If we do so, we can remove an escape hatch, i.e., `getLatestCache` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); Review Comment: Should we use `CATALOG_DEFAULT` in MetastoreConf? ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { Review Comment: ```suggestion private void onCacheInvalidate(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { Review Comment: ```suggestion private void onCacheLoad(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + long count = cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, count); + } + + /** + * Callback when an L1 cache hit occurs for a given table identifier. + * Only fired when the L2 cache also has the entry. + * + * @param tid the table identifier + */ + protected void onL1CacheHit(TableIdentifier tid) { + long count = l1CacheHitCount.incrementAndGet(); + LOG.debug("L1 cache hit {}: {}", tid, count); } + /** + * Callback when an L1 cache miss occurs for a given table identifier. + * Only fired when the L2 cache has the entry but L1 is absent or expired. + * + * @param tid the table identifier + */ + protected void onL1CacheMiss(TableIdentifier tid) { + long count = l1CacheMissCount.incrementAndGet(); + LOG.debug("L1 cache miss {}: {}", tid, count); + } + + // Getter methods for accessing metrics @Override - public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - return hiveCatalog.buildTable(identifier, schema); + public long getCacheHitCount() { + return cacheHitCount.get(); } @Override - public void createNamespace(Namespace nmspc, Map<String, String> map) { - hiveCatalog.createNamespace(nmspc, map); + public long getCacheMissCount() { + return cacheMissCount.get(); } @Override - public List<Namespace> listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.listNamespaces(nmspc); + public long getCacheLoadCount() { + return cacheLoadCount.get(); } @Override - public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.loadNamespaceMetadata(nmspc); + public long getCacheInvalidateCount() { + return cacheInvalidateCount.get(); } @Override - public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { - List<TableIdentifier> tables = listTables(nmspc); + public long getCacheMetaLoadCount() { + return cacheMetaLoadCount.get(); + } + + @Override + public double getCacheHitRate() { + long hits = cacheHitCount.get(); + long total = hits + cacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + @Override + public long getL1CacheHitCount() { + return l1CacheHitCount.get(); + } + + @Override + public long getL1CacheMissCount() { + return l1CacheMissCount.get(); + } + + @Override + public double getL1CacheHitRate() { + long hits = l1CacheHitCount.get(); + long total = hits + l1CacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + @Override + public void resetCacheStats() { + cacheHitCount.set(0); + cacheMissCount.set(0); + cacheLoadCount.set(0); + cacheInvalidateCount.set(0); + cacheMetaLoadCount.set(0); + l1CacheHitCount.set(0); + l1CacheMissCount.set(0); + LOG.debug("Cache stats reset"); + } + + @Override + public void close() { + unregisterJmx(); + } + + /** + * Unregisters this instance from the platform MBeanServer. + */ + private void unregisterJmx() { + if (jmxObjectName != null) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + if (mbs.isRegistered(jmxObjectName)) { + mbs.unregisterMBean(jmxObjectName); + LOG.info("Unregistered JMX MBean: {}", jmxObjectName); + } + } catch (JMException e) { + LOG.warn("Failed to unregister JMX MBean: {}", jmxObjectName, e); + } finally { + jmxObjectName = null; + } + } + } + + @Override + public void createNamespace(Namespace namespace, Map<String, String> map) { + hiveCatalog.createNamespace(namespace, map); + } + + @Override + public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.listNamespaces(namespace); + } + + /** + * Canonicalizes the given table identifier based on the case sensitivity of the underlying catalog. + * Copied from CachingCatalog that exposes it as private. + * @param tableIdentifier the table identifier to canonicalize + * @return the canonicalized table identifier + */ + private TableIdentifier canonicalizeIdentifier(TableIdentifier tableIdentifier) { + return this.caseSensitive ? tableIdentifier : tableIdentifier.toLowerCase(); + } + + @Override + public void invalidateTable(TableIdentifier ident) { + super.invalidateTable(ident); + l1Cache.remove(ident); + } + + @Override + public Table loadTable(final TableIdentifier identifier) { + final TableIdentifier canonicalized = canonicalizeIdentifier(identifier); + final Table cachedTable = tableCache.getIfPresent(canonicalized); + long now = System.currentTimeMillis(); + if (cachedTable != null) { + // Determine if L1 cache is valid based on the last cached time and the TTL. + // If the table is in L1 cache, we can skip the location check and return the cached table directly, + // which can significantly reduce the latency for repeated access to the same table. + Long lastCached = l1Cache.get(canonicalized); + if (lastCached != null) { + if (now - lastCached < l1Ttl) { + LOG.debug("Table {} is in L1 cache, returning cached table", canonicalized); + onL1CacheHit(canonicalized); + onCacheHit(canonicalized); + return cachedTable; Review Comment: Can we make L1 stuff out of scope from this PR? As CachingCatalog is designed for the client-side caching, there is no authorization when accessing a cached object. It means if Alice successfully loads `abc` table into the cache and then Bob, without access to `abc`, the security domain would be breached. It is OK because a user of a client-side CachingCatalog, e.g., SparkCatalog, is always Alice. But HMS is designed to accept multiple users' access. It requires additional tests and careful reviews. ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { Review Comment: ```suggestion private void onCacheMetaLoad(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + long count = cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, count); + } + + /** + * Callback when an L1 cache hit occurs for a given table identifier. + * Only fired when the L2 cache also has the entry. + * + * @param tid the table identifier + */ + protected void onL1CacheHit(TableIdentifier tid) { + long count = l1CacheHitCount.incrementAndGet(); + LOG.debug("L1 cache hit {}: {}", tid, count); } + /** + * Callback when an L1 cache miss occurs for a given table identifier. + * Only fired when the L2 cache has the entry but L1 is absent or expired. + * + * @param tid the table identifier + */ + protected void onL1CacheMiss(TableIdentifier tid) { + long count = l1CacheMissCount.incrementAndGet(); + LOG.debug("L1 cache miss {}: {}", tid, count); + } + + // Getter methods for accessing metrics @Override - public Catalog.TableBuilder buildTable(TableIdentifier identifier, Schema schema) { - return hiveCatalog.buildTable(identifier, schema); + public long getCacheHitCount() { + return cacheHitCount.get(); } @Override - public void createNamespace(Namespace nmspc, Map<String, String> map) { - hiveCatalog.createNamespace(nmspc, map); + public long getCacheMissCount() { + return cacheMissCount.get(); } @Override - public List<Namespace> listNamespaces(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.listNamespaces(nmspc); + public long getCacheLoadCount() { + return cacheLoadCount.get(); } @Override - public Map<String, String> loadNamespaceMetadata(Namespace nmspc) throws NoSuchNamespaceException { - return hiveCatalog.loadNamespaceMetadata(nmspc); + public long getCacheInvalidateCount() { + return cacheInvalidateCount.get(); } @Override - public boolean dropNamespace(Namespace nmspc) throws NamespaceNotEmptyException { - List<TableIdentifier> tables = listTables(nmspc); + public long getCacheMetaLoadCount() { + return cacheMetaLoadCount.get(); + } + + @Override + public double getCacheHitRate() { + long hits = cacheHitCount.get(); + long total = hits + cacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + @Override + public long getL1CacheHitCount() { + return l1CacheHitCount.get(); + } + + @Override + public long getL1CacheMissCount() { + return l1CacheMissCount.get(); + } + + @Override + public double getL1CacheHitRate() { + long hits = l1CacheHitCount.get(); + long total = hits + l1CacheMissCount.get(); + return total == 0 ? 0.0 : (double) hits / total; + } + + @Override + public void resetCacheStats() { + cacheHitCount.set(0); + cacheMissCount.set(0); + cacheLoadCount.set(0); + cacheInvalidateCount.set(0); + cacheMetaLoadCount.set(0); + l1CacheHitCount.set(0); + l1CacheMissCount.set(0); + LOG.debug("Cache stats reset"); + } + + @Override + public void close() { + unregisterJmx(); + } + + /** + * Unregisters this instance from the platform MBeanServer. + */ + private void unregisterJmx() { + if (jmxObjectName != null) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + if (mbs.isRegistered(jmxObjectName)) { + mbs.unregisterMBean(jmxObjectName); + LOG.info("Unregistered JMX MBean: {}", jmxObjectName); + } + } catch (JMException e) { + LOG.warn("Failed to unregister JMX MBean: {}", jmxObjectName, e); + } finally { + jmxObjectName = null; + } + } + } + + @Override + public void createNamespace(Namespace namespace, Map<String, String> map) { + hiveCatalog.createNamespace(namespace, map); + } + + @Override + public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException { + return hiveCatalog.listNamespaces(namespace); + } + + /** + * Canonicalizes the given table identifier based on the case sensitivity of the underlying catalog. + * Copied from CachingCatalog that exposes it as private. + * @param tableIdentifier the table identifier to canonicalize + * @return the canonicalized table identifier + */ + private TableIdentifier canonicalizeIdentifier(TableIdentifier tableIdentifier) { + return this.caseSensitive ? tableIdentifier : tableIdentifier.toLowerCase(); + } + + @Override + public void invalidateTable(TableIdentifier ident) { + super.invalidateTable(ident); + l1Cache.remove(ident); + } + + @Override + public Table loadTable(final TableIdentifier identifier) { + final TableIdentifier canonicalized = canonicalizeIdentifier(identifier); + final Table cachedTable = tableCache.getIfPresent(canonicalized); + long now = System.currentTimeMillis(); + if (cachedTable != null) { + // Determine if L1 cache is valid based on the last cached time and the TTL. + // If the table is in L1 cache, we can skip the location check and return the cached table directly, + // which can significantly reduce the latency for repeated access to the same table. + Long lastCached = l1Cache.get(canonicalized); + if (lastCached != null) { + if (now - lastCached < l1Ttl) { + LOG.debug("Table {} is in L1 cache, returning cached table", canonicalized); + onL1CacheHit(canonicalized); + onCacheHit(canonicalized); + return cachedTable; + } else { + l1Cache.remove(canonicalized); + onL1CacheMiss(canonicalized); + } + } else { + onL1CacheMiss(canonicalized); + } + // If the table is no longer in L1 cache, we need to check the location. + final String location = metadataLocator.getLocation(canonicalized); + if (location == null) { + LOG.debug("Table {} has no location, returning cached table without location", canonicalized); Review Comment: I think this case needs to throw NoSuchTableException because it does mean the table metadata is deleted, which means manifest lists are highly likely to be deleted or already invalid ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); Review Comment: This is probably eligible for `error` ```suggestion LOG.error("Failed to register JMX MBean for HMSCachingCatalog", e); ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); Review Comment: LongAdder could be an option ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { Review Comment: ```suggestion private void onCacheHit(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + long count = cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, count); + } + + /** + * Callback when an L1 cache hit occurs for a given table identifier. + * Only fired when the L2 cache also has the entry. + * + * @param tid the table identifier + */ + protected void onL1CacheHit(TableIdentifier tid) { Review Comment: ```suggestion private void onL1CacheHit(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { Review Comment: Can we use this constructor in the near future? If no, we may always use the default sensitivity. ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { Review Comment: ```suggestion private void onCacheMiss(TableIdentifier tid) { ``` ########## standalone-metastore/metastore-rest-catalog/src/main/java/org/apache/iceberg/rest/HMSCachingCatalog.java: ########## @@ -33,65 +51,423 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.MetadataLocator; import org.apache.iceberg.view.View; import org.apache.iceberg.view.ViewBuilder; +import org.jetbrains.annotations.TestOnly; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.github.benmanes.caffeine.cache.Ticker; /** * Class that wraps an Iceberg Catalog to cache tables. */ -public class HMSCachingCatalog extends CachingCatalog implements SupportsNamespaces, ViewCatalog { +public class HMSCachingCatalog extends CachingCatalog + implements SupportsNamespaces, ViewCatalog, HMSCachingCatalogMXBean, Closeable { + protected static final Logger LOG = LoggerFactory.getLogger(HMSCachingCatalog.class); + + @TestOnly + private static SoftReference<HMSCachingCatalog> cacheRef = new SoftReference<>(null); + + @TestOnly + @SuppressWarnings("unchecked") + public static <C extends Catalog> C getLatestCache(Function<HMSCachingCatalog, C> extractor) { + HMSCachingCatalog cache = cacheRef.get(); + if (cache == null) { + return null; + } + return extractor == null ? (C) cache : extractor.apply(cache); + } + + @TestOnly + public HiveCatalog getCatalog() { + return hiveCatalog; + } + + // The underlying HiveCatalog instance. private final HiveCatalog hiveCatalog; - - public HMSCachingCatalog(HiveCatalog catalog, long expiration) { - super(catalog, true, expiration, Ticker.systemTicker()); + // Duplicate because CachingCatalog doesn't expose the case sensitivity of the underlying catalog, + // which is needed for canonicalizing identifiers before caching. + private final boolean caseSensitive; + // The locator. + private final MetadataLocator metadataLocator; + // An L1 small latency cache. + // This is used to cache the last cached time for each table identifier, + // so that we can skip location check for repeated access to the same table within a short period of time, + // which can significantly reduce the latency for repeated access to the same table. + private final Map<TableIdentifier, Long> l1Cache; + // The TTL for L1 cache (3s). + private final int l1Ttl; + // The L1 cache size. + private final int l1CacheSize; + + // Metrics counters. + private final AtomicLong cacheHitCount = new AtomicLong(0); + private final AtomicLong cacheMissCount = new AtomicLong(0); + private final AtomicLong cacheLoadCount = new AtomicLong(0); + private final AtomicLong cacheInvalidateCount = new AtomicLong(0); + private final AtomicLong cacheMetaLoadCount = new AtomicLong(0); + // L1 cache metrics: counted only when the L2 (Caffeine) cache already has the entry. + private final AtomicLong l1CacheHitCount = new AtomicLong(0); + private final AtomicLong l1CacheMissCount = new AtomicLong(0); + + // JMX ObjectName under which this instance is registered (may be null if registration failed). + private ObjectName jmxObjectName; + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs) { + this(catalog, expirationMs, /*caseSensitive*/ true); + } + + public HMSCachingCatalog(HiveCatalog catalog, long expirationMs, boolean caseSensitive) { + super(catalog, caseSensitive, expirationMs, Ticker.systemTicker()); this.hiveCatalog = catalog; + this.caseSensitive = caseSensitive; + this.metadataLocator = new MetadataLocator(catalog); + Configuration conf = catalog.getConf(); + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_IN_TEST)) { + // Only keep a reference to the latest cache for testing purpose, so that tests can manipulate the catalog. + cacheRef = new SoftReference<>(this); + } + int l1size = conf.getInt("hms.caching.catalog.l1.cache.size", 32); + int l1ttl = conf.getInt("hms.caching.catalog.l1.cache.ttl", 3_000); + if (l1size > 0 && l1ttl > 0) { + l1Cache = Collections.synchronizedMap(new LinkedHashMap<TableIdentifier, Long>() { + @Override + protected boolean removeEldestEntry(Map.Entry<TableIdentifier, Long> eldest) { + return size() > l1CacheSize; + } + }); + l1Ttl = l1ttl; + l1CacheSize = l1size; + } else { + l1Cache = Collections.emptyMap(); + l1Ttl = 0; + l1CacheSize = 0; + } + registerJmx(catalog.name()); + } + + /** + * Registers this instance as a JMX MBean. + * + * @param catalogName the catalog name, used to build the {@link ObjectName} + */ + private void registerJmx(String catalogName) { + try { + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + String sanitized = catalogName == null || catalogName.isEmpty() + ? "default" + : catalogName.replaceAll("[^a-zA-Z0-9.\\\\-]", "_"); + ObjectName name = new ObjectName("org.apache.iceberg.rest:type=HMSCachingCatalog,name=" + sanitized); + if (mbs.isRegistered(name)) { + mbs.unregisterMBean(name); + } + mbs.registerMBean(this, name); + this.jmxObjectName = name; + LOG.info("Registered JMX MBean: {}", name); + } catch (JMException e) { + LOG.warn("Failed to register JMX MBean for HMSCachingCatalog", e); + } + } + + /** + * Callback when cache invalidates the entry for a given table identifier. + * + * @param tid the table identifier to invalidate + */ + protected void onCacheInvalidate(TableIdentifier tid) { + long count = cacheInvalidateCount.incrementAndGet(); + LOG.debug("Cache invalidate {}: {}", tid, count); + } + + /** + * Callback when cache loads a table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheLoad(TableIdentifier tid) { + long count = cacheLoadCount.incrementAndGet(); + LOG.debug("Cache load {}: {}", tid, count); + } + + /** + * Callback when cache hit for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheHit(TableIdentifier tid) { + long count = cacheHitCount.incrementAndGet(); + LOG.debug("Cache hit {} : {}", tid, count); + } + + /** + * Callback when cache miss occurs for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMiss(TableIdentifier tid) { + long count = cacheMissCount.incrementAndGet(); + LOG.debug("Cache miss {}: {}", tid, count); + } + + /** + * Callback when cache loads a metadata table for a given table identifier. + * + * @param tid the table identifier + */ + protected void onCacheMetaLoad(TableIdentifier tid) { + long count = cacheMetaLoadCount.incrementAndGet(); + LOG.debug("Cache meta-load {}: {}", tid, count); + } + + /** + * Callback when an L1 cache hit occurs for a given table identifier. + * Only fired when the L2 cache also has the entry. + * + * @param tid the table identifier + */ + protected void onL1CacheHit(TableIdentifier tid) { + long count = l1CacheHitCount.incrementAndGet(); + LOG.debug("L1 cache hit {}: {}", tid, count); } + /** + * Callback when an L1 cache miss occurs for a given table identifier. + * Only fired when the L2 cache has the entry but L1 is absent or expired. + * + * @param tid the table identifier + */ + protected void onL1CacheMiss(TableIdentifier tid) { Review Comment: ```suggestion private void onL1CacheMiss(TableIdentifier tid) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
