This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch HBASE-21879
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d82f8a1cf122203fb22fe8d5312b6f7d0bfca839
Author: huzheng <open...@gmail.com>
AuthorDate: Mon Apr 1 22:23:24 2019 +0800

    HBASE-22127 Ensure that the block cached in the LRUBlockCache offheap is 
allocated from heap
---
 .../apache/hadoop/hbase/io/ByteBuffAllocator.java  |  20 ++-
 .../apache/hadoop/hbase/io/hfile/CacheConfig.java  |   4 +
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  86 +++++++-----
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     |  49 ++++---
 .../hadoop/hbase/io/hfile/LruBlockCache.java       |  32 ++++-
 .../hadoop/hbase/io/hfile/bucket/BucketCache.java  |  53 +++++---
 .../apache/hadoop/hbase/io/hfile/TestChecksum.java |  24 ++--
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    | 124 ++++++++++++++++--
 .../hadoop/hbase/io/hfile/TestHFileBlock.java      | 145 +++++++++++++++------
 .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java |   2 +-
 .../hadoop/hbase/io/hfile/TestHFileEncryption.java |   2 +-
 .../hadoop/hbase/io/hfile/TestHFileWriterV3.java   |   8 +-
 .../io/hfile/TestLazyDataBlockDecompression.java   |   2 +-
 .../io/hfile/bucket/TestBucketWriterThread.java    |   6 +-
 .../hadoop/hbase/master/AbstractTestDLS.java       |  29 ++---
 15 files changed, 426 insertions(+), 160 deletions(-)

diff --git 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
index 0020e23..984d46d 100644
--- 
a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
+++ 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import sun.nio.ch.DirectBuffer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
@@ -34,7 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -191,7 +191,7 @@ public class ByteBuffAllocator {
     }
     // If disabled the reservoir, just allocate it from on-heap.
     if (!isReservoirEnabled() || size == 0) {
-      return new SingleByteBuff(NONE, ByteBuffer.allocate(size));
+      return allocateOnHeap(size);
     }
     int reminder = size % bufSize;
     int len = size / bufSize + (reminder > 0 ? 1 : 0);
@@ -222,6 +222,22 @@ public class ByteBuffAllocator {
     return bb;
   }
 
