This is an automated email from the ASF dual-hosted git repository.

taklwu pushed a commit to branch HBASE-30018
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 863c2107d801dcfb151bf46f427c14aa5e0e33f2
Author: Vladimir Rodionov <[email protected]>
AuthorDate: Thu May 14 15:27:25 2026 -0700

    HBASE-30021 Introduce cache access service API (#8231)
    
    Signed-off-by: Tak Lon (Stephen) Wu <[email protected]>
    Reviewed by: Kota-SH <[email protected]>
---
 .../cache/BlockCacheBackedCacheAccessService.java  | 300 +++++++++++++
 .../hbase/io/hfile/cache/CacheAccessService.java   | 423 ++++++++++++++++++
 .../hbase/io/hfile/cache/CacheAccessServices.java  |  89 ++++
 .../hbase/io/hfile/cache/CacheRequestContext.java  |   7 +-
 .../io/hfile/cache/NoOpCacheAccessService.java     | 286 ++++++++++++
 .../cache/TopologyBackedCacheAccessService.java    | 477 +++++++++++++++++++++
 .../TestBlockCacheBackedCacheAccessService.java    | 221 ++++++++++
 .../TestTopologyBackedCacheAccessService.java      | 474 ++++++++++++++++++++
 8 files changed, 2274 insertions(+), 3 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/BlockCacheBackedCacheAccessService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/BlockCacheBackedCacheAccessService.java
new file mode 100644
index 00000000000..4b3d0ef6566
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/BlockCacheBackedCacheAccessService.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * {@link CacheAccessService} implementation backed by an existing {@link 
BlockCache} instance.
+ * <p>
+ * This adapter is the compatibility bridge for the first migration step. It 
allows new callers to
+ * depend on {@link CacheAccessService} while the runtime implementation still 
uses the current
+ * {@link BlockCache} hierarchy, including LruBlockCache, BucketCache, 
CombinedBlockCache, and other
+ * existing implementations.
+ * </p>
+ * <p>
+ * The adapter should not introduce new policy, placement, admission, 
representation, promotion, or
+ * topology behavior. Its purpose is to translate the new context-based 
service API into the current
+ * {@code BlockCache} API with no intentional behavior change.
+ * </p>
+ * <p>
+ * A future topology-backed service can replace this adapter after call sites 
have migrated to
+ * {@link CacheAccessService}. That future implementation may use {@link 
CacheTopology},
+ * {@link CachePlacementAdmissionPolicy}, and {@link CacheEngine}; this 
adapter deliberately does
+ * not.
+ * </p>
+ */
[email protected]
+public class BlockCacheBackedCacheAccessService implements CacheAccessService {
+
+  private final BlockCache blockCache;
+
+  /**
+   * Creates a cache access service backed by the supplied legacy block cache.
+   * @param blockCache block cache to wrap
+   */
+  public BlockCacheBackedCacheAccessService(BlockCache blockCache) {
+    this.blockCache = Objects.requireNonNull(blockCache, "blockCache must not 
be null");
+  }
+
+  /**
+   * Returns the wrapped {@link BlockCache} instance.
+   * <p>
+   * This accessor is intended for tests and transitional wiring only. New 
read/write path code
+   * should use {@link CacheAccessService} methods instead of unwrapping the 
legacy cache.
+   * </p>
+   * @return wrapped block cache
+   */
+  public BlockCache getBlockCache() {
+    return blockCache;
+  }
+
+  /**
+   * Returns a human-readable service name.
+   * @return service name
+   */
+  @Override
+  public String getName() {
+    return blockCache.getClass().getSimpleName();
+  }
+
+  /**
+   * Fetches a block by delegating to the wrapped {@link BlockCache}.
+   * @param cacheKey block to fetch
+   * @param context  cache request context
+   * @return cached block, or {@code null} if not present
+   */
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, CacheRequestContext 
context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    Optional<BlockType> blockType = context.getBlockType();
+    if (blockType.isPresent()) {
+      return blockCache.getBlock(cacheKey, context.isCaching(), 
context.isRepeat(),
+        context.isUpdateCacheMetrics(), blockType.get());
+    }
+    return blockCache.getBlock(cacheKey, context.isCaching(), 
context.isRepeat(),
+      context.isUpdateCacheMetrics());
+  }
+
+  /**
+   * Caches a block by delegating to the wrapped {@link BlockCache}.
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param context  cache write context
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable block, 
CacheWriteContext context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(block, "block must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    blockCache.cacheBlock(cacheKey, block, context.isInMemory(), 
context.isWaitWhenCache());
+  }
+
+  /**
+   * Evicts a single block by delegating to the wrapped {@link BlockCache}.
+   * @param cacheKey block to remove
+   * @return {@code true} if the block existed and was removed, {@code false} 
otherwise
+   */
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    return blockCache.evictBlock(cacheKey);
+  }
+
+  /**
+   * Evicts all cached blocks for the given HFile by delegating to the wrapped 
{@link BlockCache}.
+   * @param hfileName HFile name
+   * @return number of blocks removed
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+    return blockCache.evictBlocksByHfileName(hfileName);
+  }
+
+  /**
+   * Evicts cached blocks for an HFile range if the wrapped cache supports it.
+   * @param hfileName  HFile name
+   * @param initOffset inclusive start offset
+   * @param endOffset  inclusive end offset
+   * @return number of blocks removed
+   */
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+    return blockCache.evictBlocksRangeByHfileName(hfileName, initOffset, 
endOffset);
+  }
+
+  /**
+   * Evicts cached blocks for a region if the wrapped cache supports it.
+   * @param regionName region name
+   * @return number of blocks removed
+   */
+  @Override
+  public int evictBlocksByRegionName(String regionName) {
+    Objects.requireNonNull(regionName, "regionName must not be null");
+    // BlockCache does not support region-based eviction, so we return 0 to 
indicate no
+    // blocks removed
+    return 0;
+  }
+
+  /**
+   * Returns statistics from the wrapped {@link BlockCache}.
+   * @return cache statistics
+   */
+  @Override
+  public CacheStats getStats() {
+    return blockCache.getStats();
+  }
+
+  /**
+   * Shuts down the wrapped {@link BlockCache}.
+   */
+  @Override
+  public void shutdown() {
+    blockCache.shutdown();
+  }
+
+  /**
+   * Returns maximum configured cache size from the wrapped {@link BlockCache}.
+   * @return maximum cache size
+   */
+  @Override
+  public long getMaxSize() {
+    return blockCache.getMaxSize();
+  }
+
+  /**
+   * Returns free cache size from the wrapped {@link BlockCache}.
+   * @return free size
+   */
+  @Override
+  public long getFreeSize() {
+    return blockCache.getFreeSize();
+  }
+
+  /**
+   * Returns occupied cache size from the wrapped {@link BlockCache}.
+   * @return occupied cache size
+   */
+  @Override
+  public long size() {
+    return blockCache.size();
+  }
+
+  /**
+   * Returns occupied data-block size from the wrapped {@link BlockCache}.
+   * @return occupied data-block size
+   */
+  @Override
+  public long getCurrentDataSize() {
+    return blockCache.getCurrentDataSize();
+  }
+
+  /**
+   * Returns cached block count from the wrapped {@link BlockCache}.
+   * @return total block count
+   */
+  @Override
+  public long getBlockCount() {
+    return blockCache.getBlockCount();
+  }
+
+  /**
+   * Returns cached data block count from the wrapped {@link BlockCache}.
+   * @return data block count
+   */
+  @Override
+  public long getDataBlockCount() {
+    return blockCache.getDataBlockCount();
+  }
+
+  /**
+   * Delegates block fit check to the wrapped {@link BlockCache}.
+   * @param block block to check
+   * @return empty if unsupported; otherwise whether the block fits
+   */
+  @Override
+  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+    Objects.requireNonNull(block, "block must not be null");
+    return blockCache.blockFitsIntoTheCache(block);
+  }
+
+  /**
+   * Delegates already-cached check to the wrapped {@link BlockCache}.
+   * @param key block cache key
+   * @return empty if unsupported; otherwise whether the block is cached
+   */
+  @Override
+  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+    return blockCache.isAlreadyCached(key);
+  }
+
+  /**
+   * Delegates per-block size lookup to the wrapped {@link BlockCache}.
+   * @param key block cache key
+   * @return empty if unsupported or not present; otherwise cached block size
+   */
+  @Override
+  public Optional<Integer> getBlockSize(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+    return blockCache.getBlockSize(key);
+  }
+
+  /**
+   * Returns whether the wrapped {@link BlockCache} is enabled.
+   * @return {@code true} if enabled, {@code false} otherwise
+   */
+  @Override
+  public boolean isCacheEnabled() {
+    return blockCache.isCacheEnabled();
+  }
+
+  /**
+   * Waits for wrapped cache initialization if supported.
+   * @param timeout maximum time to wait
+   * @return {@code true} if the cache is enabled and ready, {@code false} 
otherwise
+   */
+  @Override
+  public boolean waitForCacheInitialization(long timeout) {
+    return blockCache.waitForCacheInitialization(timeout);
+  }
+
+  /**
+   * Propagates configuration changes to the wrapped {@link BlockCache}.
+   * @param config new configuration
+   */
+  @Override
+  public void onConfigurationChange(Configuration config) {
+    Objects.requireNonNull(config, "config must not be null");
+    blockCache.onConfigurationChange(config);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessService.java
new file mode 100644
index 00000000000..ab258eabc14
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessService.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * HBase-facing access layer for block cache operations.
+ * <p>
+ * {@code CacheAccessService} is the cache abstraction intended for HBase read 
and write path
+ * callers such as HFile readers, HFile writers, prefetch code, and file 
lifecycle code. It
+ * preserves the core operational shape of the existing {@code BlockCache} API 
so callers can be
+ * migrated incrementally, but it removes methods that are specific to a 
concrete cache engine,
+ * tiered topology implementation, or diagnostics-only use case.
+ * </p>
+ * <p>
+ * This interface is deliberately close to the hot-path subset of {@code 
BlockCache}:
+ * </p>
+ * <ul>
+ * <li>lookup a block</li>
+ * <li>cache a block</li>
+ * <li>explicitly evict or invalidate blocks</li>
+ * <li>report aggregate statistics and sizing information</li>
+ * <li>manage cache lifecycle</li>
+ * </ul>
+ * <p>
+ * The important difference from {@link CacheEngine} is the abstraction level. 
A {@code CacheEngine}
+ * is a storage back-end implemented by a concrete cache such as 
LruBlockCache, BucketCache, or a
+ * future CarrotCache engine. {@code CacheAccessService} is the HBase-facing 
facade above topology
+ * and engines. It receives request intent through {@link CacheRequestContext} 
and insertion intent
+ * through {@link CacheWriteContext}. A service implementation may initially 
delegate to an existing
+ * {@code BlockCache}; a later implementation may delegate to {@link 
CacheTopology},
+ * {@link CachePlacementAdmissionPolicy}, and one or more {@link CacheEngine} 
instances.
+ * </p>
+ * <p>
+ * Normal HBase read and write path code should depend on this interface 
rather than accessing
+ * {@link CacheTopology} or {@link CacheEngine} directly. Direct access to 
topology or engines is
+ * reserved for cache assembly, topology execution, policy code, diagnostics, 
and tests. This keeps
+ * placement, admission, representation, promotion, and engine-specific 
behavior outside of HFile
+ * read/write logic.
+ * </p>
+ * <p>
+ * This interface intentionally does not expose methods such as victim-cache 
wiring, backing-map
+ * inspection, per-engine iterators, file fully-cached diagnostics, or 
BucketCache-specific helper
+ * methods. Those concerns belong in lower-level engine APIs, topology views, 
metrics, or admin
+ * tooling rather than the main read/write path abstraction.
+ * </p>
+ */
[email protected]
+public interface CacheAccessService {
+
+  /**
+   * Returns a human-readable name for this cache access service instance.
+   * <p>
+   * The name is intended for logging, metrics, diagnostics, and configuration 
reporting. For a
+   * legacy adapter this may be derived from the wrapped cache. For a 
topology-backed service it may
+   * identify the configured topology or service implementation.
+   * </p>
+   * @return service name
+   */
+  String getName();
+
+  /**
+   * Fetches a block from the cache.
+   * <p>
+   * This is the main read-path lookup method. The supplied {@link 
CacheRequestContext} carries
+   * caller intent that used to be passed as individual boolean arguments to 
{@code BlockCache},
+   * such as whether caching is enabled for this request, whether this lookup 
is a repeated lookup
+   * for the same logical request, and whether cache metrics should be updated.
+   * </p>
+   * <p>
+   * The service is responsible for preserving the current HBase lookup 
semantics. A legacy
+   * implementation may translate this call directly to {@code 
BlockCache#getBlock}. A future
+   * topology-backed implementation may route the lookup through {@link 
CacheTopology}, apply
+   * hit-driven promotion decisions, and aggregate metrics across engines.
+   * </p>
+   * @param cacheKey block to fetch
+   * @param context  cache request context
+   * @return cached block, or {@code null} if not present
+   */
+  Cacheable getBlock(BlockCacheKey cacheKey, CacheRequestContext context);
+
+  /**
+   * Fetches a block from the cache using explicit lookup flags.
+   * <p>
+   * This compatibility helper mirrors the current {@code BlockCache#getBlock} 
call shape and builds
+   * a {@link CacheRequestContext} before delegating to
+   * {@link #getBlock(BlockCacheKey, CacheRequestContext)}. New call sites 
should prefer the context
+   * based method directly.
+   * </p>
+   * @param cacheKey           block to fetch
+   * @param caching            whether caching is enabled for the request; 
used for metrics
+   * @param repeat             whether this is a repeated lookup for the same 
logical request
+   * @param updateCacheMetrics whether cache metrics should be updated
+   * @return cached block, or {@code null} if not present
+   */
+  default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean 
repeat,
+    boolean updateCacheMetrics) {
+    CacheRequestContext context = 
CacheRequestContext.newBuilder().setCaching(caching)
+      .setRepeat(repeat).setUpdateCacheMetrics(updateCacheMetrics).build();
+    return getBlock(cacheKey, context);
+  }
+
+  /**
+   * Fetches a block from the cache using explicit lookup flags and a block 
type hint.
+   * <p>
+   * This compatibility helper mirrors the current {@code BlockCache#getBlock} 
overload that accepts
+   * a {@link BlockType}. The block type is a lookup hint and must not be 
treated as part of the
+   * cache key.
+   * </p>
+   * @param cacheKey           block to fetch
+   * @param caching            whether caching is enabled for the request; 
used for metrics
+   * @param repeat             whether this is a repeated lookup for the same 
logical request
+   * @param updateCacheMetrics whether cache metrics should be updated
+   * @param blockType          optional block type hint
+   * @return cached block, or {@code null} if not present
+   */
+  default Cacheable getBlock(BlockCacheKey cacheKey, boolean caching, boolean 
repeat,
+    boolean updateCacheMetrics, BlockType blockType) {
+    CacheRequestContext context = 
CacheRequestContext.newBuilder().setCaching(caching)
+      
.setRepeat(repeat).setUpdateCacheMetrics(updateCacheMetrics).setBlockType(blockType).build();
+    return getBlock(cacheKey, context);
+  }
+
+  /**
+   * Adds a block to the cache.
+   * <p>
+   * This is the main write-side cache population method. The supplied {@link 
CacheWriteContext}
+   * identifies the source and intent of the insertion, for example read-miss 
population, flush
+   * output, compaction output, prefetch, or promotion. Existing {@code 
CacheConfig} logic should
+   * decide whether cache population should be attempted before this method is 
called.
+   * </p>
+   * <p>
+   * A legacy implementation may translate this call directly to {@code 
BlockCache#cacheBlock}. A
+   * future topology-backed implementation may first ask {@link 
CachePlacementAdmissionPolicy} for
+   * admission, tier placement, and representation decisions, then execute the 
selected operation
+   * through {@link CacheTopology} and its engines.
+   * </p>
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param context  cache write context
+   */
+  void cacheBlock(BlockCacheKey cacheKey, Cacheable block, CacheWriteContext 
context);
+
+  /**
+   * Adds a block to the cache using explicit insertion flags.
+   * <p>
+   * This compatibility helper mirrors the current {@code 
BlockCache#cacheBlock} call shape and
+   * builds a {@link CacheWriteContext} before delegating to
+   * {@link #cacheBlock(BlockCacheKey, Cacheable, CacheWriteContext)}. New 
call sites should prefer
+   * the context based method directly.
+   * </p>
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param inMemory whether the block should be treated as in-memory
+   */
+  default void cacheBlock(BlockCacheKey cacheKey, Cacheable block, boolean 
inMemory) {
+    CacheWriteContext context = 
CacheWriteContext.newBuilder().setInMemory(inMemory).build();
+    cacheBlock(cacheKey, block, context);
+  }
+
+  /**
+   * Adds a block to the cache using explicit insertion flags.
+   * <p>
+   * This compatibility helper mirrors the current {@code 
BlockCache#cacheBlock} overload that
+   * carries {@code waitWhenCache}. The flag is useful for asynchronous cache 
backends such as
+   * BucketCache. New call sites should prefer the context based method 
directly.
+   * </p>
+   * @param cacheKey      block cache key
+   * @param block         block contents
+   * @param inMemory      whether the block should be treated as in-memory
+   * @param waitWhenCache whether to wait for the cache operation to be 
accepted/flushed
+   */
+  default void cacheBlock(BlockCacheKey cacheKey, Cacheable block, boolean 
inMemory,
+    boolean waitWhenCache) {
+    CacheWriteContext context =
+      
CacheWriteContext.newBuilder().setInMemory(inMemory).setWaitWhenCache(waitWhenCache).build();
+    cacheBlock(cacheKey, block, context);
+  }
+
+  /**
+   * Adds a block to the cache, defaulting to non in-memory treatment.
+   * <p>
+   * This compatibility helper mirrors {@code 
BlockCache#cacheBlock(BlockCacheKey, Cacheable)}.
+   * </p>
+   * @param cacheKey block cache key
+   * @param block    block contents
+   */
+  default void cacheBlock(BlockCacheKey cacheKey, Cacheable block) {
+    cacheBlock(cacheKey, block, CacheWriteContext.newBuilder().build());
+  }
+
+  /**
+   * Explicitly removes a single block from the cache.
+   * <p>
+   * This method represents caller-requested invalidation/removal. It is not 
the normal replacement
+   * path used by a cache implementation under capacity pressure. A service 
implementation should
+   * propagate this operation to the appropriate topology or underlying cache 
implementation.
+   * </p>
+   * @param cacheKey block to remove
+   * @return {@code true} if the block existed and was removed, {@code false} 
otherwise
+   */
+  boolean evictBlock(BlockCacheKey cacheKey);
+
+  /**
+   * Explicitly removes all cached blocks for the given HFile.
+   * <p>
+   * This method is used for file-scoped invalidation during HFile lifecycle 
events. The method name
+   * intentionally follows the existing {@code BlockCache} API for migration 
compatibility, although
+   * the operation is better understood as explicit invalidation rather than 
replacement eviction.
+   * </p>
+   * @param hfileName HFile name
+   * @return number of blocks removed
+   */
+  int evictBlocksByHfileName(String hfileName);
+
+  /**
+   * Explicitly removes cached blocks for the given HFile within the specified 
offset range.
+   * <p>
+   * This optional method mirrors the storage-oriented range invalidation 
support in
+   * {@link CacheEngine}. Implementations that cannot perform efficient range 
invalidation may
+   * return {@code 0}. Callers must not rely on this method for correctness 
unless the specific
+   * service implementation documents support for it.
+   * </p>
+   * @param hfileName  HFile name
+   * @param initOffset inclusive start offset
+   * @param endOffset  inclusive end offset
+   * @return number of blocks removed
+   */
+  default int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    return 0;
+  }
+
+  /**
+   * Explicitly removes cached blocks associated with the specified region.
+   * <p>
+   * This optional method supports region-scoped invalidation without 
requiring callers to enumerate
+   * files first. Implementations that cannot perform efficient region 
invalidation may return
+   * {@code 0}. Callers must not rely on this method for correctness unless 
the specific service
+   * implementation documents support for it.
+   * </p>
+   * @param regionName region name
+   * @return number of blocks removed
+   */
+  default int evictBlocksByRegionName(String regionName) {
+    return 0;
+  }
+
+  /**
+   * Returns aggregate cache statistics visible at the service level.
+   * <p>
+   * For a legacy adapter this may be the statistics object from the wrapped 
cache. For a
+   * topology-backed implementation this should represent aggregate statistics 
across all engines in
+   * the topology, preserving the semantics expected by existing HBase metrics 
and callers.
+   * </p>
+   * @return cache statistics
+   */
+  CacheStats getStats();
+
+  /**
+   * Shuts down the cache access service and releases owned resources.
+   * <p>
+   * A service implementation should also shut down any underlying topology, 
engines, or wrapped
+   * legacy cache it owns. If the service does not own the underlying cache 
lifecycle, the
+   * implementation should document that behavior.
+   * </p>
+   */
+  void shutdown();
+
+  /**
+   * Returns the maximum configured cache size visible through this service, 
in bytes.
+   * <p>
+   * For a topology-backed service this should normally be the aggregate 
configured capacity of the
+   * participating engines, unless the topology has a different externally 
visible capacity model.
+   * </p>
+   * @return maximum cache size
+   */
+  long getMaxSize();
+
+  /**
+   * Returns the amount of free cache space visible through this service, in 
bytes.
+   * <p>
+   * Free space may not always be exactly {@code getMaxSize() - size()} 
because engines may have
+   * fragmentation, reserved capacity, asynchronous insertion queues, or 
topology-specific capacity
+   * rules.
+   * </p>
+   * @return free size
+   */
+  long getFreeSize();
+
+  /**
+   * Returns the currently occupied cache size visible through this service, 
in bytes.
+   * <p>
+   * The method name follows the existing {@code BlockCache} and {@link 
CacheEngine} shape for
+   * migration compatibility.
+   * </p>
+   * @return occupied cache size
+   */
+  long size();
+
+  /**
+   * Returns the currently occupied size of cached data blocks, in bytes.
+   * <p>
+   * This is an aggregate service-level value. Implementations that cannot 
distinguish data-block
+   * occupancy should return the best value available from the underlying 
cache implementation.
+   * </p>
+   * @return occupied data-block size
+   */
+  long getCurrentDataSize();
+
+  /**
+   * Returns the total number of cached blocks visible through this service.
+   * @return total block count
+   */
+  long getBlockCount();
+
+  /**
+   * Returns the number of cached data blocks visible through this service.
+   * @return data block count
+   */
+  long getDataBlockCount();
+
+  /**
+   * Checks whether the given block can fit into the cache represented by this 
service.
+   * <p>
+   * This method is optional and exists primarily for migration compatibility 
with existing cache
+   * call sites. It should not be used as a general admission policy API. 
Admission and placement
+   * decisions belong to {@link CachePlacementAdmissionPolicy} in the 
topology-backed architecture.
+   * </p>
+   * @param block block to check
+   * @return empty if unsupported; otherwise whether the block fits
+   */
+  default Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+    return Optional.empty();
+  }
+
+  /**
+   * Checks whether the block represented by the given key is already cached.
+   * <p>
+   * This method is optional and exists primarily for migration compatibility 
and diagnostics. It is
+   * not required for the normal read/write path because callers should use
+   * {@link #getBlock(BlockCacheKey, CacheRequestContext)} for lookup.
+   * </p>
+   * @param key block cache key
+   * @return empty if unsupported; otherwise whether the block is cached
+   */
+  default Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
+    return Optional.empty();
+  }
+
+  /**
+   * Returns the size of the cached block represented by the given key.
+   * <p>
+   * This method is optional because not all implementations expose per-block 
size cheaply. It is
+   * not part of the normal hot-path lookup contract.
+   * </p>
+   * @param key block cache key
+   * @return empty if unsupported or not present; otherwise cached block size
+   */
+  default Optional<Integer> getBlockSize(BlockCacheKey key) {
+    return Optional.empty();
+  }
+
+  /**
+   * Returns whether cache access is enabled.
+   * <p>
+   * Implementations may override this when cache availability is controlled 
by initialization
+   * state, configuration, or a wrapped legacy cache. The default assumes the 
service is enabled.
+   * </p>
+   * @return {@code true} if enabled, {@code false} otherwise
+   */
+  default boolean isCacheEnabled() {
+    return true;
+  }
+
+  /**
+   * Waits for asynchronous cache initialization to complete.
+   * <p>
+   * Some cache implementations, especially those backed by persistent or 
asynchronous engines, may
+   * require initialization before they should be used. Services that do not 
require asynchronous
+   * initialization may return {@code true} immediately.
+   * </p>
+   * @param timeout maximum time to wait
+   * @return {@code true} if the cache is enabled and ready, {@code false} 
otherwise
+   */
+  default boolean waitForCacheInitialization(long timeout) {
+    return true;
+  }
+
+  /**
+   * Refreshes this service's configuration.
+   * <p>
+   * A service implementation may propagate this notification to the wrapped 
legacy cache, topology,
+   * policy, or engines. The default implementation is a no-op.
+   * </p>
+   * @param config new configuration
+   */
+  default void onConfigurationChange(Configuration config) {
+    // noop
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessServices.java
new file mode 100644
index 00000000000..c98dd2275d8
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheAccessServices.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Objects;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Utility methods for creating {@link CacheAccessService} instances.
+ * <p>
+ * This class keeps service construction centralized without introducing a 
full factory or plugin
+ * loader in the initial {@code CacheAccessService} ticket. The first 
supported construction modes
+ * are a legacy {@link BlockCache}-backed service, a topology-backed service, 
and a disabled no-op
+ * service.
+ * </p>
+ * <p>
+ * A later integration step can move construction into {@code 
BlockCacheFactory} once HBase runtime
+ * wiring starts returning {@link CacheAccessService} instead of, or 
alongside, raw
+ * {@link BlockCache}.
+ * </p>
+ */
[email protected]
+public final class CacheAccessServices {
+
+  private CacheAccessServices() {
+  }
+
+  /**
+   * Creates a {@link CacheAccessService} backed by an existing {@link 
BlockCache}.
+   * <p>
+   * This is the default compatibility path during migration from {@code 
BlockCache} to
+   * {@code CacheAccessService}. The returned service delegates to the 
supplied block cache and
+   * should preserve existing behavior.
+   * </p>
+   * @param blockCache block cache to wrap
+   * @return cache access service backed by {@code blockCache}
+   */
+  public static CacheAccessService fromBlockCache(BlockCache blockCache) {
+    return new BlockCacheBackedCacheAccessService(
+      Objects.requireNonNull(blockCache, "blockCache must not be null"));
+  }
+
+  /**
+   * Creates a {@link CacheAccessService} backed by a {@link CacheTopology} 
and placement/admission
+   * policy.
+   * <p>
+   * The returned service uses the topology to resolve participating tiers and 
engines, uses the
+   * policy to make admission, placement, representation, and promotion 
decisions, and executes
+   * storage operations through {@link CacheEngine}.
+   * </p>
+   * @param topology cache topology
+   * @param policy   placement and admission policy
+   * @return topology-backed cache access service
+   */
+  public static CacheAccessService fromTopology(CacheTopology topology,
+    CachePlacementAdmissionPolicy policy) {
+    return new TopologyBackedCacheAccessService(
+      Objects.requireNonNull(topology, "topology must not be null"),
+      Objects.requireNonNull(policy, "policy must not be null"));
+  }
+
+  /**
+   * Creates a disabled no-op {@link CacheAccessService}.
+   * <p>
+   * The returned service never stores blocks and always reports zero capacity 
and occupancy. It is
+   * useful for callers that prefer a non-null service object even when block 
cache is disabled.
+   * </p>
+   * @return disabled no-op cache access service
+   */
+  public static CacheAccessService disabled() {
+    return new NoOpCacheAccessService();
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheRequestContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheRequestContext.java
index 17aee5265fd..4b04d9bf11b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheRequestContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/CacheRequestContext.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io.hfile.cache;
 
+import java.util.Optional;
 import org.apache.hadoop.hbase.io.hfile.BlockType;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -79,10 +80,10 @@ public final class CacheRequestContext {
 
   /**
    * Returns the expected block type, if known.
-   * @return block type, or null if unknown
+   * @return an {@link Optional} containing the block type, or empty if unknown
    */
-  public BlockType getBlockType() {
-    return blockType;
+  public Optional<BlockType> getBlockType() {
+    return Optional.ofNullable(blockType);
   }
 
   /**
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/NoOpCacheAccessService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/NoOpCacheAccessService.java
new file mode 100644
index 00000000000..570ab93e8b2
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/NoOpCacheAccessService.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Disabled-cache implementation of {@link CacheAccessService}.
+ * <p>
+ * {@code NoOpCacheAccessService} is useful when block cache access is 
disabled but callers still
+ * want to depend on a non-null {@link CacheAccessService}. It never stores 
blocks, never returns
+ * cached blocks, reports zero capacity and occupancy, and treats all 
invalidation requests as
+ * no-ops.
+ * </p>
+ * <p>
+ * Even though this implementation does not perform cache operations, it still 
validates caller
+ * inputs consistently with other {@link CacheAccessService} implementations. 
This prevents the
+ * disabled-cache path from masking caller bugs that would fail when a real 
cache is configured.
+ * </p>
+ * <p>
+ * This implementation should not be used to hide configuration mistakes. It 
represents an explicit
+ * disabled-cache state and should be selected only when the caller has 
determined that no block
+ * cache is available or desired.
+ * </p>
+ */
[email protected]
+public final class NoOpCacheAccessService implements CacheAccessService {
+
+  private static final String NAME = "NoOpCacheAccessService";
+
+  private final CacheStats stats;
+
+  /**
+   * Creates a disabled cache access service with its own {@link CacheStats} 
instance.
+   */
+  public NoOpCacheAccessService() {
+    this(new CacheStats(NAME));
+  }
+
+  /**
+   * Creates a disabled cache access service with the supplied statistics 
object.
+   * <p>
+   * Allowing the statistics object to be supplied makes tests easier and 
allows callers to preserve
+   * existing metrics conventions if needed.
+   * </p>
+   * @param stats cache statistics object
+   */
+  public NoOpCacheAccessService(CacheStats stats) {
+    this.stats = Objects.requireNonNull(stats, "stats must not be null");
+  }
+
+  /**
+   * Returns the service name.
+   * @return service name
+   */
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /**
+   * Always returns {@code null} because this service does not store blocks.
+   * @param cacheKey block to fetch
+   * @param context  cache request context
+   * @return always {@code null}
+   */
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, CacheRequestContext 
context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+    return null;
+  }
+
+  /**
+   * Validates the request and ignores the cache insertion.
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param context  cache write context
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable block, 
CacheWriteContext context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(block, "block must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+  }
+
+  /**
+   * Always returns {@code false} because this service does not store blocks.
+   * @param cacheKey block to remove
+   * @return always {@code false}
+   */
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    return false;
+  }
+
+  /**
+   * Always returns {@code 0} because this service does not store blocks.
+   * @param hfileName HFile name
+   * @return always {@code 0}
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+    return 0;
+  }
+
+  /**
+   * Always returns {@code 0} because this service does not store blocks.
+   * @param hfileName  HFile name
+   * @param initOffset inclusive start offset
+   * @param endOffset  inclusive end offset
+   * @return always {@code 0}
+   */
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+    return 0;
+  }
+
+  /**
+   * Always returns {@code 0} because this service does not store blocks.
+   * @param regionName region name
+   * @return always {@code 0}
+   */
+  @Override
+  public int evictBlocksByRegionName(String regionName) {
+    Objects.requireNonNull(regionName, "regionName must not be null");
+    return 0;
+  }
+
+  /**
+   * Returns this service's statistics object.
+   * @return cache statistics
+   */
+  @Override
+  public CacheStats getStats() {
+    return stats;
+  }
+
+  /**
+   * Does nothing because this service owns no resources.
+   */
+  @Override
+  public void shutdown() {
+    // noop
+  }
+
+  /**
+   * Always returns {@code 0} because this service has no capacity.
+   * @return always {@code 0}
+   */
+  @Override
+  public long getMaxSize() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@code 0} because this service has no capacity.
+   * @return always {@code 0}
+   */
+  @Override
+  public long getFreeSize() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@code 0} because this service stores no blocks.
+   * @return always {@code 0}
+   */
+  @Override
+  public long size() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@code 0} because this service stores no data blocks.
+   * @return always {@code 0}
+   */
+  @Override
+  public long getCurrentDataSize() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@code 0} because this service stores no blocks.
+   * @return always {@code 0}
+   */
+  @Override
+  public long getBlockCount() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@code 0} because this service stores no data blocks.
+   * @return always {@code 0}
+   */
+  @Override
+  public long getDataBlockCount() {
+    return 0L;
+  }
+
+  /**
+   * Always returns {@link Optional#empty()} because this service has no 
capacity.
+   * @param block block to check
+   * @return always {@link Optional#empty()}
+   */
+  @Override
+  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+    Objects.requireNonNull(block, "block must not be null");
+    return Optional.empty();
+  }
+
+  /**
+   * Always returns {@link Optional#empty()} because this service does not 
store blocks.
+   * @param key block cache key
+   * @return always {@link Optional#empty()}
+   */
+  @Override
+  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+    return Optional.empty();
+  }
+
+  /**
+   * Always returns {@link Optional#empty()} because this service does not 
store blocks.
+   * @param key block cache key
+   * @return always {@link Optional#empty()}
+   */
+  @Override
+  public Optional<Integer> getBlockSize(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+    return Optional.empty();
+  }
+
+  /**
+   * Always returns {@code false} because cache access is disabled.
+   * @return always {@code false}
+   */
+  @Override
+  public boolean isCacheEnabled() {
+    return false;
+  }
+
+  /**
+   * Always returns {@code false} because cache access is disabled.
+   * @param timeout maximum time to wait
+   * @return always {@code false}
+   */
+  @Override
+  public boolean waitForCacheInitialization(long timeout) {
+    return false;
+  }
+
+  /**
+   * Validates the request and ignores the configuration change.
+   * @param config new configuration
+   */
+  @Override
+  public void onConfigurationChange(Configuration config) {
+    Objects.requireNonNull(config, "config must not be null");
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/TopologyBackedCacheAccessService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/TopologyBackedCacheAccessService.java
new file mode 100644
index 00000000000..70f575c1136
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/cache/TopologyBackedCacheAccessService.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * {@link CacheAccessService} implementation backed by {@link CacheTopology} 
and {@link CacheEngine}
+ * instances.
+ * <p>
+ * This implementation connects the cache access layer to the topology, 
policy, and engine
+ * abstractions. {@link CachePlacementAdmissionPolicy} decides whether a block 
should be admitted,
+ * where it should be placed, which representation should be used, and whether 
a hit should trigger
+ * promotion. {@link CacheTopology} provides tier structure, engine lookup, 
aggregate statistics,
+ * and topology-specific promotion mechanics. {@link CacheEngine} performs the 
actual storage
+ * operations.
+ * </p>
+ * <p>
+ * This class is the topology-backed counterpart to {@link 
BlockCacheBackedCacheAccessService}. The
+ * block-cache-backed implementation is useful for incremental migration with 
no behavior change.
+ * This implementation is useful once callers are ready to exercise the new 
topology and engine
+ * abstractions directly through {@link CacheAccessService}.
+ * </p>
+ * <p>
+ * Representation selection is intentionally not invoked by this initial 
implementation. Until the
+ * service can actually apply representation decisions safely, especially 
around HFileBlock
+ * lifecycle and packed/unpacked storage, representation policy is left to a 
later integration step.
+ * </p>
+ */
[email protected]
+public class TopologyBackedCacheAccessService implements CacheAccessService {
+
+  private final CacheTopology topology;
+  private final CachePlacementAdmissionPolicy policy;
+  private final CacheTopologyView topologyView;
+
+  /**
+   * Creates a topology-backed cache access service.
+   * @param topology cache topology used for tier structure, engine lookup, 
and promotion mechanics
+   * @param policy   placement and admission policy used for cache access 
decisions
+   */
+  public TopologyBackedCacheAccessService(CacheTopology topology,
+    CachePlacementAdmissionPolicy policy) {
+    this.topology = Objects.requireNonNull(topology, "topology must not be 
null");
+    this.policy = Objects.requireNonNull(policy, "policy must not be null");
+    this.topologyView =
+      Objects.requireNonNull(topology.getView(), "topology view must not be 
null");
+  }
+
+  /**
+   * Returns the topology used by this service.
+   * <p>
+   * This accessor is intended for tests, diagnostics, and transitional 
wiring. HBase read/write
+   * path callers should use {@link CacheAccessService} methods instead of 
accessing topology
+   * directly.
+   * </p>
+   * @return cache topology
+   */
+  public CacheTopology getTopology() {
+    return topology;
+  }
+
+  /**
+   * Returns the placement/admission policy used by this service.
+   * @return placement/admission policy
+   */
+  public CachePlacementAdmissionPolicy getPolicy() {
+    return policy;
+  }
+
+  /**
+   * Returns the read-only topology view used by policy calls.
+   * @return read-only topology view
+   */
+  public CacheTopologyView getTopologyView() {
+    return topologyView;
+  }
+
+  /**
+   * Returns a human-readable service name.
+   * @return service name
+   */
+  @Override
+  public String getName() {
+    return topology.getName();
+  }
+
+  /**
+   * Fetches a block by checking topology tiers in lookup order.
+   * <p>
+   * The lookup order is defined by {@link CacheTopology#getTiers()}. On a 
cache hit, this method
+   * asks the configured {@link CachePlacementAdmissionPolicy} whether the 
block should be promoted.
+   * If promotion is requested and the target tier exists, promotion mechanics 
are delegated to
+   * {@link CacheTopology#promote(BlockCacheKey, Cacheable, CacheEngine, 
CacheEngine)}.
+   * </p>
+   * @param cacheKey block to fetch
+   * @param context  cache request context
+   * @return cached block, or {@code null} if not present in any tier
+   */
+  @Override
+  public Cacheable getBlock(BlockCacheKey cacheKey, CacheRequestContext 
context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    for (CacheTier tier : topology.getTiers()) {
+      Optional<CacheEngine> engine = topology.getEngine(tier);
+      if (engine.isEmpty()) {
+        continue;
+      }
+
+      Cacheable block = getBlockFromEngine(engine.get(), cacheKey, context);
+      if (block != null) {
+        maybePromote(cacheKey, block, tier, engine.get(), context);
+        return block;
+      }
+    }
+
+    return null;
+  }
+
+  /**
+   * Adds a block to the cache using policy-selected target tiers.
+   * <p>
+   * This method first asks the configured policy whether the block should be 
admitted. If admitted,
+   * the policy selects the target tier or tiers. The block is then inserted 
into each selected
+   * engine using {@link CacheEngine#cacheBlock(BlockCacheKey, Cacheable, 
boolean, boolean)}.
+   * </p>
+   * <p>
+   * The policy's representation decision is intentionally not applied in this 
initial
+   * implementation. The current block object is passed through unchanged.
+   * </p>
+   * @param cacheKey block cache key
+   * @param block    block contents
+   * @param context  cache write context
+   */
+  @Override
+  public void cacheBlock(BlockCacheKey cacheKey, Cacheable block, 
CacheWriteContext context) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+    Objects.requireNonNull(block, "block must not be null");
+    Objects.requireNonNull(context, "context must not be null");
+
+    AdmissionDecision admission =
+      policy.shouldAdmit(cacheKey, block, context, AdmissionPriority.NORMAL, 
topologyView);
+    if (!admission.isAdmitted()) {
+      return;
+    }
+
+    TierDecision tierDecision = policy.selectTier(cacheKey, block, context, 
topologyView);
+    for (CacheTier tier : tierDecision.getTiers()) {
+      Optional<CacheEngine> engine = topology.getEngine(tier);
+      if (engine.isPresent()) {
+        engine.get().cacheBlock(cacheKey, block, context.isInMemory(), 
context.isWaitWhenCache());
+      }
+    }
+  }
+
+  /**
+   * Evicts a single block from all engines participating in the topology.
+   * @param cacheKey block to remove
+   * @return {@code true} if at least one engine removed the block, {@code 
false} otherwise
+   */
+  @Override
+  public boolean evictBlock(BlockCacheKey cacheKey) {
+    Objects.requireNonNull(cacheKey, "cacheKey must not be null");
+
+    boolean evicted = false;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted |= engine.evictBlock(cacheKey);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts all cached blocks for the given HFile from all engines 
participating in the topology.
+   * @param hfileName HFile name
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksByHfileName(String hfileName) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksByHfileName(hfileName);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts cached blocks for the given HFile range from all engines 
participating in the topology.
+   * @param hfileName  HFile name
+   * @param initOffset inclusive start offset
+   * @param endOffset  inclusive end offset
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    Objects.requireNonNull(hfileName, "hfileName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksRangeByHfileName(hfileName, initOffset, 
endOffset);
+    }
+    return evicted;
+  }
+
+  /**
+   * Evicts cached blocks for the given region from all engines participating 
in the topology.
+   * @param regionName region name
+   * @return total number of blocks removed across all engines
+   */
+  @Override
+  public int evictBlocksByRegionName(String regionName) {
+    Objects.requireNonNull(regionName, "regionName must not be null");
+
+    int evicted = 0;
+    for (CacheEngine engine : topology.getEngines()) {
+      evicted += engine.evictBlocksByRegionName(regionName);
+    }
+    return evicted;
+  }
+
+  /**
+   * Returns aggregate topology statistics.
+   * @return aggregate cache statistics
+   */
+  @Override
+  public CacheStats getStats() {
+    return topology.getStats();
+  }
+
+  /**
+   * Shuts down the topology.
+   */
+  @Override
+  public void shutdown() {
+    topology.shutdown();
+  }
+
+  /**
+   * Returns aggregate maximum configured cache size across participating 
engines.
+   * @return aggregate maximum cache size
+   */
+  @Override
+  public long getMaxSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getMaxSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate free cache size across participating engines.
+   * @return aggregate free size
+   */
+  @Override
+  public long getFreeSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getFreeSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate occupied cache size across participating engines.
+   * @return aggregate occupied cache size
+   */
+  @Override
+  public long size() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.size();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate occupied data-block size across participating engines.
+   * @return aggregate occupied data-block size
+   */
+  @Override
+  public long getCurrentDataSize() {
+    long size = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      size += engine.getCurrentDataSize();
+    }
+    return size;
+  }
+
+  /**
+   * Returns aggregate cached block count across participating engines.
+   * @return aggregate cached block count
+   */
+  @Override
+  public long getBlockCount() {
+    long count = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      count += engine.getBlockCount();
+    }
+    return count;
+  }
+
+  /**
+   * Returns aggregate cached data block count across participating engines.
+   * @return aggregate cached data block count
+   */
+  @Override
+  public long getDataBlockCount() {
+    long count = 0L;
+    for (CacheEngine engine : topology.getEngines()) {
+      count += engine.getDataBlockCount();
+    }
+    return count;
+  }
+
+  /**
+   * Checks whether the given block fits into at least one participating 
engine that supports this
+   * check.
+   * @param block block to check
+   * @return empty if no engine supports this check; otherwise whether at 
least one engine can fit
+   *         the block
+   */
+  @Override
+  public Optional<Boolean> blockFitsIntoTheCache(HFileBlock block) {
+    Objects.requireNonNull(block, "block must not be null");
+
+    boolean unsupported = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Boolean> result = engine.blockFitsIntoTheCache(block);
+      if (result.isPresent()) {
+        unsupported = false;
+        if (result.get()) {
+          return Optional.of(true);
+        }
+      }
+    }
+    return unsupported ? Optional.empty() : Optional.of(false);
+  }
+
+  /**
+   * Checks whether the block represented by the given key is present in any 
participating engine
+   * that supports this check.
+   * @param key block cache key
+   * @return empty if no engine supports this check; otherwise whether any 
engine has the block
+   */
+  @Override
+  public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+
+    boolean unsupported = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Boolean> result = engine.isAlreadyCached(key);
+      if (result.isPresent()) {
+        unsupported = false;
+        if (result.get()) {
+          return Optional.of(true);
+        }
+      }
+    }
+    return unsupported ? Optional.empty() : Optional.of(false);
+  }
+
+  /**
+   * Returns the first available cached block size reported by participating 
engines.
+   * <p>
+   * If the same block exists in multiple engines, this method returns the 
first present size in
+   * topology engine order. It does not sum duplicate copies.
+   * </p>
+   * @param key block cache key
+   * @return empty if unsupported or not present; otherwise cached block size
+   */
+  @Override
+  public Optional<Integer> getBlockSize(BlockCacheKey key) {
+    Objects.requireNonNull(key, "key must not be null");
+
+    for (CacheEngine engine : topology.getEngines()) {
+      Optional<Integer> size = engine.getBlockSize(key);
+      if (size.isPresent()) {
+        return size;
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Returns whether at least one participating engine is enabled.
+   * @return {@code true} if at least one engine is enabled, {@code false} 
otherwise
+   */
+  @Override
+  public boolean isCacheEnabled() {
+    for (CacheEngine engine : topology.getEngines()) {
+      if (engine.isCacheEnabled()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Waits for all participating engines to complete initialization.
+   * @param timeout maximum time to wait per engine
+   * @return {@code true} if all engines report ready/enabled, {@code false} 
otherwise
+   */
+  @Override
+  public boolean waitForCacheInitialization(long timeout) {
+    boolean enabled = true;
+    for (CacheEngine engine : topology.getEngines()) {
+      enabled &= engine.waitForCacheInitialization(timeout);
+    }
+    return enabled;
+  }
+
+  /**
+   * Propagates a configuration change to all participating engines.
+   * @param config new configuration
+   */
+  @Override
+  public void onConfigurationChange(Configuration config) {
+    Objects.requireNonNull(config, "config must not be null");
+    for (CacheEngine engine : topology.getEngines()) {
+      engine.onConfigurationChange(config);
+    }
+  }
+
+  private Cacheable getBlockFromEngine(CacheEngine engine, BlockCacheKey 
cacheKey,
+    CacheRequestContext context) {
+    Optional<BlockType> blockType = context.getBlockType();
+    if (blockType.isPresent()) {
+      return engine.getBlock(cacheKey, context.isCaching(), context.isRepeat(),
+        context.isUpdateCacheMetrics(), blockType.get());
+    }
+    return engine.getBlock(cacheKey, context.isCaching(), context.isRepeat(),
+      context.isUpdateCacheMetrics());
+  }
+
+  private void maybePromote(BlockCacheKey cacheKey, Cacheable block, CacheTier 
sourceTier,
+    CacheEngine sourceEngine, CacheRequestContext context) {
+    PromotionDecision decision = Objects.requireNonNull(
+      policy.shouldPromote(cacheKey, block, sourceTier, context, topologyView),
+      "policy should not return null from shouldPromote()");
+    if (!decision.shouldPromote()) {
+      return;
+    }
+
+    Optional<CacheEngine> targetEngine = 
topology.getEngine(decision.getTargetTier());
+    if (targetEngine.isEmpty()) {
+      return;
+    }
+
+    topology.promote(cacheKey, block, sourceEngine, targetEngine.get());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestBlockCacheBackedCacheAccessService.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestBlockCacheBackedCacheAccessService.java
new file mode 100644
index 00000000000..bce743aa1b5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestBlockCacheBackedCacheAccessService.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link BlockCacheBackedCacheAccessService} and related service 
helpers.
+ */
+@Tag(IOTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestBlockCacheBackedCacheAccessService {
+
+  private static final String HFILE_NAME = "file";
+
+  private static final long BLOCK_OFFSET = 1L;
+
+  private static final long RANGE_START_OFFSET = 1L;
+
+  private static final long RANGE_END_OFFSET = 10L;
+
+  /**
+   * Verifies that context-based lookup delegates to the block-type aware 
legacy lookup method.
+   */
+  @Test
+  void testGetBlockWithBlockTypeDelegatesToBlockCache() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = new 
BlockCacheBackedCacheAccessService(blockCache);
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+
+    when(blockCache.getBlock(key, true, true, false, 
BlockType.DATA)).thenReturn(block);
+
+    CacheRequestContext context = 
CacheRequestContext.newBuilder().setCaching(true).setRepeat(true)
+      .setUpdateCacheMetrics(false).setBlockType(BlockType.DATA).build();
+
+    assertSame(block, service.getBlock(key, context));
+    verify(blockCache).getBlock(key, true, true, false, BlockType.DATA);
+  }
+
+  /**
+   * Verifies that context-based lookup delegates to the legacy lookup method 
without block type.
+   */
+  @Test
+  void testGetBlockWithoutBlockTypeDelegatesToBlockCache() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = new 
BlockCacheBackedCacheAccessService(blockCache);
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+
+    when(blockCache.getBlock(key, true, false, true)).thenReturn(block);
+
+    CacheRequestContext context = 
CacheRequestContext.newBuilder().setCaching(true).setRepeat(false)
+      .setUpdateCacheMetrics(true).build();
+
+    assertSame(block, service.getBlock(key, context));
+    verify(blockCache).getBlock(key, true, false, true);
+  }
+
+  /**
+   * Verifies that context-based insertion delegates in-memory and wait flags 
correctly.
+   */
+  @Test
+  void testCacheBlockDelegatesToBlockCache() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = new 
BlockCacheBackedCacheAccessService(blockCache);
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+
+    CacheWriteContext context = 
CacheWriteContext.newBuilder().setInMemory(true)
+      .setWaitWhenCache(true).setSource(CacheWriteSource.READ_MISS).build();
+
+    service.cacheBlock(key, block, context);
+
+    verify(blockCache).cacheBlock(key, block, true, true);
+  }
+
+  /**
+   * Verifies that invalidation methods delegate to the wrapped block cache.
+   */
+  @Test
+  void testEvictionDelegatesToBlockCache() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = new 
BlockCacheBackedCacheAccessService(blockCache);
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+
+    when(blockCache.evictBlock(key)).thenReturn(true);
+    when(blockCache.evictBlocksByHfileName(HFILE_NAME)).thenReturn(3);
+    when(blockCache.evictBlocksRangeByHfileName(HFILE_NAME, 
RANGE_START_OFFSET, RANGE_END_OFFSET))
+      .thenReturn(2);
+
+    assertTrue(service.evictBlock(key));
+    assertEquals(3, service.evictBlocksByHfileName(HFILE_NAME));
+    assertEquals(2,
+      service.evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET));
+
+    verify(blockCache).evictBlock(key);
+    verify(blockCache).evictBlocksByHfileName(HFILE_NAME);
+    verify(blockCache).evictBlocksRangeByHfileName(HFILE_NAME, 
RANGE_START_OFFSET,
+      RANGE_END_OFFSET);
+  }
+
+  /**
+   * Verifies that stats, sizing, lifecycle, and optional helpers delegate to 
the wrapped cache.
+   */
+  @Test
+  void testStatsSizingLifecycleAndHelpersDelegateToBlockCache() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = new 
BlockCacheBackedCacheAccessService(blockCache);
+    CacheStats stats = new CacheStats("test");
+    HFileBlock hfileBlock = mock(HFileBlock.class);
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Configuration conf = new Configuration(false);
+
+    when(blockCache.getStats()).thenReturn(stats);
+    when(blockCache.getMaxSize()).thenReturn(100L);
+    when(blockCache.getFreeSize()).thenReturn(40L);
+    when(blockCache.size()).thenReturn(60L);
+    when(blockCache.getCurrentDataSize()).thenReturn(50L);
+    when(blockCache.getBlockCount()).thenReturn(10L);
+    when(blockCache.getDataBlockCount()).thenReturn(8L);
+    
when(blockCache.blockFitsIntoTheCache(hfileBlock)).thenReturn(Optional.of(true));
+    when(blockCache.isAlreadyCached(key)).thenReturn(Optional.of(false));
+    when(blockCache.getBlockSize(key)).thenReturn(Optional.of(123));
+    when(blockCache.isCacheEnabled()).thenReturn(true);
+    when(blockCache.waitForCacheInitialization(500L)).thenReturn(true);
+
+    assertSame(stats, service.getStats());
+    assertEquals(100L, service.getMaxSize());
+    assertEquals(40L, service.getFreeSize());
+    assertEquals(60L, service.size());
+    assertEquals(50L, service.getCurrentDataSize());
+    assertEquals(10L, service.getBlockCount());
+    assertEquals(8L, service.getDataBlockCount());
+    assertEquals(Optional.of(true), service.blockFitsIntoTheCache(hfileBlock));
+    assertEquals(Optional.of(false), service.isAlreadyCached(key));
+    assertEquals(Optional.of(123), service.getBlockSize(key));
+    assertTrue(service.isCacheEnabled());
+    assertTrue(service.waitForCacheInitialization(500L));
+
+    service.onConfigurationChange(conf);
+    service.shutdown();
+
+    verify(blockCache).onConfigurationChange(conf);
+    verify(blockCache).shutdown();
+  }
+
+  /**
+   * Verifies factory helper methods.
+   */
+  @Test
+  void testCacheAccessServicesFactoryMethods() {
+    BlockCache blockCache = mock(BlockCache.class);
+    CacheAccessService service = 
CacheAccessServices.fromBlockCache(blockCache);
+
+    assertInstanceOf(BlockCacheBackedCacheAccessService.class, service);
+    assertSame(blockCache, ((BlockCacheBackedCacheAccessService) 
service).getBlockCache());
+    assertInstanceOf(NoOpCacheAccessService.class, 
CacheAccessServices.disabled());
+  }
+
+  /**
+   * Verifies disabled-cache behavior.
+   */
+  @Test
+  void testNoOpCacheAccessService() {
+    CacheAccessService service = new NoOpCacheAccessService(new 
CacheStats("noop"));
+
+    assertEquals("NoOpCacheAccessService", service.getName());
+    assertNull(service.getBlock(mock(BlockCacheKey.class), 
mock(CacheRequestContext.class)));
+    service.cacheBlock(mock(BlockCacheKey.class), mock(Cacheable.class),
+      mock(CacheWriteContext.class));
+    assertFalse(service.evictBlock(mock(BlockCacheKey.class)));
+    assertEquals(0, service.evictBlocksByHfileName(HFILE_NAME));
+    assertEquals(0L, service.getMaxSize());
+    assertEquals(0L, service.getFreeSize());
+    assertEquals(0L, service.size());
+    assertEquals(0L, service.getCurrentDataSize());
+    assertEquals(0L, service.getBlockCount());
+    assertEquals(0L, service.getDataBlockCount());
+    assertFalse(service.isCacheEnabled());
+    assertFalse(service.waitForCacheInitialization(1L));
+    service.onConfigurationChange(new Configuration(false));
+    service.shutdown();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestTopologyBackedCacheAccessService.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestTopologyBackedCacheAccessService.java
new file mode 100644
index 00000000000..7d37a4dd110
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/cache/TestTopologyBackedCacheAccessService.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile.cache;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CacheStats;
+import org.apache.hadoop.hbase.io.hfile.Cacheable;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@link TopologyBackedCacheAccessService}.
+ */
+@Tag(IOTests.TAG)
+@Tag(SmallTests.TAG)
+public class TestTopologyBackedCacheAccessService {
+  private static final String HFILE_NAME = "file";
+
+  private static final long BLOCK_OFFSET = 1L;
+
+  private static final long RANGE_START_OFFSET = 10L;
+
+  private static final long RANGE_END_OFFSET = 100L;
+
+  /**
+   * Verifies that lookup checks topology tiers in order and returns the first 
cached block found.
+   */
+  @Test
+  void testGetBlockChecksTiersInOrder() {
+    BlockCacheKey key = new BlockCacheKey("file", BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+    CacheRequestContext context = requestContext();
+
+    when(topology.getName()).thenReturn("tiered");
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getTiers()).thenReturn(List.of(CacheTier.L1, CacheTier.L2));
+    when(topology.getEngine(CacheTier.L1)).thenReturn(Optional.of(l1));
+    when(topology.getEngine(CacheTier.L2)).thenReturn(Optional.of(l2));
+    when(l1.getBlock(key, true, false, true, BlockType.DATA)).thenReturn(null);
+    when(l2.getBlock(key, true, false, true, 
BlockType.DATA)).thenReturn(block);
+    when(policy.shouldPromote(key, block, CacheTier.L2, context, topologyView))
+      .thenReturn(PromotionDecision.none());
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertSame(block, service.getBlock(key, context));
+
+    verify(l1).getBlock(key, true, false, true, BlockType.DATA);
+    verify(l2).getBlock(key, true, false, true, BlockType.DATA);
+    verify(policy).shouldPromote(key, block, CacheTier.L2, context, 
topologyView);
+  }
+
+  /**
+   * Verifies that a hit can trigger topology-level promotion when requested 
by policy.
+   */
+  @Test
+  void testGetBlockPromotesOnPolicyDecision() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+    CacheRequestContext context = requestContext();
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getTiers()).thenReturn(List.of(CacheTier.L1, CacheTier.L2));
+    when(topology.getEngine(CacheTier.L1)).thenReturn(Optional.of(l1));
+    when(topology.getEngine(CacheTier.L2)).thenReturn(Optional.of(l2));
+    when(l1.getBlock(key, true, false, true, BlockType.DATA)).thenReturn(null);
+    when(l2.getBlock(key, true, false, true, 
BlockType.DATA)).thenReturn(block);
+    when(policy.shouldPromote(key, block, CacheTier.L2, context, topologyView))
+      .thenReturn(PromotionDecision.promoteTo(CacheTier.L1, false));
+    when(topology.promote(key, block, l2, l1)).thenReturn(true);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertSame(block, service.getBlock(key, context));
+
+    verify(policy).shouldPromote(key, block, CacheTier.L2, context, 
topologyView);
+    verify(topology).promote(key, block, l2, l1);
+  }
+
+  /**
+   * Verifies that cache insertion is skipped when admission policy rejects 
the block.
+   */
+  @Test
+  void testCacheBlockSkipsInsertionWhenRejected() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine engine = mock(CacheEngine.class);
+    CacheWriteContext context = writeContext();
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngine(CacheTier.SINGLE)).thenReturn(Optional.of(engine));
+    when(policy.shouldAdmit(key, block, context, AdmissionPriority.NORMAL, 
topologyView))
+      .thenReturn(AdmissionDecision.reject("rejected"));
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    service.cacheBlock(key, block, context);
+
+    verify(policy).shouldAdmit(key, block, context, AdmissionPriority.NORMAL, 
topologyView);
+    verify(policy, never()).selectTier(key, block, context, topologyView);
+    verify(engine, never()).cacheBlock(key, block, true, true);
+  }
+
+  /**
+   * Verifies that admitted blocks are inserted into policy-selected tiers.
+   */
+  @Test
+  void testCacheBlockInsertsIntoSelectedTier() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine engine = mock(CacheEngine.class);
+    CacheWriteContext context = writeContext();
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngine(CacheTier.SINGLE)).thenReturn(Optional.of(engine));
+    when(policy.shouldAdmit(key, block, context, AdmissionPriority.NORMAL, 
topologyView))
+      .thenReturn(AdmissionDecision.admit());
+    when(policy.selectRepresentation(key, block, context, topologyView))
+      .thenReturn(RepresentationDecision.CURRENT_HBASE_DEFAULT);
+    when(policy.selectTier(key, block, context, topologyView))
+      .thenReturn(TierDecision.single(CacheTier.SINGLE));
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    service.cacheBlock(key, block, context);
+
+    verify(policy).shouldAdmit(key, block, context, AdmissionPriority.NORMAL, 
topologyView);
+    verify(policy, never()).selectRepresentation(key, block, context, 
topologyView);
+    verify(policy).selectTier(key, block, context, topologyView);
+    verify(engine).cacheBlock(key, block, true, true);
+  }
+
+  /**
+   * Verifies that insertion can target multiple tiers.
+   */
+  @Test
+  void testCacheBlockInsertsIntoMultipleSelectedTiers() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    Cacheable block = mock(Cacheable.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+    CacheWriteContext context = writeContext();
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngine(CacheTier.L1)).thenReturn(Optional.of(l1));
+    when(topology.getEngine(CacheTier.L2)).thenReturn(Optional.of(l2));
+    when(policy.shouldAdmit(key, block, context, AdmissionPriority.NORMAL, 
topologyView))
+      .thenReturn(AdmissionDecision.admit());
+    when(policy.selectRepresentation(key, block, context, topologyView))
+      .thenReturn(RepresentationDecision.CURRENT_HBASE_DEFAULT);
+    when(policy.selectTier(key, block, context, topologyView))
+      .thenReturn(TierDecision.multiple(List.of(CacheTier.L1, CacheTier.L2)));
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    service.cacheBlock(key, block, context);
+
+    verify(l1).cacheBlock(key, block, true, true);
+    verify(l2).cacheBlock(key, block, true, true);
+  }
+
+  /**
+   * Verifies that eviction is propagated to all participating engines.
+   */
+  @Test
+  void testEvictBlockEvictsFromAllEngines() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.evictBlock(key)).thenReturn(false);
+    when(l2.evictBlock(key)).thenReturn(true);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertTrue(service.evictBlock(key));
+
+    verify(l1).evictBlock(key);
+    verify(l2).evictBlock(key);
+  }
+
+  /**
+   * Verifies that file-level eviction sums results from all engines.
+   */
+  @Test
+  void testEvictBlocksByHfileNameSumsEngineResults() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.evictBlocksByHfileName(HFILE_NAME)).thenReturn(2);
+    when(l2.evictBlocksByHfileName(HFILE_NAME)).thenReturn(3);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertEquals(5, service.evictBlocksByHfileName(HFILE_NAME));
+
+    verify(l1).evictBlocksByHfileName(HFILE_NAME);
+    verify(l2).evictBlocksByHfileName(HFILE_NAME);
+  }
+
+  /**
+   * Verifies that aggregate sizing methods sum engine values.
+   */
+  @Test
+  void testSizingMethodsAggregateEngines() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.getMaxSize()).thenReturn(100L);
+    when(l2.getMaxSize()).thenReturn(200L);
+    when(l1.getFreeSize()).thenReturn(10L);
+    when(l2.getFreeSize()).thenReturn(20L);
+    when(l1.size()).thenReturn(90L);
+    when(l2.size()).thenReturn(180L);
+    when(l1.getCurrentDataSize()).thenReturn(80L);
+    when(l2.getCurrentDataSize()).thenReturn(160L);
+    when(l1.getBlockCount()).thenReturn(8L);
+    when(l2.getBlockCount()).thenReturn(16L);
+    when(l1.getDataBlockCount()).thenReturn(6L);
+    when(l2.getDataBlockCount()).thenReturn(12L);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertEquals(300L, service.getMaxSize());
+    assertEquals(30L, service.getFreeSize());
+    assertEquals(270L, service.size());
+    assertEquals(240L, service.getCurrentDataSize());
+    assertEquals(24L, service.getBlockCount());
+    assertEquals(18L, service.getDataBlockCount());
+  }
+
+  /**
+   * Verifies that service-level stats are returned from the topology.
+   */
+  @Test
+  void testGetStatsDelegatesToTopology() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheStats stats = new CacheStats("topology");
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getStats()).thenReturn(stats);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertSame(stats, service.getStats());
+  }
+
+  /**
+   * Verifies optional helper methods aggregate or search across engines.
+   */
+  @Test
+  void testOptionalHelpersUseParticipatingEngines() {
+    BlockCacheKey key = new BlockCacheKey(HFILE_NAME, BLOCK_OFFSET);
+    HFileBlock block = mock(HFileBlock.class);
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.blockFitsIntoTheCache(block)).thenReturn(Optional.of(false));
+    when(l2.blockFitsIntoTheCache(block)).thenReturn(Optional.of(true));
+    when(l1.isAlreadyCached(key)).thenReturn(Optional.of(false));
+    when(l2.isAlreadyCached(key)).thenReturn(Optional.of(true));
+    when(l1.getBlockSize(key)).thenReturn(Optional.empty());
+    when(l2.getBlockSize(key)).thenReturn(Optional.of(123));
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertEquals(Optional.of(true), service.blockFitsIntoTheCache(block));
+    assertEquals(Optional.of(true), service.isAlreadyCached(key));
+    assertEquals(Optional.of(123), service.getBlockSize(key));
+  }
+
+  /**
+   * Verifies cache-enabled and initialization behavior across participating 
engines.
+   */
+  @Test
+  void testEnablementAndInitializationUseEngines() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.isCacheEnabled()).thenReturn(false);
+    when(l2.isCacheEnabled()).thenReturn(true);
+    when(l1.waitForCacheInitialization(500L)).thenReturn(true);
+    when(l2.waitForCacheInitialization(500L)).thenReturn(false);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertTrue(service.isCacheEnabled());
+    assertFalse(service.waitForCacheInitialization(500L));
+  }
+
+  /**
+   * Verifies that configuration changes are propagated to all engines and 
shutdown uses topology.
+   */
+  @Test
+  void testConfigurationChangeAndShutdown() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+    Configuration conf = new Configuration(false);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    service.onConfigurationChange(conf);
+    service.shutdown();
+
+    verify(l1).onConfigurationChange(conf);
+    verify(l2).onConfigurationChange(conf);
+    verify(topology).shutdown();
+  }
+
+  /**
+   * Verifies that range-based HFile eviction is propagated to all 
participating engines and sums
+   * their results.
+   */
+  @Test
+  void testEvictBlocksRangeByHfileNameSumsEngineResults() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET))
+      .thenReturn(2);
+    when(l2.evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET))
+      .thenReturn(3);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertEquals(5,
+      service.evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET));
+
+    verify(l1).evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET);
+    verify(l2).evictBlocksRangeByHfileName(HFILE_NAME, RANGE_START_OFFSET, 
RANGE_END_OFFSET);
+  }
+
+  /**
+   * Verifies that region-level eviction is propagated to all participating 
engines and sums their
+   * results.
+   */
+  @Test
+  void testEvictBlocksByRegionNameSumsEngineResults() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+    CacheEngine l1 = mock(CacheEngine.class);
+    CacheEngine l2 = mock(CacheEngine.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+    when(topology.getEngines()).thenReturn(List.of(l1, l2));
+    when(l1.evictBlocksByRegionName("region")).thenReturn(4);
+    when(l2.evictBlocksByRegionName("region")).thenReturn(5);
+
+    CacheAccessService service = new 
TopologyBackedCacheAccessService(topology, policy);
+
+    assertEquals(9, service.evictBlocksByRegionName("region"));
+
+    verify(l1).evictBlocksByRegionName("region");
+    verify(l2).evictBlocksByRegionName("region");
+  }
+
+  /**
+   * Verifies that factory helper creates a topology-backed service.
+   */
+  @Test
+  void testFactoryCreatesTopologyBackedService() {
+    CacheTopology topology = mock(CacheTopology.class);
+    CacheTopologyView topologyView = mock(CacheTopologyView.class);
+    CachePlacementAdmissionPolicy policy = 
mock(CachePlacementAdmissionPolicy.class);
+
+    when(topology.getView()).thenReturn(topologyView);
+
+    CacheAccessService service = CacheAccessServices.fromTopology(topology, 
policy);
+
+    assertInstanceOf(TopologyBackedCacheAccessService.class, service);
+    assertSame(topology, ((TopologyBackedCacheAccessService) 
service).getTopology());
+    assertSame(policy, ((TopologyBackedCacheAccessService) 
service).getPolicy());
+  }
+
+  private static CacheRequestContext requestContext() {
+    return CacheRequestContext.newBuilder().setCaching(true).setRepeat(false)
+      .setUpdateCacheMetrics(true).setBlockType(BlockType.DATA).build();
+  }
+
+  private static CacheWriteContext writeContext() {
+    return 
CacheWriteContext.newBuilder().setInMemory(true).setWaitWhenCache(true)
+      .setSource(CacheWriteSource.READ_MISS).build();
+  }
+}


Reply via email to