This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 05e09b9aa44 HBASE-28596 Optimise BucketCache usage upon regions 
splits/merges. (#5906)
05e09b9aa44 is described below

commit 05e09b9aa445a1d56725b4208ada2779fffb6680
Author: Wellington Ramos Chevreuil <wchevre...@apache.org>
AuthorDate: Wed Jun 19 14:38:18 2024 +0100

    HBASE-28596 Optimise BucketCache usage upon regions splits/merges. (#5906)
    
    Signed-off-by: Tak Lon (Stephen) Wu <tak...@apache.org>
    Reviewed-by: Duo Zhang <zhang...@apache.org>
---
 .../hadoop/hbase/io/HalfStoreFileReader.java       |  42 +++++++
 .../apache/hadoop/hbase/io/hfile/BlockCache.java   |  11 ++
 .../hadoop/hbase/io/hfile/BlockCacheUtil.java      |  42 +++++++
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |   3 +
 .../hadoop/hbase/io/hfile/CombinedBlockCache.java  |   7 +-
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  13 +--
 .../hadoop/hbase/io/hfile/HFilePreadReader.java    |   2 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |  43 ++++---
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  | 117 +++++++++++++------
 .../assignment/TransitRegionStateProcedure.java    |  21 ++--
 .../hadoop/hbase/regionserver/StoreFileReader.java |   2 +-
 .../handler/UnassignRegionHandler.java             |   8 +-
 .../apache/hadoop/hbase/TestSplitWithCache.java    | 130 +++++++++++++++++++++
 .../hadoop/hbase/io/TestHalfStoreFileReader.java   |  37 ++++--
 .../apache/hadoop/hbase/io/hfile/TestPrefetch.java |   8 --
 .../io/hfile/TestPrefetchWithBucketCache.java      |  70 ++++++++++-
 .../io/hfile/bucket/TestBucketCachePersister.java  |   6 +
 17 files changed, 457 insertions(+), 105 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index 0989f73df0a..862fbc69809 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.IntConsumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
+import org.apache.hadoop.hbase.io.hfile.HFileReaderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.ReaderContext;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -64,6 +66,8 @@ public class HalfStoreFileReader extends StoreFileReader {
 
   private boolean firstKeySeeked = false;
 
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
   /**
    * Creates a half file reader for a hfile referred to by an hfilelink.
    * @param context   Reader context info
@@ -335,4 +339,42 @@ public class HalfStoreFileReader extends StoreFileReader {
     // Estimate the number of entries as half the original file; this may be 
wildly inaccurate.
     return super.getFilterEntries() / 2;
   }
+
+  /**
+   * Overrides close method to handle cache evictions for the referred file. 
If evictionOnClose is
+   * true, we will seek to the block containing the splitCell and evict all 
blocks from offset 0 up
+   * to that block offset if this is a bottom half reader, or the from the 
split block offset up to
+   * the end of the file if this is a top half reader.
+   * @param evictOnClose true if it should evict the file blocks from the 
cache.
+   */
+  @Override
+  public void close(boolean evictOnClose) throws IOException {
+    if (closed.compareAndSet(false, true)) {
+      if (evictOnClose) {
+        final HFileReaderImpl.HFileScannerImpl s =
+          (HFileReaderImpl.HFileScannerImpl) super.getScanner(false, true, 
false);
+        final String reference = 
this.reader.getHFileInfo().getHFileContext().getHFileName();
+        final String referred = 
StoreFileInfo.getReferredToRegionAndFile(reference).getSecond();
+        s.seekTo(splitCell);
+        if (s.getCurBlock() != null) {
+          long offset = s.getCurBlock().getOffset();
+          LOG.trace("Seeking to split cell in reader: {} for file: {} top: {}, 
split offset: {}",
+            this, reference, top, offset);
+          ((HFileReaderImpl) 
reader).getCacheConf().getBlockCache().ifPresent(cache -> {
+            int numEvictedReferred = top
+              ? cache.evictBlocksRangeByHfileName(referred, offset, 
Long.MAX_VALUE)
+              : cache.evictBlocksRangeByHfileName(referred, 0, offset);
+            int numEvictedReference = cache.evictBlocksByHfileName(reference);
+            LOG.trace(
+              "Closing reference: {}; referred file: {}; was top? {}; evicted 
for referred: {};"
+                + "evicted for reference: {}",
+              reference, referred, top, numEvictedReferred, 
numEvictedReference);
+          });
+        }
+        reader.close(false);
+      } else {
+        reader.close(evictOnClose);
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
index bed0194b1fa..028a80075b5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCache.java
@@ -250,4 +250,15 @@ public interface BlockCache extends Iterable<CachedBlock> {
   default Optional<Map<String, Long>> getRegionCachedInfo() {
     return Optional.empty();
   }
+
+  /**
+   * Evict all blocks for the given file name between the passed offset values.
+   * @param hfileName  The file for which blocks should be evicted.
+   * @param initOffset the initial offset for the range of blocks to be 
evicted.
+   * @param endOffset  the end offset for the range of blocks to be evicted.
+   * @return number of blocks evicted.
+   */
+  default int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    return 0;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index e6a4b609bc7..3d4698b0047 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.io.hfile.HFileBlock.FILL_HEADER;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.NavigableMap;
@@ -25,7 +27,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.metrics.impl.FastLongHistogram;
+import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.GsonUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -244,6 +248,44 @@ public class BlockCacheUtil {
     return conf == null ? DEFAULT_MAX : 
conf.getInt("hbase.ui.blockcache.by.file.max", DEFAULT_MAX);
   }
 
+  /**
+   * Similarly to HFileBlock.Writer.getBlockForCaching(), creates a HFileBlock 
instance without
+   * checksum for caching. This is needed for when we cache blocks via readers 
(either prefetch or
+   * client read), otherwise we may fail equality comparison when checking 
against same block that
+   * may already have been cached at write time.
+   * @param cacheConf the related CacheConfig object.
+   * @param block     the HFileBlock instance to be converted.
+   * @return the resulting HFileBlock instance without checksum.
+   */
+  public static HFileBlock getBlockForCaching(CacheConfig cacheConf, 
HFileBlock block) {
+    // Calculate how many bytes we need for checksum on the tail of the block.
+    int numBytes = 
cacheConf.shouldCacheCompressed(block.getBlockType().getCategory())
+      ? 0
+      : (int) ChecksumUtil.numBytes(block.getOnDiskDataSizeWithHeader(),
+        block.getHFileContext().getBytesPerChecksum());
+    ByteBuff buff = block.getBufferReadOnly();
+    HFileBlockBuilder builder = new HFileBlockBuilder();
+    return builder.withBlockType(block.getBlockType())
+      .withOnDiskSizeWithoutHeader(block.getOnDiskSizeWithoutHeader())
+      
.withUncompressedSizeWithoutHeader(block.getUncompressedSizeWithoutHeader())
+      .withPrevBlockOffset(block.getPrevBlockOffset()).withByteBuff(buff)
+      
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
+      .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + 
numBytes)
+      .withHFileContext(cloneContext(block.getHFileContext()))
+      
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
+  }
+
+  public static HFileContext cloneContext(HFileContext context) {
+    HFileContext newContext = new 
HFileContextBuilder().withBlockSize(context.getBlocksize())
+      .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no 
checksums in cached data
+      .withCompression(context.getCompression())
+      .withDataBlockEncoding(context.getDataBlockEncoding())
+      
.withHBaseCheckSum(context.isUseHBaseChecksum()).withCompressTags(context.isCompressTags())
+      
.withIncludesMvcc(context.isIncludesMvcc()).withIncludesTags(context.isIncludesTags())
+      
.withColumnFamily(context.getColumnFamily()).withTableName(context.getTableName()).build();
+    return newContext;
+  }
+
   /**
    * Use one of these to keep a running account of cached blocks by file. 
Throw it away when done.
    * This is different than metrics in that it is stats on current state of a 
cache. See
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 7fb1f1ec85b..78f62bfc77f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -72,6 +72,8 @@ public class CacheConfig implements ConfigurationObserver {
    */
   public static final String EVICT_BLOCKS_ON_CLOSE_KEY = 
"hbase.rs.evictblocksonclose";
 
+  public static final String EVICT_BLOCKS_ON_SPLIT_KEY = 
"hbase.rs.evictblocksonsplit";
+
   /**
    * Configuration key to prefetch all blocks of a given file into the block 
cache when the file is
    * opened.
@@ -113,6 +115,7 @@ public class CacheConfig implements ConfigurationObserver {
   public static final boolean DEFAULT_CACHE_INDEXES_ON_WRITE = false;
   public static final boolean DEFAULT_CACHE_BLOOMS_ON_WRITE = false;
   public static final boolean DEFAULT_EVICT_ON_CLOSE = false;
+  public static final boolean DEFAULT_EVICT_ON_SPLIT = true;
   public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false;
   public static final boolean DEFAULT_PREFETCH_ON_OPEN = false;
   public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
index d6692d2e2bf..06bf2a76f75 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CombinedBlockCache.java
@@ -109,8 +109,6 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
         }
       } else {
         if (existInL1) {
-          LOG.warn("Cache key {} had block type {}, but was found in L1 
cache.", cacheKey,
-            cacheKey.getBlockType());
           updateBlockMetrics(block, cacheKey, l1Cache, caching);
         } else {
           updateBlockMetrics(block, cacheKey, l2Cache, caching);
@@ -504,4 +502,9 @@ public class CombinedBlockCache implements 
ResizableBlockCache, HeapSize {
     return l1Result.isPresent() ? l1Result : l2Cache.getBlockSize(key);
   }
 
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
+    return l1Cache.evictBlocksRangeByHfileName(hfileName, initOffset, 
endOffset)
+      + l2Cache.evictBlocksRangeByHfileName(hfileName, initOffset, endOffset);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 47c20b691b4..b24976707c3 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -697,7 +697,7 @@ public class HFileBlock implements Cacheable {
    * when block is returned to the cache.
    * @return the offset of this block in the file it was read from
    */
-  long getOffset() {
+  public long getOffset() {
     if (offset < 0) {
       throw new IllegalStateException("HFile block offset not initialized 
properly");
     }
@@ -1205,16 +1205,7 @@ public class HFileBlock implements Cacheable {
      * being wholesome (ECC memory or if file-backed, it does checksumming).
      */
     HFileBlock getBlockForCaching(CacheConfig cacheConf) {
-      HFileContext newContext = new 
HFileContextBuilder().withBlockSize(fileContext.getBlocksize())
-        .withBytesPerCheckSum(0).withChecksumType(ChecksumType.NULL) // no 
checksums in cached data
-        .withCompression(fileContext.getCompression())
-        .withDataBlockEncoding(fileContext.getDataBlockEncoding())
-        .withHBaseCheckSum(fileContext.isUseHBaseChecksum())
-        .withCompressTags(fileContext.isCompressTags())
-        .withIncludesMvcc(fileContext.isIncludesMvcc())
-        .withIncludesTags(fileContext.isIncludesTags())
-        
.withColumnFamily(fileContext.getColumnFamily()).withTableName(fileContext.getTableName())
-        .build();
+      HFileContext newContext = BlockCacheUtil.cloneContext(fileContext);
       // Build the HFileBlock.
       HFileBlockBuilder builder = new HFileBlockBuilder();
       ByteBuff buff;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
index 6063ffe6889..e6b79cc55cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFilePreadReader.java
@@ -45,7 +45,7 @@ public class HFilePreadReader extends HFileReaderImpl {
     });
 
     // Prefetch file blocks upon open if requested
-    if (cacheConf.shouldPrefetchOnOpen() && cacheIfCompactionsOff() && 
shouldCache.booleanValue()) {
+    if (cacheConf.shouldPrefetchOnOpen() && shouldCache.booleanValue()) {
       PrefetchExecutor.request(path, new Runnable() {
         @Override
         public void run() {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index e0585c6edaa..c66a709fe49 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
-import static 
org.apache.hadoop.hbase.regionserver.CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION;
 import static 
org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.BLOCK_CACHE_KEY_KEY;
 
 import io.opentelemetry.api.common.Attributes;
@@ -42,14 +41,12 @@ import org.apache.hadoop.hbase.SizeCachedByteBufferKeyValue;
 import org.apache.hadoop.hbase.SizeCachedKeyValue;
 import org.apache.hadoop.hbase.SizeCachedNoTagsByteBufferKeyValue;
 import org.apache.hadoop.hbase.SizeCachedNoTagsKeyValue;
-import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.IdLock;
@@ -159,6 +156,10 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
     }
   }
 
+  public CacheConfig getCacheConf() {
+    return cacheConf;
+  }
+
   private Optional<String> toStringFirstKey() {
     return getFirstKey().map(CellUtil::getCellKeyAsString);
   }
@@ -307,7 +308,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
     }
   }
 
-  protected static class HFileScannerImpl implements HFileScanner {
+  public static class HFileScannerImpl implements HFileScanner {
     private ByteBuff blockBuffer;
     protected final boolean cacheBlocks;
     protected final boolean pread;
@@ -331,6 +332,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
      * loaded yet.
      */
     protected Cell nextIndexedKey;
+
     // Current block being used. NOTICE: DON't release curBlock separately 
except in shipped() or
     // close() methods. Because the shipped() or close() will do the release 
finally, even if any
     // exception occur the curBlock will be released by the close() method (see
@@ -340,6 +342,11 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
     // Whether we returned a result for curBlock's size in recordBlockSize().
     // gets reset whenever curBlock is changed.
     private boolean providedCurrentBlockSize = false;
+
+    public HFileBlock getCurBlock() {
+      return curBlock;
+    }
+
     // Previous blocks that were used in the course of the read
     protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();
 
@@ -1283,8 +1290,6 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
       new BlockCacheKey(path, dataBlockOffset, this.isPrimaryReplicaReader(), 
expectedBlockType);
     Attributes attributes = Attributes.of(BLOCK_CACHE_KEY_KEY, 
cacheKey.toString());
 
-    boolean cacheable = cacheBlock && cacheIfCompactionsOff();
-
     boolean useLock = false;
     IdLock.Entry lockEntry = null;
     final Span span = Span.current();
@@ -1326,7 +1331,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
             return cachedBlock;
           }
 
-          if (!useLock && cacheable && 
cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
+          if (!useLock && cacheBlock && 
cacheConf.shouldLockOnCacheMiss(expectedBlockType)) {
             // check cache again with lock
             useLock = true;
             continue;
@@ -1337,7 +1342,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
         span.addEvent("block cache miss", attributes);
         // Load block from filesystem.
         HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, 
onDiskBlockSize, pread,
-          !isCompaction, shouldUseHeap(expectedBlockType, cacheable));
+          !isCompaction, shouldUseHeap(expectedBlockType, cacheBlock));
         try {
           validateBlockType(hfileBlock, expectedBlockType);
         } catch (IOException e) {
@@ -1350,25 +1355,30 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
 
         // Don't need the unpacked block back and we're storing the block in 
the cache compressed
         if (cacheOnly && cacheCompressed && cacheOnRead) {
+          HFileBlock blockNoChecksum = 
BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock);
           cacheConf.getBlockCache().ifPresent(cache -> {
             LOG.debug("Skipping decompression of block {} in prefetch", 
cacheKey);
             // Cache the block if necessary
-            if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
-              cache.cacheBlock(cacheKey, hfileBlock, cacheConf.isInMemory(), 
cacheOnly);
+            if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
+              cache.cacheBlock(cacheKey, blockNoChecksum, 
cacheConf.isInMemory(), cacheOnly);
             }
           });
 
           if (updateCacheMetrics && hfileBlock.getBlockType().isData()) {
             HFile.DATABLOCK_READ_COUNT.increment();
           }
-          return hfileBlock;
+          return blockNoChecksum;
         }
         HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
+        HFileBlock unpackedNoChecksum = 
BlockCacheUtil.getBlockForCaching(cacheConf, unpacked);
         // Cache the block if necessary
         cacheConf.getBlockCache().ifPresent(cache -> {
-          if (cacheable && cacheConf.shouldCacheBlockOnRead(category)) {
+          if (cacheBlock && cacheConf.shouldCacheBlockOnRead(category)) {
             // Using the wait on cache during compaction and prefetching.
-            cache.cacheBlock(cacheKey, cacheCompressed ? hfileBlock : unpacked,
+            cache.cacheBlock(cacheKey,
+              cacheCompressed
+                ? BlockCacheUtil.getBlockForCaching(cacheConf, hfileBlock)
+                : unpackedNoChecksum,
               cacheConf.isInMemory(), cacheOnly);
           }
         });
@@ -1380,7 +1390,7 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
           HFile.DATABLOCK_READ_COUNT.increment();
         }
 
-        return unpacked;
+        return unpackedNoChecksum;
       }
     } finally {
       if (lockEntry != null) {
@@ -1691,9 +1701,4 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
   public void unbufferStream() {
     fsBlockReader.unbufferStream();
   }
-
-  protected boolean cacheIfCompactionsOff() {
-    return (!StoreFileInfo.isReference(name) && !HFileLink.isHFileLink(name))
-      || !conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
-  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 71bfc757e51..7ee7a03ba64 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.RefCnt;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
@@ -215,6 +216,8 @@ public class BucketCache implements BlockCache, HeapSize {
   // reset after a successful read/write.
   private volatile long ioErrorStartTime = -1;
 
+  private Configuration conf;
+
   /**
    * A ReentrantReadWriteLock to lock on a particular block identified by 
offset. The purpose of
    * this is to avoid freeing the block which is being read.
@@ -291,6 +294,7 @@ public class BucketCache implements BlockCache, HeapSize {
     } else {
       this.offsetLock = new 
IdReadWriteLockWithObjectPool<>(ReferenceType.SOFT);
     }
+    this.conf = conf;
     this.algorithm = conf.get(FILE_VERIFY_ALGORITHM, 
DEFAULT_FILE_VERIFY_ALGORITHM);
     this.ioEngine = getIOEngineFromName(ioEngineName, capacity, 
persistencePath);
     this.writerThreads = new WriterThread[writerThreadNum];
@@ -560,6 +564,30 @@ public class BucketCache implements BlockCache, HeapSize {
     }
   }
 
+  /**
+   * If the passed cache key relates to a reference 
(<hfile>.<parentEncRegion>), this method looks
+   * for the block from the referred file, in the cache. If present in the 
cache, the block for the
+   * referred file is returned, otherwise, this method returns null. It will 
also return null if the
+   * passed cache key doesn't relate to a reference.
+   * @param key the BlockCacheKey instance to look for in the cache.
+   * @return the cached block from the referred file, null if there's no such 
block in the cache or
+   *         the passed key doesn't relate to a reference.
+   */
+  public BucketEntry getBlockForReference(BlockCacheKey key) {
+    BucketEntry foundEntry = null;
+    String referredFileName = null;
+    if (StoreFileInfo.isReference(key.getHfileName())) {
+      referredFileName = 
StoreFileInfo.getReferredToRegionAndFile(key.getHfileName()).getSecond();
+    }
+    if (referredFileName != null) {
+      BlockCacheKey convertedCacheKey = new BlockCacheKey(referredFileName, 
key.getOffset());
+      foundEntry = backingMap.get(convertedCacheKey);
+      LOG.debug("Got a link/ref: {}. Related cacheKey: {}. Found entry: {}", 
key.getHfileName(),
+        convertedCacheKey, foundEntry);
+    }
+    return foundEntry;
+  }
+
   /**
    * Get the buffer of the block with the specified key.
    * @param key                block's cache key
@@ -583,6 +611,9 @@ public class BucketCache implements BlockCache, HeapSize {
       return re.getData();
     }
     BucketEntry bucketEntry = backingMap.get(key);
+    if (bucketEntry == null) {
+      bucketEntry = getBlockForReference(key);
+    }
     if (bucketEntry != null) {
       long start = System.nanoTime();
       ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntry.offset());
@@ -591,7 +622,9 @@ public class BucketCache implements BlockCache, HeapSize {
         // We can not read here even if backingMap does contain the given key 
because its offset
         // maybe changed. If we lock BlockCacheKey instead of offset, then we 
can only check
         // existence here.
-        if (bucketEntry.equals(backingMap.get(key))) {
+        if (
+          bucketEntry.equals(backingMap.get(key)) || 
bucketEntry.equals(getBlockForReference(key))
+        ) {
           // Read the block from IOEngine based on the bucketEntry's offset 
and length, NOTICE: the
           // block will use the refCnt of bucketEntry, which means if two 
HFileBlock mapping to
           // the same BucketEntry, then all of the three will share the same 
refCnt.
@@ -1658,8 +1691,15 @@ public class BucketCache implements BlockCache, HeapSize 
{
    */
   @Override
   public int evictBlocksByHfileName(String hfileName) {
+    return evictBlocksRangeByHfileName(hfileName, 0, Long.MAX_VALUE);
+  }
+
+  @Override
+  public int evictBlocksRangeByHfileName(String hfileName, long initOffset, 
long endOffset) {
     fileNotFullyCached(hfileName);
-    Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName);
+    Set<BlockCacheKey> keySet = getAllCacheKeysForFile(hfileName, initOffset, 
endOffset);
+    LOG.debug("found {} blocks for file {}, starting offset: {}, end offset: 
{}", keySet.size(),
+      hfileName, initOffset, endOffset);
     int numEvicted = 0;
     for (BlockCacheKey key : keySet) {
       if (evictBlock(key)) {
@@ -1669,9 +1709,9 @@ public class BucketCache implements BlockCache, HeapSize {
     return numEvicted;
   }
 
-  private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName) {
-    return blocksByHFile.subSet(new BlockCacheKey(hfileName, Long.MIN_VALUE), 
true,
-      new BlockCacheKey(hfileName, Long.MAX_VALUE), true);
+  private Set<BlockCacheKey> getAllCacheKeysForFile(String hfileName, long 
init, long end) {
+    return blocksByHFile.subSet(new BlockCacheKey(hfileName, init), true,
+      new BlockCacheKey(hfileName, end), true);
   }
 
   /**
@@ -2081,25 +2121,20 @@ public class BucketCache implements BlockCache, 
HeapSize {
     try {
       final MutableInt count = new MutableInt();
       LOG.debug("iterating over {} entries in the backing map", 
backingMap.size());
-      backingMap.entrySet().stream().forEach(entry -> {
-        if (
-          entry.getKey().getHfileName().equals(fileName.getName())
-            && entry.getKey().getBlockType().equals(BlockType.DATA)
-        ) {
-          long offsetToLock = entry.getValue().offset();
-          LOG.debug("found block {} in the backing map. Acquiring read lock 
for offset {}",
-            entry.getKey(), offsetToLock);
-          ReentrantReadWriteLock lock = offsetLock.getLock(offsetToLock);
-          lock.readLock().lock();
-          locks.add(lock);
-          // rechecks the given key is still there (no eviction happened 
before the lock acquired)
-          if (backingMap.containsKey(entry.getKey())) {
-            count.increment();
-          } else {
-            lock.readLock().unlock();
-            locks.remove(lock);
-            LOG.debug("found block {}, but when locked and tried to count, it 
was gone.");
-          }
+      Set<BlockCacheKey> result = getAllCacheKeysForFile(fileName.getName(), 
0, Long.MAX_VALUE);
+      if (result.isEmpty() && StoreFileInfo.isReference(fileName)) {
+        result = getAllCacheKeysForFile(
+          
StoreFileInfo.getReferredToRegionAndFile(fileName.getName()).getSecond(), 0,
+          Long.MAX_VALUE);
+      }
+      result.stream().forEach(entry -> {
+        LOG.debug("found block for file {} in the backing map. Acquiring read 
lock for offset {}",
+          fileName.getName(), entry.getOffset());
+        ReentrantReadWriteLock lock = offsetLock.getLock(entry.getOffset());
+        lock.readLock().lock();
+        locks.add(lock);
+        if (backingMap.containsKey(entry) && entry.getBlockType() == 
BlockType.DATA) {
+          count.increment();
         }
       });
       int metaCount = totalBlockCount - dataBlockCount;
@@ -2121,17 +2156,19 @@ public class BucketCache implements BlockCache, 
HeapSize {
             + "and try the verification again.", fileName);
           Thread.sleep(100);
           notifyFileCachingCompleted(fileName, totalBlockCount, 
dataBlockCount, size);
-        } else
-          if ((getAllCacheKeysForFile(fileName.getName()).size() - metaCount) 
== dataBlockCount) {
-            LOG.debug("We counted {} data blocks, expected was {}, there was 
no more pending in "
-              + "the cache write queue but we now found that total cached 
blocks for file {} "
-              + "is equal to data block count.", count, dataBlockCount, 
fileName.getName());
-            fileCacheCompleted(fileName, size);
-          } else {
-            LOG.info("We found only {} data blocks cached from a total of {} 
for file {}, "
-              + "but no blocks pending caching. Maybe cache is full or 
evictions "
-              + "happened concurrently to cache prefetch.", count, 
dataBlockCount, fileName);
-          }
+        } else if (
+          (getAllCacheKeysForFile(fileName.getName(), 0, 
Long.MAX_VALUE).size() - metaCount)
+              == dataBlockCount
+        ) {
+          LOG.debug("We counted {} data blocks, expected was {}, there was no 
more pending in "
+            + "the cache write queue but we now found that total cached blocks 
for file {} "
+            + "is equal to data block count.", count, dataBlockCount, 
fileName.getName());
+          fileCacheCompleted(fileName, size);
+        } else {
+          LOG.info("We found only {} data blocks cached from a total of {} for 
file {}, "
+            + "but no blocks pending caching. Maybe cache is full or evictions 
"
+            + "happened concurrently to cache prefetch.", count, 
dataBlockCount, fileName);
+        }
       }
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
@@ -2157,14 +2194,20 @@ public class BucketCache implements BlockCache, 
HeapSize {
 
   @Override
   public Optional<Boolean> isAlreadyCached(BlockCacheKey key) {
-    return Optional.of(getBackingMap().containsKey(key));
+    boolean foundKey = backingMap.containsKey(key);
+    // if there's no entry for the key itself, we need to check if this key is 
for a reference,
+    // and if so, look for a block from the referenced file using this 
getBlockForReference method.
+    return Optional.of(foundKey ? true : getBlockForReference(key) != null);
   }
 
   @Override
   public Optional<Integer> getBlockSize(BlockCacheKey key) {
     BucketEntry entry = backingMap.get(key);
     if (entry == null) {
-      return Optional.empty();
+      // the key might be for a reference tha we had found the block from the 
referenced file in
+      // the cache when we first tried to cache it.
+      entry = getBlockForReference(key);
+      return entry == null ? Optional.empty() : 
Optional.of(entry.getOnDiskSizeWithHeader());
     } else {
       return Optional.of(entry.getOnDiskSizeWithHeader());
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
index 8dfc08a5de8..0ed740c7853 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/TransitRegionStateProcedure.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.master.assignment;
 
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_CLOSE;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.DEFAULT_EVICT_ON_SPLIT;
 import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY;
 import static org.apache.hadoop.hbase.master.LoadBalancer.BOGUS_SERVER_NAME;
 import static 
org.apache.hadoop.hbase.master.assignment.AssignmentManager.FORCE_REGION_RETAINMENT;
 
@@ -369,12 +371,15 @@ public class TransitRegionStateProcedure
     }
   }
 
-  private void closeRegionAfterUpdatingMeta(RegionStateNode regionNode) {
-    CloseRegionProcedure closeProc = isSplit
-      ? new CloseRegionProcedure(this, getRegion(), 
regionNode.getRegionLocation(), assignCandidate,
-        true)
-      : new CloseRegionProcedure(this, getRegion(), 
regionNode.getRegionLocation(), assignCandidate,
-        evictCache);
+  private void closeRegionAfterUpdatingMeta(MasterProcedureEnv env, 
RegionStateNode regionNode) {
+    CloseRegionProcedure closeProc =
+      isSplit
+        ? new CloseRegionProcedure(this, getRegion(), 
regionNode.getRegionLocation(),
+          assignCandidate,
+          env.getMasterConfiguration().getBoolean(EVICT_BLOCKS_ON_SPLIT_KEY,
+            DEFAULT_EVICT_ON_SPLIT))
+        : new CloseRegionProcedure(this, getRegion(), 
regionNode.getRegionLocation(),
+          assignCandidate, evictCache);
     addChildProcedure(closeProc);
     
setNextState(RegionStateTransitionState.REGION_STATE_TRANSITION_CONFIRM_CLOSED);
   }
@@ -383,7 +388,7 @@ public class TransitRegionStateProcedure
     throws IOException, ProcedureSuspendedException {
     if (
       ProcedureFutureUtil.checkFuture(this, this::getFuture, this::setFuture,
-        () -> closeRegionAfterUpdatingMeta(regionNode))
+        () -> closeRegionAfterUpdatingMeta(env, regionNode))
     ) {
       return;
     }
@@ -391,7 +396,7 @@ public class TransitRegionStateProcedure
       // this is the normal case
       ProcedureFutureUtil.suspendIfNecessary(this, this::setFuture,
         env.getAssignmentManager().regionClosing(regionNode), env,
-        () -> closeRegionAfterUpdatingMeta(regionNode));
+        () -> closeRegionAfterUpdatingMeta(env, regionNode));
     } else {
       forceNewPlan = true;
       regionNode.setRegionLocation(null);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index 4f872d7084e..7751df300e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -68,7 +68,7 @@ public class StoreFileReader {
   protected BloomFilter deleteFamilyBloomFilter = null;
   private BloomFilterMetrics bloomFilterMetrics = null;
   protected BloomType bloomFilterType;
-  private final HFile.Reader reader;
+  protected final HFile.Reader reader;
   protected long sequenceID = -1;
   protected TimeRange timeRange = null;
   private byte[] lastBloomKey;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 217f2ebbd45..a360759aea1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -124,11 +124,9 @@ public class UnassignRegionHandler extends EventHandler {
       region.getCoprocessorHost().preClose(abort);
     }
     // This should be true only in the case of splits/merges closing the 
parent regions, as
-    // there's no point on keep blocks for those region files. As 
hbase.rs.evictblocksonclose is
-    // false by default we don't bother overriding it if evictCache is false.
-    if (evictCache) {
-      region.getStores().forEach(s -> 
s.getCacheConfig().setEvictOnClose(true));
-    }
+    // there's no point on keep blocks for those region files.
+    region.getStores().forEach(s -> 
s.getCacheConfig().setEvictOnClose(evictCache));
+
     if (region.close(abort) == null) {
       // XXX: Is this still possible? The old comment says about split, but 
now split is done at
       // master side, so...
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java
new file mode 100644
index 00000000000..c308d5f6d83
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestSplitWithCache.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.CACHE_BLOCKS_ON_WRITE_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_SPLIT_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.PREFETCH_BLOCKS_ON_OPEN_KEY;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MiscTests.class, MediumTests.class })
+public class TestSplitWithCache {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSplitWithCache.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestSplitWithCache.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 
1000);
+    UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    UTIL.getConfiguration().setBoolean(CACHE_BLOCKS_ON_WRITE_KEY, true);
+    UTIL.getConfiguration().setBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, true);
+    UTIL.getConfiguration().set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    UTIL.getConfiguration().setInt(BUCKET_CACHE_SIZE_KEY, 200);
+  }
+
+  @Test
+  public void testEvictOnSplit() throws Exception {
+    doTest("testEvictOnSplit", true,
+      (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) 
!= null),
+      (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) 
== null));
+  }
+
+  @Test
+  public void testDoesntEvictOnSplit() throws Exception {
+    doTest("testDoesntEvictOnSplit", false,
+      (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) 
!= null),
+      (f, m) -> Waiter.waitFor(UTIL.getConfiguration(), 1000, () -> m.get(f) 
!= null));
+  }
+
+  private void doTest(String table, boolean evictOnSplit,
+    BiConsumer<String, Map<String, Pair<String, Long>>> predicateBeforeSplit,
+    BiConsumer<String, Map<String, Pair<String, Long>>> predicateAfterSplit) 
throws Exception {
+    UTIL.getConfiguration().setBoolean(EVICT_BLOCKS_ON_SPLIT_KEY, 
evictOnSplit);
+    UTIL.startMiniCluster(1);
+    try {
+      TableName tableName = TableName.valueOf(table);
+      byte[] family = Bytes.toBytes("CF");
+      TableDescriptor td = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build();
+      UTIL.getAdmin().createTable(td);
+      UTIL.waitTableAvailable(tableName);
+      Table tbl = UTIL.getConnection().getTable(tableName);
+      List<Put> puts = new ArrayList<>();
+      for (int i = 0; i < 1000; i++) {
+        Put p = new Put(Bytes.toBytes("row-" + i));
+        p.addColumn(family, Bytes.toBytes(1), Bytes.toBytes("val-" + i));
+        puts.add(p);
+      }
+      tbl.put(puts);
+      UTIL.getAdmin().flush(tableName);
+      Collection<HStoreFile> files =
+        
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getStores().get(0).getStorefiles();
+      checkCacheForBlocks(tableName, files, predicateBeforeSplit);
+      UTIL.getAdmin().split(tableName, Bytes.toBytes("row-500"));
+      Waiter.waitFor(UTIL.getConfiguration(), 30000,
+        () -> UTIL.getMiniHBaseCluster().getRegions(tableName).size() == 2);
+      UTIL.waitUntilNoRegionsInTransition();
+      checkCacheForBlocks(tableName, files, predicateAfterSplit);
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+
+  }
+
+  private void checkCacheForBlocks(TableName tableName, Collection<HStoreFile> 
files,
+    BiConsumer<String, Map<String, Pair<String, Long>>> checker) {
+    files.forEach(f -> {
+      
UTIL.getMiniHBaseCluster().getRegionServer(0).getBlockCache().ifPresent(cache 
-> {
+        cache.getFullyCachedFiles().ifPresent(m -> {
+          checker.accept(f.getPath().getName(), m);
+        });
+        assertTrue(cache.getFullyCachedFiles().isPresent());
+      });
+    });
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
index 0a41159e3aa..3a285a21f40 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHalfStoreFileReader.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.ReaderContext;
 import org.apache.hadoop.hbase.io.hfile.ReaderContextBuilder;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -82,15 +84,19 @@ public class TestHalfStoreFileReader {
    */
   @Test
   public void testHalfScanAndReseek() throws IOException {
-    String root_dir = TEST_UTIL.getDataTestDir().toString();
-    Path p = new Path(root_dir, "test");
-
     Configuration conf = TEST_UTIL.getConfiguration();
     FileSystem fs = FileSystem.get(conf);
+    String root_dir = TEST_UTIL.getDataTestDir().toString();
+    Path parentPath = new Path(new Path(root_dir, "parent"), "CF");
+    fs.mkdirs(parentPath);
+    Path splitAPath = new Path(new Path(root_dir, "splita"), "CF");
+    Path splitBPath = new Path(new Path(root_dir, "splitb"), "CF");
+    Path filePath = StoreFileWriter.getUniqueFile(fs, parentPath);
+
     CacheConfig cacheConf = new CacheConfig(conf);
     HFileContext meta = new HFileContextBuilder().withBlockSize(1024).build();
     HFile.Writer w =
-      HFile.getWriterFactory(conf, cacheConf).withPath(fs, 
p).withFileContext(meta).create();
+      HFile.getWriterFactory(conf, cacheConf).withPath(fs, 
filePath).withFileContext(meta).create();
 
     // write some things.
     List<KeyValue> items = genSomeKeys();
@@ -99,26 +105,35 @@ public class TestHalfStoreFileReader {
     }
     w.close();
 
-    HFile.Reader r = HFile.createReader(fs, p, cacheConf, true, conf);
+    HFile.Reader r = HFile.createReader(fs, filePath, cacheConf, true, conf);
     Cell midKV = r.midKey().get();
     byte[] midkey = CellUtil.cloneRow(midKV);
 
-    // System.out.println("midkey: " + midKV + " or: " + 
Bytes.toStringBinary(midkey));
+    Path splitFileA = new Path(splitAPath, filePath.getName() + ".parent");
+    Path splitFileB = new Path(splitBPath, filePath.getName() + ".parent");
 
     Reference bottom = new Reference(midkey, Reference.Range.bottom);
-    doTestOfScanAndReseek(p, fs, bottom, cacheConf);
+    bottom.write(fs, splitFileA);
+    doTestOfScanAndReseek(splitFileA, fs, bottom, cacheConf);
 
     Reference top = new Reference(midkey, Reference.Range.top);
-    doTestOfScanAndReseek(p, fs, top, cacheConf);
+    top.write(fs, splitFileB);
+    doTestOfScanAndReseek(splitFileB, fs, top, cacheConf);
 
     r.close();
   }
 
   private void doTestOfScanAndReseek(Path p, FileSystem fs, Reference bottom, 
CacheConfig cacheConf)
     throws IOException {
-    ReaderContext context = new 
ReaderContextBuilder().withFileSystemAndPath(fs, p).build();
-    StoreFileInfo storeFileInfo =
-      new StoreFileInfo(TEST_UTIL.getConfiguration(), fs, fs.getFileStatus(p), 
bottom);
+    Path referencePath = StoreFileInfo.getReferredToFile(p);
+    FSDataInputStreamWrapper in = new FSDataInputStreamWrapper(fs, 
referencePath, false, 0);
+    FileStatus status = fs.getFileStatus(referencePath);
+    long length = status.getLen();
+    ReaderContextBuilder contextBuilder =
+      new 
ReaderContextBuilder().withInputStreamWrapper(in).withFileSize(length)
+        
.withReaderType(ReaderContext.ReaderType.PREAD).withFileSystem(fs).withFilePath(p);
+    ReaderContext context = contextBuilder.build();
+    StoreFileInfo storeFileInfo = new 
StoreFileInfo(TEST_UTIL.getConfiguration(), fs, p, true);
     storeFileInfo.initHFileInfo(context);
     final HalfStoreFileReader halfreader =
       (HalfStoreFileReader) storeFileInfo.createReader(context, cacheConf);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
index 6083d872c82..b172202c8d4 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java
@@ -323,14 +323,6 @@ public class TestPrefetch {
     conf.setBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, false);
   }
 
-  @Test
-  public void testPrefetchSkipsRefs() throws Exception {
-    testPrefetchWhenRefs(true, c -> {
-      boolean isCached = c != null;
-      assertFalse(isCached);
-    });
-  }
-
   @Test
   public void testPrefetchDoesntSkipRefs() throws Exception {
     testPrefetchWhenRefs(false, c -> {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
index 2d0a85962ef..581d1893c17 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetchWithBucketCache.java
@@ -22,6 +22,7 @@ import static 
org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
 import static 
org.apache.hadoop.hbase.io.hfile.BlockCacheFactory.BUCKET_CACHE_BUCKETS_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -39,13 +40,20 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache;
 import org.apache.hadoop.hbase.io.hfile.bucket.BucketEntry;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -135,6 +143,55 @@ public class TestPrefetchWithBucketCache {
     assertTrue(snapshot.get(key).getCachedTime() < 
bc.getBackingMap().get(key).getCachedTime());
   }
 
+  @Test
+  public void testPrefetchRefsAfterSplit() throws Exception {
+    conf.setLong(BUCKET_CACHE_SIZE_KEY, 200);
+    blockCache = BlockCacheFactory.createBlockCache(conf);
+    cacheConf = new CacheConfig(conf, blockCache);
+
+    Path tableDir = new Path(TEST_UTIL.getDataTestDir(), 
"testPrefetchRefsAfterSplit");
+    RegionInfo region = 
RegionInfoBuilder.newBuilder(TableName.valueOf(tableDir.getName())).build();
+    Path regionDir = new Path(tableDir, region.getEncodedName());
+    Path cfDir = new Path(regionDir, "cf");
+    HRegionFileSystem regionFS =
+      HRegionFileSystem.createRegionOnFileSystem(conf, fs, tableDir, region);
+    Path storeFile = writeStoreFile(100, cfDir);
+
+    // Prefetches the file blocks
+    LOG.debug("First read should prefetch the blocks.");
+    readStoreFile(storeFile);
+    BucketCache bc = 
BucketCache.getBucketCacheFromCacheConfig(cacheConf).get();
+    // Our file should have 6 DATA blocks. We should wait for all of them to 
be cached
+    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
+
+    // split the file and return references to the original file
+    Random rand = ThreadLocalRandom.current();
+    byte[] splitPoint = RandomKeyValueUtil.randomOrderedKey(rand, 50);
+    HStoreFile file = new HStoreFile(fs, storeFile, conf, cacheConf, 
BloomType.NONE, true);
+    Path ref = regionFS.splitStoreFile(region, "cf", file, splitPoint, false,
+      new ConstantSizeRegionSplitPolicy());
+    HStoreFile refHsf = new HStoreFile(this.fs, ref, conf, cacheConf, 
BloomType.NONE, true);
+    // starts reader for the ref. The ref should resolve to the original file 
blocks
+    // and not duplicate blocks in the cache.
+    refHsf.initReader();
+    HFile.Reader reader = refHsf.getReader().getHFileReader();
+    while (!reader.prefetchComplete()) {
+      // Sleep for a bit
+      Thread.sleep(1000);
+    }
+    // the ref file blocks keys should actually resolve to the referred file 
blocks,
+    // so we should not see additional blocks in the cache.
+    Waiter.waitFor(conf, 300, () -> bc.getBackingMap().size() == 6);
+
+    BlockCacheKey refCacheKey = new BlockCacheKey(ref.getName(), 0);
+    Cacheable result = bc.getBlock(refCacheKey, true, false, true);
+    assertNotNull(result);
+    BlockCacheKey fileCacheKey = new BlockCacheKey(file.getPath().getName(), 
0);
+    assertEquals(result, bc.getBlock(fileCacheKey, true, false, true));
+    assertNull(bc.getBackingMap().get(refCacheKey));
+    assertNotNull(bc.getBlockForReference(refCacheKey));
+  }
+
   @Test
   public void testPrefetchInterruptOnCapacity() throws Exception {
     conf.setLong(BUCKET_CACHE_SIZE_KEY, 1);
@@ -270,10 +327,19 @@ public class TestPrefetchWithBucketCache {
     return writeStoreFile(fname, meta, numKVs);
   }
 
+  private Path writeStoreFile(int numKVs, Path regionCFDir) throws IOException 
{
+    HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE).build();
+    return writeStoreFile(meta, numKVs, regionCFDir);
+  }
+
   private Path writeStoreFile(String fname, HFileContext context, int numKVs) 
throws IOException {
-    Path storeFileParentDir = new Path(TEST_UTIL.getDataTestDir(), fname);
+    return writeStoreFile(context, numKVs, new 
Path(TEST_UTIL.getDataTestDir(), fname));
+  }
+
+  private Path writeStoreFile(HFileContext context, int numKVs, Path 
regionCFDir)
+    throws IOException {
     StoreFileWriter sfw = new StoreFileWriter.Builder(conf, cacheConf, fs)
-      .withOutputDir(storeFileParentDir).withFileContext(context).build();
+      .withOutputDir(regionCFDir).withFileContext(context).build();
     Random rand = ThreadLocalRandom.current();
     final int rowLen = 32;
     for (int i = 0; i < numKVs; ++i) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
index d60d2c53ef6..b3ac553582b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCachePersister.java
@@ -49,6 +49,8 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Category({ IOTests.class, MediumTests.class })
 public class TestBucketCachePersister {
@@ -61,6 +63,8 @@ public class TestBucketCachePersister {
 
   public int constructedBlockSize = 16 * 1024;
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestBucketCachePersister.class);
+
   public int[] constructedBlockSizes =
     new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 
1024,
       28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, 
128 * 1024 + 1024 };
@@ -166,6 +170,7 @@ public class TestBucketCachePersister {
     HFile.createReader(fs, storeFile, cacheConf, true, conf);
     boolean evicted = false;
     while (!PrefetchExecutor.isCompleted(storeFile)) {
+      LOG.debug("Entered loop as prefetch for {} is still running.", 
storeFile);
       if (bucketCache.backingMap.size() > 0 && !evicted) {
         Iterator<Map.Entry<BlockCacheKey, BucketEntry>> it =
           bucketCache.backingMap.entrySet().iterator();
@@ -174,6 +179,7 @@ public class TestBucketCachePersister {
         while (it.hasNext() && !evicted) {
           if (entry.getKey().getBlockType().equals(BlockType.DATA)) {
             evicted = bucketCache.evictBlock(it.next().getKey());
+            LOG.debug("Attempted eviction for {}. Succeeded? {}", storeFile, 
evicted);
           }
         }
       }

Reply via email to