+  /**
+   * Free all direct buffers if allocated, mainly used for testing.
+   */
+  @VisibleForTesting
+  public void clean() {
+    while (!buffers.isEmpty()) {
+      ByteBuffer b = buffers.poll();
+      if (b instanceof DirectBuffer) {
+        DirectBuffer db = (DirectBuffer) b;
+        if (db.cleaner() != null) {
+          db.cleaner().clean();
+        }
+      }
+    }
+  }
+
   public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) {
     if (buffers == null || buffers.length == 0) {
       throw new IllegalArgumentException("buffers shouldn't be null or empty");
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 53c216f..bb57fbe 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -367,6 +367,10 @@ public class CacheConfig {
     return Optional.ofNullable(this.blockCache);
   }
 
+  public boolean isCombinedBlockCache() {
+    return blockCache instanceof CombinedBlockCache;
+  }
+
   public ByteBuffAllocator getByteBuffAllocator() {
     return this.byteBuffAllocator;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 22a8295..2fe9255 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -763,6 +763,13 @@ public class HFileBlock implements Cacheable {
   }
 
   /**
+   * @return true to indicate the block is allocated from JVM heap, otherwise 
from off-heap.
+   */
+  boolean isOnHeap() {
+    return buf.hasArray();
+  }
+
+  /**
    * Unified version 2 {@link HFile} block writer. The intended usage pattern
    * is as follows:
    * <ol>
@@ -1300,16 +1307,29 @@ public class HFileBlock implements Cacheable {
   /** An HFile block reader with iteration ability. */
   interface FSReader {
     /**
-     * Reads the block at the given offset in the file with the given on-disk
-     * size and uncompressed size.
-     *
-     * @param offset
-     * @param onDiskSize the on-disk size of the entire block, including all
-     *          applicable headers, or -1 if unknown
+     * Reads the block at the given offset in the file with the given on-disk 
size and uncompressed
+     * size.
+     * @param offset of the file to read
+     * @param onDiskSize the on-disk size of the entire block, including all 
applicable headers, or
+     *          -1 if unknown
+     * @param pread true to use pread, otherwise use the stream read.
+     * @param updateMetrics update the metrics or not.
+     * @param intoHeap allocate the block's ByteBuff by {@link 
ByteBuffAllocator} or JVM heap. For
+     *          LRUBlockCache, we must ensure that the block to cache is an 
heap one, because the
+     *          memory occupation is based on heap now, also for {@link 
CombinedBlockCache}, we use
+     *          the heap LRUBlockCache as L1 cache to cache small blocks such 
as IndexBlock or
+     *          MetaBlock for faster access. So introduce an flag here to 
decide whether allocate
+     *          from JVM heap or not so that we can avoid an extra off-heap to 
heap memory copy when
+     *          using LRUBlockCache. For most cases, we known what's the 
expected block type we'll
+     *          read, while for some special case (Example: 
HFileReaderImpl#readNextDataBlock()), we
+     *          cannot pre-decide what's the expected block type, then we can 
only allocate block's
+     *          ByteBuff from {@link ByteBuffAllocator} firstly, and then when 
caching it in
+     *          {@link LruBlockCache} we'll check whether the ByteBuff is from 
heap or not, if not
+     *          then we'll clone it to an heap one and cache it.
      * @return the newly read block
      */
-    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, 
boolean updateMetrics)
-        throws IOException;
+    HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, 
boolean updateMetrics,
+        boolean intoHeap) throws IOException;
 
     /**
      * Creates a block iterator over the given portion of the {@link HFile}.
@@ -1444,7 +1464,7 @@ public class HFileBlock implements Cacheable {
           if (offset >= endOffset) {
             return null;
           }
-          HFileBlock b = readBlockData(offset, length, false, false);
+          HFileBlock b = readBlockData(offset, length, false, false, true);
           offset += b.getOnDiskSizeWithHeader();
           length = b.getNextBlockOnDiskSize();
           HFileBlock uncompressed = b.unpack(fileContext, owner);
@@ -1526,16 +1546,18 @@ public class HFileBlock implements Cacheable {
     /**
      * Reads a version 2 block (version 1 blocks not supported and not 
expected). Tries to do as
      * little memory allocation as possible, using the provided on-disk size.
-     *
      * @param offset the offset in the stream to read at
-     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
-     *          the header, or -1 if unknown; i.e. when iterating over blocks 
reading
-     *          in the file metadata info.
+     * @param onDiskSizeWithHeaderL the on-disk size of the block, including 
the header, or -1 if
+     *          unknown; i.e. when iterating over blocks reading in the file 
metadata info.
      * @param pread whether to use a positional read
+     * @param updateMetrics whether to update the metrics
+     * @param intoHeap allocate ByteBuff of block from heap or off-heap.
+     * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for 
more details about the
+     *      useHeap.
      */
     @Override
     public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, 
boolean pread,
-                                    boolean updateMetrics) throws IOException {
+        boolean updateMetrics, boolean intoHeap) throws IOException {
       // Get a copy of the current state of whether to validate
       // hbase checksums or not for this read call. This is not
       // thread-safe but the one constaint is that if we decide
@@ -1544,9 +1566,8 @@ public class HFileBlock implements Cacheable {
       boolean doVerificationThruHBaseChecksum = 
streamWrapper.shouldUseHBaseChecksum();
       FSDataInputStream is = 
streamWrapper.getStream(doVerificationThruHBaseChecksum);
 
-      HFileBlock blk = readBlockDataInternal(is, offset,
-                         onDiskSizeWithHeaderL, pread,
-                         doVerificationThruHBaseChecksum, updateMetrics);
+      HFileBlock blk = readBlockDataInternal(is, offset, 
onDiskSizeWithHeaderL, pread,
+        doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
       if (blk == null) {
         HFile.LOG.warn("HBase checksum verification failed for file " +
                        pathName + " at offset " +
@@ -1573,7 +1594,7 @@ public class HFileBlock implements Cacheable {
         is = 
this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD);
         doVerificationThruHBaseChecksum = false;
         blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread,
-                                    doVerificationThruHBaseChecksum, 
updateMetrics);
+          doVerificationThruHBaseChecksum, updateMetrics, intoHeap);
         if (blk != null) {
           HFile.LOG.warn("HDFS checksum verification succeeded for file " +
                          pathName + " at offset " +
@@ -1669,24 +1690,29 @@ public class HFileBlock implements Cacheable {
       return nextBlockOnDiskSize;
     }
 
+    private ByteBuff allocate(int size, boolean intoHeap) {
+      return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : 
allocator.allocate(size);
+    }
+
     /**
      * Reads a version 2 block.
-     *
      * @param offset the offset in the stream to read at.
-     * @param onDiskSizeWithHeaderL the on-disk size of the block, including
-     *          the header and checksums if present or -1 if unknown (as a 
long). Can be -1
-     *          if we are doing raw iteration of blocks as when loading up 
file metadata; i.e.
-     *          the first read of a new file. Usually non-null gotten from the 
file index.
+     * @param onDiskSizeWithHeaderL the on-disk size of the block, including 
the header and
+     *          checksums if present or -1 if unknown (as a long). Can be -1 
if we are doing raw
+     *          iteration of blocks as when loading up file metadata; i.e. the 
first read of a new
+     *          file. Usually non-null gotten from the file index.
      * @param pread whether to use a positional read
-     * @param verifyChecksum Whether to use HBase checksums.
-     *        If HBase checksum is switched off, then use HDFS checksum. Can 
also flip on/off
-     *        reading same file if we hit a troublesome patch in an hfile.
+     * @param verifyChecksum Whether to use HBase checksums. If HBase checksum 
is switched off, then
+     *          use HDFS checksum. Can also flip on/off reading same file if 
we hit a troublesome
+     *          patch in an hfile.
+     * @param updateMetrics whether need to update the metrics.
+     * @param intoHeap allocate the ByteBuff of block from heap or off-heap.
      * @return the HFileBlock or null if there is a HBase checksum mismatch
      */
     @VisibleForTesting
     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long 
offset,
-        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, 
boolean updateMetrics)
-     throws IOException {
+        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, 
boolean updateMetrics,
+        boolean intoHeap) throws IOException {
       if (offset < 0) {
         throw new IOException("Invalid offset=" + offset + " trying to read "
             + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")");
@@ -1728,7 +1754,7 @@ public class HFileBlock implements Cacheable {
       // says where to start reading. If we have the header cached, then we 
don't need to read
       // it again and we can likely read from last place we left off w/o need 
to backup and reread
       // the header we read last time through here.
-      ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + 
hdrSize);
+      ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, 
intoHeap);
       boolean initHFileBlockSuccess = false;
       try {
         if (headerBuf != null) {
@@ -2072,7 +2098,7 @@ public class HFileBlock implements Cacheable {
                    " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader;
   }
 
-  public HFileBlock deepClone() {
+  public HFileBlock deepCloneOnHeap() {
     return new HFileBlock(this, true);
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index c878773..d005f2a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -272,8 +272,8 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
             if (LOG.isTraceEnabled()) {
               LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, 
end));
             }
-            // TODO: Could we use block iterator in here? Would that get stuff 
into the cache?
-            HFileBlock prevBlock = null;
+            // Don't use BlockIterator here, because it's designed to read 
load-on-open section.
+            long onDiskSizeOfNextBlock = -1;
             while (offset < end) {
               if (Thread.interrupted()) {
                 break;
@@ -282,16 +282,17 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
               // the internal-to-hfileblock thread local which holds the 
overread that gets the
               // next header, will not have happened...so, pass in the 
onDiskSize gotten from the
               // cached block. This 'optimization' triggers extremely rarely 
I'd say.
-              long onDiskSize = prevBlock != null? 
prevBlock.getNextBlockOnDiskSize(): -1;
-              HFileBlock block = readBlock(offset, onDiskSize, 
/*cacheBlock=*/true,
-                  /*pread=*/true, false, false, null, null);
-              // Need not update the current block. Ideally here the readBlock 
won't find the
-              // block in cache. We call this readBlock so that block data is 
read from FS and
-              // cached in BC. So there is no reference count increment that 
happens here.
-              // The return will ideally be a noop because the block is not of 
MemoryType SHARED.
-              returnBlock(block);
-              prevBlock = block;
-              offset += block.getOnDiskSizeWithHeader();
+              HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* 
cacheBlock= */true,
+                /* pread= */true, false, false, null, null);
+              try {
+                onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize();
+                offset += block.getOnDiskSizeWithHeader();
+              } finally {
+                // Ideally here the readBlock won't find the block in cache. 
We call this
+                // readBlock so that block data is read from FS and cached in 
BC. we must call
+                // returnBlock here to decrease the reference count of block.
+                returnBlock(block);
+              }
             }
           } catch (IOException e) {
             // IOExceptions are probably due to region closes (relocation, 
etc.)
@@ -1412,7 +1413,7 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
       // Cache Miss, please load.
 
       HFileBlock compressedBlock =
-          fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false);
+          fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false, 
true);
       HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, 
fsBlockReader);
       if (compressedBlock != uncompressedBlock) {
         compressedBlock.release();
@@ -1427,6 +1428,24 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
     }
   }
 
+  /**
+   * If expected block is data block, we'll allocate the ByteBuff of block from
+   * {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} and it's usually an 
off-heap one,
+   * otherwise it will allocate from heap.
+   * @see 
org.apache.hadoop.hbase.io.hfile.HFileBlock.FSReader#readBlockData(long, long, 
boolean,
+   *      boolean, boolean)
+   */
+  private boolean shouldUseHeap(BlockType expectedBlockType) {
+    if (cacheConf.getBlockCache() == null) {
+      return false;
+    } else if (!cacheConf.isCombinedBlockCache()) {
+      // Block to cache in LruBlockCache must be an heap one. So just allocate 
block memory from
+      // heap for saving an extra off-heap to heap copying.
+      return true;
+    }
+    return expectedBlockType != null && !expectedBlockType.isData();
+  }
+
   @Override
   public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize,
       final boolean cacheBlock, boolean pread, final boolean isCompaction,
@@ -1496,8 +1515,8 @@ public class HFileReaderImpl implements HFile.Reader, 
Configurable {
 
         TraceUtil.addTimelineAnnotation("blockCacheMiss");
         // Load block from filesystem.
-        HFileBlock hfileBlock =
-            fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, 
pread, !isCompaction);
+        HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, 
onDiskBlockSize, pread,
+          !isCompaction, shouldUseHeap(expectedBlockType));
         validateBlockType(hfileBlock, expectedBlockType);
         HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader);
         BlockType.BlockCategory category = 
hfileBlock.getBlockType().getCategory();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index dd7c695..2aabb61 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -346,6 +346,32 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
     }
   }
 
