This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 927f077abaa HBASE-29398: Server side scan metrics for bytes read from 
FS vs Block cache vs memstore (#7162) (#7136)
927f077abaa is described below

commit 927f077abaa7c03b46d9c3f0ca5420a1e9b27da7
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Jul 19 00:48:59 2025 +0530

    HBASE-29398: Server side scan metrics for bytes read from FS vs Block cache 
vs memstore (#7162) (#7136)
    
    Signed-off-by: Viraj Jasani <[email protected]>
    Signed-off-by: Hari Krishna Dara <[email protected]>
---
 .../client/metrics/ServerSideScanMetrics.java      |  19 +
 .../hadoop/hbase/io/hfile/BlockCacheUtil.java      |   1 +
 .../hadoop/hbase/io/hfile/CompoundBloomFilter.java |  18 +-
 .../hadoop/hbase/io/hfile/FixedFileTrailer.java    |  10 +-
 .../apache/hadoop/hbase/io/hfile/HFileBlock.java   |  24 +-
 .../hadoop/hbase/io/hfile/HFileBlockIndex.java     |   7 +-
 .../hadoop/hbase/io/hfile/HFileReaderImpl.java     | 130 +--
 .../hadoop/hbase/io/hfile/LruBlockCache.java       |   5 +-
 .../hbase/io/hfile/NoOpIndexBlockEncoder.java      |   4 +-
 .../ThreadLocalServerSideScanMetrics.java          | 160 ++++
 .../hbase/regionserver/RegionScannerImpl.java      |  27 +-
 .../hadoop/hbase/regionserver/SegmentScanner.java  |  13 +
 .../hadoop/hbase/regionserver/StoreFileReader.java |   3 +-
 .../hadoop/hbase/regionserver/StoreFileWriter.java |   4 +-
 .../hadoop/hbase/regionserver/StoreScanner.java    |  38 +
 .../regionserver/handler/ParallelSeekHandler.java  |  36 +
 .../hadoop/hbase/io/hfile/TestBytesReadFromFs.java | 412 ++++++++++
 .../apache/hadoop/hbase/io/hfile/TestHFile.java    |  85 ++
 .../TestBytesReadServerSideScanMetrics.java        | 894 +++++++++++++++++++++
 .../hbase/regionserver/TestDefaultMemStore.java    |  44 +
 20 files changed, 1866 insertions(+), 68 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 916451df75d..6b3a4f5675a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -52,6 +52,10 @@ public class ServerSideScanMetrics {
     
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
     
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
     currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_FS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
   }
 
   /**
@@ -68,6 +72,11 @@ public class ServerSideScanMetrics {
   public static final String BLOCK_BYTES_SCANNED_KEY_METRIC_NAME = 
"BLOCK_BYTES_SCANNED";
 
   public static final String FS_READ_TIME_METRIC_NAME = "FS_READ_TIME";
+  public static final String BYTES_READ_FROM_FS_METRIC_NAME = 
"BYTES_READ_FROM_FS";
+  public static final String BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME =
+    "BYTES_READ_FROM_BLOCK_CACHE";
+  public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME = 
"BYTES_READ_FROM_MEMSTORE";
+  public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME = 
"BLOCK_READ_OPS_COUNT";
 
   /**
    * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
@@ -102,6 +111,16 @@ public class ServerSideScanMetrics {
 
   public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
 
+  public final AtomicLong bytesReadFromFs = 
createCounter(BYTES_READ_FROM_FS_METRIC_NAME);
+
+  public final AtomicLong bytesReadFromBlockCache =
+    createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+
+  public final AtomicLong bytesReadFromMemstore =
+    createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+
+  public final AtomicLong blockReadOpsCount = 
createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+
   /**
    * Sets counter with counterName to passed in value, does nothing if counter 
does not exist. If
    * region level scan metrics are enabled then sets the value of counter for 
the current region
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index b130d06ca43..c963fc2617f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -288,6 +288,7 @@ public class BlockCacheUtil {
       
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
       .withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() + 
numBytes)
       .withHFileContext(cloneContext(block.getHFileContext()))
+      .withNextBlockOnDiskSize(block.getNextBlockOnDiskSize())
       
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
index 95bc1c7b83d..b1cb519fb58 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
 import java.io.DataInput;
 import java.io.IOException;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.util.BloomFilter;
@@ -120,7 +121,8 @@ public class CompoundBloomFilter extends 
CompoundBloomFilterBase implements Bloo
     return result;
   }
 
-  private HFileBlock getBloomBlock(int block) {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public HFileBlock getBloomBlock(int block) {
     HFileBlock bloomBlock;
     try {
       // We cache the block and use a positional read.
@@ -218,4 +220,18 @@ public class CompoundBloomFilter extends 
CompoundBloomFilterBase implements Bloo
     return sb.toString();
   }
 
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public HFileBlockIndex.BlockIndexReader getBloomIndex() {
+    return index;
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public int getHashCount() {
+    return hashCount;
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public Hash getHash() {
+    return hash;
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index f2071d21abf..fe0b3124cb5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -27,10 +27,12 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.InnerStoreCellComparator;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MetaCellComparator;
 import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -410,6 +412,11 @@ public class FixedFileTrailer {
     FixedFileTrailer fft = new FixedFileTrailer(majorVersion, minorVersion);
     fft.deserialize(new DataInputStream(new ByteArrayInputStream(buf.array(),
       buf.arrayOffset() + bufferSize - trailerSize, trailerSize)));
+    boolean isScanMetricsEnabled = 
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+    if (isScanMetricsEnabled) {
+      ThreadLocalServerSideScanMetrics.addBytesReadFromFs(trailerSize);
+      ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1);
+    }
     return fft;
   }
 
@@ -648,7 +655,8 @@ public class FixedFileTrailer {
     }
   }
 
-  CellComparator createComparator() throws IOException {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public CellComparator createComparator() throws IOException {
     expectAtLeastMajorVersion(2);
     return createComparator(comparatorClassName);
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 036270824f7..122380c5649 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
@@ -56,6 +57,7 @@ import 
org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
 import 
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
 import org.apache.hadoop.hbase.io.util.BlockIOUtils;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.nio.MultiByteBuff;
 import org.apache.hadoop.hbase.nio.SingleByteBuff;
@@ -405,7 +407,8 @@ public class HFileBlock implements Cacheable {
    *         present) read by peeking into the next block's header; use as a 
hint when doing a read
    *         of the next block when scanning or running over a file.
    */
-  int getNextBlockOnDiskSize() {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public int getNextBlockOnDiskSize() {
     return nextBlockOnDiskSize;
   }
 
@@ -626,7 +629,8 @@ public class HFileBlock implements Cacheable {
    * Retrieves the decompressed/decrypted view of this block. An encoded block 
remains in its
    * encoded structure. Internal structures are shared between instances where 
applicable.
    */
-  HFileBlock unpack(HFileContext fileContext, FSReader reader) throws 
IOException {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public HFileBlock unpack(HFileContext fileContext, FSReader reader) throws 
IOException {
     if (!fileContext.isCompressedOrEncrypted()) {
       // TODO: cannot use our own fileContext here because 
HFileBlock(ByteBuffer, boolean),
       // which is used for block serialization to L2 cache, does not preserve 
encoding and
@@ -1241,7 +1245,8 @@ public class HFileBlock implements Cacheable {
    * Iterator for reading {@link HFileBlock}s in load-on-open-section, such as 
root data index
    * block, meta index block, file info block etc.
    */
-  interface BlockIterator {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public interface BlockIterator {
     /**
      * Get the next block, or null if there are no more blocks to iterate.
      */
@@ -1265,7 +1270,8 @@ public class HFileBlock implements Cacheable {
   }
 
   /** An HFile block reader with iteration ability. */
-  interface FSReader {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public interface FSReader {
     /**
      * Reads the block at the given offset in the file with the given on-disk 
size and uncompressed
      * size.
@@ -1738,6 +1744,7 @@ public class HFileBlock implements Cacheable {
       // checksums. Can change with circumstances. The below flag is whether 
the
       // file has support for checksums (version 2+).
       boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
+      boolean isScanMetricsEnabled = 
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
       long startTime = EnvironmentEdgeManager.currentTime();
       if (onDiskSizeWithHeader == -1) {
         // The caller does not know the block size. Need to get it from the 
header. If header was
@@ -1754,6 +1761,9 @@ public class HFileBlock implements Cacheable {
           headerBuf = HEAP.allocate(hdrSize);
           readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
           headerBuf.rewind();
+          if (isScanMetricsEnabled) {
+            ThreadLocalServerSideScanMetrics.addBytesReadFromFs(hdrSize);
+          }
         }
         onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf, 
checksumSupport);
       }
@@ -1801,6 +1811,12 @@ public class HFileBlock implements Cacheable {
         boolean readNextHeader = readAtOffset(is, onDiskBlock,
           onDiskSizeWithHeader - preReadHeaderSize, true, offset + 
preReadHeaderSize, pread);
         onDiskBlock.rewind(); // in case of moving position when copying a 
cached header
+        if (isScanMetricsEnabled) {
+          long bytesRead =
+            (onDiskSizeWithHeader - preReadHeaderSize) + (readNextHeader ? 
hdrSize : 0);
+          ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesRead);
+          ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1);
+        }
 
         // the call to validateChecksum for this block excludes the next block 
header over-read, so
         // no reason to delay extracting this value.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 8989ef2b281..2858800e932 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -561,7 +562,8 @@ public class HFileBlockIndex {
    * array of offsets to the entries within the block. This allows us to do 
binary search for the
    * entry corresponding to the given key without having to deserialize the 
block.
    */
-  static abstract class BlockIndexReader implements HeapSize {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public static abstract class BlockIndexReader implements HeapSize {
 
     protected long[] blockOffsets;
     protected int[] blockDataSizes;
@@ -808,7 +810,8 @@ public class HFileBlockIndex {
      * @return the index position where the given key was found, otherwise 
return -1 in the case the
      *         given key is before the first key.
      */
-    static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key, 
CellComparator comparator) {
+    public static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
+      CellComparator comparator) {
       int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
 
       if (entryIndex != -1) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index db2383db399..e1e9eaf8a53 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -1105,71 +1107,91 @@ public abstract class HFileReaderImpl implements 
HFile.Reader, Configurable {
    * Retrieve block from cache. Validates the retrieved block's type vs {@code 
expectedBlockType}
    * and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block 
as necessary.
    */
-  private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean 
cacheBlock, boolean useLock,
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock, 
boolean useLock,
     boolean updateCacheMetrics, BlockType expectedBlockType,
     DataBlockEncoding expectedDataBlockEncoding) throws IOException {
     // Check cache for block. If found return.
     BlockCache cache = cacheConf.getBlockCache().orElse(null);
+    long cachedBlockBytesRead = 0;
     if (cache != null) {
-      HFileBlock cachedBlock =
-        (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, 
updateCacheMetrics);
-      if (cachedBlock != null) {
-        if 
(cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
-          HFileBlock compressedBlock = cachedBlock;
-          cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
-          // In case of compressed block after unpacking we can release the 
compressed block
-          if (compressedBlock != cachedBlock) {
-            compressedBlock.release();
+      HFileBlock cachedBlock = null;
+      boolean isScanMetricsEnabled = 
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+      try {
+        cachedBlock =
+          (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock, 
updateCacheMetrics);
+        if (cachedBlock != null) {
+          if 
(cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
+            HFileBlock compressedBlock = cachedBlock;
+            cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
+            // In case of compressed block after unpacking we can release the 
compressed block
+            if (compressedBlock != cachedBlock) {
+              compressedBlock.release();
+            }
+          }
+          try {
+            validateBlockType(cachedBlock, expectedBlockType);
+          } catch (IOException e) {
+            returnAndEvictBlock(cache, cacheKey, cachedBlock);
+            cachedBlock = null;
+            throw e;
           }
-        }
-        try {
-          validateBlockType(cachedBlock, expectedBlockType);
-        } catch (IOException e) {
-          returnAndEvictBlock(cache, cacheKey, cachedBlock);
-          throw e;
-        }
 
-        if (expectedDataBlockEncoding == null) {
-          return cachedBlock;
-        }
-        DataBlockEncoding actualDataBlockEncoding = 
cachedBlock.getDataBlockEncoding();
-        // Block types other than data blocks always have
-        // DataBlockEncoding.NONE. To avoid false negative cache misses, only
-        // perform this check if cached block is a data block.
-        if (
-          cachedBlock.getBlockType().isData()
-            && !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
-        ) {
-          // This mismatch may happen if a Scanner, which is used for say a
-          // compaction, tries to read an encoded block from the block cache.
-          // The reverse might happen when an EncodedScanner tries to read
-          // un-encoded blocks which were cached earlier.
-          //
-          // Because returning a data block with an implicit BlockType mismatch
-          // will cause the requesting scanner to throw a disk read should be
-          // forced here. This will potentially cause a significant number of
-          // cache misses, so update so we should keep track of this as it 
might
-          // justify the work on a CompoundScanner.
+          if (expectedDataBlockEncoding == null) {
+            return cachedBlock;
+          }
+          DataBlockEncoding actualDataBlockEncoding = 
cachedBlock.getDataBlockEncoding();
+          // Block types other than data blocks always have
+          // DataBlockEncoding.NONE. To avoid false negative cache misses, only
+          // perform this check if cached block is a data block.
           if (
-            !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
-              && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
+            cachedBlock.getBlockType().isData()
+              && !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
           ) {
-            // If the block is encoded but the encoding does not match the
-            // expected encoding it is likely the encoding was changed but the
-            // block was not yet evicted. Evictions on file close happen async
-            // so blocks with the old encoding still linger in cache for some
-            // period of time. This event should be rare as it only happens on
-            // schema definition change.
-            LOG.info(
-              "Evicting cached block with key {} because data block encoding 
mismatch; "
-                + "expected {}, actual {}, path={}",
-              cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, 
path);
-            // This is an error scenario. so here we need to release the block.
-            returnAndEvictBlock(cache, cacheKey, cachedBlock);
+            // This mismatch may happen if a Scanner, which is used for say a
+            // compaction, tries to read an encoded block from the block cache.
+            // The reverse might happen when an EncodedScanner tries to read
+            // un-encoded blocks which were cached earlier.
+            //
+            // Because returning a data block with an implicit BlockType 
mismatch
+            // will cause the requesting scanner to throw a disk read should be
+            // forced here. This will potentially cause a significant number of
+            // cache misses, so update so we should keep track of this as it 
might
+            // justify the work on a CompoundScanner.
+            if (
+              !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
+                && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
+            ) {
+              // If the block is encoded but the encoding does not match the
+              // expected encoding it is likely the encoding was changed but 
the
+              // block was not yet evicted. Evictions on file close happen 
async
+              // so blocks with the old encoding still linger in cache for some
+              // period of time. This event should be rare as it only happens 
on
+              // schema definition change.
+              LOG.info(
+                "Evicting cached block with key {} because data block encoding 
mismatch; "
+                  + "expected {}, actual {}, path={}",
+                cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding, 
path);
+              // This is an error scenario. so here we need to release the 
block.
+              returnAndEvictBlock(cache, cacheKey, cachedBlock);
+            }
+            cachedBlock = null;
+            return null;
           }
-          return null;
+          return cachedBlock;
+        }
+      } finally {
+        // Count bytes read as cached block is being returned
+        if (isScanMetricsEnabled && cachedBlock != null) {
+          cachedBlockBytesRead = cachedBlock.getOnDiskSizeWithHeader();
+          // Account for the header size of the next block if it exists
+          if (cachedBlock.getNextBlockOnDiskSize() > 0) {
+            cachedBlockBytesRead += cachedBlock.headerSize();
+          }
+        }
+        if (cachedBlockBytesRead > 0) {
+          
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(cachedBlockBytesRead);
         }
-        return cachedBlock;
       }
     }
     return null;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index b61084f7883..efc533cb149 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -1151,8 +1152,8 @@ public class LruBlockCache implements 
FirstLevelBlockCache {
   }
 
   // Simple calculators of sizes given factors and maxSize
-
-  long acceptableSize() {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public long acceptableSize() {
     return (long) Math.floor(this.maxSize * this.acceptableFactor);
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
index 4162fca6afe..fd4bcc12c50 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
@@ -127,7 +128,8 @@ public class NoOpIndexBlockEncoder implements 
HFileIndexBlockEncoder {
     return getClass().getSimpleName();
   }
 
-  protected static class NoOpEncodedSeeker implements EncodedSeeker {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public static class NoOpEncodedSeeker implements EncodedSeeker {
 
     protected long[] blockOffsets;
     protected int[] blockDataSizes;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
new file mode 100644
index 00000000000..8c9ec24e866
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Thread-local storage for server-side scan metrics that captures performance 
data separately for
+ * each scan thread. This class works in conjunction with {@link 
ServerSideScanMetrics} to provide
+ * comprehensive scan performance monitoring.
+ * <h3>Purpose and Design</h3> {@link ServerSideScanMetrics} captures scan 
metrics on the server
+ * side and passes them back to the client in protocol buffer responses. 
However, the
+ * {@link ServerSideScanMetrics} instance is not readily available at deeper 
layers in HBase where
+ * HFiles are read and individual HFile blocks are accessed. To avoid the 
complexity of passing
+ * {@link ServerSideScanMetrics} instances through method calls across 
multiple layers, this class
+ * provides thread-local storage for metrics collection.
+ * <h3>Thread Safety and HBase Architecture</h3> This class leverages a 
critical aspect of HBase
+ * server design: on the server side, the thread that opens a {@link 
RegionScanner} and calls
+ * {@link RegionScanner#nextRaw(java.util.List, ScannerContext)} is the same 
thread that reads HFile
+ * blocks. This design allows thread-local storage to effectively capture 
metrics without
+ * cross-thread synchronization.
+ * <h3>Special Handling for Parallel Operations</h3> The only deviation from 
the single-thread model
+ * occurs when {@link 
org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} is used for
+ * parallel store file seeking. In this case, special handling ensures that 
metrics are captured
+ * correctly across multiple threads. The
+ * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} 
captures metrics from
+ * worker threads and aggregates them back to the main scan thread. Please 
refer to the javadoc of
+ * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} 
for detailed information
+ * about this parallel processing mechanism.
+ * <h3>Usage Pattern</h3>
+ * <ol>
+ * <li>Enable metrics collection: {@link #setScanMetricsEnabled(boolean)}</li>
+ * <li>Reset counters at scan start: {@link #reset()}</li>
+ * <li>Increment counters during I/O operations using the various {@code add*} 
methods</li>
+ * <li>Populate the main metrics object:
+ * {@link #populateServerSideScanMetrics(ServerSideScanMetrics)}</li>
+ * </ol>
+ * <h3>Thread Safety</h3> This class is thread-safe. Each thread maintains its 
own set of counters
+ * through {@link ThreadLocal} storage, ensuring that metrics from different 
scan operations do not
+ * interfere with each other.
+ * @see ServerSideScanMetrics
+ * @see RegionScanner
+ * @see org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler
+ */
[email protected]
+public final class ThreadLocalServerSideScanMetrics {
+  private ThreadLocalServerSideScanMetrics() {
+  }
+
+  private static final ThreadLocal<Boolean> IS_SCAN_METRICS_ENABLED =
+    ThreadLocal.withInitial(() -> false);
+
+  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_FS =
+    ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_BLOCK_CACHE =
+    ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+  private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_MEMSTORE =
+    ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+  private static final ThreadLocal<AtomicLong> BLOCK_READ_OPS_COUNT =
+    ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+  public static void setScanMetricsEnabled(boolean enable) {
+    IS_SCAN_METRICS_ENABLED.set(enable);
+  }
+
+  public static long addBytesReadFromFs(long bytes) {
+    return BYTES_READ_FROM_FS.get().addAndGet(bytes);
+  }
+
+  public static long addBytesReadFromBlockCache(long bytes) {
+    return BYTES_READ_FROM_BLOCK_CACHE.get().addAndGet(bytes);
+  }
+
+  public static long addBytesReadFromMemstore(long bytes) {
+    return BYTES_READ_FROM_MEMSTORE.get().addAndGet(bytes);
+  }
+
+  public static long addBlockReadOpsCount(long count) {
+    return BLOCK_READ_OPS_COUNT.get().addAndGet(count);
+  }
+
+  public static boolean isScanMetricsEnabled() {
+    return IS_SCAN_METRICS_ENABLED.get();
+  }
+
+  public static AtomicLong getBytesReadFromFsCounter() {
+    return BYTES_READ_FROM_FS.get();
+  }
+
+  public static AtomicLong getBytesReadFromBlockCacheCounter() {
+    return BYTES_READ_FROM_BLOCK_CACHE.get();
+  }
+
+  public static AtomicLong getBytesReadFromMemstoreCounter() {
+    return BYTES_READ_FROM_MEMSTORE.get();
+  }
+
+  public static AtomicLong getBlockReadOpsCountCounter() {
+    return BLOCK_READ_OPS_COUNT.get();
+  }
+
+  public static long getBytesReadFromFsAndReset() {
+    return getBytesReadFromFsCounter().getAndSet(0);
+  }
+
+  public static long getBytesReadFromBlockCacheAndReset() {
+    return getBytesReadFromBlockCacheCounter().getAndSet(0);
+  }
+
+  public static long getBytesReadFromMemstoreAndReset() {
+    return getBytesReadFromMemstoreCounter().getAndSet(0);
+  }
+
+  public static long getBlockReadOpsCountAndReset() {
+    return getBlockReadOpsCountCounter().getAndSet(0);
+  }
+
+  public static void reset() {
+    getBytesReadFromFsAndReset();
+    getBytesReadFromBlockCacheAndReset();
+    getBytesReadFromMemstoreAndReset();
+    getBlockReadOpsCountAndReset();
+  }
+
+  public static void populateServerSideScanMetrics(ServerSideScanMetrics 
metrics) {
+    if (metrics == null) {
+      return;
+    }
+    metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME,
+      getBytesReadFromFsCounter().get());
+    
metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
+      getBytesReadFromBlockCacheCounter().get());
+    
metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
+      getBytesReadFromMemstoreCounter().get());
+    
metrics.addToCounter(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
+      getBlockReadOpsCountCounter().get());
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 67cc9b7eba4..153900544d1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -44,6 +44,7 @@ import 
org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
 import org.apache.hadoop.hbase.ipc.RpcCall;
 import org.apache.hadoop.hbase.ipc.RpcCallback;
 import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@@ -94,6 +95,8 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
 
   private RegionServerServices rsServices;
 
+  private ServerSideScanMetrics scannerInitMetrics = null;
+
   @Override
   public RegionInfo getRegionInfo() {
     return region.getRegionInfo();
@@ -144,7 +147,16 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
     } finally {
       
region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
     }
+    boolean isScanMetricsEnabled = scan.isScanMetricsEnabled();
+    
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+    if (isScanMetricsEnabled) {
+      this.scannerInitMetrics = new ServerSideScanMetrics();
+      ThreadLocalServerSideScanMetrics.reset();
+    }
     initializeScanners(scan, additionalScanners);
+    if (isScanMetricsEnabled) {
+      
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scannerInitMetrics);
+    }
   }
 
   public ScannerContext getContext() {
@@ -277,6 +289,16 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       throw new UnknownScannerException("Scanner was closed");
     }
     boolean moreValues = false;
+    boolean isScanMetricsEnabled = scannerContext.isTrackingMetrics();
+    
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+    if (isScanMetricsEnabled) {
+      ThreadLocalServerSideScanMetrics.reset();
+      ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
+      if (scannerInitMetrics != null) {
+        scannerInitMetrics.getMetricsMap().forEach(scanMetrics::addToCounter);
+        scannerInitMetrics = null;
+      }
+    }
     if (outResults.isEmpty()) {
       // Usually outResults is empty. This is true when next is called
       // to handle scan or get operation.
@@ -286,7 +308,10 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       moreValues = nextInternal(tmpList, scannerContext);
       outResults.addAll(tmpList);
     }
-
+    if (isScanMetricsEnabled) {
+      ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
+      
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
+    }
     region.addReadRequestsCount(1);
     if (region.getMetrics() != null) {
       region.getMetrics().updateReadRequestCount();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 1d28c55570e..27df80fa8ff 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -54,6 +55,7 @@ public class SegmentScanner implements KeyValueScanner {
 
   // flag to indicate if this scanner is closed
   protected boolean closed = false;
+  private boolean isScanMetricsEnabled = false;
 
   /**
    * Scanners are ordered from 0 (oldest) to newest in increasing order.
@@ -66,6 +68,8 @@ public class SegmentScanner implements KeyValueScanner {
     iter = segment.iterator();
     // the initialization of the current is required for working with heap of 
SegmentScanners
     updateCurrent();
+    // Enable scan metrics for tracking bytes read after initialization of 
current
+    this.isScanMetricsEnabled = 
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
     if (current == null) {
       // nothing to fetch from this scanner
       close();
@@ -335,10 +339,15 @@ public class SegmentScanner implements KeyValueScanner {
    */
   protected void updateCurrent() {
     Cell next = null;
+    long totalBytesRead = 0;
 
     try {
       while (iter.hasNext()) {
         next = iter.next();
+        if (isScanMetricsEnabled) {
+          // Batch collect bytes to reduce method call overhead
+          totalBytesRead += Segment.getCellLength(next);
+        }
         if (next.getSequenceId() <= this.readPoint) {
           current = next;
           return;// skip irrelevant versions
@@ -352,6 +361,10 @@ public class SegmentScanner implements KeyValueScanner {
 
       current = null; // nothing found
     } finally {
+      // Add accumulated bytes before returning
+      if (totalBytesRead > 0) {
+        
ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(totalBytesRead);
+      }
       if (next != null) {
         // in all cases, remember the last KV we iterated to, needed for 
reseek()
         last = next;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index e241bf0a5d3..fac344c3387 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -635,7 +635,8 @@ public class StoreFileReader {
     return this.bulkLoadResult;
   }
 
-  BloomFilter getGeneralBloomFilter() {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public BloomFilter getGeneralBloomFilter() {
     return generalBloomFilter;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 61d5b91b35b..a2a641df6d8 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -284,7 +285,8 @@ public class StoreFileWriter implements CellSink, 
ShipperListener {
    * For unit testing only.
    * @return the Bloom filter used by this writer.
    */
-  BloomFilterWriter getGeneralBloomWriter() {
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  public BloomFilterWriter getGeneralBloomWriter() {
     return liveFileWriter.generalBloomFilterWriter;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index cbc617d2a0b..506af8c404b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -24,12 +24,14 @@ import java.util.List;
 import java.util.NavigableSet;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.IntConsumer;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -165,6 +167,10 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
   protected final long readPt;
   private boolean topChanged = false;
 
+  // These are used to verify the state of the scanner during testing.
+  private static AtomicBoolean hasUpdatedReaders;
+  private static AtomicBoolean hasSwitchedToStreamRead;
+
   /** An internal constructor. */
   private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int 
numColumns, long readPt,
     boolean cacheBlocks, ScanType scanType) {
@@ -1016,6 +1022,9 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       if (updateReaders) {
         closeLock.unlock();
       }
+      if (hasUpdatedReaders != null) {
+        hasUpdatedReaders.set(true);
+      }
     }
     // Let the next() call handle re-creating and seeking
   }
@@ -1166,6 +1175,9 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
     this.heap = newHeap;
     resetQueryMatcher(lastTop);
     scannersToClose.forEach(KeyValueScanner::close);
+    if (hasSwitchedToStreamRead != null) {
+      hasSwitchedToStreamRead.set(true);
+    }
   }
 
   protected final boolean checkFlushed() {
@@ -1273,4 +1285,30 @@ public class StoreScanner extends 
NonReversedNonLazyKeyValueScanner
       trySwitchToStreamRead();
     }
   }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  static final void instrument() {
+    hasUpdatedReaders = new AtomicBoolean(false);
+    hasSwitchedToStreamRead = new AtomicBoolean(false);
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  static final boolean hasUpdatedReaders() {
+    return hasUpdatedReaders.get();
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  static final boolean hasSwitchedToStreamRead() {
+    return hasSwitchedToStreamRead.get();
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  static final void resetHasUpdatedReaders() {
+    hasUpdatedReaders.set(false);
+  }
+
+  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+  static final void resetHasSwitchedToStreamRead() {
+    hasSwitchedToStreamRead.set(false);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
index 41fb3e7bf12..9a9af221b49 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver.handler;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -39,6 +41,17 @@ public class ParallelSeekHandler extends EventHandler {
   private CountDownLatch latch;
   private Throwable err = null;
 
+  // Flag to enable/disable scan metrics collection and thread-local counters 
for capturing scan
+  // performance during parallel store file seeking.
+  // These aggregate metrics from worker threads back to the main scan thread.
+  private final boolean isScanMetricsEnabled;
+  // Thread-local counter for bytes read from FS.
+  private final AtomicLong bytesReadFromFs;
+  // Thread-local counter for bytes read from BlockCache.
+  private final AtomicLong bytesReadFromBlockCache;
+  // Thread-local counter for block read operations count.
+  private final AtomicLong blockReadOpsCount;
+
   public ParallelSeekHandler(KeyValueScanner scanner, Cell keyValue, long 
readPoint,
     CountDownLatch latch) {
     super(null, EventType.RS_PARALLEL_SEEK);
@@ -46,12 +59,35 @@ public class ParallelSeekHandler extends EventHandler {
     this.keyValue = keyValue;
     this.readPoint = readPoint;
     this.latch = latch;
+    this.isScanMetricsEnabled = 
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+    this.bytesReadFromFs = 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsCounter();
+    this.bytesReadFromBlockCache =
+      ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheCounter();
+    this.blockReadOpsCount = 
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountCounter();
   }
 
   @Override
   public void process() {
     try {
+      
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+      if (isScanMetricsEnabled) {
+        ThreadLocalServerSideScanMetrics.reset();
+      }
       scanner.seek(keyValue);
+      if (isScanMetricsEnabled) {
+        long metricValue = 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+        if (metricValue > 0) {
+          bytesReadFromFs.addAndGet(metricValue);
+        }
+        metricValue = 
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
+        if (metricValue > 0) {
+          bytesReadFromBlockCache.addAndGet(metricValue);
+        }
+        metricValue = 
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+        if (metricValue > 0) {
+          blockReadOpsCount.addAndGet(metricValue);
+        }
+      }
     } catch (IOException e) {
       LOG.error("", e);
       setErr(e);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
new file mode 100644
index 00000000000..c85a162ad96
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.BloomFilterUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestBytesReadFromFs {
+  private static final int NUM_KEYS = 100000;
+  private static final int BLOOM_BLOCK_SIZE = 512;
+  private static final int INDEX_CHUNK_SIZE = 512;
+  private static final int DATA_BLOCK_SIZE = 4096;
+  private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42;
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBytesReadFromFs.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestBytesReadFromFs.class);
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final Random RNG = new Random(9713312); // Just a fixed seed.
+
+  private Configuration conf;
+  private FileSystem fs;
+  private List<KeyValue> keyValues = new ArrayList<>();
+  private List<byte[]> keyList = new ArrayList<>();
+  private Path path;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = TEST_UTIL.getConfiguration();
+    conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 
ROW_PREFIX_LENGTH_IN_BLOOM_FILTER);
+    fs = FileSystem.get(conf);
+    String hfileName = UUID.randomUUID().toString().replaceAll("-", "");
+    path = new Path(TEST_UTIL.getDataTestDir(), hfileName);
+    conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE);
+  }
+
+  @Test
+  public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+    writeData(path);
+    KeyValue keyValue = keyValues.get(0);
+    readDataAndIndexBlocks(path, keyValue, false);
+  }
+
+  @Test
+  public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws 
IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    writeData(path);
+    KeyValue keyValue = keyValues.get(0);
+    readDataAndIndexBlocks(path, keyValue, true);
+  }
+
+  @Test
+  public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws 
IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    writeData(path);
+    readLoadOnOpenDataSection(path, false);
+  }
+
+  @Test
+  public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks() 
throws IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL, 
BloomType.ROWPREFIX_FIXED_LENGTH };
+    for (BloomType bloomType : bloomTypes) {
+      LOG.info("Testing bloom type: {}", bloomType);
+      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+      keyList.clear();
+      keyValues.clear();
+      writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE);
+      if (bloomType == BloomType.ROWCOL) {
+        KeyValue keyValue = keyValues.get(0);
+        readBloomFilters(path, bloomType, null, keyValue);
+      } else {
+        Assert.assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER, 
keyList.get(0).length);
+        byte[] key = keyList.get(0);
+        readBloomFilters(path, bloomType, key, null);
+      }
+    }
+  }
+
+  private void writeData(Path path) throws IOException {
+    HFileContext context = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
+      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
+      .withCompression(Compression.Algorithm.NONE).build();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    HFile.Writer writer = new HFile.WriterFactory(conf, 
cacheConfig).withPath(fs, path)
+      .withFileContext(context).create();
+
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] cq = Bytes.toBytes("cq");
+
+    for (int i = 0; i < NUM_KEYS; i++) {
+      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 
10);
+      // A random-length random value.
+      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
+      KeyValue keyValue =
+        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), 
valueBytes);
+      writer.append(keyValue);
+      keyValues.add(keyValue);
+    }
+
+    writer.close();
+  }
+
+  private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean 
isScanMetricsEnabled)
+    throws IOException {
+    long fileSize = fs.getFileStatus(path).getLen();
+
+    ReaderContext readerContext =
+      new ReaderContextBuilder().withInputStreamWrapper(new 
FSDataInputStreamWrapper(fs, path))
+        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
+
+    // Read HFile trailer and create HFileContext
+    HFileInfo hfile = new HFileInfo(readerContext, conf);
+    FixedFileTrailer trailer = hfile.getTrailer();
+
+    // Read HFile info and load-on-open data section (we will read root again 
explicitly later)
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, 
cacheConfig, conf);
+    hfile.initMetaAndIndex(reader);
+    HFileContext meta = hfile.getHFileContext();
+
+    // Get access to the block reader
+    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
+
+    // Create iterator for reading load-on-open data section
+    HFileBlock.BlockIterator blockIter = 
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+      fileSize - trailer.getTrailerSize());
+
+    // Indexes use NoOpEncodedSeeker
+    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
+    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+
+    int bytesRead = 0;
+    int blockLevelsRead = 0;
+
+    // Read the root index block
+    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+    bytesRead += block.getOnDiskSizeWithHeader();
+    if (block.getNextBlockOnDiskSize() > 0) {
+      bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
+    }
+    blockLevelsRead++;
+
+    // Comparator class name is stored in the trailer in version 3.
+    CellComparator comparator = trailer.createComparator();
+    // Initialize the seeker
+    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
+      trailer.getNumDataIndexLevels());
+
+    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
+    long currentOffset = seeker.getBlockOffset(rootLevIndex);
+    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
+
+    HFileBlock prevBlock = null;
+    do {
+      prevBlock = block;
+      block = blockReader.readBlockData(currentOffset, currentDataSize, true, 
true, true);
+      HFileBlock unpacked = block.unpack(meta, blockReader);
+      if (unpacked != block) {
+        block.release();
+        block = unpacked;
+      }
+      bytesRead += block.getOnDiskSizeWithHeader();
+      if (block.getNextBlockOnDiskSize() > 0) {
+        bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
+      }
+      if (!block.getBlockType().isData()) {
+        ByteBuff buffer = block.getBufferWithoutHeader();
+        // Place the buffer at the correct position
+        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, 
keyValue, comparator);
+        currentOffset = buffer.getLong();
+        currentDataSize = buffer.getInt();
+      }
+      prevBlock.release();
+      blockLevelsRead++;
+    } while (!block.getBlockType().isData());
+    block.release();
+
+    reader.close();
+
+    Assert.assertEquals(isScanMetricsEnabled,
+      ThreadLocalServerSideScanMetrics.isScanMetricsEnabled());
+    bytesRead = isScanMetricsEnabled ? bytesRead : 0;
+    Assert.assertEquals(bytesRead, 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+    Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+    // At every index level we read one index block and finally read data block
+    long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0;
+    Assert.assertEquals(blockReadOpsCount,
+      ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+  }
+
+  private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters) 
throws IOException {
+    long fileSize = fs.getFileStatus(path).getLen();
+
+    ReaderContext readerContext =
+      new ReaderContextBuilder().withInputStreamWrapper(new 
FSDataInputStreamWrapper(fs, path))
+        .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
+
+    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+    // Read HFile trailer
+    HFileInfo hfile = new HFileInfo(readerContext, conf);
+    FixedFileTrailer trailer = hfile.getTrailer();
+    Assert.assertEquals(trailer.getTrailerSize(),
+      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+    Assert.assertEquals(1, 
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    HFile.Reader reader = new HFilePreadReader(readerContext, hfile, 
cacheConfig, conf);
+    HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
+
+    // Create iterator for reading root index block
+    HFileBlock.BlockIterator blockIter = 
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+      fileSize - trailer.getTrailerSize());
+    boolean readNextHeader = false;
+
+    // Read the root index block
+    readNextHeader = readEachBlockInLoadOnOpenDataSection(
+      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
+
+    // Read meta index block
+    readNextHeader = readEachBlockInLoadOnOpenDataSection(
+      blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
+
+    // Read File info block
+    readNextHeader = readEachBlockInLoadOnOpenDataSection(
+      blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader);
+
+    // Read bloom filter indexes
+    boolean bloomFilterIndexesRead = false;
+    HFileBlock block;
+    while ((block = blockIter.nextBlock()) != null) {
+      bloomFilterIndexesRead = true;
+      readNextHeader = readEachBlockInLoadOnOpenDataSection(block, 
readNextHeader);
+    }
+
+    reader.close();
+
+    Assert.assertEquals(hasBloomFilters, bloomFilterIndexesRead);
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+  }
+
+  private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block, 
boolean readNextHeader)
+    throws IOException {
+    long bytesRead = block.getOnDiskSizeWithHeader();
+    if (readNextHeader) {
+      bytesRead -= HFileBlock.headerSize(true);
+      readNextHeader = false;
+    }
+    if (block.getNextBlockOnDiskSize() > 0) {
+      bytesRead += HFileBlock.headerSize(true);
+      readNextHeader = true;
+    }
+    block.release();
+    Assert.assertEquals(bytesRead, 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+    Assert.assertEquals(1, 
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+    return readNextHeader;
+  }
+
+  private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue 
keyValue)
+    throws IOException {
+    Assert.assertTrue(keyValue == null || key == null);
+
+    // Assert that the bloom filter index was read and it's size is accounted 
in bytes read from
+    // fs
+    readLoadOnOpenDataSection(path, true);
+
+    CacheConfig cacheConf = new CacheConfig(conf);
+    StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, path, true);
+    HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf);
+
+    // Read HFile trailer and load-on-open data section
+    sf.initReader();
+
+    // Reset bytes read from fs to 0
+    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+    // Reset read ops count to 0
+    ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+
+    StoreFileReader reader = sf.getReader();
+    BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+    Assert.assertTrue(bloomFilter instanceof CompoundBloomFilter);
+    CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter;
+
+    // Get the bloom filter index reader
+    HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
+    int block;
+
+    // Search for the key in the bloom filter index
+    if (keyValue != null) {
+      block = index.rootBlockContainingKey(keyValue);
+    } else {
+      byte[] row = key;
+      block = index.rootBlockContainingKey(row, 0, row.length);
+    }
+
+    // Read the bloom block from FS
+    HFileBlock bloomBlock = cbf.getBloomBlock(block);
+    long bytesRead = bloomBlock.getOnDiskSizeWithHeader();
+    if (bloomBlock.getNextBlockOnDiskSize() > 0) {
+      bytesRead += HFileBlock.headerSize(true);
+    }
+    // Asser that the block read is a bloom block
+    Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
+    bloomBlock.release();
+
+    // Close the reader
+    reader.close(true);
+
+    Assert.assertEquals(bytesRead, 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+    Assert.assertEquals(1, 
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+  }
+
+  private void writeBloomFilters(Path path, BloomType bt, int 
bloomBlockByteSize)
+    throws IOException {
+    conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE, 
bloomBlockByteSize);
+    CacheConfig cacheConf = new CacheConfig(conf);
+    HFileContext meta = new 
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
+      .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
+      .withCompression(Compression.Algorithm.NONE).build();
+    StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf, 
fs).withFileContext(meta)
+      .withBloomType(bt).withFilePath(path).build();
+    Assert.assertTrue(w.hasGeneralBloom());
+    Assert.assertTrue(w.getGeneralBloomWriter() instanceof 
CompoundBloomFilterWriter);
+    CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter) 
w.getGeneralBloomWriter();
+    byte[] cf = Bytes.toBytes("cf");
+    byte[] cq = Bytes.toBytes("cq");
+    for (int i = 0; i < NUM_KEYS; i++) {
+      byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i, 
10);
+      // A random-length random value.
+      byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
+      KeyValue keyValue =
+        new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(), 
valueBytes);
+      w.append(keyValue);
+      keyList.add(keyBytes);
+      keyValues.add(keyValue);
+    }
+    Assert.assertEquals(keyList.size(), cbbf.getKeyCount());
+    w.close();
+  }
+
+  private static class MyNoOpEncodedSeeker extends 
NoOpIndexBlockEncoder.NoOpEncodedSeeker {
+    public long getBlockOffset(int i) {
+      return blockOffsets[i];
+    }
+
+    public int getBlockDataSize(int i) {
+      return blockDataSizes[i];
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 189af113b33..66e2db5e67c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
 import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
 import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -201,6 +202,90 @@ public class TestHFile {
     lru.shutdown();
   }
 
+  private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws 
Exception {
+    assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE);
+  }
+
+  private void assertBytesReadFromCache(boolean isScanMetricsEnabled, 
DataBlockEncoding encoding)
+    throws Exception {
+    // Write a store file
+    Path storeFilePath = writeStoreFile();
+
+    // Initialize the block cache and HFile reader
+    BlockCache lru = BlockCacheFactory.createBlockCache(conf);
+    Assert.assertTrue(lru instanceof LruBlockCache);
+    CacheConfig cacheConfig = new CacheConfig(conf, null, lru, 
ByteBuffAllocator.HEAP);
+    HFileReaderImpl reader =
+      (HFileReaderImpl) HFile.createReader(fs, storeFilePath, cacheConfig, 
true, conf);
+
+    // Read the first block in HFile from the block cache.
+    final int offset = 0;
+    BlockCacheKey cacheKey = new BlockCacheKey(storeFilePath.getName(), 
offset);
+    HFileBlock block = (HFileBlock) lru.getBlock(cacheKey, false, false, true);
+    Assert.assertNull(block);
+
+    // Assert that first block has not been cached in the block cache and no 
disk I/O happened to
+    // check that.
+    ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
+    ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+    block = reader.getCachedBlock(cacheKey, false, false, true, 
BlockType.DATA, null);
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+
+    // Read the first block from the HFile.
+    block = reader.readBlock(offset, -1, true, true, false, true, 
BlockType.DATA, null);
+    Assert.assertNotNull(block);
+    int bytesReadFromFs = block.getOnDiskSizeWithHeader();
+    if (block.getNextBlockOnDiskSize() > 0) {
+      bytesReadFromFs += block.headerSize();
+    }
+    block.release();
+    // Assert that disk I/O happened to read the first block.
+    Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromFs : 0,
+      ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+
+    // Read the first block again and assert that it has been cached in the 
block cache.
+    block = reader.getCachedBlock(cacheKey, false, false, true, 
BlockType.DATA, encoding);
+    long bytesReadFromCache = 0;
+    if (encoding == DataBlockEncoding.NONE) {
+      Assert.assertNotNull(block);
+      bytesReadFromCache = block.getOnDiskSizeWithHeader();
+      if (block.getNextBlockOnDiskSize() > 0) {
+        bytesReadFromCache += block.headerSize();
+      }
+      block.release();
+      // Assert that bytes read from block cache account for same number of 
bytes that would have
+      // been read from FS if block cache wasn't there.
+      Assert.assertEquals(bytesReadFromFs, bytesReadFromCache);
+    } else {
+      Assert.assertNull(block);
+    }
+    Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromCache : 0,
+      ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+
+    reader.close();
+  }
+
+  @Test
+  public void testBytesReadFromCache() throws Exception {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    assertBytesReadFromCache(true);
+  }
+
+  @Test
+  public void testBytesReadFromCacheWithScanMetricsDisabled() throws Exception 
{
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+    assertBytesReadFromCache(false);
+  }
+
+  @Test
+  public void testBytesReadFromCacheWithInvalidDataEncoding() throws Exception 
{
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    assertBytesReadFromCache(true, DataBlockEncoding.FAST_DIFF);
+  }
+
   private BlockCache initCombinedBlockCache(final String l1CachePolicy) {
     Configuration that = HBaseConfiguration.create(conf);
     that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
new file mode 100644
index 00000000000..ff4b63e399c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CompoundBloomFilter;
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.NoOpIndexBlockEncoder;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, LargeTests.class })
+public class TestBytesReadServerSideScanMetrics {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBytesReadServerSideScanMetrics.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestBytesReadServerSideScanMetrics.class);
+
+  private HBaseTestingUtility UTIL;
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] ROW3 = Bytes.toBytes("row3");
+  private static final byte[] ROW4 = Bytes.toBytes("row4");
+
+  private Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    UTIL = new HBaseTestingUtility();
+    conf = UTIL.getConfiguration();
+    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
+    conf.setBoolean(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, false);
+  }
+
+  @Test
+  public void testScanMetricsDisabled() throws Exception {
+    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, false, BloomType.NONE);
+      writeData(tableName, true);
+      Scan scan = new Scan();
+      scan.withStartRow(ROW2, true);
+      scan.withStopRow(ROW4, true);
+      scan.setCaching(1);
+      try (Table table = UTIL.getConnection().getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+        int rowCount = 0;
+        for (Result r : scanner) {
+          rowCount++;
+        }
+        Assert.assertEquals(2, rowCount);
+        Assert.assertNull(scanner.getScanMetrics());
+      }
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadFromFsForSerialSeeks() throws Exception {
+    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, false, BloomType.ROW);
+      writeData(tableName, true);
+      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+
+      // Use oldest timestamp to make sure the fake key is not less than the 
first key in
+      // the file containing key: row2
+      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, 
HConstants.OLDEST_TIMESTAMP, VALUE);
+      assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), 
keyValue,
+        scanMetrics.blockReadOpsCount.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadFromFsForParallelSeeks() throws Exception {
+    conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+    // This property doesn't work correctly if only applied at column family 
level.
+    conf.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, true);
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, false, BloomType.NONE);
+      writeData(tableName, true);
+      HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
+      ThreadPoolExecutor executor =
+        
server.getExecutorService().getExecutorThreadPool(ExecutorType.RS_PARALLEL_SEEK);
+      long tasksCompletedBeforeRead = executor.getCompletedTaskCount();
+      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+      long tasksCompletedAfterRead = executor.getCompletedTaskCount();
+      // Assert both of the HFiles were read using parallel seek executor
+      Assert.assertEquals(2, tasksCompletedAfterRead - 
tasksCompletedBeforeRead);
+
+      // Use oldest timestamp to make sure the fake key is not less than the 
first key in
+      // the file containing key: row2
+      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, 
HConstants.OLDEST_TIMESTAMP, VALUE);
+      assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(), 
keyValue,
+        scanMetrics.blockReadOpsCount.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadFromBlockCache() throws Exception {
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, true, BloomType.NONE);
+      HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
+      LruBlockCache blockCache = (LruBlockCache) server.getBlockCache().get();
+
+      // Assert that acceptable size of LRU block cache is greater than 1MB
+      Assert.assertTrue(blockCache.acceptableSize() > 1024 * 1024);
+      writeData(tableName, true);
+      readDataAndGetScanMetrics(tableName, false);
+      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, 
HConstants.OLDEST_TIMESTAMP, VALUE);
+      assertBlockCacheWarmUp(tableName, keyValue);
+      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+      Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+      assertBytesReadFromBlockCache(tableName, 
scanMetrics.bytesReadFromBlockCache.get(), keyValue);
+      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadFromMemstore() throws Exception {
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, false, BloomType.NONE);
+      writeData(tableName, false);
+      ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+
+      // Assert no flush has happened for the table
+      List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+      for (HRegion region : regions) {
+        HStore store = region.getStore(CF);
+        // Assert no HFile is there
+        Assert.assertEquals(0, store.getStorefiles().size());
+      }
+
+      KeyValue keyValue = new KeyValue(ROW2, CF, CQ, 
HConstants.LATEST_TIMESTAMP, VALUE);
+      int singleKeyValueSize = Segment.getCellLength(keyValue);
+      // First key value will be read on doing seek and second one on doing 
next() to determine
+      // there are no more cells in the row. We don't count key values read on 
SegmentScanner
+      // instance creation.
+      int totalKeyValueSize = 2 * singleKeyValueSize;
+      Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+      Assert.assertEquals(totalKeyValueSize, 
scanMetrics.bytesReadFromMemstore.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadWithSwitchFromPReadToStream() throws Exception {
+    // Set pread max bytes to 3 to make sure that the first row is read using 
pread and the second
+    // one using stream read
+    Map<String, String> configuration = new HashMap<>();
+    configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, "3");
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, true, BloomType.ROW, configuration);
+      writeData(tableName, true);
+      Scan scan = new Scan();
+      scan.withStartRow(ROW2, true);
+      scan.withStopRow(ROW4, true);
+      scan.setScanMetricsEnabled(true);
+      // Set caching to 1 so that one row is read via PREAD and other via 
STREAM
+      scan.setCaching(1);
+      ScanMetrics scanMetrics = null;
+      StoreScanner.instrument();
+      try (Table table = UTIL.getConnection().getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+        int rowCount = 0;
+        Assert.assertFalse(StoreScanner.hasSwitchedToStreamRead());
+        for (Result r : scanner) {
+          rowCount++;
+        }
+        Assert.assertTrue(StoreScanner.hasSwitchedToStreamRead());
+        Assert.assertEquals(2, rowCount);
+        scanMetrics = scanner.getScanMetrics();
+      }
+      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, 
scanMetrics, 2);
+      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+      // There are 2 HFiles so, 1 read op per HFile was done by actual scan to 
read data block.
+      // No bloom blocks will be read as this is non Get scan and only bloom 
filter type is ROW.
+      Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
+      // With scan caching set to 1 and 2 rows being scanned, 2 RPC calls will 
be needed.
+      Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadWhenFlushHappenedInTheMiddleOfScan() throws 
Exception {
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, true, BloomType.ROW);
+      writeData(tableName, false);
+      Scan scan = new Scan();
+      scan.withStartRow(ROW2, true);
+      scan.withStopRow(ROW4, true);
+      scan.setScanMetricsEnabled(true);
+      // Set caching to 1 so that one row is read per RPC call
+      scan.setCaching(1);
+      ScanMetrics scanMetrics = null;
+      try (Table table = UTIL.getConnection().getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+        int rowCount = 0;
+        for (Result r : scanner) {
+          rowCount++;
+          if (rowCount == 1) {
+            flushAndWaitUntilFlushed(tableName, true);
+          }
+        }
+        Assert.assertEquals(2, rowCount);
+        scanMetrics = scanner.getScanMetrics();
+      }
+
+      // Only 1 HFile will be created and it will have only one data block.
+      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, 
scanMetrics, 1);
+      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+
+      Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+
+      // Flush happens after first row is returned from server but before 
second row is returned.
+      // So, 2 cells will be read from memstore i.e. the cell for the first 
row and the next cell
+      // at which scanning will stop. Per row we have 1 cell.
+      int bytesReadFromMemstore =
+        Segment.getCellLength(new KeyValue(ROW2, CF, CQ, 
HConstants.LATEST_TIMESTAMP, VALUE));
+      Assert.assertEquals(2 * bytesReadFromMemstore, 
scanMetrics.bytesReadFromMemstore.get());
+
+      // There will be 1 read op to read the only data block present in the 
HFile.
+      Assert.assertEquals(1, scanMetrics.blockReadOpsCount.get());
+
+      // More than 1 RPC call should be there
+      Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadInReverseScan() throws Exception {
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, true, BloomType.ROW);
+      writeData(tableName, true);
+      Scan scan = new Scan();
+      scan.withStartRow(ROW4, true);
+      scan.withStopRow(ROW2, true);
+      scan.setScanMetricsEnabled(true);
+      scan.setReversed(true);
+      // Set caching to 1 so that one row is read per RPC call
+      scan.setCaching(1);
+      ScanMetrics scanMetrics = null;
+      try (Table table = UTIL.getConnection().getTable(tableName);
+        ResultScanner scanner = table.getScanner(scan)) {
+        int rowCount = 0;
+        for (Result r : scanner) {
+          rowCount++;
+        }
+        Assert.assertEquals(2, rowCount);
+        scanMetrics = scanner.getScanMetrics();
+        System.out.println("Scan metrics: " + scanMetrics.toString());
+      }
+
+      // 1 data block per HFile was read.
+      int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName, 
scanMetrics, 2);
+      Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+
+      // For the HFile containing both the rows, the data block will be read 
from block cache when
+      // KeyValueHeap.next() will be called to read the second row.
+      // KeyValueHeap.next() will call StoreFileScanner.next() when on ROW4 
which is last row of the
+      // file causing curBlock to be set to null in underlying HFileScanner. 
As curBlock is null,
+      // kvNext will be null and call to StoreFileScanner.seekToPreviousRow() 
will be made. As the
+      // curBlock of HFileScanner is null so, 
StoreFileScanner.seekToPreviousRow() will load data
+      // block from BlockCache. So, 1 data block will be read from block cache.
+      Assert.assertEquals(bytesReadFromFs / 2, 
scanMetrics.bytesReadFromBlockCache.get());
+      Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+
+      // 1 read op per HFile was done by actual scan to read data block.
+      Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
+
+      // 2 RPC calls will be there
+      Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  @Test
+  public void testBytesReadWithLazySeek() throws Exception {
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      createTable(tableName, true, BloomType.NONE);
+      writeData(tableName, true);
+      try (Table table = UTIL.getConnection().getTable(tableName)) {
+        byte[] newValue = Bytes.toBytes("new value");
+        // Update the value of ROW2 and let it stay in memstore. Will assert 
that lazy seek doesn't
+        // lead to seek on the HFile.
+        table.put(new Put(ROW2).addColumn(CF, CQ, newValue));
+        Scan scan = new Scan();
+        scan.withStartRow(ROW2, true);
+        scan.withStopRow(ROW2, true);
+        scan.setScanMetricsEnabled(true);
+        Map<byte[], NavigableSet<byte[]>> familyMap = new HashMap<>();
+        familyMap.put(CF, new TreeSet<>(Bytes.BYTES_COMPARATOR));
+        familyMap.get(CF).add(CQ);
+        scan.setFamilyMap(familyMap);
+        ScanMetrics scanMetrics = null;
+        try (ResultScanner scanner = table.getScanner(scan)) {
+          int rowCount = 0;
+          for (Result r : scanner) {
+            rowCount++;
+            Assert.assertArrayEquals(newValue, r.getValue(CF, CQ));
+          }
+          Assert.assertEquals(1, rowCount);
+          scanMetrics = scanner.getScanMetrics();
+        }
+        // No real seek should be done on the HFile.
+        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+
+        // The cell should be coming purely from memstore.
+        int cellSize =
+          Segment.getCellLength(new KeyValue(ROW2, CF, CQ, 
HConstants.LATEST_TIMESTAMP, newValue));
+        Assert.assertEquals(cellSize, scanMetrics.bytesReadFromMemstore.get());
+        Assert.assertEquals(1, scanMetrics.countOfRPCcalls.get());
+      }
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  /**
+   * Test consecutive calls to RegionScannerImpl.next() to make sure 
populating scan metrics from
+   * ThreadLocalServerSideScanMetrics is done correctly.
+   */
+  @Test
+  public void testConsecutiveRegionScannerNextCalls() throws Exception {
+    // We will be setting a very small block size so, make sure to set big 
enough pread max bytes
+    Map<String, String> configuration = new HashMap<>();
+    configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 
Integer.toString(64 * 1024));
+    UTIL.startMiniCluster();
+    try {
+      TableName tableName = TableName.valueOf(name.getMethodName());
+      // Set the block size to 4 bytes to get 1 row per data block in HFile.
+      createTable(tableName, true, BloomType.NONE, 4, configuration);
+      try (Table table = UTIL.getConnection().getTable(tableName)) {
+        // Add 3 rows to the table.
+        table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
+        table.put(new Put(ROW3).addColumn(CF, CQ, VALUE));
+        table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
+
+        ScanMetrics scanMetrics = null;
+
+        // Scan the added rows. The rows should be read from memstore.
+        Scan scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+        try (ResultScanner scanner = table.getScanner(scan)) {
+          int rowCount = 0;
+          for (Result r : scanner) {
+            rowCount++;
+          }
+          Assert.assertEquals(2, rowCount);
+          scanMetrics = scanner.getScanMetrics();
+        }
+
+        // Assert that rows were read from only memstore and involved 2 RPC 
calls.
+        int cellSize =
+          Segment.getCellLength(new KeyValue(ROW2, CF, CQ, 
HConstants.LATEST_TIMESTAMP, VALUE));
+        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+        Assert.assertEquals(3 * cellSize, 
scanMetrics.bytesReadFromMemstore.get());
+
+        // Flush the table and make sure that the rows are read from HFiles.
+        flushAndWaitUntilFlushed(tableName, false);
+        scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+        scanMetrics = null;
+        try (ResultScanner scanner = table.getScanner(scan)) {
+          int rowCount = 0;
+          for (Result r : scanner) {
+            rowCount++;
+          }
+          Assert.assertEquals(2, rowCount);
+          scanMetrics = scanner.getScanMetrics();
+        }
+
+        // Assert that rows were read from HFiles and involved 2 RPC calls.
+        int bytesReadFromFs = 
getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, true);
+        Assert.assertEquals(bytesReadFromFs, 
scanMetrics.bytesReadFromFs.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+        Assert.assertEquals(3, scanMetrics.blockReadOpsCount.get());
+        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+
+        // Make sure that rows are read from Blockcache now.
+        scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+        scanMetrics = null;
+        try (ResultScanner scanner = table.getScanner(scan)) {
+          int rowCount = 0;
+          for (Result r : scanner) {
+            rowCount++;
+          }
+          Assert.assertEquals(2, rowCount);
+          scanMetrics = scanner.getScanMetrics();
+        }
+
+        // Assert that rows were read from Blockcache and involved 2 RPC calls.
+        int bytesReadFromBlockCache =
+          getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, false);
+        Assert.assertEquals(bytesReadFromBlockCache, 
scanMetrics.bytesReadFromBlockCache.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+        Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+        Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+        Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+      }
+    } finally {
+      UTIL.shutdownMiniCluster();
+    }
+  }
+
+  private Scan createScanToReadOneRowAtATimeFromServer(byte[] startRow, byte[] 
stopRow) {
+    Scan scan = new Scan();
+    scan.withStartRow(startRow, true);
+    scan.withStopRow(stopRow, true);
+    scan.setScanMetricsEnabled(true);
+    scan.setCaching(1);
+    return scan;
+  }
+
+  private void flushAndWaitUntilFlushed(TableName tableName, boolean 
waitForUpdatedReaders)
+    throws Exception {
+    if (waitForUpdatedReaders) {
+      StoreScanner.instrument();
+    }
+    UTIL.flush(tableName);
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    HRegion region = regions.get(0);
+    HStore store = region.getStore(CF);
+    // In milliseconds
+    int maxWaitTime = 100000;
+    int totalWaitTime = 0;
+    int sleepTime = 10000;
+    while (
+      store.getStorefiles().size() == 0
+        || (waitForUpdatedReaders && !StoreScanner.hasUpdatedReaders())
+    ) {
+      Thread.sleep(sleepTime);
+      totalWaitTime += sleepTime;
+      if (totalWaitTime >= maxWaitTime) {
+        throw new Exception("Store files not flushed after " + maxWaitTime + 
"ms");
+      }
+    }
+    Assert.assertEquals(1, store.getStorefiles().size());
+  }
+
+  private int getBytesReadToReadConsecutiveDataBlocks(TableName tableName,
+    int expectedStoreFileCount, int expectedDataBlockCount, boolean 
isReadFromFs) throws Exception {
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    HRegion region = regions.get(0);
+    HStore store = region.getStore(CF);
+    Collection<HStoreFile> storeFiles = store.getStorefiles();
+    Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
+    int bytesReadFromFs = 0;
+    for (HStoreFile storeFile : storeFiles) {
+      StoreFileReader reader = storeFile.getReader();
+      HFile.Reader hfileReader = reader.getHFileReader();
+      HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+      FixedFileTrailer trailer = hfileReader.getTrailer();
+      int dataIndexLevels = trailer.getNumDataIndexLevels();
+      long loadOnOpenDataOffset = trailer.getLoadOnOpenDataOffset();
+      HFileBlock.BlockIterator blockIterator = blockReader.blockRange(0, 
loadOnOpenDataOffset);
+      HFileBlock block;
+      boolean readNextBlock = false;
+      int blockCount = 0;
+      while ((block = blockIterator.nextBlock()) != null) {
+        blockCount++;
+        bytesReadFromFs += block.getOnDiskSizeWithHeader();
+        if (isReadFromFs && readNextBlock) {
+          // This accounts for savings we get from prefetched header but these 
saving are only
+          // applicable when reading from FS and not from BlockCache.
+          bytesReadFromFs -= block.headerSize();
+          readNextBlock = false;
+        }
+        if (block.getNextBlockOnDiskSize() > 0) {
+          bytesReadFromFs += block.headerSize();
+          readNextBlock = true;
+        }
+        Assert.assertTrue(block.getBlockType().isData());
+      }
+      blockIterator.freeBlocks();
+      // No intermediate or leaf index blocks are expected.
+      Assert.assertEquals(1, dataIndexLevels);
+      Assert.assertEquals(expectedDataBlockCount, blockCount);
+    }
+    return bytesReadFromFs;
+  }
+
+  private int getBytesReadFromFsForNonGetScan(TableName tableName, ScanMetrics 
scanMetrics,
+    int expectedStoreFileCount) throws Exception {
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    HRegion region = regions.get(0);
+    HStore store = region.getStore(CF);
+    Collection<HStoreFile> storeFiles = store.getStorefiles();
+    Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
+    int bytesReadFromFs = 0;
+    for (HStoreFile storeFile : storeFiles) {
+      StoreFileReader reader = storeFile.getReader();
+      HFile.Reader hfileReader = reader.getHFileReader();
+      HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+      FixedFileTrailer trailer = hfileReader.getTrailer();
+      int dataIndexLevels = trailer.getNumDataIndexLevels();
+      // Read the first block of the HFile. First block is always expected to 
be a DATA block and
+      // the HFile is expected to have only one DATA block.
+      HFileBlock block = blockReader.readBlockData(0, -1, true, true, true);
+      Assert.assertTrue(block.getBlockType().isData());
+      bytesReadFromFs += block.getOnDiskSizeWithHeader();
+      if (block.getNextBlockOnDiskSize() > 0) {
+        bytesReadFromFs += block.headerSize();
+      }
+      block.release();
+      // Each of the HFiles is expected to have only root index but no 
intermediate or leaf index
+      // blocks.
+      Assert.assertEquals(1, dataIndexLevels);
+    }
+    return bytesReadFromFs;
+  }
+
+  private ScanMetrics readDataAndGetScanMetrics(TableName tableName, boolean 
isScanMetricsEnabled)
+    throws Exception {
+    Scan scan = new Scan();
+    scan.withStartRow(ROW2, true);
+    scan.withStopRow(ROW2, true);
+    scan.setScanMetricsEnabled(isScanMetricsEnabled);
+    ScanMetrics scanMetrics;
+    try (Table table = UTIL.getConnection().getTable(tableName);
+      ResultScanner scanner = table.getScanner(scan)) {
+      int rowCount = 0;
+      StoreFileScanner.instrument();
+      for (Result r : scanner) {
+        rowCount++;
+      }
+      Assert.assertEquals(1, rowCount);
+      scanMetrics = scanner.getScanMetrics();
+    }
+    if (isScanMetricsEnabled) {
+      LOG.info("Bytes read from fs: " + scanMetrics.bytesReadFromFs.get());
+      LOG.info("Bytes read from block cache: " + 
scanMetrics.bytesReadFromBlockCache.get());
+      LOG.info("Bytes read from memstore: " + 
scanMetrics.bytesReadFromMemstore.get());
+      LOG.info("Count of bytes scanned: " + 
scanMetrics.countOfBlockBytesScanned.get());
+      LOG.info("StoreFileScanners seek count: " + 
StoreFileScanner.getSeekCount());
+    }
+    return scanMetrics;
+  }
+
+  private void writeData(TableName tableName, boolean shouldFlush) throws 
Exception {
+    try (Table table = UTIL.getConnection().getTable(tableName)) {
+      table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
+      table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
+      if (shouldFlush) {
+        // Create a HFile
+        UTIL.flush(tableName);
+      }
+
+      table.put(new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, VALUE));
+      table.put(new Put(Bytes.toBytes("row5")).addColumn(CF, CQ, VALUE));
+      if (shouldFlush) {
+        // Create a HFile
+        UTIL.flush(tableName);
+      }
+    }
+  }
+
+  private void createTable(TableName tableName, boolean blockCacheEnabled, 
BloomType bloomType)
+    throws Exception {
+    createTable(tableName, blockCacheEnabled, bloomType, 
HConstants.DEFAULT_BLOCKSIZE,
+      new HashMap<>());
+  }
+
+  private void createTable(TableName tableName, boolean blockCacheEnabled, 
BloomType bloomType,
+    Map<String, String> configuration) throws Exception {
+    createTable(tableName, blockCacheEnabled, bloomType, 
HConstants.DEFAULT_BLOCKSIZE,
+      configuration);
+  }
+
+  private void createTable(TableName tableName, boolean blockCacheEnabled, 
BloomType bloomType,
+    int blocksize, Map<String, String> configuration) throws Exception {
+    Admin admin = UTIL.getAdmin();
+    TableDescriptorBuilder tableDescriptorBuilder = 
TableDescriptorBuilder.newBuilder(tableName);
+    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
+      ColumnFamilyDescriptorBuilder.newBuilder(CF);
+    columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
+    columnFamilyDescriptorBuilder.setBlockCacheEnabled(blockCacheEnabled);
+    columnFamilyDescriptorBuilder.setBlocksize(blocksize);
+    for (Map.Entry<String, String> entry : configuration.entrySet()) {
+      columnFamilyDescriptorBuilder.setConfiguration(entry.getKey(), 
entry.getValue());
+    }
+    
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
+    admin.createTable(tableDescriptorBuilder.build());
+    UTIL.waitUntilAllRegionsAssigned(tableName);
+  }
+
+  private void assertBytesReadFromFs(TableName tableName, long 
actualBytesReadFromFs,
+    KeyValue keyValue, long actualReadOps) throws Exception {
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    MutableInt totalExpectedBytesReadFromFs = new MutableInt(0);
+    MutableInt totalExpectedReadOps = new MutableInt(0);
+    for (HRegion region : regions) {
+      Assert.assertNull(region.getBlockCache());
+      HStore store = region.getStore(CF);
+      Collection<HStoreFile> storeFiles = store.getStorefiles();
+      Assert.assertEquals(2, storeFiles.size());
+      for (HStoreFile storeFile : storeFiles) {
+        StoreFileReader reader = storeFile.getReader();
+        HFile.Reader hfileReader = reader.getHFileReader();
+        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof 
CompoundBloomFilter);
+        CompoundBloomFilter cbf = bloomFilter == null ? null : 
(CompoundBloomFilter) bloomFilter;
+        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+          @Override
+          public void accept(HFileBlock block) {
+            totalExpectedBytesReadFromFs.add(block.getOnDiskSizeWithHeader());
+            if (block.getNextBlockOnDiskSize() > 0) {
+              totalExpectedBytesReadFromFs.add(block.headerSize());
+            }
+            totalExpectedReadOps.add(1);
+          }
+        };
+        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+      }
+    }
+    Assert.assertEquals(totalExpectedBytesReadFromFs.longValue(), 
actualBytesReadFromFs);
+    Assert.assertEquals(totalExpectedReadOps.longValue(), actualReadOps);
+  }
+
+  private void readHFile(HFile.Reader hfileReader, CompoundBloomFilter cbf, 
KeyValue keyValue,
+    Consumer<HFileBlock> bytesReadFunction) throws Exception {
+    HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+    FixedFileTrailer trailer = hfileReader.getTrailer();
+    HFileContext meta = hfileReader.getFileContext();
+    long fileSize = hfileReader.length();
+
+    // Read the bloom block from FS
+    if (cbf != null) {
+      // Read a block in load-on-open section to make sure prefetched header 
is not bloom
+      // block's header
+      blockReader.readBlockData(trailer.getLoadOnOpenDataOffset(), -1, true, 
true, true).release();
+
+      HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
+      byte[] row = ROW2;
+      int blockIndex = index.rootBlockContainingKey(row, 0, row.length);
+      HFileBlock bloomBlock = cbf.getBloomBlock(blockIndex);
+      boolean fileContainsKey = BloomFilterUtil.contains(row, 0, row.length,
+        bloomBlock.getBufferReadOnly(), bloomBlock.headerSize(),
+        bloomBlock.getUncompressedSizeWithoutHeader(), cbf.getHash(), 
cbf.getHashCount());
+      bytesReadFunction.accept(bloomBlock);
+      // Asser that the block read is a bloom block
+      Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
+      bloomBlock.release();
+      if (!fileContainsKey) {
+        // Key is not in th file, so we don't need to read the data block
+        return;
+      }
+    }
+
+    // Indexes use NoOpEncodedSeeker
+    MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
+    HFileBlock.BlockIterator blockIter = 
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+      fileSize - trailer.getTrailerSize());
+    HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+
+    // Comparator class name is stored in the trailer in version 3.
+    CellComparator comparator = trailer.createComparator();
+    // Initialize the seeker
+    seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
+      trailer.getNumDataIndexLevels());
+
+    int blockLevelsRead = 1; // Root index is the first level
+
+    int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
+    long currentOffset = seeker.getBlockOffset(rootLevIndex);
+    int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
+
+    HFileBlock prevBlock = null;
+    do {
+      prevBlock = block;
+      block = blockReader.readBlockData(currentOffset, currentDataSize, true, 
true, true);
+      HFileBlock unpacked = block.unpack(meta, blockReader);
+      if (unpacked != block) {
+        block.release();
+        block = unpacked;
+      }
+      bytesReadFunction.accept(block);
+      if (!block.getBlockType().isData()) {
+        ByteBuff buffer = block.getBufferWithoutHeader();
+        // Place the buffer at the correct position
+        HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer, 
keyValue, comparator);
+        currentOffset = buffer.getLong();
+        currentDataSize = buffer.getInt();
+      }
+      prevBlock.release();
+      blockLevelsRead++;
+    } while (!block.getBlockType().isData());
+    block.release();
+    blockIter.freeBlocks();
+
+    Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
+  }
+
+  private void assertBytesReadFromBlockCache(TableName tableName,
+    long actualBytesReadFromBlockCache, KeyValue keyValue) throws Exception {
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    MutableInt totalExpectedBytesReadFromBlockCache = new MutableInt(0);
+    for (HRegion region : regions) {
+      Assert.assertNotNull(region.getBlockCache());
+      HStore store = region.getStore(CF);
+      Collection<HStoreFile> storeFiles = store.getStorefiles();
+      Assert.assertEquals(2, storeFiles.size());
+      for (HStoreFile storeFile : storeFiles) {
+        StoreFileReader reader = storeFile.getReader();
+        HFile.Reader hfileReader = reader.getHFileReader();
+        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof 
CompoundBloomFilter);
+        CompoundBloomFilter cbf = bloomFilter == null ? null : 
(CompoundBloomFilter) bloomFilter;
+        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+          @Override
+          public void accept(HFileBlock block) {
+            
totalExpectedBytesReadFromBlockCache.add(block.getOnDiskSizeWithHeader());
+            if (block.getNextBlockOnDiskSize() > 0) {
+              totalExpectedBytesReadFromBlockCache.add(block.headerSize());
+            }
+          }
+        };
+        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+      }
+    }
+    Assert.assertEquals(totalExpectedBytesReadFromBlockCache.longValue(),
+      actualBytesReadFromBlockCache);
+  }
+
+  private void assertBlockCacheWarmUp(TableName tableName, KeyValue keyValue) 
throws Exception {
+    List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+    Assert.assertEquals(1, regions.size());
+    for (HRegion region : regions) {
+      Assert.assertNotNull(region.getBlockCache());
+      HStore store = region.getStore(CF);
+      Collection<HStoreFile> storeFiles = store.getStorefiles();
+      Assert.assertEquals(2, storeFiles.size());
+      for (HStoreFile storeFile : storeFiles) {
+        StoreFileReader reader = storeFile.getReader();
+        HFile.Reader hfileReader = reader.getHFileReader();
+        BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+        Assert.assertTrue(bloomFilter == null || bloomFilter instanceof 
CompoundBloomFilter);
+        CompoundBloomFilter cbf = bloomFilter == null ? null : 
(CompoundBloomFilter) bloomFilter;
+        Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+          @Override
+          public void accept(HFileBlock block) {
+            assertBlockIsCached(hfileReader, block, region.getBlockCache());
+          }
+        };
+        readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+      }
+    }
+  }
+
+  private void assertBlockIsCached(HFile.Reader hfileReader, HFileBlock block,
+    BlockCache blockCache) {
+    if (blockCache == null) {
+      return;
+    }
+    Path path = hfileReader.getPath();
+    BlockCacheKey key = new BlockCacheKey(path, block.getOffset(), true, 
block.getBlockType());
+    HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(key, true, 
false, true);
+    Assert.assertNotNull(cachedBlock);
+    Assert.assertEquals(block.getOnDiskSizeWithHeader(), 
cachedBlock.getOnDiskSizeWithHeader());
+    Assert.assertEquals(block.getNextBlockOnDiskSize(), 
cachedBlock.getNextBlockOnDiskSize());
+    cachedBlock.release();
+  }
+
+  private static class MyNoOpEncodedSeeker extends 
NoOpIndexBlockEncoder.NoOpEncodedSeeker {
+    public long getBlockOffset(int i) {
+      return blockOffsets[i];
+    }
+
+    public int getBlockDataSize(int i) {
+      return blockDataSizes[i];
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 60fdf235775..7ac45775009 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;
 import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -354,6 +356,48 @@ public class TestDefaultMemStore {
     assertScannerResults(s, new KeyValue[] { kv1, kv2 });
   }
 
+  private long getBytesReadFromMemstore() throws IOException {
+    final byte[] f = Bytes.toBytes("family");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] v = Bytes.toBytes("value");
+    int numKvs = 10;
+
+    MultiVersionConcurrencyControl.WriteEntry w = mvcc.begin();
+
+    KeyValue kv;
+    KeyValue[] kvs = new KeyValue[numKvs];
+    long totalCellSize = 0;
+    for (int i = 0; i < numKvs; i++) {
+      byte[] row = Bytes.toBytes(i);
+      kv = new KeyValue(row, f, q1, v);
+      kv.setSequenceId(w.getWriteNumber());
+      memstore.add(kv, null);
+      kvs[i] = kv;
+      totalCellSize += Segment.getCellLength(kv);
+    }
+    mvcc.completeAndWait(w);
+
+    ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset();
+    KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
+    assertScannerResults(s, kvs);
+    return totalCellSize;
+  }
+
+  @Test
+  public void testBytesReadFromMemstore() throws IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+    long totalCellSize = getBytesReadFromMemstore();
+    Assert.assertEquals(totalCellSize,
+      ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset());
+  }
+
+  @Test
+  public void testBytesReadFromMemstoreWithScanMetricsDisabled() throws 
IOException {
+    ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+    getBytesReadFromMemstore();
+    Assert.assertEquals(0, 
ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset());
+  }
+
   /**
    * Regression test for HBASE-2616, HBASE-2670. When we insert a 
higher-memstoreTS version of a
    * cell but with the same timestamp, we still need to provide consistent 
reads for the same

Reply via email to