This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 927f077abaa HBASE-29398: Server side scan metrics for bytes read from
FS vs Block cache vs memstore (#7162) (#7136)
927f077abaa is described below
commit 927f077abaa7c03b46d9c3f0ca5420a1e9b27da7
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Jul 19 00:48:59 2025 +0530
HBASE-29398: Server side scan metrics for bytes read from FS vs Block cache
vs memstore (#7162) (#7136)
Signed-off-by: Viraj Jasani <[email protected]>
Signed-off-by: Hari Krishna Dara <[email protected]>
---
.../client/metrics/ServerSideScanMetrics.java | 19 +
.../hadoop/hbase/io/hfile/BlockCacheUtil.java | 1 +
.../hadoop/hbase/io/hfile/CompoundBloomFilter.java | 18 +-
.../hadoop/hbase/io/hfile/FixedFileTrailer.java | 10 +-
.../apache/hadoop/hbase/io/hfile/HFileBlock.java | 24 +-
.../hadoop/hbase/io/hfile/HFileBlockIndex.java | 7 +-
.../hadoop/hbase/io/hfile/HFileReaderImpl.java | 130 +--
.../hadoop/hbase/io/hfile/LruBlockCache.java | 5 +-
.../hbase/io/hfile/NoOpIndexBlockEncoder.java | 4 +-
.../ThreadLocalServerSideScanMetrics.java | 160 ++++
.../hbase/regionserver/RegionScannerImpl.java | 27 +-
.../hadoop/hbase/regionserver/SegmentScanner.java | 13 +
.../hadoop/hbase/regionserver/StoreFileReader.java | 3 +-
.../hadoop/hbase/regionserver/StoreFileWriter.java | 4 +-
.../hadoop/hbase/regionserver/StoreScanner.java | 38 +
.../regionserver/handler/ParallelSeekHandler.java | 36 +
.../hadoop/hbase/io/hfile/TestBytesReadFromFs.java | 412 ++++++++++
.../apache/hadoop/hbase/io/hfile/TestHFile.java | 85 ++
.../TestBytesReadServerSideScanMetrics.java | 894 +++++++++++++++++++++
.../hbase/regionserver/TestDefaultMemStore.java | 44 +
20 files changed, 1866 insertions(+), 68 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 916451df75d..6b3a4f5675a 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -52,6 +52,10 @@ public class ServerSideScanMetrics {
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_FS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
}
/**
@@ -68,6 +72,11 @@ public class ServerSideScanMetrics {
public static final String BLOCK_BYTES_SCANNED_KEY_METRIC_NAME =
"BLOCK_BYTES_SCANNED";
public static final String FS_READ_TIME_METRIC_NAME = "FS_READ_TIME";
+ public static final String BYTES_READ_FROM_FS_METRIC_NAME =
"BYTES_READ_FROM_FS";
+ public static final String BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME =
+ "BYTES_READ_FROM_BLOCK_CACHE";
+ public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME =
"BYTES_READ_FROM_MEMSTORE";
+ public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME =
"BLOCK_READ_OPS_COUNT";
/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
@@ -102,6 +111,16 @@ public class ServerSideScanMetrics {
public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
+ public final AtomicLong bytesReadFromFs =
createCounter(BYTES_READ_FROM_FS_METRIC_NAME);
+
+ public final AtomicLong bytesReadFromBlockCache =
+ createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+
+ public final AtomicLong bytesReadFromMemstore =
+ createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+
+ public final AtomicLong blockReadOpsCount =
createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+
/**
* Sets counter with counterName to passed in value, does nothing if counter
does not exist. If
* region level scan metrics are enabled then sets the value of counter for
the current region
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
index b130d06ca43..c963fc2617f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/BlockCacheUtil.java
@@ -288,6 +288,7 @@ public class BlockCacheUtil {
.withFillHeader(FILL_HEADER).withOffset(block.getOffset()).withNextBlockOnDiskSize(-1)
.withOnDiskDataSizeWithHeader(block.getOnDiskDataSizeWithHeader() +
numBytes)
.withHFileContext(cloneContext(block.getHFileContext()))
+ .withNextBlockOnDiskSize(block.getNextBlockOnDiskSize())
.withByteBuffAllocator(cacheConf.getByteBuffAllocator()).withShared(!buff.hasArray()).build();
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
index 95bc1c7b83d..b1cb519fb58 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CompoundBloomFilter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.io.hfile;
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.BloomFilter;
@@ -120,7 +121,8 @@ public class CompoundBloomFilter extends
CompoundBloomFilterBase implements Bloo
return result;
}
- private HFileBlock getBloomBlock(int block) {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public HFileBlock getBloomBlock(int block) {
HFileBlock bloomBlock;
try {
// We cache the block and use a positional read.
@@ -218,4 +220,18 @@ public class CompoundBloomFilter extends
CompoundBloomFilterBase implements Bloo
return sb.toString();
}
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public HFileBlockIndex.BlockIndexReader getBloomIndex() {
+ return index;
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public int getHashCount() {
+ return hashCount;
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public Hash getHash() {
+ return hash;
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index f2071d21abf..fe0b3124cb5 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -27,10 +27,12 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellComparatorImpl;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.InnerStoreCellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -410,6 +412,11 @@ public class FixedFileTrailer {
FixedFileTrailer fft = new FixedFileTrailer(majorVersion, minorVersion);
fft.deserialize(new DataInputStream(new ByteArrayInputStream(buf.array(),
buf.arrayOffset() + bufferSize - trailerSize, trailerSize)));
+ boolean isScanMetricsEnabled =
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+ if (isScanMetricsEnabled) {
+ ThreadLocalServerSideScanMetrics.addBytesReadFromFs(trailerSize);
+ ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1);
+ }
return fft;
}
@@ -648,7 +655,8 @@ public class FixedFileTrailer {
}
}
- CellComparator createComparator() throws IOException {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public CellComparator createComparator() throws IOException {
expectAtLeastMajorVersion(2);
return createComparator(comparatorClassName);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index 036270824f7..122380c5649 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
@@ -56,6 +57,7 @@ import
org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
import
org.apache.hadoop.hbase.io.hfile.trace.HFileContextAttributesBuilderConsumer;
import org.apache.hadoop.hbase.io.util.BlockIOUtils;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.nio.MultiByteBuff;
import org.apache.hadoop.hbase.nio.SingleByteBuff;
@@ -405,7 +407,8 @@ public class HFileBlock implements Cacheable {
* present) read by peeking into the next block's header; use as a
hint when doing a read
* of the next block when scanning or running over a file.
*/
- int getNextBlockOnDiskSize() {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public int getNextBlockOnDiskSize() {
return nextBlockOnDiskSize;
}
@@ -626,7 +629,8 @@ public class HFileBlock implements Cacheable {
* Retrieves the decompressed/decrypted view of this block. An encoded block
remains in its
* encoded structure. Internal structures are shared between instances where
applicable.
*/
- HFileBlock unpack(HFileContext fileContext, FSReader reader) throws
IOException {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public HFileBlock unpack(HFileContext fileContext, FSReader reader) throws
IOException {
if (!fileContext.isCompressedOrEncrypted()) {
// TODO: cannot use our own fileContext here because
HFileBlock(ByteBuffer, boolean),
// which is used for block serialization to L2 cache, does not preserve
encoding and
@@ -1241,7 +1245,8 @@ public class HFileBlock implements Cacheable {
* Iterator for reading {@link HFileBlock}s in load-on-open-section, such as
root data index
* block, meta index block, file info block etc.
*/
- interface BlockIterator {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public interface BlockIterator {
/**
* Get the next block, or null if there are no more blocks to iterate.
*/
@@ -1265,7 +1270,8 @@ public class HFileBlock implements Cacheable {
}
/** An HFile block reader with iteration ability. */
- interface FSReader {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public interface FSReader {
/**
* Reads the block at the given offset in the file with the given on-disk
size and uncompressed
* size.
@@ -1738,6 +1744,7 @@ public class HFileBlock implements Cacheable {
// checksums. Can change with circumstances. The below flag is whether
the
// file has support for checksums (version 2+).
boolean checksumSupport = this.fileContext.isUseHBaseChecksum();
+ boolean isScanMetricsEnabled =
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
long startTime = EnvironmentEdgeManager.currentTime();
if (onDiskSizeWithHeader == -1) {
// The caller does not know the block size. Need to get it from the
header. If header was
@@ -1754,6 +1761,9 @@ public class HFileBlock implements Cacheable {
headerBuf = HEAP.allocate(hdrSize);
readAtOffset(is, headerBuf, hdrSize, false, offset, pread);
headerBuf.rewind();
+ if (isScanMetricsEnabled) {
+ ThreadLocalServerSideScanMetrics.addBytesReadFromFs(hdrSize);
+ }
}
onDiskSizeWithHeader = getOnDiskSizeWithHeader(headerBuf,
checksumSupport);
}
@@ -1801,6 +1811,12 @@ public class HFileBlock implements Cacheable {
boolean readNextHeader = readAtOffset(is, onDiskBlock,
onDiskSizeWithHeader - preReadHeaderSize, true, offset +
preReadHeaderSize, pread);
onDiskBlock.rewind(); // in case of moving position when copying a
cached header
+ if (isScanMetricsEnabled) {
+ long bytesRead =
+ (onDiskSizeWithHeader - preReadHeaderSize) + (readNextHeader ?
hdrSize : 0);
+ ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesRead);
+ ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(1);
+ }
// the call to validateChecksum for this block excludes the next block
header over-read, so
// no reason to delay extracting this value.
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
index 8989ef2b281..2858800e932 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlockIndex.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KeyOnlyKeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -561,7 +562,8 @@ public class HFileBlockIndex {
* array of offsets to the entries within the block. This allows us to do
binary search for the
* entry corresponding to the given key without having to deserialize the
block.
*/
- static abstract class BlockIndexReader implements HeapSize {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public static abstract class BlockIndexReader implements HeapSize {
protected long[] blockOffsets;
protected int[] blockDataSizes;
@@ -808,7 +810,8 @@ public class HFileBlockIndex {
* @return the index position where the given key was found, otherwise
return -1 in the case the
* given key is before the first key.
*/
- static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
CellComparator comparator) {
+ public static int locateNonRootIndexEntry(ByteBuff nonRootBlock, Cell key,
+ CellComparator comparator) {
int entryIndex = binarySearchNonRootIndex(key, nonRootBlock, comparator);
if (entryIndex != -1) {
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index db2383db399..e1e9eaf8a53 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ByteBufferKeyOnlyKeyValue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -45,6 +46,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
@@ -1105,71 +1107,91 @@ public abstract class HFileReaderImpl implements
HFile.Reader, Configurable {
* Retrieve block from cache. Validates the retrieved block's type vs {@code
expectedBlockType}
* and its encoding vs. {@code expectedDataBlockEncoding}. Unpacks the block
as necessary.
*/
- private HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean
cacheBlock, boolean useLock,
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public HFileBlock getCachedBlock(BlockCacheKey cacheKey, boolean cacheBlock,
boolean useLock,
boolean updateCacheMetrics, BlockType expectedBlockType,
DataBlockEncoding expectedDataBlockEncoding) throws IOException {
// Check cache for block. If found return.
BlockCache cache = cacheConf.getBlockCache().orElse(null);
+ long cachedBlockBytesRead = 0;
if (cache != null) {
- HFileBlock cachedBlock =
- (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
updateCacheMetrics);
- if (cachedBlock != null) {
- if
(cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
- HFileBlock compressedBlock = cachedBlock;
- cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
- // In case of compressed block after unpacking we can release the
compressed block
- if (compressedBlock != cachedBlock) {
- compressedBlock.release();
+ HFileBlock cachedBlock = null;
+ boolean isScanMetricsEnabled =
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+ try {
+ cachedBlock =
+ (HFileBlock) cache.getBlock(cacheKey, cacheBlock, useLock,
updateCacheMetrics);
+ if (cachedBlock != null) {
+ if
(cacheConf.shouldCacheCompressed(cachedBlock.getBlockType().getCategory())) {
+ HFileBlock compressedBlock = cachedBlock;
+ cachedBlock = compressedBlock.unpack(hfileContext, fsBlockReader);
+ // In case of compressed block after unpacking we can release the
compressed block
+ if (compressedBlock != cachedBlock) {
+ compressedBlock.release();
+ }
+ }
+ try {
+ validateBlockType(cachedBlock, expectedBlockType);
+ } catch (IOException e) {
+ returnAndEvictBlock(cache, cacheKey, cachedBlock);
+ cachedBlock = null;
+ throw e;
}
- }
- try {
- validateBlockType(cachedBlock, expectedBlockType);
- } catch (IOException e) {
- returnAndEvictBlock(cache, cacheKey, cachedBlock);
- throw e;
- }
- if (expectedDataBlockEncoding == null) {
- return cachedBlock;
- }
- DataBlockEncoding actualDataBlockEncoding =
cachedBlock.getDataBlockEncoding();
- // Block types other than data blocks always have
- // DataBlockEncoding.NONE. To avoid false negative cache misses, only
- // perform this check if cached block is a data block.
- if (
- cachedBlock.getBlockType().isData()
- && !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
- ) {
- // This mismatch may happen if a Scanner, which is used for say a
- // compaction, tries to read an encoded block from the block cache.
- // The reverse might happen when an EncodedScanner tries to read
- // un-encoded blocks which were cached earlier.
- //
- // Because returning a data block with an implicit BlockType mismatch
- // will cause the requesting scanner to throw a disk read should be
- // forced here. This will potentially cause a significant number of
- // cache misses, so update so we should keep track of this as it
might
- // justify the work on a CompoundScanner.
+ if (expectedDataBlockEncoding == null) {
+ return cachedBlock;
+ }
+ DataBlockEncoding actualDataBlockEncoding =
cachedBlock.getDataBlockEncoding();
+ // Block types other than data blocks always have
+ // DataBlockEncoding.NONE. To avoid false negative cache misses, only
+ // perform this check if cached block is a data block.
if (
- !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
- && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
+ cachedBlock.getBlockType().isData()
+ && !actualDataBlockEncoding.equals(expectedDataBlockEncoding)
) {
- // If the block is encoded but the encoding does not match the
- // expected encoding it is likely the encoding was changed but the
- // block was not yet evicted. Evictions on file close happen async
- // so blocks with the old encoding still linger in cache for some
- // period of time. This event should be rare as it only happens on
- // schema definition change.
- LOG.info(
- "Evicting cached block with key {} because data block encoding
mismatch; "
- + "expected {}, actual {}, path={}",
- cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding,
path);
- // This is an error scenario. so here we need to release the block.
- returnAndEvictBlock(cache, cacheKey, cachedBlock);
+ // This mismatch may happen if a Scanner, which is used for say a
+ // compaction, tries to read an encoded block from the block cache.
+ // The reverse might happen when an EncodedScanner tries to read
+ // un-encoded blocks which were cached earlier.
+ //
+ // Because returning a data block with an implicit BlockType
mismatch
+ // will cause the requesting scanner to throw a disk read should be
+ // forced here. This will potentially cause a significant number of
+ // cache misses, so update so we should keep track of this as it
might
+ // justify the work on a CompoundScanner.
+ if (
+ !expectedDataBlockEncoding.equals(DataBlockEncoding.NONE)
+ && !actualDataBlockEncoding.equals(DataBlockEncoding.NONE)
+ ) {
+ // If the block is encoded but the encoding does not match the
+ // expected encoding it is likely the encoding was changed but
the
+ // block was not yet evicted. Evictions on file close happen
async
+ // so blocks with the old encoding still linger in cache for some
+ // period of time. This event should be rare as it only happens
on
+ // schema definition change.
+ LOG.info(
+ "Evicting cached block with key {} because data block encoding
mismatch; "
+ + "expected {}, actual {}, path={}",
+ cacheKey, actualDataBlockEncoding, expectedDataBlockEncoding,
path);
+ // This is an error scenario. so here we need to release the
block.
+ returnAndEvictBlock(cache, cacheKey, cachedBlock);
+ }
+ cachedBlock = null;
+ return null;
}
- return null;
+ return cachedBlock;
+ }
+ } finally {
+ // Count bytes read as cached block is being returned
+ if (isScanMetricsEnabled && cachedBlock != null) {
+ cachedBlockBytesRead = cachedBlock.getOnDiskSizeWithHeader();
+ // Account for the header size of the next block if it exists
+ if (cachedBlock.getNextBlockOnDiskSize() > 0) {
+ cachedBlockBytesRead += cachedBlock.headerSize();
+ }
+ }
+ if (cachedBlockBytesRead > 0) {
+
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(cachedBlockBytesRead);
}
- return cachedBlock;
}
}
return null;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
index b61084f7883..efc533cb149 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -1151,8 +1152,8 @@ public class LruBlockCache implements
FirstLevelBlockCache {
}
// Simple calculators of sizes given factors and maxSize
-
- long acceptableSize() {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public long acceptableSize() {
return (long) Math.floor(this.maxSize * this.acceptableFactor);
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
index 4162fca6afe..fd4bcc12c50 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/NoOpIndexBlockEncoder.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
@@ -127,7 +128,8 @@ public class NoOpIndexBlockEncoder implements
HFileIndexBlockEncoder {
return getClass().getSimpleName();
}
- protected static class NoOpEncodedSeeker implements EncodedSeeker {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public static class NoOpEncodedSeeker implements EncodedSeeker {
protected long[] blockOffsets;
protected int[] blockDataSizes;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
new file mode 100644
index 00000000000..8c9ec24e866
--- /dev/null
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/ThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.monitoring;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Thread-local storage for server-side scan metrics that captures performance
data separately for
+ * each scan thread. This class works in conjunction with {@link
ServerSideScanMetrics} to provide
+ * comprehensive scan performance monitoring.
+ * <h3>Purpose and Design</h3> {@link ServerSideScanMetrics} captures scan
metrics on the server
+ * side and passes them back to the client in protocol buffer responses.
However, the
+ * {@link ServerSideScanMetrics} instance is not readily available at deeper
layers in HBase where
+ * HFiles are read and individual HFile blocks are accessed. To avoid the
complexity of passing
+ * {@link ServerSideScanMetrics} instances through method calls across
multiple layers, this class
+ * provides thread-local storage for metrics collection.
+ * <h3>Thread Safety and HBase Architecture</h3> This class leverages a
critical aspect of HBase
+ * server design: on the server side, the thread that opens a {@link
RegionScanner} and calls
+ * {@link RegionScanner#nextRaw(java.util.List, ScannerContext)} is the same
thread that reads HFile
+ * blocks. This design allows thread-local storage to effectively capture
metrics without
+ * cross-thread synchronization.
+ * <h3>Special Handling for Parallel Operations</h3> The only deviation from
the single-thread model
+ * occurs when {@link
org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler} is used for
+ * parallel store file seeking. In this case, special handling ensures that
metrics are captured
+ * correctly across multiple threads. The
+ * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler}
captures metrics from
+ * worker threads and aggregates them back to the main scan thread. Please
refer to the javadoc of
+ * {@link org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler}
for detailed information
+ * about this parallel processing mechanism.
+ * <h3>Usage Pattern</h3>
+ * <ol>
+ * <li>Enable metrics collection: {@link #setScanMetricsEnabled(boolean)}</li>
+ * <li>Reset counters at scan start: {@link #reset()}</li>
+ * <li>Increment counters during I/O operations using the various {@code add*}
methods</li>
+ * <li>Populate the main metrics object:
+ * {@link #populateServerSideScanMetrics(ServerSideScanMetrics)}</li>
+ * </ol>
+ * <h3>Thread Safety</h3> This class is thread-safe. Each thread maintains its
own set of counters
+ * through {@link ThreadLocal} storage, ensuring that metrics from different
scan operations do not
+ * interfere with each other.
+ * @see ServerSideScanMetrics
+ * @see RegionScanner
+ * @see org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler
+ */
[email protected]
+public final class ThreadLocalServerSideScanMetrics {
+ private ThreadLocalServerSideScanMetrics() {
+ }
+
+ private static final ThreadLocal<Boolean> IS_SCAN_METRICS_ENABLED =
+ ThreadLocal.withInitial(() -> false);
+
+ private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_FS =
+ ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+ private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_BLOCK_CACHE =
+ ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+ private static final ThreadLocal<AtomicLong> BYTES_READ_FROM_MEMSTORE =
+ ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+ private static final ThreadLocal<AtomicLong> BLOCK_READ_OPS_COUNT =
+ ThreadLocal.withInitial(() -> new AtomicLong(0));
+
+ public static void setScanMetricsEnabled(boolean enable) {
+ IS_SCAN_METRICS_ENABLED.set(enable);
+ }
+
+ public static long addBytesReadFromFs(long bytes) {
+ return BYTES_READ_FROM_FS.get().addAndGet(bytes);
+ }
+
+ public static long addBytesReadFromBlockCache(long bytes) {
+ return BYTES_READ_FROM_BLOCK_CACHE.get().addAndGet(bytes);
+ }
+
+ public static long addBytesReadFromMemstore(long bytes) {
+ return BYTES_READ_FROM_MEMSTORE.get().addAndGet(bytes);
+ }
+
+ public static long addBlockReadOpsCount(long count) {
+ return BLOCK_READ_OPS_COUNT.get().addAndGet(count);
+ }
+
+ public static boolean isScanMetricsEnabled() {
+ return IS_SCAN_METRICS_ENABLED.get();
+ }
+
+ public static AtomicLong getBytesReadFromFsCounter() {
+ return BYTES_READ_FROM_FS.get();
+ }
+
+ public static AtomicLong getBytesReadFromBlockCacheCounter() {
+ return BYTES_READ_FROM_BLOCK_CACHE.get();
+ }
+
+ public static AtomicLong getBytesReadFromMemstoreCounter() {
+ return BYTES_READ_FROM_MEMSTORE.get();
+ }
+
+ public static AtomicLong getBlockReadOpsCountCounter() {
+ return BLOCK_READ_OPS_COUNT.get();
+ }
+
+ public static long getBytesReadFromFsAndReset() {
+ return getBytesReadFromFsCounter().getAndSet(0);
+ }
+
+ public static long getBytesReadFromBlockCacheAndReset() {
+ return getBytesReadFromBlockCacheCounter().getAndSet(0);
+ }
+
+ public static long getBytesReadFromMemstoreAndReset() {
+ return getBytesReadFromMemstoreCounter().getAndSet(0);
+ }
+
+ public static long getBlockReadOpsCountAndReset() {
+ return getBlockReadOpsCountCounter().getAndSet(0);
+ }
+
+ public static void reset() {
+ getBytesReadFromFsAndReset();
+ getBytesReadFromBlockCacheAndReset();
+ getBytesReadFromMemstoreAndReset();
+ getBlockReadOpsCountAndReset();
+ }
+
+ public static void populateServerSideScanMetrics(ServerSideScanMetrics
metrics) {
+ if (metrics == null) {
+ return;
+ }
+ metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME,
+ getBytesReadFromFsCounter().get());
+
metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
+ getBytesReadFromBlockCacheCounter().get());
+
metrics.addToCounter(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
+ getBytesReadFromMemstoreCounter().get());
+
metrics.addToCounter(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
+ getBlockReadOpsCountCounter().get());
+ }
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 67cc9b7eba4..153900544d1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -44,6 +44,7 @@ import
org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@@ -94,6 +95,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
private RegionServerServices rsServices;
+ private ServerSideScanMetrics scannerInitMetrics = null;
+
@Override
public RegionInfo getRegionInfo() {
return region.getRegionInfo();
@@ -144,7 +147,16 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
} finally {
region.smallestReadPointCalcLock.unlock(ReadPointCalculationLock.LockType.RECORDING_LOCK);
}
+ boolean isScanMetricsEnabled = scan.isScanMetricsEnabled();
+
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+ if (isScanMetricsEnabled) {
+ this.scannerInitMetrics = new ServerSideScanMetrics();
+ ThreadLocalServerSideScanMetrics.reset();
+ }
initializeScanners(scan, additionalScanners);
+ if (isScanMetricsEnabled) {
+
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scannerInitMetrics);
+ }
}
public ScannerContext getContext() {
@@ -277,6 +289,16 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
throw new UnknownScannerException("Scanner was closed");
}
boolean moreValues = false;
+ boolean isScanMetricsEnabled = scannerContext.isTrackingMetrics();
+
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+ if (isScanMetricsEnabled) {
+ ThreadLocalServerSideScanMetrics.reset();
+ ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
+ if (scannerInitMetrics != null) {
+ scannerInitMetrics.getMetricsMap().forEach(scanMetrics::addToCounter);
+ scannerInitMetrics = null;
+ }
+ }
if (outResults.isEmpty()) {
// Usually outResults is empty. This is true when next is called
// to handle scan or get operation.
@@ -286,7 +308,10 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
-
+ if (isScanMetricsEnabled) {
+ ServerSideScanMetrics scanMetrics = scannerContext.getMetrics();
+
ThreadLocalServerSideScanMetrics.populateServerSideScanMetrics(scanMetrics);
+ }
region.addReadRequestsCount(1);
if (region.getMetrics() != null) {
region.getMetrics().updateReadRequestCount();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
index 1d28c55570e..27df80fa8ff 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -54,6 +55,7 @@ public class SegmentScanner implements KeyValueScanner {
// flag to indicate if this scanner is closed
protected boolean closed = false;
+ private boolean isScanMetricsEnabled = false;
/**
* Scanners are ordered from 0 (oldest) to newest in increasing order.
@@ -66,6 +68,8 @@ public class SegmentScanner implements KeyValueScanner {
iter = segment.iterator();
// the initialization of the current is required for working with heap of
SegmentScanners
updateCurrent();
+ // Enable scan metrics for tracking bytes read after initialization of
current
+ this.isScanMetricsEnabled =
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
if (current == null) {
// nothing to fetch from this scanner
close();
@@ -335,10 +339,15 @@ public class SegmentScanner implements KeyValueScanner {
*/
protected void updateCurrent() {
Cell next = null;
+ long totalBytesRead = 0;
try {
while (iter.hasNext()) {
next = iter.next();
+ if (isScanMetricsEnabled) {
+ // Batch collect bytes to reduce method call overhead
+ totalBytesRead += Segment.getCellLength(next);
+ }
if (next.getSequenceId() <= this.readPoint) {
current = next;
return;// skip irrelevant versions
@@ -352,6 +361,10 @@ public class SegmentScanner implements KeyValueScanner {
current = null; // nothing found
} finally {
+ // Add accumulated bytes before returning
+ if (totalBytesRead > 0) {
+
ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(totalBytesRead);
+ }
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for
reseek()
last = next;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
index e241bf0a5d3..fac344c3387 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java
@@ -635,7 +635,8 @@ public class StoreFileReader {
return this.bulkLoadResult;
}
- BloomFilter getGeneralBloomFilter() {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public BloomFilter getGeneralBloomFilter() {
return generalBloomFilter;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
index 61d5b91b35b..a2a641df6d8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileWriter.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
@@ -284,7 +285,8 @@ public class StoreFileWriter implements CellSink,
ShipperListener {
* For unit testing only.
* @return the Bloom filter used by this writer.
*/
- BloomFilterWriter getGeneralBloomWriter() {
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ public BloomFilterWriter getGeneralBloomWriter() {
return liveFileWriter.generalBloomFilterWriter;
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index cbc617d2a0b..506af8c404b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -24,12 +24,14 @@ import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntConsumer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -165,6 +167,10 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
protected final long readPt;
private boolean topChanged = false;
+ // These are used to verify the state of the scanner during testing.
+ private static AtomicBoolean hasUpdatedReaders;
+ private static AtomicBoolean hasSwitchedToStreamRead;
+
/** An internal constructor. */
private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int
numColumns, long readPt,
boolean cacheBlocks, ScanType scanType) {
@@ -1016,6 +1022,9 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
if (updateReaders) {
closeLock.unlock();
}
+ if (hasUpdatedReaders != null) {
+ hasUpdatedReaders.set(true);
+ }
}
// Let the next() call handle re-creating and seeking
}
@@ -1166,6 +1175,9 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
this.heap = newHeap;
resetQueryMatcher(lastTop);
scannersToClose.forEach(KeyValueScanner::close);
+ if (hasSwitchedToStreamRead != null) {
+ hasSwitchedToStreamRead.set(true);
+ }
}
protected final boolean checkFlushed() {
@@ -1273,4 +1285,30 @@ public class StoreScanner extends
NonReversedNonLazyKeyValueScanner
trySwitchToStreamRead();
}
}
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ static final void instrument() {
+ hasUpdatedReaders = new AtomicBoolean(false);
+ hasSwitchedToStreamRead = new AtomicBoolean(false);
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ static final boolean hasUpdatedReaders() {
+ return hasUpdatedReaders.get();
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ static final boolean hasSwitchedToStreamRead() {
+ return hasSwitchedToStreamRead.get();
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ static final void resetHasUpdatedReaders() {
+ hasUpdatedReaders.set(false);
+ }
+
+ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.UNITTEST)
+ static final void resetHasSwitchedToStreamRead() {
+ hasSwitchedToStreamRead.set(false);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
index 41fb3e7bf12..9a9af221b49 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/ParallelSeekHandler.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.regionserver.handler;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -39,6 +41,17 @@ public class ParallelSeekHandler extends EventHandler {
private CountDownLatch latch;
private Throwable err = null;
+ // Flag to enable/disable scan metrics collection and thread-local counters
for capturing scan
+ // performance during parallel store file seeking.
+ // These aggregate metrics from worker threads back to the main scan thread.
+ private final boolean isScanMetricsEnabled;
+ // Thread-local counter for bytes read from FS.
+ private final AtomicLong bytesReadFromFs;
+ // Thread-local counter for bytes read from BlockCache.
+ private final AtomicLong bytesReadFromBlockCache;
+ // Thread-local counter for block read operations count.
+ private final AtomicLong blockReadOpsCount;
+
public ParallelSeekHandler(KeyValueScanner scanner, Cell keyValue, long
readPoint,
CountDownLatch latch) {
super(null, EventType.RS_PARALLEL_SEEK);
@@ -46,12 +59,35 @@ public class ParallelSeekHandler extends EventHandler {
this.keyValue = keyValue;
this.readPoint = readPoint;
this.latch = latch;
+ this.isScanMetricsEnabled =
ThreadLocalServerSideScanMetrics.isScanMetricsEnabled();
+ this.bytesReadFromFs =
ThreadLocalServerSideScanMetrics.getBytesReadFromFsCounter();
+ this.bytesReadFromBlockCache =
+ ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheCounter();
+ this.blockReadOpsCount =
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountCounter();
}
@Override
public void process() {
try {
+
ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(isScanMetricsEnabled);
+ if (isScanMetricsEnabled) {
+ ThreadLocalServerSideScanMetrics.reset();
+ }
scanner.seek(keyValue);
+ if (isScanMetricsEnabled) {
+ long metricValue =
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ if (metricValue > 0) {
+ bytesReadFromFs.addAndGet(metricValue);
+ }
+ metricValue =
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
+ if (metricValue > 0) {
+ bytesReadFromBlockCache.addAndGet(metricValue);
+ }
+ metricValue =
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+ if (metricValue > 0) {
+ blockReadOpsCount.addAndGet(metricValue);
+ }
+ }
} catch (IOException e) {
LOG.error("", e);
setErr(e);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
new file mode 100644
index 00000000000..c85a162ad96
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestBytesReadFromFs.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileReader;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterFactory;
+import org.apache.hadoop.hbase.util.BloomFilterUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, SmallTests.class })
+public class TestBytesReadFromFs {
+ private static final int NUM_KEYS = 100000;
+ private static final int BLOOM_BLOCK_SIZE = 512;
+ private static final int INDEX_CHUNK_SIZE = 512;
+ private static final int DATA_BLOCK_SIZE = 4096;
+ private static final int ROW_PREFIX_LENGTH_IN_BLOOM_FILTER = 42;
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBytesReadFromFs.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestBytesReadFromFs.class);
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final Random RNG = new Random(9713312); // Just a fixed seed.
+
+ private Configuration conf;
+ private FileSystem fs;
+ private List<KeyValue> keyValues = new ArrayList<>();
+ private List<byte[]> keyList = new ArrayList<>();
+ private Path path;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = TEST_UTIL.getConfiguration();
+ conf.setInt(BloomFilterUtil.PREFIX_LENGTH_KEY,
ROW_PREFIX_LENGTH_IN_BLOOM_FILTER);
+ fs = FileSystem.get(conf);
+ String hfileName = UUID.randomUUID().toString().replaceAll("-", "");
+ path = new Path(TEST_UTIL.getDataTestDir(), hfileName);
+ conf.setInt(HFileBlockIndex.MAX_CHUNK_SIZE_KEY, INDEX_CHUNK_SIZE);
+ }
+
+ @Test
+ public void testBytesReadFromFsWithScanMetricsDisabled() throws IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+ writeData(path);
+ KeyValue keyValue = keyValues.get(0);
+ readDataAndIndexBlocks(path, keyValue, false);
+ }
+
+ @Test
+ public void testBytesReadFromFsToReadDataUsingIndexBlocks() throws
IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ writeData(path);
+ KeyValue keyValue = keyValues.get(0);
+ readDataAndIndexBlocks(path, keyValue, true);
+ }
+
+ @Test
+ public void testBytesReadFromFsToReadLoadOnOpenDataSection() throws
IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ writeData(path);
+ readLoadOnOpenDataSection(path, false);
+ }
+
+ @Test
+ public void testBytesReadFromFsToReadBloomFilterIndexesAndBloomBlocks()
throws IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ BloomType[] bloomTypes = { BloomType.ROW, BloomType.ROWCOL,
BloomType.ROWPREFIX_FIXED_LENGTH };
+ for (BloomType bloomType : bloomTypes) {
+ LOG.info("Testing bloom type: {}", bloomType);
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+ keyList.clear();
+ keyValues.clear();
+ writeBloomFilters(path, bloomType, BLOOM_BLOCK_SIZE);
+ if (bloomType == BloomType.ROWCOL) {
+ KeyValue keyValue = keyValues.get(0);
+ readBloomFilters(path, bloomType, null, keyValue);
+ } else {
+ Assert.assertEquals(ROW_PREFIX_LENGTH_IN_BLOOM_FILTER,
keyList.get(0).length);
+ byte[] key = keyList.get(0);
+ readBloomFilters(path, bloomType, key, null);
+ }
+ }
+ }
+
+ private void writeData(Path path) throws IOException {
+ HFileContext context = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
+ .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
+ .withCompression(Compression.Algorithm.NONE).build();
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ HFile.Writer writer = new HFile.WriterFactory(conf,
cacheConfig).withPath(fs, path)
+ .withFileContext(context).create();
+
+ byte[] cf = Bytes.toBytes("cf");
+ byte[] cq = Bytes.toBytes("cq");
+
+ for (int i = 0; i < NUM_KEYS; i++) {
+ byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i,
10);
+ // A random-length random value.
+ byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
+ KeyValue keyValue =
+ new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(),
valueBytes);
+ writer.append(keyValue);
+ keyValues.add(keyValue);
+ }
+
+ writer.close();
+ }
+
+ private void readDataAndIndexBlocks(Path path, KeyValue keyValue, boolean
isScanMetricsEnabled)
+ throws IOException {
+ long fileSize = fs.getFileStatus(path).getLen();
+
+ ReaderContext readerContext =
+ new ReaderContextBuilder().withInputStreamWrapper(new
FSDataInputStreamWrapper(fs, path))
+ .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
+
+ // Read HFile trailer and create HFileContext
+ HFileInfo hfile = new HFileInfo(readerContext, conf);
+ FixedFileTrailer trailer = hfile.getTrailer();
+
+ // Read HFile info and load-on-open data section (we will read root again
explicitly later)
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ HFile.Reader reader = new HFilePreadReader(readerContext, hfile,
cacheConfig, conf);
+ hfile.initMetaAndIndex(reader);
+ HFileContext meta = hfile.getHFileContext();
+
+ // Get access to the block reader
+ HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
+
+ // Create iterator for reading load-on-open data section
+ HFileBlock.BlockIterator blockIter =
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+ fileSize - trailer.getTrailerSize());
+
+ // Indexes use NoOpEncodedSeeker
+ MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+
+ int bytesRead = 0;
+ int blockLevelsRead = 0;
+
+ // Read the root index block
+ HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+ bytesRead += block.getOnDiskSizeWithHeader();
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
+ }
+ blockLevelsRead++;
+
+ // Comparator class name is stored in the trailer in version 3.
+ CellComparator comparator = trailer.createComparator();
+ // Initialize the seeker
+ seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
+ trailer.getNumDataIndexLevels());
+
+ int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
+ long currentOffset = seeker.getBlockOffset(rootLevIndex);
+ int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
+
+ HFileBlock prevBlock = null;
+ do {
+ prevBlock = block;
+ block = blockReader.readBlockData(currentOffset, currentDataSize, true,
true, true);
+ HFileBlock unpacked = block.unpack(meta, blockReader);
+ if (unpacked != block) {
+ block.release();
+ block = unpacked;
+ }
+ bytesRead += block.getOnDiskSizeWithHeader();
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesRead += HFileBlock.headerSize(meta.isUseHBaseChecksum());
+ }
+ if (!block.getBlockType().isData()) {
+ ByteBuff buffer = block.getBufferWithoutHeader();
+ // Place the buffer at the correct position
+ HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer,
keyValue, comparator);
+ currentOffset = buffer.getLong();
+ currentDataSize = buffer.getInt();
+ }
+ prevBlock.release();
+ blockLevelsRead++;
+ } while (!block.getBlockType().isData());
+ block.release();
+
+ reader.close();
+
+ Assert.assertEquals(isScanMetricsEnabled,
+ ThreadLocalServerSideScanMetrics.isScanMetricsEnabled());
+ bytesRead = isScanMetricsEnabled ? bytesRead : 0;
+ Assert.assertEquals(bytesRead,
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+ Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+ // At every index level we read one index block and finally read data block
+ long blockReadOpsCount = isScanMetricsEnabled ? blockLevelsRead : 0;
+ Assert.assertEquals(blockReadOpsCount,
+ ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+ }
+
+ private void readLoadOnOpenDataSection(Path path, boolean hasBloomFilters)
throws IOException {
+ long fileSize = fs.getFileStatus(path).getLen();
+
+ ReaderContext readerContext =
+ new ReaderContextBuilder().withInputStreamWrapper(new
FSDataInputStreamWrapper(fs, path))
+ .withFilePath(path).withFileSystem(fs).withFileSize(fileSize).build();
+
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+ // Read HFile trailer
+ HFileInfo hfile = new HFileInfo(readerContext, conf);
+ FixedFileTrailer trailer = hfile.getTrailer();
+ Assert.assertEquals(trailer.getTrailerSize(),
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+ Assert.assertEquals(1,
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+
+ CacheConfig cacheConfig = new CacheConfig(conf);
+ HFile.Reader reader = new HFilePreadReader(readerContext, hfile,
cacheConfig, conf);
+ HFileBlock.FSReader blockReader = reader.getUncachedBlockReader();
+
+ // Create iterator for reading root index block
+ HFileBlock.BlockIterator blockIter =
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+ fileSize - trailer.getTrailerSize());
+ boolean readNextHeader = false;
+
+ // Read the root index block
+ readNextHeader = readEachBlockInLoadOnOpenDataSection(
+ blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
+
+ // Read meta index block
+ readNextHeader = readEachBlockInLoadOnOpenDataSection(
+ blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX), readNextHeader);
+
+ // Read File info block
+ readNextHeader = readEachBlockInLoadOnOpenDataSection(
+ blockIter.nextBlockWithBlockType(BlockType.FILE_INFO), readNextHeader);
+
+ // Read bloom filter indexes
+ boolean bloomFilterIndexesRead = false;
+ HFileBlock block;
+ while ((block = blockIter.nextBlock()) != null) {
+ bloomFilterIndexesRead = true;
+ readNextHeader = readEachBlockInLoadOnOpenDataSection(block,
readNextHeader);
+ }
+
+ reader.close();
+
+ Assert.assertEquals(hasBloomFilters, bloomFilterIndexesRead);
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+ }
+
+ private boolean readEachBlockInLoadOnOpenDataSection(HFileBlock block,
boolean readNextHeader)
+ throws IOException {
+ long bytesRead = block.getOnDiskSizeWithHeader();
+ if (readNextHeader) {
+ bytesRead -= HFileBlock.headerSize(true);
+ readNextHeader = false;
+ }
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesRead += HFileBlock.headerSize(true);
+ readNextHeader = true;
+ }
+ block.release();
+ Assert.assertEquals(bytesRead,
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+ Assert.assertEquals(1,
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+ return readNextHeader;
+ }
+
+ private void readBloomFilters(Path path, BloomType bt, byte[] key, KeyValue
keyValue)
+ throws IOException {
+ Assert.assertTrue(keyValue == null || key == null);
+
+ // Assert that the bloom filter index was read and it's size is accounted
in bytes read from
+ // fs
+ readLoadOnOpenDataSection(path, true);
+
+ CacheConfig cacheConf = new CacheConfig(conf);
+ StoreFileInfo storeFileInfo = new StoreFileInfo(conf, fs, path, true);
+ HStoreFile sf = new HStoreFile(storeFileInfo, bt, cacheConf);
+
+ // Read HFile trailer and load-on-open data section
+ sf.initReader();
+
+ // Reset bytes read from fs to 0
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ // Reset read ops count to 0
+ ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset();
+
+ StoreFileReader reader = sf.getReader();
+ BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+ Assert.assertTrue(bloomFilter instanceof CompoundBloomFilter);
+ CompoundBloomFilter cbf = (CompoundBloomFilter) bloomFilter;
+
+ // Get the bloom filter index reader
+ HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
+ int block;
+
+ // Search for the key in the bloom filter index
+ if (keyValue != null) {
+ block = index.rootBlockContainingKey(keyValue);
+ } else {
+ byte[] row = key;
+ block = index.rootBlockContainingKey(row, 0, row.length);
+ }
+
+ // Read the bloom block from FS
+ HFileBlock bloomBlock = cbf.getBloomBlock(block);
+ long bytesRead = bloomBlock.getOnDiskSizeWithHeader();
+ if (bloomBlock.getNextBlockOnDiskSize() > 0) {
+ bytesRead += HFileBlock.headerSize(true);
+ }
+ // Asser that the block read is a bloom block
+ Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
+ bloomBlock.release();
+
+ // Close the reader
+ reader.close(true);
+
+ Assert.assertEquals(bytesRead,
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+ Assert.assertEquals(1,
ThreadLocalServerSideScanMetrics.getBlockReadOpsCountAndReset());
+ }
+
+ private void writeBloomFilters(Path path, BloomType bt, int
bloomBlockByteSize)
+ throws IOException {
+ conf.setInt(BloomFilterFactory.IO_STOREFILE_BLOOM_BLOCK_SIZE,
bloomBlockByteSize);
+ CacheConfig cacheConf = new CacheConfig(conf);
+ HFileContext meta = new
HFileContextBuilder().withBlockSize(DATA_BLOCK_SIZE)
+ .withIncludesTags(false).withDataBlockEncoding(DataBlockEncoding.NONE)
+ .withCompression(Compression.Algorithm.NONE).build();
+ StoreFileWriter w = new StoreFileWriter.Builder(conf, cacheConf,
fs).withFileContext(meta)
+ .withBloomType(bt).withFilePath(path).build();
+ Assert.assertTrue(w.hasGeneralBloom());
+ Assert.assertTrue(w.getGeneralBloomWriter() instanceof
CompoundBloomFilterWriter);
+ CompoundBloomFilterWriter cbbf = (CompoundBloomFilterWriter)
w.getGeneralBloomWriter();
+ byte[] cf = Bytes.toBytes("cf");
+ byte[] cq = Bytes.toBytes("cq");
+ for (int i = 0; i < NUM_KEYS; i++) {
+ byte[] keyBytes = RandomKeyValueUtil.randomOrderedFixedLengthKey(RNG, i,
10);
+ // A random-length random value.
+ byte[] valueBytes = RandomKeyValueUtil.randomFixedLengthValue(RNG, 10);
+ KeyValue keyValue =
+ new KeyValue(keyBytes, cf, cq, EnvironmentEdgeManager.currentTime(),
valueBytes);
+ w.append(keyValue);
+ keyList.add(keyBytes);
+ keyValues.add(keyValue);
+ }
+ Assert.assertEquals(keyList.size(), cbbf.getKeyCount());
+ w.close();
+ }
+
+ private static class MyNoOpEncodedSeeker extends
NoOpIndexBlockEncoder.NoOpEncodedSeeker {
+ public long getBlockOffset(int i) {
+ return blockOffsets[i];
+ }
+
+ public int getBlockDataSize(int i) {
+ return blockDataSizes[i];
+ }
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
index 189af113b33..66e2db5e67c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.io.encoding.IndexBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
import org.apache.hadoop.hbase.io.hfile.ReaderContext.ReaderType;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.testclassification.IOTests;
@@ -201,6 +202,90 @@ public class TestHFile {
lru.shutdown();
}
+ private void assertBytesReadFromCache(boolean isScanMetricsEnabled) throws
Exception {
+ assertBytesReadFromCache(isScanMetricsEnabled, DataBlockEncoding.NONE);
+ }
+
+ private void assertBytesReadFromCache(boolean isScanMetricsEnabled,
DataBlockEncoding encoding)
+ throws Exception {
+ // Write a store file
+ Path storeFilePath = writeStoreFile();
+
+ // Initialize the block cache and HFile reader
+ BlockCache lru = BlockCacheFactory.createBlockCache(conf);
+ Assert.assertTrue(lru instanceof LruBlockCache);
+ CacheConfig cacheConfig = new CacheConfig(conf, null, lru,
ByteBuffAllocator.HEAP);
+ HFileReaderImpl reader =
+ (HFileReaderImpl) HFile.createReader(fs, storeFilePath, cacheConfig,
true, conf);
+
+ // Read the first block in HFile from the block cache.
+ final int offset = 0;
+ BlockCacheKey cacheKey = new BlockCacheKey(storeFilePath.getName(),
offset);
+ HFileBlock block = (HFileBlock) lru.getBlock(cacheKey, false, false, true);
+ Assert.assertNull(block);
+
+ // Assert that first block has not been cached in the block cache and no
disk I/O happened to
+ // check that.
+ ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset();
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset();
+ block = reader.getCachedBlock(cacheKey, false, false, true,
BlockType.DATA, null);
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+
+ // Read the first block from the HFile.
+ block = reader.readBlock(offset, -1, true, true, false, true,
BlockType.DATA, null);
+ Assert.assertNotNull(block);
+ int bytesReadFromFs = block.getOnDiskSizeWithHeader();
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesReadFromFs += block.headerSize();
+ }
+ block.release();
+ // Assert that disk I/O happened to read the first block.
+ Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromFs : 0,
+ ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+
+ // Read the first block again and assert that it has been cached in the
block cache.
+ block = reader.getCachedBlock(cacheKey, false, false, true,
BlockType.DATA, encoding);
+ long bytesReadFromCache = 0;
+ if (encoding == DataBlockEncoding.NONE) {
+ Assert.assertNotNull(block);
+ bytesReadFromCache = block.getOnDiskSizeWithHeader();
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesReadFromCache += block.headerSize();
+ }
+ block.release();
+ // Assert that bytes read from block cache account for same number of
bytes that would have
+ // been read from FS if block cache wasn't there.
+ Assert.assertEquals(bytesReadFromFs, bytesReadFromCache);
+ } else {
+ Assert.assertNull(block);
+ }
+ Assert.assertEquals(isScanMetricsEnabled ? bytesReadFromCache : 0,
+ ThreadLocalServerSideScanMetrics.getBytesReadFromBlockCacheAndReset());
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromFsAndReset());
+
+ reader.close();
+ }
+
+ @Test
+ public void testBytesReadFromCache() throws Exception {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ assertBytesReadFromCache(true);
+ }
+
+ @Test
+ public void testBytesReadFromCacheWithScanMetricsDisabled() throws Exception
{
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+ assertBytesReadFromCache(false);
+ }
+
+ @Test
+ public void testBytesReadFromCacheWithInvalidDataEncoding() throws Exception
{
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ assertBytesReadFromCache(true, DataBlockEncoding.FAST_DIFF);
+ }
+
private BlockCache initCombinedBlockCache(final String l1CachePolicy) {
Configuration that = HBaseConfiguration.create(conf);
that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache.
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
new file mode 100644
index 00000000000..ff4b63e399c
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBytesReadServerSideScanMetrics.java
@@ -0,0 +1,894 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.io.hfile.BlockCache;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
+import org.apache.hadoop.hbase.io.hfile.BlockType;
+import org.apache.hadoop.hbase.io.hfile.CompoundBloomFilter;
+import org.apache.hadoop.hbase.io.hfile.FixedFileTrailer;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileBlock;
+import org.apache.hadoop.hbase.io.hfile.HFileBlockIndex;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
+import org.apache.hadoop.hbase.io.hfile.NoOpIndexBlockEncoder;
+import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.testclassification.IOTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.BloomFilter;
+import org.apache.hadoop.hbase.util.BloomFilterUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ IOTests.class, LargeTests.class })
+public class TestBytesReadServerSideScanMetrics {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBytesReadServerSideScanMetrics.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestBytesReadServerSideScanMetrics.class);
+
+ private HBaseTestingUtility UTIL;
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] ROW3 = Bytes.toBytes("row3");
+ private static final byte[] ROW4 = Bytes.toBytes("row4");
+
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ UTIL = new HBaseTestingUtility();
+ conf = UTIL.getConfiguration();
+ conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 0);
+ conf.setBoolean(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, false);
+ }
+
+ @Test
+ public void testScanMetricsDisabled() throws Exception {
+ conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, false, BloomType.NONE);
+ writeData(tableName, true);
+ Scan scan = new Scan();
+ scan.withStartRow(ROW2, true);
+ scan.withStopRow(ROW4, true);
+ scan.setCaching(1);
+ try (Table table = UTIL.getConnection().getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(2, rowCount);
+ Assert.assertNull(scanner.getScanMetrics());
+ }
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadFromFsForSerialSeeks() throws Exception {
+ conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, false, BloomType.ROW);
+ writeData(tableName, true);
+ ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+
+ // Use oldest timestamp to make sure the fake key is not less than the
first key in
+ // the file containing key: row2
+ KeyValue keyValue = new KeyValue(ROW2, CF, CQ,
HConstants.OLDEST_TIMESTAMP, VALUE);
+ assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(),
keyValue,
+ scanMetrics.blockReadOpsCount.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadFromFsForParallelSeeks() throws Exception {
+ conf.setInt(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+ // This property doesn't work correctly if only applied at column family
level.
+ conf.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, true);
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, false, BloomType.NONE);
+ writeData(tableName, true);
+ HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
+ ThreadPoolExecutor executor =
+
server.getExecutorService().getExecutorThreadPool(ExecutorType.RS_PARALLEL_SEEK);
+ long tasksCompletedBeforeRead = executor.getCompletedTaskCount();
+ ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+ long tasksCompletedAfterRead = executor.getCompletedTaskCount();
+ // Assert both of the HFiles were read using parallel seek executor
+ Assert.assertEquals(2, tasksCompletedAfterRead -
tasksCompletedBeforeRead);
+
+ // Use oldest timestamp to make sure the fake key is not less than the
first key in
+ // the file containing key: row2
+ KeyValue keyValue = new KeyValue(ROW2, CF, CQ,
HConstants.OLDEST_TIMESTAMP, VALUE);
+ assertBytesReadFromFs(tableName, scanMetrics.bytesReadFromFs.get(),
keyValue,
+ scanMetrics.blockReadOpsCount.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadFromBlockCache() throws Exception {
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, true, BloomType.NONE);
+ HRegionServer server = UTIL.getMiniHBaseCluster().getRegionServer(0);
+ LruBlockCache blockCache = (LruBlockCache) server.getBlockCache().get();
+
+ // Assert that acceptable size of LRU block cache is greater than 1MB
+ Assert.assertTrue(blockCache.acceptableSize() > 1024 * 1024);
+ writeData(tableName, true);
+ readDataAndGetScanMetrics(tableName, false);
+ KeyValue keyValue = new KeyValue(ROW2, CF, CQ,
HConstants.OLDEST_TIMESTAMP, VALUE);
+ assertBlockCacheWarmUp(tableName, keyValue);
+ ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+ Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+ assertBytesReadFromBlockCache(tableName,
scanMetrics.bytesReadFromBlockCache.get(), keyValue);
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadFromMemstore() throws Exception {
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, false, BloomType.NONE);
+ writeData(tableName, false);
+ ScanMetrics scanMetrics = readDataAndGetScanMetrics(tableName, true);
+
+ // Assert no flush has happened for the table
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ for (HRegion region : regions) {
+ HStore store = region.getStore(CF);
+ // Assert no HFile is there
+ Assert.assertEquals(0, store.getStorefiles().size());
+ }
+
+ KeyValue keyValue = new KeyValue(ROW2, CF, CQ,
HConstants.LATEST_TIMESTAMP, VALUE);
+ int singleKeyValueSize = Segment.getCellLength(keyValue);
+ // First key value will be read on doing seek and second one on doing
next() to determine
+ // there are no more cells in the row. We don't count key values read on
SegmentScanner
+ // instance creation.
+ int totalKeyValueSize = 2 * singleKeyValueSize;
+ Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(totalKeyValueSize,
scanMetrics.bytesReadFromMemstore.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadWithSwitchFromPReadToStream() throws Exception {
+ // Set pread max bytes to 3 to make sure that the first row is read using
pread and the second
+ // one using stream read
+ Map<String, String> configuration = new HashMap<>();
+ configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, "3");
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, true, BloomType.ROW, configuration);
+ writeData(tableName, true);
+ Scan scan = new Scan();
+ scan.withStartRow(ROW2, true);
+ scan.withStopRow(ROW4, true);
+ scan.setScanMetricsEnabled(true);
+ // Set caching to 1 so that one row is read via PREAD and other via
STREAM
+ scan.setCaching(1);
+ ScanMetrics scanMetrics = null;
+ StoreScanner.instrument();
+ try (Table table = UTIL.getConnection().getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ Assert.assertFalse(StoreScanner.hasSwitchedToStreamRead());
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertTrue(StoreScanner.hasSwitchedToStreamRead());
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+ int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName,
scanMetrics, 2);
+ Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+ // There are 2 HFiles so, 1 read op per HFile was done by actual scan to
read data block.
+ // No bloom blocks will be read as this is non Get scan and only bloom
filter type is ROW.
+ Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
+ // With scan caching set to 1 and 2 rows being scanned, 2 RPC calls will
be needed.
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadWhenFlushHappenedInTheMiddleOfScan() throws
Exception {
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, true, BloomType.ROW);
+ writeData(tableName, false);
+ Scan scan = new Scan();
+ scan.withStartRow(ROW2, true);
+ scan.withStopRow(ROW4, true);
+ scan.setScanMetricsEnabled(true);
+ // Set caching to 1 so that one row is read per RPC call
+ scan.setCaching(1);
+ ScanMetrics scanMetrics = null;
+ try (Table table = UTIL.getConnection().getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ if (rowCount == 1) {
+ flushAndWaitUntilFlushed(tableName, true);
+ }
+ }
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+
+ // Only 1 HFile will be created and it will have only one data block.
+ int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName,
scanMetrics, 1);
+ Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+
+ // Flush happens after first row is returned from server but before
second row is returned.
+ // So, 2 cells will be read from memstore i.e. the cell for the first
row and the next cell
+ // at which scanning will stop. Per row we have 1 cell.
+ int bytesReadFromMemstore =
+ Segment.getCellLength(new KeyValue(ROW2, CF, CQ,
HConstants.LATEST_TIMESTAMP, VALUE));
+ Assert.assertEquals(2 * bytesReadFromMemstore,
scanMetrics.bytesReadFromMemstore.get());
+
+ // There will be 1 read op to read the only data block present in the
HFile.
+ Assert.assertEquals(1, scanMetrics.blockReadOpsCount.get());
+
+ // More than 1 RPC call should be there
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadInReverseScan() throws Exception {
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, true, BloomType.ROW);
+ writeData(tableName, true);
+ Scan scan = new Scan();
+ scan.withStartRow(ROW4, true);
+ scan.withStopRow(ROW2, true);
+ scan.setScanMetricsEnabled(true);
+ scan.setReversed(true);
+ // Set caching to 1 so that one row is read per RPC call
+ scan.setCaching(1);
+ ScanMetrics scanMetrics = null;
+ try (Table table = UTIL.getConnection().getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ System.out.println("Scan metrics: " + scanMetrics.toString());
+ }
+
+ // 1 data block per HFile was read.
+ int bytesReadFromFs = getBytesReadFromFsForNonGetScan(tableName,
scanMetrics, 2);
+ Assert.assertEquals(bytesReadFromFs, scanMetrics.bytesReadFromFs.get());
+
+ // For the HFile containing both the rows, the data block will be read
from block cache when
+ // KeyValueHeap.next() will be called to read the second row.
+ // KeyValueHeap.next() will call StoreFileScanner.next() when on ROW4
which is last row of the
+ // file causing curBlock to be set to null in underlying HFileScanner.
As curBlock is null,
+ // kvNext will be null and call to StoreFileScanner.seekToPreviousRow()
will be made. As the
+ // curBlock of HFileScanner is null so,
StoreFileScanner.seekToPreviousRow() will load data
+ // block from BlockCache. So, 1 data block will be read from block cache.
+ Assert.assertEquals(bytesReadFromFs / 2,
scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+
+ // 1 read op per HFile was done by actual scan to read data block.
+ Assert.assertEquals(2, scanMetrics.blockReadOpsCount.get());
+
+ // 2 RPC calls will be there
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ @Test
+ public void testBytesReadWithLazySeek() throws Exception {
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTable(tableName, true, BloomType.NONE);
+ writeData(tableName, true);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ byte[] newValue = Bytes.toBytes("new value");
+ // Update the value of ROW2 and let it stay in memstore. Will assert
that lazy seek doesn't
+ // lead to seek on the HFile.
+ table.put(new Put(ROW2).addColumn(CF, CQ, newValue));
+ Scan scan = new Scan();
+ scan.withStartRow(ROW2, true);
+ scan.withStopRow(ROW2, true);
+ scan.setScanMetricsEnabled(true);
+ Map<byte[], NavigableSet<byte[]>> familyMap = new HashMap<>();
+ familyMap.put(CF, new TreeSet<>(Bytes.BYTES_COMPARATOR));
+ familyMap.get(CF).add(CQ);
+ scan.setFamilyMap(familyMap);
+ ScanMetrics scanMetrics = null;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ Assert.assertArrayEquals(newValue, r.getValue(CF, CQ));
+ }
+ Assert.assertEquals(1, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+ // No real seek should be done on the HFile.
+ Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+
+ // The cell should be coming purely from memstore.
+ int cellSize =
+ Segment.getCellLength(new KeyValue(ROW2, CF, CQ,
HConstants.LATEST_TIMESTAMP, newValue));
+ Assert.assertEquals(cellSize, scanMetrics.bytesReadFromMemstore.get());
+ Assert.assertEquals(1, scanMetrics.countOfRPCcalls.get());
+ }
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ /**
+ * Test consecutive calls to RegionScannerImpl.next() to make sure
populating scan metrics from
+ * ThreadLocalServerSideScanMetrics is done correctly.
+ */
+ @Test
+ public void testConsecutiveRegionScannerNextCalls() throws Exception {
+ // We will be setting a very small block size so, make sure to set big
enough pread max bytes
+ Map<String, String> configuration = new HashMap<>();
+ configuration.put(StoreScanner.STORESCANNER_PREAD_MAX_BYTES,
Integer.toString(64 * 1024));
+ UTIL.startMiniCluster();
+ try {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ // Set the block size to 4 bytes to get 1 row per data block in HFile.
+ createTable(tableName, true, BloomType.NONE, 4, configuration);
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ // Add 3 rows to the table.
+ table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
+ table.put(new Put(ROW3).addColumn(CF, CQ, VALUE));
+ table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
+
+ ScanMetrics scanMetrics = null;
+
+ // Scan the added rows. The rows should be read from memstore.
+ Scan scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+
+ // Assert that rows were read from only memstore and involved 2 RPC
calls.
+ int cellSize =
+ Segment.getCellLength(new KeyValue(ROW2, CF, CQ,
HConstants.LATEST_TIMESTAMP, VALUE));
+ Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ Assert.assertEquals(3 * cellSize,
scanMetrics.bytesReadFromMemstore.get());
+
+ // Flush the table and make sure that the rows are read from HFiles.
+ flushAndWaitUntilFlushed(tableName, false);
+ scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+ scanMetrics = null;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+
+ // Assert that rows were read from HFiles and involved 2 RPC calls.
+ int bytesReadFromFs =
getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, true);
+ Assert.assertEquals(bytesReadFromFs,
scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(3, scanMetrics.blockReadOpsCount.get());
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+
+ // Make sure that rows are read from Blockcache now.
+ scan = createScanToReadOneRowAtATimeFromServer(ROW2, ROW3);
+ scanMetrics = null;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(2, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+
+ // Assert that rows were read from Blockcache and involved 2 RPC calls.
+ int bytesReadFromBlockCache =
+ getBytesReadToReadConsecutiveDataBlocks(tableName, 1, 3, false);
+ Assert.assertEquals(bytesReadFromBlockCache,
scanMetrics.bytesReadFromBlockCache.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromFs.get());
+ Assert.assertEquals(0, scanMetrics.blockReadOpsCount.get());
+ Assert.assertEquals(2, scanMetrics.countOfRPCcalls.get());
+ Assert.assertEquals(0, scanMetrics.bytesReadFromMemstore.get());
+ }
+ } finally {
+ UTIL.shutdownMiniCluster();
+ }
+ }
+
+ private Scan createScanToReadOneRowAtATimeFromServer(byte[] startRow, byte[]
stopRow) {
+ Scan scan = new Scan();
+ scan.withStartRow(startRow, true);
+ scan.withStopRow(stopRow, true);
+ scan.setScanMetricsEnabled(true);
+ scan.setCaching(1);
+ return scan;
+ }
+
+ private void flushAndWaitUntilFlushed(TableName tableName, boolean
waitForUpdatedReaders)
+ throws Exception {
+ if (waitForUpdatedReaders) {
+ StoreScanner.instrument();
+ }
+ UTIL.flush(tableName);
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ HRegion region = regions.get(0);
+ HStore store = region.getStore(CF);
+ // In milliseconds
+ int maxWaitTime = 100000;
+ int totalWaitTime = 0;
+ int sleepTime = 10000;
+ while (
+ store.getStorefiles().size() == 0
+ || (waitForUpdatedReaders && !StoreScanner.hasUpdatedReaders())
+ ) {
+ Thread.sleep(sleepTime);
+ totalWaitTime += sleepTime;
+ if (totalWaitTime >= maxWaitTime) {
+ throw new Exception("Store files not flushed after " + maxWaitTime +
"ms");
+ }
+ }
+ Assert.assertEquals(1, store.getStorefiles().size());
+ }
+
+ private int getBytesReadToReadConsecutiveDataBlocks(TableName tableName,
+ int expectedStoreFileCount, int expectedDataBlockCount, boolean
isReadFromFs) throws Exception {
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ HRegion region = regions.get(0);
+ HStore store = region.getStore(CF);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
+ int bytesReadFromFs = 0;
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ HFile.Reader hfileReader = reader.getHFileReader();
+ HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+ FixedFileTrailer trailer = hfileReader.getTrailer();
+ int dataIndexLevels = trailer.getNumDataIndexLevels();
+ long loadOnOpenDataOffset = trailer.getLoadOnOpenDataOffset();
+ HFileBlock.BlockIterator blockIterator = blockReader.blockRange(0,
loadOnOpenDataOffset);
+ HFileBlock block;
+ boolean readNextBlock = false;
+ int blockCount = 0;
+ while ((block = blockIterator.nextBlock()) != null) {
+ blockCount++;
+ bytesReadFromFs += block.getOnDiskSizeWithHeader();
+ if (isReadFromFs && readNextBlock) {
+ // This accounts for savings we get from prefetched header but these
saving are only
+ // applicable when reading from FS and not from BlockCache.
+ bytesReadFromFs -= block.headerSize();
+ readNextBlock = false;
+ }
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesReadFromFs += block.headerSize();
+ readNextBlock = true;
+ }
+ Assert.assertTrue(block.getBlockType().isData());
+ }
+ blockIterator.freeBlocks();
+ // No intermediate or leaf index blocks are expected.
+ Assert.assertEquals(1, dataIndexLevels);
+ Assert.assertEquals(expectedDataBlockCount, blockCount);
+ }
+ return bytesReadFromFs;
+ }
+
+ private int getBytesReadFromFsForNonGetScan(TableName tableName, ScanMetrics
scanMetrics,
+ int expectedStoreFileCount) throws Exception {
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ HRegion region = regions.get(0);
+ HStore store = region.getStore(CF);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ Assert.assertEquals(expectedStoreFileCount, storeFiles.size());
+ int bytesReadFromFs = 0;
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ HFile.Reader hfileReader = reader.getHFileReader();
+ HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+ FixedFileTrailer trailer = hfileReader.getTrailer();
+ int dataIndexLevels = trailer.getNumDataIndexLevels();
+ // Read the first block of the HFile. First block is always expected to
be a DATA block and
+ // the HFile is expected to have only one DATA block.
+ HFileBlock block = blockReader.readBlockData(0, -1, true, true, true);
+ Assert.assertTrue(block.getBlockType().isData());
+ bytesReadFromFs += block.getOnDiskSizeWithHeader();
+ if (block.getNextBlockOnDiskSize() > 0) {
+ bytesReadFromFs += block.headerSize();
+ }
+ block.release();
+ // Each of the HFiles is expected to have only root index but no
intermediate or leaf index
+ // blocks.
+ Assert.assertEquals(1, dataIndexLevels);
+ }
+ return bytesReadFromFs;
+ }
+
+ private ScanMetrics readDataAndGetScanMetrics(TableName tableName, boolean
isScanMetricsEnabled)
+ throws Exception {
+ Scan scan = new Scan();
+ scan.withStartRow(ROW2, true);
+ scan.withStopRow(ROW2, true);
+ scan.setScanMetricsEnabled(isScanMetricsEnabled);
+ ScanMetrics scanMetrics;
+ try (Table table = UTIL.getConnection().getTable(tableName);
+ ResultScanner scanner = table.getScanner(scan)) {
+ int rowCount = 0;
+ StoreFileScanner.instrument();
+ for (Result r : scanner) {
+ rowCount++;
+ }
+ Assert.assertEquals(1, rowCount);
+ scanMetrics = scanner.getScanMetrics();
+ }
+ if (isScanMetricsEnabled) {
+ LOG.info("Bytes read from fs: " + scanMetrics.bytesReadFromFs.get());
+ LOG.info("Bytes read from block cache: " +
scanMetrics.bytesReadFromBlockCache.get());
+ LOG.info("Bytes read from memstore: " +
scanMetrics.bytesReadFromMemstore.get());
+ LOG.info("Count of bytes scanned: " +
scanMetrics.countOfBlockBytesScanned.get());
+ LOG.info("StoreFileScanners seek count: " +
StoreFileScanner.getSeekCount());
+ }
+ return scanMetrics;
+ }
+
+ private void writeData(TableName tableName, boolean shouldFlush) throws
Exception {
+ try (Table table = UTIL.getConnection().getTable(tableName)) {
+ table.put(new Put(ROW2).addColumn(CF, CQ, VALUE));
+ table.put(new Put(ROW4).addColumn(CF, CQ, VALUE));
+ if (shouldFlush) {
+ // Create a HFile
+ UTIL.flush(tableName);
+ }
+
+ table.put(new Put(Bytes.toBytes("row1")).addColumn(CF, CQ, VALUE));
+ table.put(new Put(Bytes.toBytes("row5")).addColumn(CF, CQ, VALUE));
+ if (shouldFlush) {
+ // Create a HFile
+ UTIL.flush(tableName);
+ }
+ }
+ }
+
+ private void createTable(TableName tableName, boolean blockCacheEnabled,
BloomType bloomType)
+ throws Exception {
+ createTable(tableName, blockCacheEnabled, bloomType,
HConstants.DEFAULT_BLOCKSIZE,
+ new HashMap<>());
+ }
+
+ private void createTable(TableName tableName, boolean blockCacheEnabled,
BloomType bloomType,
+ Map<String, String> configuration) throws Exception {
+ createTable(tableName, blockCacheEnabled, bloomType,
HConstants.DEFAULT_BLOCKSIZE,
+ configuration);
+ }
+
+ private void createTable(TableName tableName, boolean blockCacheEnabled,
BloomType bloomType,
+ int blocksize, Map<String, String> configuration) throws Exception {
+ Admin admin = UTIL.getAdmin();
+ TableDescriptorBuilder tableDescriptorBuilder =
TableDescriptorBuilder.newBuilder(tableName);
+ ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
+ ColumnFamilyDescriptorBuilder.newBuilder(CF);
+ columnFamilyDescriptorBuilder.setBloomFilterType(bloomType);
+ columnFamilyDescriptorBuilder.setBlockCacheEnabled(blockCacheEnabled);
+ columnFamilyDescriptorBuilder.setBlocksize(blocksize);
+ for (Map.Entry<String, String> entry : configuration.entrySet()) {
+ columnFamilyDescriptorBuilder.setConfiguration(entry.getKey(),
entry.getValue());
+ }
+
tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
+ admin.createTable(tableDescriptorBuilder.build());
+ UTIL.waitUntilAllRegionsAssigned(tableName);
+ }
+
+ private void assertBytesReadFromFs(TableName tableName, long
actualBytesReadFromFs,
+ KeyValue keyValue, long actualReadOps) throws Exception {
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ MutableInt totalExpectedBytesReadFromFs = new MutableInt(0);
+ MutableInt totalExpectedReadOps = new MutableInt(0);
+ for (HRegion region : regions) {
+ Assert.assertNull(region.getBlockCache());
+ HStore store = region.getStore(CF);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ Assert.assertEquals(2, storeFiles.size());
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ HFile.Reader hfileReader = reader.getHFileReader();
+ BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+ Assert.assertTrue(bloomFilter == null || bloomFilter instanceof
CompoundBloomFilter);
+ CompoundBloomFilter cbf = bloomFilter == null ? null :
(CompoundBloomFilter) bloomFilter;
+ Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+ @Override
+ public void accept(HFileBlock block) {
+ totalExpectedBytesReadFromFs.add(block.getOnDiskSizeWithHeader());
+ if (block.getNextBlockOnDiskSize() > 0) {
+ totalExpectedBytesReadFromFs.add(block.headerSize());
+ }
+ totalExpectedReadOps.add(1);
+ }
+ };
+ readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+ }
+ }
+ Assert.assertEquals(totalExpectedBytesReadFromFs.longValue(),
actualBytesReadFromFs);
+ Assert.assertEquals(totalExpectedReadOps.longValue(), actualReadOps);
+ }
+
+ private void readHFile(HFile.Reader hfileReader, CompoundBloomFilter cbf,
KeyValue keyValue,
+ Consumer<HFileBlock> bytesReadFunction) throws Exception {
+ HFileBlock.FSReader blockReader = hfileReader.getUncachedBlockReader();
+ FixedFileTrailer trailer = hfileReader.getTrailer();
+ HFileContext meta = hfileReader.getFileContext();
+ long fileSize = hfileReader.length();
+
+ // Read the bloom block from FS
+ if (cbf != null) {
+ // Read a block in load-on-open section to make sure prefetched header
is not bloom
+ // block's header
+ blockReader.readBlockData(trailer.getLoadOnOpenDataOffset(), -1, true,
true, true).release();
+
+ HFileBlockIndex.BlockIndexReader index = cbf.getBloomIndex();
+ byte[] row = ROW2;
+ int blockIndex = index.rootBlockContainingKey(row, 0, row.length);
+ HFileBlock bloomBlock = cbf.getBloomBlock(blockIndex);
+ boolean fileContainsKey = BloomFilterUtil.contains(row, 0, row.length,
+ bloomBlock.getBufferReadOnly(), bloomBlock.headerSize(),
+ bloomBlock.getUncompressedSizeWithoutHeader(), cbf.getHash(),
cbf.getHashCount());
+ bytesReadFunction.accept(bloomBlock);
+ // Asser that the block read is a bloom block
+ Assert.assertEquals(bloomBlock.getBlockType(), BlockType.BLOOM_CHUNK);
+ bloomBlock.release();
+ if (!fileContainsKey) {
+ // Key is not in th file, so we don't need to read the data block
+ return;
+ }
+ }
+
+ // Indexes use NoOpEncodedSeeker
+ MyNoOpEncodedSeeker seeker = new MyNoOpEncodedSeeker();
+ HFileBlock.BlockIterator blockIter =
blockReader.blockRange(trailer.getLoadOnOpenDataOffset(),
+ fileSize - trailer.getTrailerSize());
+ HFileBlock block = blockIter.nextBlockWithBlockType(BlockType.ROOT_INDEX);
+
+ // Comparator class name is stored in the trailer in version 3.
+ CellComparator comparator = trailer.createComparator();
+ // Initialize the seeker
+ seeker.initRootIndex(block, trailer.getDataIndexCount(), comparator,
+ trailer.getNumDataIndexLevels());
+
+ int blockLevelsRead = 1; // Root index is the first level
+
+ int rootLevIndex = seeker.rootBlockContainingKey(keyValue);
+ long currentOffset = seeker.getBlockOffset(rootLevIndex);
+ int currentDataSize = seeker.getBlockDataSize(rootLevIndex);
+
+ HFileBlock prevBlock = null;
+ do {
+ prevBlock = block;
+ block = blockReader.readBlockData(currentOffset, currentDataSize, true,
true, true);
+ HFileBlock unpacked = block.unpack(meta, blockReader);
+ if (unpacked != block) {
+ block.release();
+ block = unpacked;
+ }
+ bytesReadFunction.accept(block);
+ if (!block.getBlockType().isData()) {
+ ByteBuff buffer = block.getBufferWithoutHeader();
+ // Place the buffer at the correct position
+ HFileBlockIndex.BlockIndexReader.locateNonRootIndexEntry(buffer,
keyValue, comparator);
+ currentOffset = buffer.getLong();
+ currentDataSize = buffer.getInt();
+ }
+ prevBlock.release();
+ blockLevelsRead++;
+ } while (!block.getBlockType().isData());
+ block.release();
+ blockIter.freeBlocks();
+
+ Assert.assertEquals(blockLevelsRead, trailer.getNumDataIndexLevels() + 1);
+ }
+
+ private void assertBytesReadFromBlockCache(TableName tableName,
+ long actualBytesReadFromBlockCache, KeyValue keyValue) throws Exception {
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ MutableInt totalExpectedBytesReadFromBlockCache = new MutableInt(0);
+ for (HRegion region : regions) {
+ Assert.assertNotNull(region.getBlockCache());
+ HStore store = region.getStore(CF);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ Assert.assertEquals(2, storeFiles.size());
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ HFile.Reader hfileReader = reader.getHFileReader();
+ BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+ Assert.assertTrue(bloomFilter == null || bloomFilter instanceof
CompoundBloomFilter);
+ CompoundBloomFilter cbf = bloomFilter == null ? null :
(CompoundBloomFilter) bloomFilter;
+ Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+ @Override
+ public void accept(HFileBlock block) {
+
totalExpectedBytesReadFromBlockCache.add(block.getOnDiskSizeWithHeader());
+ if (block.getNextBlockOnDiskSize() > 0) {
+ totalExpectedBytesReadFromBlockCache.add(block.headerSize());
+ }
+ }
+ };
+ readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+ }
+ }
+ Assert.assertEquals(totalExpectedBytesReadFromBlockCache.longValue(),
+ actualBytesReadFromBlockCache);
+ }
+
+ private void assertBlockCacheWarmUp(TableName tableName, KeyValue keyValue)
throws Exception {
+ List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(tableName);
+ Assert.assertEquals(1, regions.size());
+ for (HRegion region : regions) {
+ Assert.assertNotNull(region.getBlockCache());
+ HStore store = region.getStore(CF);
+ Collection<HStoreFile> storeFiles = store.getStorefiles();
+ Assert.assertEquals(2, storeFiles.size());
+ for (HStoreFile storeFile : storeFiles) {
+ StoreFileReader reader = storeFile.getReader();
+ HFile.Reader hfileReader = reader.getHFileReader();
+ BloomFilter bloomFilter = reader.getGeneralBloomFilter();
+ Assert.assertTrue(bloomFilter == null || bloomFilter instanceof
CompoundBloomFilter);
+ CompoundBloomFilter cbf = bloomFilter == null ? null :
(CompoundBloomFilter) bloomFilter;
+ Consumer<HFileBlock> bytesReadFunction = new Consumer<HFileBlock>() {
+ @Override
+ public void accept(HFileBlock block) {
+ assertBlockIsCached(hfileReader, block, region.getBlockCache());
+ }
+ };
+ readHFile(hfileReader, cbf, keyValue, bytesReadFunction);
+ }
+ }
+ }
+
+ private void assertBlockIsCached(HFile.Reader hfileReader, HFileBlock block,
+ BlockCache blockCache) {
+ if (blockCache == null) {
+ return;
+ }
+ Path path = hfileReader.getPath();
+ BlockCacheKey key = new BlockCacheKey(path, block.getOffset(), true,
block.getBlockType());
+ HFileBlock cachedBlock = (HFileBlock) blockCache.getBlock(key, true,
false, true);
+ Assert.assertNotNull(cachedBlock);
+ Assert.assertEquals(block.getOnDiskSizeWithHeader(),
cachedBlock.getOnDiskSizeWithHeader());
+ Assert.assertEquals(block.getNextBlockOnDiskSize(),
cachedBlock.getNextBlockOnDiskSize());
+ cachedBlock.release();
+ }
+
+ private static class MyNoOpEncodedSeeker extends
NoOpIndexBlockEncoder.NoOpEncodedSeeker {
+ public long getBlockOffset(int i) {
+ return blockOffsets[i];
+ }
+
+ public int getBlockDataSize(int i) {
+ return blockDataSizes[i];
+ }
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
index 60fdf235775..7ac45775009 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -354,6 +356,48 @@ public class TestDefaultMemStore {
assertScannerResults(s, new KeyValue[] { kv1, kv2 });
}
+ private long getBytesReadFromMemstore() throws IOException {
+ final byte[] f = Bytes.toBytes("family");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] v = Bytes.toBytes("value");
+ int numKvs = 10;
+
+ MultiVersionConcurrencyControl.WriteEntry w = mvcc.begin();
+
+ KeyValue kv;
+ KeyValue[] kvs = new KeyValue[numKvs];
+ long totalCellSize = 0;
+ for (int i = 0; i < numKvs; i++) {
+ byte[] row = Bytes.toBytes(i);
+ kv = new KeyValue(row, f, q1, v);
+ kv.setSequenceId(w.getWriteNumber());
+ memstore.add(kv, null);
+ kvs[i] = kv;
+ totalCellSize += Segment.getCellLength(kv);
+ }
+ mvcc.completeAndWait(w);
+
+ ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset();
+ KeyValueScanner s = this.memstore.getScanners(mvcc.getReadPoint()).get(0);
+ assertScannerResults(s, kvs);
+ return totalCellSize;
+ }
+
+ @Test
+ public void testBytesReadFromMemstore() throws IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(true);
+ long totalCellSize = getBytesReadFromMemstore();
+ Assert.assertEquals(totalCellSize,
+ ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset());
+ }
+
+ @Test
+ public void testBytesReadFromMemstoreWithScanMetricsDisabled() throws
IOException {
+ ThreadLocalServerSideScanMetrics.setScanMetricsEnabled(false);
+ getBytesReadFromMemstore();
+ Assert.assertEquals(0,
ThreadLocalServerSideScanMetrics.getBytesReadFromMemstoreAndReset());
+ }
+
/**
* Regression test for HBASE-2616, HBASE-2670. When we insert a
higher-memstoreTS version of a
* cell but with the same timestamp, we still need to provide consistent
reads for the same