+  /**
+   * The block cached in LRUBlockCache will always be an heap block: on the 
one side, the heap
+   * access will be more faster then off-heap, the small index block or meta 
block cached in
+   * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache 
size is always
+   * calculated based on the total heap size, if caching an off-heap block in 
LRUBlockCache, the
+   * heap size will be messed up. Here we will clone the block into an heap 
block if it's an
+   * off-heap block, otherwise just use the original block. The key point is 
maintain the refCnt of
+   * the block (HBASE-22127): <br>
+   * 1. if cache the cloned heap block, its refCnt is an totally new one, it's 
easy to handle; <br>
+   * 2. if cache the original heap block, we're sure that it won't be tracked 
in ByteBuffAllocator's
+   * reservoir, if both RPC and LRUBlockCache release the block, then it can 
be garbage collected by
+   * JVM, so need a retain here.
+   * @param buf the original block
+   * @return an block with an heap memory backend.
+   */
+  private Cacheable asReferencedHeapBlock(Cacheable buf) {
+    if (buf instanceof HFileBlock) {
+      HFileBlock blk = ((HFileBlock) buf);
+      if (!blk.isOnHeap()) {
+        return blk.deepCloneOnHeap();
+      }
+    }
+    // The block will be referenced by this LRUBlockCache, so should increase 
its refCnt here.
+    return buf.retain();
+  }
+
   // BlockCache implementation
 
   /**
@@ -394,8 +420,8 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
       }
       return;
     }
-    // The block will be referenced by the LRUBlockCache, so should increase 
the refCnt here.
-    buf.retain();
+    // Ensure that the block is an heap one.
+    buf = asReferencedHeapBlock(buf);
     cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
     long newSize = updateSizeMetrics(cb, false);
     map.put(cacheKey, cb);
@@ -495,7 +521,7 @@ public class LruBlockCache implements ResizableBlockCache, 
HeapSize {
           if (caching) {
             if (result instanceof HFileBlock && ((HFileBlock) 
result).usesSharedMemory()) {
               Cacheable original = result;
-              result = ((HFileBlock) original).deepClone();
+              result = ((HFileBlock) original).deepCloneOnHeap();
               // deepClone an new one, so need to put the original one back to 
free it.
               victimHandler.returnBlock(cacheKey, original);
             }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 1db1013..7c709c1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -564,10 +566,10 @@ public class BucketCache implements BlockCache, HeapSize {
     if (!cacheEnabled) {
       return false;
     }
-    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
+    boolean existed = removeFromRamCache(cacheKey);
     BucketEntry bucketEntry = backingMap.get(cacheKey);
     if (bucketEntry == null) {
-      if (removedBlock != null) {
+      if (existed) {
         cacheStats.evicted(0, cacheKey.isPrimary());
         return true;
       } else {
@@ -578,7 +580,7 @@ public class BucketCache implements BlockCache, HeapSize {
     try {
       lock.writeLock().lock();
       if (backingMap.remove(cacheKey, bucketEntry)) {
-        blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+        blockEvicted(cacheKey, bucketEntry, !existed);
       } else {
         return false;
       }
@@ -589,23 +591,23 @@ public class BucketCache implements BlockCache, HeapSize {
     return true;
   }
 
-  private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
-    RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
-    if (removedBlock != null) {
-      this.blockNumber.decrement();
-      this.heapSize.add(-1 * removedBlock.getData().heapSize());
-    }
-    return removedBlock;
+  private boolean removeFromRamCache(BlockCacheKey cacheKey) {
+    return ramCache.remove(cacheKey, re -> {
+      if (re != null) {
+        this.blockNumber.decrement();
+        this.heapSize.add(-1 * re.getData().heapSize());
+      }
+    });
   }
 
   public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) {
     if (!cacheEnabled) {
       return false;
     }
-    RAMQueueEntry removedBlock = checkRamCache(cacheKey);
+    boolean existed = removeFromRamCache(cacheKey);
     BucketEntry bucketEntry = backingMap.get(cacheKey);
     if (bucketEntry == null) {
-      if (removedBlock != null) {
+      if (existed) {
         cacheStats.evicted(0, cacheKey.isPrimary());
         return true;
       } else {
@@ -618,7 +620,7 @@ public class BucketCache implements BlockCache, HeapSize {
       int refCount = bucketEntry.getRefCount();
       if (refCount == 0) {
         if (backingMap.remove(cacheKey, bucketEntry)) {
-          blockEvicted(cacheKey, bucketEntry, removedBlock == null);
+          blockEvicted(cacheKey, bucketEntry, !existed);
         } else {
           return false;
         }
@@ -1041,10 +1043,12 @@ public class BucketCache implements BlockCache, 
HeapSize {
           putIntoBackingMap(key, bucketEntries[i]);
         }
         // Always remove from ramCache even if we failed adding it to the 
block cache above.
-        RAMQueueEntry ramCacheEntry = ramCache.remove(key);
-        if (ramCacheEntry != null) {
-          heapSize.add(-1 * entries.get(i).getData().heapSize());
-        } else if (bucketEntries[i] != null){
+        boolean existed = ramCache.remove(key, re -> {
+          if (re != null) {
+            heapSize.add(-1 * re.getData().heapSize());
+          }
+        });
+        if (!existed && bucketEntries[i] != null) {
           // Block should have already been evicted. Remove it and free space.
           ReentrantReadWriteLock lock = 
offsetLock.getLock(bucketEntries[i].offset());
           try {
@@ -1753,12 +1757,23 @@ public class BucketCache implements BlockCache, 
HeapSize {
       return previous;
     }
 
-    public RAMQueueEntry remove(BlockCacheKey key) {
+    public boolean remove(BlockCacheKey key) {
+      return remove(key, re->{});
+    }
+
+    /**
+     * Defined an {@link Consumer} here, because once the removed entry 
release its reference count,
+     * then it's ByteBuffers may be recycled and accessing it outside this 
method will be thrown an
+     * exception. the consumer will access entry to remove before release its 
reference count.
+     * Notice, don't change its reference count in the {@link Consumer}
+     */
+    public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) {
       RAMQueueEntry previous = delegate.remove(key);
+      action.accept(previous);
       if (previous != null) {
         previous.getData().release();
       }
-      return previous;
+      return previous != null;
     }
 
     public boolean isEmpty() {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
index c432fa9..2aebc8c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java
@@ -100,7 +100,8 @@ public class TestChecksum {
     meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
     HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, 
(HFileSystem) fs, path,
         meta, ByteBuffAllocator.HEAP);
-    HFileBlock b = hbr.readBlockData(0, -1, false, false);
+    HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
+    assertTrue(b.isOnHeap());
     assertEquals(b.getChecksumType(), 
ChecksumType.getDefaultChecksumType().getCode());
   }
 
@@ -146,7 +147,8 @@ public class TestChecksum {
       meta = new HFileContextBuilder().withHBaseCheckSum(true).build();
       HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, 
(HFileSystem) fs, path,
           meta, ByteBuffAllocator.HEAP);
-      HFileBlock b = hbr.readBlockData(0, -1, false, false);
+      HFileBlock b = hbr.readBlockData(0, -1, false, false, true);
+      assertTrue(b.isOnHeap());
 
       // verify SingleByteBuff checksum.
       verifySBBCheckSum(b.getBufferReadOnly());
@@ -215,7 +217,7 @@ public class TestChecksum {
               .withHBaseCheckSum(true)
               .build();
         HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, 
path, meta);
-        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         b.sanityCheck();
         assertEquals(4936, b.getUncompressedSizeWithoutHeader());
         assertEquals(algo == GZ ? 2173 : 4936,
@@ -236,19 +238,19 @@ public class TestChecksum {
         // requests. Verify that this is correct.
         for (int i = 0; i <
              HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) {
-          b = hbr.readBlockData(0, -1, pread, false);
+          b = hbr.readBlockData(0, -1, pread, false, true);
           assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
           assertEquals(0, HFile.getAndResetChecksumFailuresCount());
         }
         // The next read should have hbase checksum verification reanabled,
         // we verify this by assertng that there was a hbase-checksum failure.
-        b = hbr.readBlockData(0, -1, pread, false);
+        b = hbr.readBlockData(0, -1, pread, false, true);
         assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         assertEquals(1, HFile.getAndResetChecksumFailuresCount());
 
         // Since the above encountered a checksum failure, we switch
         // back to not checking hbase checksums.
-        b = hbr.readBlockData(0, -1, pread, false);
+        b = hbr.readBlockData(0, -1, pread, false, true);
         assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
         is.close();
@@ -260,7 +262,7 @@ public class TestChecksum {
         assertEquals(false, newfs.useHBaseChecksum());
         is = new FSDataInputStreamWrapper(newfs, path);
         hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta);
-        b = hbr.readBlockData(0, -1, pread, false);
+        b = hbr.readBlockData(0, -1, pread, false, true);
         is.close();
         b.sanityCheck();
         b = b.unpack(meta, hbr);
@@ -343,7 +345,7 @@ public class TestChecksum {
         HFileBlock.FSReader hbr =
             new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, 
nochecksum), totalSize,
                 hfs, path, meta, ByteBuffAllocator.HEAP);
-        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff);
         is.close();
         b.sanityCheck();
@@ -389,13 +391,13 @@ public class TestChecksum {
 
     @Override
     protected HFileBlock readBlockDataInternal(FSDataInputStream is, long 
offset,
-        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, 
boolean updateMetrics)
-        throws IOException {
+        long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, 
boolean updateMetrics,
+        boolean useHeap) throws IOException {
       if (verifyChecksum) {
         corruptDataStream = true;
       }
       HFileBlock b = super.readBlockDataInternal(is, offset, 
onDiskSizeWithHeaderL, pread,
-          verifyChecksum, updateMetrics);
+        verifyChecksum, updateMetrics, useHeap);
       corruptDataStream = false;
       return b;
     }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index f58fe3e..0ed933b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -17,8 +17,12 @@
  */
 package org.apache.hadoop.hbase.io.hfile;
 
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY;
+import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY;
+import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY;
 import static 
org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY;
 import static 
org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY;
+import static 
org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -104,33 +108,129 @@ public class TestHFile  {
     fs = TEST_UTIL.getTestFileSystem();
   }
 
-  @Test
-  public void testReaderWithoutBlockCache() throws Exception {
-    int bufCount = 32;
+  private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int 
bufSize, int bufCount,
+      int minAllocSize) {
     Configuration that = HBaseConfiguration.create(conf);
+    that.setInt(BUFFER_SIZE_KEY, bufSize);
     that.setInt(MAX_BUFFER_COUNT_KEY, bufCount);
-    // AllByteBuffers will be allocated from the buffers.
-    that.setInt(MIN_ALLOCATE_SIZE_KEY, 0);
-    ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true);
-    List<ByteBuff> buffs = new ArrayList<>();
+    // All ByteBuffers will be allocated from the buffers.
+    that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize);
+    return ByteBuffAllocator.create(that, reservoirEnabled);
+  }
+
+  private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) {
     // Fill the allocator with bufCount ByteBuffer
+    List<ByteBuff> buffs = new ArrayList<>();
     for (int i = 0; i < bufCount; i++) {
       buffs.add(alloc.allocateOneBuffer());
+      Assert.assertEquals(alloc.getQueueSize(), 0);
     }
-    Assert.assertEquals(alloc.getQueueSize(), 0);
-    for (ByteBuff buf : buffs) {
-      buf.release();
-    }
+    buffs.forEach(ByteBuff::release);
     Assert.assertEquals(alloc.getQueueSize(), bufCount);
+  }
+
+  @Test
+  public void testReaderWithoutBlockCache() throws Exception {
+    int bufCount = 32;
+    // AllByteBuffers will be allocated from the buffers.
+    ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0);
+    fillByteBuffAllocator(alloc, bufCount);
     // start write to store file.
     Path path = writeStoreFile();
     try {
-      readStoreFile(path, that, alloc);
+      readStoreFile(path, conf, alloc);
     } catch (Exception e) {
       // fail test
       assertTrue(false);
     }
     Assert.assertEquals(bufCount, alloc.getQueueSize());
+    alloc.clean();
+  }
+
+  /**
+   * Test case for HBASE-22127 in LruBlockCache.
+   */
+  @Test
+  public void testReaderWithLRUBlockCache() throws Exception {
+    int bufCount = 1024, blockSize = 64 * 1024;
+    ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
+    fillByteBuffAllocator(alloc, bufCount);
+    Path storeFilePath = writeStoreFile();
+    // Open the file reader with LRUBlockCache
+    BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, 
conf);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc);
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, 
true, conf);
+    long offset = 0;
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
+      HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, 
null, null);
+      offset += block.getOnDiskSizeWithHeader();
+      // Ensure the block is an heap one.
+      Cacheable cachedBlock = lru.getBlock(key, false, false, true);
+      Assert.assertNotNull(cachedBlock);
+      Assert.assertTrue(cachedBlock instanceof HFileBlock);
+      Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap());
+      // Should never allocate off-heap block from allocator because ensure 
that it's LRU.
+      Assert.assertEquals(bufCount, alloc.getQueueSize());
+      block.release(); // return back the ByteBuffer back to allocator.
+    }
+    reader.close();
+    Assert.assertEquals(bufCount, alloc.getQueueSize());
+    alloc.clean();
+    lru.shutdown();
+  }
+
+  private BlockCache initCombinedBlockCache() {
+    Configuration that = HBaseConfiguration.create(conf);
+    that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
+    that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    BlockCache bc = BlockCacheFactory.createBlockCache(that);
+    Assert.assertNotNull(bc);
+    Assert.assertTrue(bc instanceof CombinedBlockCache);
+    return bc;
+  }
+
+  /**
+   * Test case for HBASE-22127 in CombinedBlockCache
+   */
+  @Test
+  public void testReaderWithCombinedBlockCache() throws Exception {
+    int bufCount = 1024, blockSize = 64 * 1024;
+    ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0);
+    fillByteBuffAllocator(alloc, bufCount);
+    Path storeFilePath = writeStoreFile();
+    // Open the file reader with CombinedBlockCache
+    BlockCache combined = initCombinedBlockCache();
+    conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc);
+    HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, 
true, conf);
+    long offset = 0;
+    while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) {
+      BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset);
+      HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, 
null, null);
+      offset += block.getOnDiskSizeWithHeader();
+      // Read the cached block.
+      Cacheable cachedBlock = combined.getBlock(key, false, false, true);
+      try {
+        Assert.assertNotNull(cachedBlock);
+        Assert.assertTrue(cachedBlock instanceof HFileBlock);
+        HFileBlock hfb = (HFileBlock) cachedBlock;
+        // Data block will be cached in BucketCache, so it should be an 
off-heap block.
+        if (hfb.getBlockType().isData()) {
+          Assert.assertFalse(hfb.isOnHeap());
+        } else {
+          // Non-data block will be cached in LRUBlockCache, so it must be an 
on-heap block.
+          Assert.assertTrue(hfb.isOnHeap());
+        }
+      } finally {
+        combined.returnBlock(key, cachedBlock);
+      }
+      block.release(); // return back the ByteBuffer back to allocator.
+    }
+    reader.close();
+    combined.shutdown();
+    Assert.assertEquals(bufCount, alloc.getQueueSize());
+    alloc.clean();
   }
 
   private void readStoreFile(Path storeFilePath, Configuration conf, 
ByteBuffAllocator alloc)
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
index efdae16..2733ca2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java
@@ -40,6 +40,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag;
 import org.apache.hadoop.hbase.CellComparatorImpl;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
@@ -68,6 +71,7 @@ import org.apache.hadoop.hbase.util.ChecksumType;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.compress.Compressor;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -93,10 +97,12 @@ public class TestHFileBlock {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestHFileBlock.class);
 
-  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ };
+  // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB 
unpack yet.
+  static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ 
*/ };
 
   private static final int NUM_TEST_BLOCKS = 1000;
   private static final int NUM_READER_THREADS = 26;
+  private static final int MAX_BUFFER_COUNT = 2048;
 
   // Used to generate KeyValues
   private static int NUM_KEYVALUES = 50;
@@ -108,14 +114,51 @@ public class TestHFileBlock {
 
   private final boolean includesMemstoreTS;
   private final boolean includesTag;
-  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) {
+  private final boolean useHeapAllocator;
+  private final ByteBuffAllocator alloc;
+
+  public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, 
boolean useHeapAllocator) {
     this.includesMemstoreTS = includesMemstoreTS;
     this.includesTag = includesTag;
+    this.useHeapAllocator = useHeapAllocator;
+    this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : 
createOffHeapAlloc();
+    assertAllocator();
   }
 
   @Parameters
   public static Collection<Object[]> parameters() {
-    return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED;
+    List<Object[]> params = new ArrayList<>();
+    // Generate boolean triples from 000 to 111
+    for (int i = 0; i < (1 << 3); i++) {
+      Object[] flags = new Boolean[3];
+      for (int k = 0; k < 3; k++) {
+        flags[k] = (i & (1 << k)) != 0;
+      }
+      params.add(flags);
+    }
+    return params;
+  }
+
+  private ByteBuffAllocator createOffHeapAlloc() {
+    Configuration conf = 
HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+    conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT);
+    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+    ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true);
+    // Fill the allocator
+    List<ByteBuff> bufs = new ArrayList<>();
+    for (int i = 0; i < MAX_BUFFER_COUNT; i++) {
+      ByteBuff bb = alloc.allocateOneBuffer();
+      assertTrue(!bb.hasArray());
+      bufs.add(bb);
+    }
+    bufs.forEach(ByteBuff::release);
+    return alloc;
+  }
+
+  private void assertAllocator() {
+    if (!useHeapAllocator) {
+      assertEquals(MAX_BUFFER_COUNT, alloc.getQueueSize());
+    }
   }
 
   @Before
@@ -123,6 +166,12 @@ public class TestHFileBlock {
     fs = HFileSystem.get(TEST_UTIL.getConfiguration());
   }
 
+  @After
+  public void tearDown() throws IOException {
+    assertAllocator();
+    alloc.clean();
+  }
+
   static void writeTestBlockContents(DataOutputStream dos) throws IOException {
     // This compresses really well.
     for (int i = 0; i < 1000; ++i)
@@ -327,9 +376,8 @@ public class TestHFileBlock {
         .withIncludesMvcc(includesMemstoreTS)
         .withIncludesTags(includesTag)
         .withCompression(algo).build();
-        HFileBlock.FSReader hbr =
-            new HFileBlock.FSReaderImpl(is, totalSize, meta, 
ByteBuffAllocator.HEAP);
-        HFileBlock b = hbr.readBlockData(0, -1, pread, false);
+        HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, 
meta, alloc);
+        HFileBlock b = hbr.readBlockData(0, -1, pread, false, true);
         is.close();
         assertEquals(0, HFile.getAndResetChecksumFailuresCount());
 
@@ -341,14 +389,14 @@ public class TestHFileBlock {
 
         if (algo == GZ) {
           is = fs.open(path);
-          hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, 
ByteBuffAllocator.HEAP);
-          b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE +
-                                b.totalChecksumBytes(), pread, false);
+          hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
+          b = hbr.readBlockData(0,
+            2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), 
pread, false, true);
           assertEquals(expected, b);
           int wrongCompressedSize = 2172;
           try {
-            b = hbr.readBlockData(0, wrongCompressedSize
-                + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false);
+            hbr.readBlockData(0, wrongCompressedSize + 
HConstants.HFILEBLOCK_HEADER_SIZE, pread,
+              false, true);
             fail("Exception expected");
           } catch (IOException ex) {
             String expectedPrefix = "Passed in onDiskSizeWithHeader=";
@@ -356,8 +404,10 @@ public class TestHFileBlock {
                 + "'.\nMessage is expected to start with: '" + expectedPrefix
                 + "'", ex.getMessage().startsWith(expectedPrefix));
           }
+          assertTrue(b.release());
           is.close();
         }
+        assertTrue(expected.release());
       }
     }
   }
@@ -428,13 +478,13 @@ public class TestHFileBlock {
                 .withIncludesTags(includesTag)
                 .build();
           HFileBlock.FSReaderImpl hbr =
-              new HFileBlock.FSReaderImpl(is, totalSize, meta, 
ByteBuffAllocator.HEAP);
+              new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
           hbr.setDataBlockEncoder(dataBlockEncoder);
           hbr.setIncludesMemStoreTS(includesMemstoreTS);
           HFileBlock blockFromHFile, blockUnpacked;
           int pos = 0;
           for (int blockId = 0; blockId < numBlocks; ++blockId) {
-            blockFromHFile = hbr.readBlockData(pos, -1, pread, false);
+            blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true);
             assertEquals(0, HFile.getAndResetChecksumFailuresCount());
             blockFromHFile.sanityCheck();
             pos += blockFromHFile.getOnDiskSizeWithHeader();
@@ -487,6 +537,10 @@ public class TestHFileBlock {
                   blockUnpacked, deserialized.unpack(meta, hbr));
               }
             }
+            assertTrue(blockUnpacked.release());
+            if (blockFromHFile != blockUnpacked) {
+              blockFromHFile.release();
+            }
           }
           is.close();
         }
@@ -557,7 +611,7 @@ public class TestHFileBlock {
                               .withIncludesTags(includesTag)
                               .withCompression(algo).build();
           HFileBlock.FSReader hbr =
-              new HFileBlock.FSReaderImpl(is, totalSize, meta, 
ByteBuffAllocator.HEAP);
+              new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc);
           long curOffset = 0;
           for (int i = 0; i < NUM_TEST_BLOCKS; ++i) {
             if (!pread) {
@@ -569,7 +623,7 @@ public class TestHFileBlock {
             if (detailedLogging) {
               LOG.info("Reading block #" + i + " at offset " + curOffset);
             }
-            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false);
+            HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, 
true);
             if (detailedLogging) {
               LOG.info("Block #" + i + ": " + b);
             }
@@ -583,7 +637,8 @@ public class TestHFileBlock {
 
             // Now re-load this block knowing the on-disk size. This tests a
             // different branch in the loader.
-            HFileBlock b2 = hbr.readBlockData(curOffset, 
b.getOnDiskSizeWithHeader(), pread, false);
+            HFileBlock b2 =
+                hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), 
pread, false, true);
             b2.sanityCheck();
 
             assertEquals(b.getBlockType(), b2.getBlockType());
@@ -599,6 +654,7 @@ public class TestHFileBlock {
             assertEquals(b.getOnDiskDataSizeWithHeader(),
                          b2.getOnDiskDataSizeWithHeader());
             assertEquals(0, HFile.getAndResetChecksumFailuresCount());
+            assertTrue(b2.release());
 
             curOffset += b.getOnDiskSizeWithHeader();
 
@@ -606,14 +662,14 @@ public class TestHFileBlock {
               // NOTE: cache-on-write testing doesn't actually involve a 
BlockCache. It simply
               // verifies that the unpacked value read back off disk matches 
the unpacked value
               // generated before writing to disk.
-              b = b.unpack(meta, hbr);
+              HFileBlock newBlock = b.unpack(meta, hbr);
               // b's buffer has header + data + checksum while
               // expectedContents have header + data only
-              ByteBuff bufRead = b.getBufferReadOnly();
+              ByteBuff bufRead = newBlock.getBufferReadOnly();
               ByteBuffer bufExpected = expectedContents.get(i);
               boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(),
                   bufRead.arrayOffset(),
-                  bufRead.limit() - b.totalChecksumBytes(),
+                  bufRead.limit() - newBlock.totalChecksumBytes(),
                   bufExpected.array(), bufExpected.arrayOffset(),
                   bufExpected.limit()) == 0;
               String wrongBytesMsg = "";
@@ -642,9 +698,12 @@ public class TestHFileBlock {
                 }
               }
               assertTrue(wrongBytesMsg, bytesAreCorrect);
+              assertTrue(newBlock.release());
+              if (newBlock != b) {
+                assertTrue(b.release());
+              }
             }
           }
-
           assertEquals(curOffset, fs.getFileStatus(path).getLen());
           is.close();
         }
@@ -687,29 +746,37 @@ public class TestHFileBlock {
         boolean pread = true;
         boolean withOnDiskSize = rand.nextBoolean();
         long expectedSize =
-          (blockId == NUM_TEST_BLOCKS - 1 ? fileSize
-              : offsets.get(blockId + 1)) - offset;
-
-        HFileBlock b;
+            (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 
1)) - offset;
+        HFileBlock b = null;
         try {
           long onDiskSizeArg = withOnDiskSize ? expectedSize : -1;
-          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false);
+          b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false);
+          if (useHeapAllocator) {
+            assertTrue(b.isOnHeap());
+          } else {
+            assertTrue(!b.getBlockType().isData() || !b.isOnHeap());
+          }
+          assertEquals(types.get(blockId), b.getBlockType());
+          assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
+          assertEquals(offset, b.getOffset());
         } catch (IOException ex) {
-          LOG.error("Error in client " + clientId + " trying to read block at "
-              + offset + ", pread=" + pread + ", withOnDiskSize=" +
-              withOnDiskSize, ex);
+          LOG.error("Error in client " + clientId + " trying to read block at 
" + offset
+              + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize,
+            ex);
           return false;
+        } finally {
+          if (b != null) {
+            b.release();
+          }
         }
-
-        assertEquals(types.get(blockId), b.getBlockType());
-        assertEquals(expectedSize, b.getOnDiskSizeWithHeader());
-        assertEquals(offset, b.getOffset());
-
         ++numBlocksRead;
-        if (pread)
+        if (pread) {
           ++numPositionalRead;
-        if (withOnDiskSize)
+        }
+
+        if (withOnDiskSize) {
           ++numWithOnDiskSize;
+        }
       }
       LOG.info("Client " + clientId + " successfully read " + numBlocksRead +
         " blocks (with pread: " + numPositionalRead + ", with onDiskSize " +
@@ -717,7 +784,6 @@ public class TestHFileBlock {
 
       return true;
     }
-
   }
 
   @Test
@@ -742,7 +808,7 @@ public class TestHFileBlock {
                           .withCompression(compressAlgo)
                           .build();
       HFileBlock.FSReader hbr =
-          new HFileBlock.FSReaderImpl(is, fileSize, meta, 
ByteBuffAllocator.HEAP);
+          new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc);
 
       Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS);
       ExecutorCompletionService<Boolean> ecs = new 
ExecutorCompletionService<>(exec);
@@ -761,7 +827,6 @@ public class TestHFileBlock {
             + ")");
         }
       }
-
       is.close();
     }
   }
@@ -874,9 +939,9 @@ public class TestHFileBlock {
     ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size);
     HFileContext meta = new HFileContextBuilder().build();
     HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, 
size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP);
+        HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc);
     HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, 
size, size, -1, buf,
-        HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP);
+        HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc);
     ByteBuffer buff1 = ByteBuffer.allocate(length);
     ByteBuffer buff2 = ByteBuffer.allocate(length);
     blockWithNextBlockMetadata.serialize(buff1, true);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
index 73f1c24..6f8d0b0 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java
@@ -192,7 +192,7 @@ public class TestHFileBlockIndex {
       }
 
       missCount += 1;
-      prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false);
+      prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false, 
true);
       prevOffset = offset;
       prevOnDiskSize = onDiskSize;
       prevPread = pread;
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
index 1222d07..508b1fe 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java
@@ -109,7 +109,7 @@ public class TestHFileEncryption {
 
   private long readAndVerifyBlock(long pos, HFileContext ctx, 
HFileBlock.FSReaderImpl hbr, int size)
       throws IOException {
-    HFileBlock b = hbr.readBlockData(pos, -1, false, false);
+    HFileBlock b = hbr.readBlockData(pos, -1, false, false, true);
     assertEquals(0, HFile.getAndResetChecksumFailuresCount());
     b.sanityCheck();
     assertFalse(b.isUnpacked());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
index b92f7c6..f8da706 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java
@@ -224,8 +224,8 @@ public class TestHFileWriterV3 {
     fsdis.seek(0);
     long curBlockPos = 0;
     while (curBlockPos <= trailer.getLastDataBlockOffset()) {
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, 
false)
-        .unpack(context, blockReader);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, 
false, true)
+          .unpack(context, blockReader);
       assertEquals(BlockType.DATA, block.getBlockType());
       ByteBuff buf = block.getBufferWithoutHeader();
       int keyLen = -1;
@@ -285,8 +285,8 @@ public class TestHFileWriterV3 {
     while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) {
       LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " +
           trailer.getLoadOnOpenDataOffset());
-      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, 
false)
-        .unpack(context, blockReader);
+      HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, 
false, true)
+          .unpack(context, blockReader);
       assertEquals(BlockType.META, block.getBlockType());
       Text t = new Text();
       ByteBuff buf = block.getBufferWithoutHeader();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
index 5935f91..f1a12a2 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java
@@ -160,7 +160,7 @@ public class TestLazyDataBlockDecompression {
     CacheConfig cc = new CacheConfig(lazyCompressDisabled,
         new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, 
lazyCompressDisabled));
     assertFalse(cc.shouldCacheDataCompressed());
-    assertTrue(cc.getBlockCache().get() instanceof LruBlockCache);
+    assertFalse(cc.isCombinedBlockCache());
     LruBlockCache disabledBlockCache = (LruBlockCache) 
cc.getBlockCache().get();
     LOG.info("disabledBlockCache=" + disabledBlockCache);
     assertEquals("test inconsistency detected.", maxSize, 
disabledBlockCache.getMaxSize());
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
index 4e7291d..746cf8d 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java
@@ -59,10 +59,10 @@ public class TestBucketWriterThread {
   private static class MockBucketCache extends BucketCache {
 
     public MockBucketCache(String ioEngineName, long capacity, int blockSize, 
int[] bucketSizes,
-      int writerThreadNum, int writerQLen, String persistencePath, int 
ioErrorsTolerationDuration)
-      throws FileNotFoundException, IOException {
+        int writerThreadNum, int writerQLen, String persistencePath, int 
ioErrorsTolerationDuration)
+        throws IOException {
       super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, 
writerQLen,
-        persistencePath, ioErrorsTolerationDuration, 
HBaseConfiguration.create());
+          persistencePath, ioErrorsTolerationDuration, 
HBaseConfiguration.create());
     }
 
     @Override
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
index 3aef976..af1f151 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -192,9 +191,7 @@ public abstract class AbstractTestDLS {
     Path rootdir = FSUtils.getRootDir(conf);
 
     int numRegions = 50;
-    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
-        Table t = installTable(zkw, numRegions)) {
-      TableName table = t.getName();
+    try (Table t = installTable(numRegions)) {
       List<RegionInfo> regions = null;
       HRegionServer hrs = null;
       for (int i = 0; i < NUM_RS; i++) {
@@ -224,7 +221,6 @@ public abstract class AbstractTestDLS {
 
       int count = 0;
       for (RegionInfo hri : regions) {
-        Path tdir = FSUtils.getWALTableDir(conf, table);
         @SuppressWarnings("deprecation")
         Path editsdir = WALSplitter
             .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf,
@@ -266,8 +262,7 @@ public abstract class AbstractTestDLS {
     // they will consume recovered.edits
     master.balanceSwitch(false);
 
-    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
-        Table ht = installTable(zkw, numRegionsToCreate)) {
+    try (Table ht = installTable(numRegionsToCreate)) {
       HRegionServer hrs = findRSToKill(false);
       List<RegionInfo> regions = 
ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices());
       makeWAL(hrs, regions, numLogLines, 100);
@@ -329,8 +324,7 @@ public abstract class AbstractTestDLS {
     final Path logDir = new Path(rootdir,
         
AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString()));
 
-    try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null);
-        Table t = installTable(zkw, 40)) {
+    try (Table t = installTable(40)) {
       makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), 
numLogLines, 100);
 
       new Thread() {
@@ -380,8 +374,7 @@ public abstract class AbstractTestDLS {
 
     startCluster(NUM_RS); // NUM_RS=6.
 
-    try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", 
null);
-        Table table = installTable(zkw, numRegionsToCreate)) {
+    try (Table table = installTable(numRegionsToCreate)) {
       populateDataInTable(numRowsPerRegion);
 
       List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads();
@@ -482,11 +475,11 @@ public abstract class AbstractTestDLS {
     }
   }
 
-  private Table installTable(ZKWatcher zkw, int nrs) throws Exception {
-    return installTable(zkw, nrs, 0);
+  private Table installTable(int nrs) throws Exception {
+    return installTable(nrs, 0);
   }
 
-  private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) 
throws Exception {
+  private Table installTable(int nrs, int existingRegions) throws Exception {
     // Create a table with regions
     byte[] family = Bytes.toBytes("family");
     LOG.info("Creating table with " + nrs + " regions");
@@ -497,14 +490,14 @@ public abstract class AbstractTestDLS {
     }
     assertEquals(nrs, numRegions);
     LOG.info("Waiting for no more RIT\n");
-    blockUntilNoRIT(zkw, master);
+    blockUntilNoRIT();
     // disable-enable cycle to get rid of table's dead regions left behind
     // by createMultiRegions
     assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName));
     LOG.debug("Disabling table\n");
     TEST_UTIL.getAdmin().disableTable(tableName);
     LOG.debug("Waiting for no more RIT\n");
-    blockUntilNoRIT(zkw, master);
+    blockUntilNoRIT();
     NavigableSet<String> regions = 
HBaseTestingUtility.getAllOnlineRegions(cluster);
     LOG.debug("Verifying only catalog region is assigned\n");
     if (regions.size() != 1) {
@@ -515,7 +508,7 @@ public abstract class AbstractTestDLS {
     LOG.debug("Enabling table\n");
     TEST_UTIL.getAdmin().enableTable(tableName);
     LOG.debug("Waiting for no more RIT\n");
-    blockUntilNoRIT(zkw, master);
+    blockUntilNoRIT();
     LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n");
     regions = HBaseTestingUtility.getAllOnlineRegions(cluster);
     assertEquals(numRegions + 1 + existingRegions, regions.size());
@@ -651,7 +644,7 @@ public abstract class AbstractTestDLS {
     return count;
   }
 
-  private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception 
{
+  private void blockUntilNoRIT() throws Exception {
     TEST_UTIL.waitUntilNoRegionsInTransition(60000);
   }
 

Reply via email to