This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch HBASE-21879 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d82f8a1cf122203fb22fe8d5312b6f7d0bfca839 Author: huzheng <open...@gmail.com> AuthorDate: Mon Apr 1 22:23:24 2019 +0800 HBASE-22127 Ensure that the block cached in the LRUBlockCache offheap is allocated from heap --- .../apache/hadoop/hbase/io/ByteBuffAllocator.java | 20 ++- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 4 + .../apache/hadoop/hbase/io/hfile/HFileBlock.java | 86 +++++++----- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 49 ++++--- .../hadoop/hbase/io/hfile/LruBlockCache.java | 32 ++++- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 53 +++++--- .../apache/hadoop/hbase/io/hfile/TestChecksum.java | 24 ++-- .../apache/hadoop/hbase/io/hfile/TestHFile.java | 124 ++++++++++++++++-- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 145 +++++++++++++++------ .../hadoop/hbase/io/hfile/TestHFileBlockIndex.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileEncryption.java | 2 +- .../hadoop/hbase/io/hfile/TestHFileWriterV3.java | 8 +- .../io/hfile/TestLazyDataBlockDecompression.java | 2 +- .../io/hfile/bucket/TestBucketWriterThread.java | 6 +- .../hadoop/hbase/master/AbstractTestDLS.java | 29 ++--- 15 files changed, 426 insertions(+), 160 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java index 0020e23..984d46d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import sun.nio.ch.DirectBuffer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -34,7 +35,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -191,7 +191,7 @@ public class ByteBuffAllocator { } // If disabled the reservoir, just allocate it from on-heap. if (!isReservoirEnabled() || size == 0) { - return new SingleByteBuff(NONE, ByteBuffer.allocate(size)); + return allocateOnHeap(size); } int reminder = size % bufSize; int len = size / bufSize + (reminder > 0 ? 1 : 0); @@ -222,6 +222,22 @@ public class ByteBuffAllocator { return bb; } + /** + * Free all direct buffers if allocated, mainly used for testing. + */ + @VisibleForTesting + public void clean() { + while (!buffers.isEmpty()) { + ByteBuffer b = buffers.poll(); + if (b instanceof DirectBuffer) { + DirectBuffer db = (DirectBuffer) b; + if (db.cleaner() != null) { + db.cleaner().clean(); + } + } + } + } + public static ByteBuff wrap(ByteBuffer[] buffers, Recycler recycler) { if (buffers == null || buffers.length == 0) { throw new IllegalArgumentException("buffers shouldn't be null or empty"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 53c216f..bb57fbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -367,6 +367,10 @@ public class CacheConfig { return Optional.ofNullable(this.blockCache); } + public boolean isCombinedBlockCache() { + return blockCache instanceof CombinedBlockCache; + } + public ByteBuffAllocator getByteBuffAllocator() { return this.byteBuffAllocator; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 22a8295..2fe9255 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -763,6 +763,13 @@ public class HFileBlock implements Cacheable { } /** + * @return true to indicate the block is allocated from JVM heap, otherwise from off-heap. + */ + boolean isOnHeap() { + return buf.hasArray(); + } + + /** * Unified version 2 {@link HFile} block writer. The intended usage pattern * is as follows: * <ol> @@ -1300,16 +1307,29 @@ public class HFileBlock implements Cacheable { /** An HFile block reader with iteration ability. */ interface FSReader { /** - * Reads the block at the given offset in the file with the given on-disk - * size and uncompressed size. - * - * @param offset - * @param onDiskSize the on-disk size of the entire block, including all - * applicable headers, or -1 if unknown + * Reads the block at the given offset in the file with the given on-disk size and uncompressed + * size. + * @param offset of the file to read + * @param onDiskSize the on-disk size of the entire block, including all applicable headers, or + * -1 if unknown + * @param pread true to use pread, otherwise use the stream read. + * @param updateMetrics update the metrics or not. + * @param intoHeap allocate the block's ByteBuff by {@link ByteBuffAllocator} or JVM heap. For + * LRUBlockCache, we must ensure that the block to cache is an heap one, because the + * memory occupation is based on heap now, also for {@link CombinedBlockCache}, we use + * the heap LRUBlockCache as L1 cache to cache small blocks such as IndexBlock or + * MetaBlock for faster access. So introduce an flag here to decide whether allocate + * from JVM heap or not so that we can avoid an extra off-heap to heap memory copy when + * using LRUBlockCache. For most cases, we known what's the expected block type we'll + * read, while for some special case (Example: HFileReaderImpl#readNextDataBlock()), we + * cannot pre-decide what's the expected block type, then we can only allocate block's + * ByteBuff from {@link ByteBuffAllocator} firstly, and then when caching it in + * {@link LruBlockCache} we'll check whether the ByteBuff is from heap or not, if not + * then we'll clone it to an heap one and cache it. * @return the newly read block */ - HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics) - throws IOException; + HFileBlock readBlockData(long offset, long onDiskSize, boolean pread, boolean updateMetrics, + boolean intoHeap) throws IOException; /** * Creates a block iterator over the given portion of the {@link HFile}. @@ -1444,7 +1464,7 @@ public class HFileBlock implements Cacheable { if (offset >= endOffset) { return null; } - HFileBlock b = readBlockData(offset, length, false, false); + HFileBlock b = readBlockData(offset, length, false, false, true); offset += b.getOnDiskSizeWithHeader(); length = b.getNextBlockOnDiskSize(); HFileBlock uncompressed = b.unpack(fileContext, owner); @@ -1526,16 +1546,18 @@ public class HFileBlock implements Cacheable { /** * Reads a version 2 block (version 1 blocks not supported and not expected). Tries to do as * little memory allocation as possible, using the provided on-disk size. - * * @param offset the offset in the stream to read at - * @param onDiskSizeWithHeaderL the on-disk size of the block, including - * the header, or -1 if unknown; i.e. when iterating over blocks reading - * in the file metadata info. + * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header, or -1 if + * unknown; i.e. when iterating over blocks reading in the file metadata info. * @param pread whether to use a positional read + * @param updateMetrics whether to update the metrics + * @param intoHeap allocate ByteBuff of block from heap or off-heap. + * @see FSReader#readBlockData(long, long, boolean, boolean, boolean) for more details about the + * useHeap. */ @Override public HFileBlock readBlockData(long offset, long onDiskSizeWithHeaderL, boolean pread, - boolean updateMetrics) throws IOException { + boolean updateMetrics, boolean intoHeap) throws IOException { // Get a copy of the current state of whether to validate // hbase checksums or not for this read call. This is not // thread-safe but the one constaint is that if we decide @@ -1544,9 +1566,8 @@ public class HFileBlock implements Cacheable { boolean doVerificationThruHBaseChecksum = streamWrapper.shouldUseHBaseChecksum(); FSDataInputStream is = streamWrapper.getStream(doVerificationThruHBaseChecksum); - HFileBlock blk = readBlockDataInternal(is, offset, - onDiskSizeWithHeaderL, pread, - doVerificationThruHBaseChecksum, updateMetrics); + HFileBlock blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, + doVerificationThruHBaseChecksum, updateMetrics, intoHeap); if (blk == null) { HFile.LOG.warn("HBase checksum verification failed for file " + pathName + " at offset " + @@ -1573,7 +1594,7 @@ public class HFileBlock implements Cacheable { is = this.streamWrapper.fallbackToFsChecksum(CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD); doVerificationThruHBaseChecksum = false; blk = readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, - doVerificationThruHBaseChecksum, updateMetrics); + doVerificationThruHBaseChecksum, updateMetrics, intoHeap); if (blk != null) { HFile.LOG.warn("HDFS checksum verification succeeded for file " + pathName + " at offset " + @@ -1669,24 +1690,29 @@ public class HFileBlock implements Cacheable { return nextBlockOnDiskSize; } + private ByteBuff allocate(int size, boolean intoHeap) { + return intoHeap ? ByteBuffAllocator.HEAP.allocate(size) : allocator.allocate(size); + } + /** * Reads a version 2 block. - * * @param offset the offset in the stream to read at. - * @param onDiskSizeWithHeaderL the on-disk size of the block, including - * the header and checksums if present or -1 if unknown (as a long). Can be -1 - * if we are doing raw iteration of blocks as when loading up file metadata; i.e. - * the first read of a new file. Usually non-null gotten from the file index. + * @param onDiskSizeWithHeaderL the on-disk size of the block, including the header and + * checksums if present or -1 if unknown (as a long). Can be -1 if we are doing raw + * iteration of blocks as when loading up file metadata; i.e. the first read of a new + * file. Usually non-null gotten from the file index. * @param pread whether to use a positional read - * @param verifyChecksum Whether to use HBase checksums. - * If HBase checksum is switched off, then use HDFS checksum. Can also flip on/off - * reading same file if we hit a troublesome patch in an hfile. + * @param verifyChecksum Whether to use HBase checksums. If HBase checksum is switched off, then + * use HDFS checksum. Can also flip on/off reading same file if we hit a troublesome + * patch in an hfile. + * @param updateMetrics whether need to update the metrics. + * @param intoHeap allocate the ByteBuff of block from heap or off-heap. * @return the HFileBlock or null if there is a HBase checksum mismatch */ @VisibleForTesting protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, - long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics) - throws IOException { + long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, + boolean intoHeap) throws IOException { if (offset < 0) { throw new IOException("Invalid offset=" + offset + " trying to read " + "block (onDiskSize=" + onDiskSizeWithHeaderL + ")"); @@ -1728,7 +1754,7 @@ public class HFileBlock implements Cacheable { // says where to start reading. If we have the header cached, then we don't need to read // it again and we can likely read from last place we left off w/o need to backup and reread // the header we read last time through here. - ByteBuff onDiskBlock = allocator.allocate(onDiskSizeWithHeader + hdrSize); + ByteBuff onDiskBlock = this.allocate(onDiskSizeWithHeader + hdrSize, intoHeap); boolean initHFileBlockSuccess = false; try { if (headerBuf != null) { @@ -2072,7 +2098,7 @@ public class HFileBlock implements Cacheable { " onDiskDataSizeWithHeader " + onDiskDataSizeWithHeader; } - public HFileBlock deepClone() { + public HFileBlock deepCloneOnHeap() { return new HFileBlock(this, true); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index c878773..d005f2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -272,8 +272,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { if (LOG.isTraceEnabled()) { LOG.trace("Prefetch start " + getPathOffsetEndStr(path, offset, end)); } - // TODO: Could we use block iterator in here? Would that get stuff into the cache? - HFileBlock prevBlock = null; + // Don't use BlockIterator here, because it's designed to read load-on-open section. + long onDiskSizeOfNextBlock = -1; while (offset < end) { if (Thread.interrupted()) { break; @@ -282,16 +282,17 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // the internal-to-hfileblock thread local which holds the overread that gets the // next header, will not have happened...so, pass in the onDiskSize gotten from the // cached block. This 'optimization' triggers extremely rarely I'd say. - long onDiskSize = prevBlock != null? prevBlock.getNextBlockOnDiskSize(): -1; - HFileBlock block = readBlock(offset, onDiskSize, /*cacheBlock=*/true, - /*pread=*/true, false, false, null, null); - // Need not update the current block. Ideally here the readBlock won't find the - // block in cache. We call this readBlock so that block data is read from FS and - // cached in BC. So there is no reference count increment that happens here. - // The return will ideally be a noop because the block is not of MemoryType SHARED. - returnBlock(block); - prevBlock = block; - offset += block.getOnDiskSizeWithHeader(); + HFileBlock block = readBlock(offset, onDiskSizeOfNextBlock, /* cacheBlock= */true, + /* pread= */true, false, false, null, null); + try { + onDiskSizeOfNextBlock = block.getNextBlockOnDiskSize(); + offset += block.getOnDiskSizeWithHeader(); + } finally { + // Ideally here the readBlock won't find the block in cache. We call this + // readBlock so that block data is read from FS and cached in BC. we must call + // returnBlock here to decrease the reference count of block. + returnBlock(block); + } } } catch (IOException e) { // IOExceptions are probably due to region closes (relocation, etc.) @@ -1412,7 +1413,7 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // Cache Miss, please load. HFileBlock compressedBlock = - fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false); + fsBlockReader.readBlockData(metaBlockOffset, blockSize, true, false, true); HFileBlock uncompressedBlock = compressedBlock.unpack(hfileContext, fsBlockReader); if (compressedBlock != uncompressedBlock) { compressedBlock.release(); @@ -1427,6 +1428,24 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } } + /** + * If expected block is data block, we'll allocate the ByteBuff of block from + * {@link org.apache.hadoop.hbase.io.ByteBuffAllocator} and it's usually an off-heap one, + * otherwise it will allocate from heap. + * @see org.apache.hadoop.hbase.io.hfile.HFileBlock.FSReader#readBlockData(long, long, boolean, + * boolean, boolean) + */ + private boolean shouldUseHeap(BlockType expectedBlockType) { + if (cacheConf.getBlockCache() == null) { + return false; + } else if (!cacheConf.isCombinedBlockCache()) { + // Block to cache in LruBlockCache must be an heap one. So just allocate block memory from + // heap for saving an extra off-heap to heap copying. + return true; + } + return expectedBlockType != null && !expectedBlockType.isData(); + } + @Override public HFileBlock readBlock(long dataBlockOffset, long onDiskBlockSize, final boolean cacheBlock, boolean pread, final boolean isCompaction, @@ -1496,8 +1515,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { TraceUtil.addTimelineAnnotation("blockCacheMiss"); // Load block from filesystem. - HFileBlock hfileBlock = - fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, !isCompaction); + HFileBlock hfileBlock = fsBlockReader.readBlockData(dataBlockOffset, onDiskBlockSize, pread, + !isCompaction, shouldUseHeap(expectedBlockType)); validateBlockType(hfileBlock, expectedBlockType); HFileBlock unpacked = hfileBlock.unpack(hfileContext, fsBlockReader); BlockType.BlockCategory category = hfileBlock.getBlockType().getCategory(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index dd7c695..2aabb61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -346,6 +346,32 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } } + /** + * The block cached in LRUBlockCache will always be an heap block: on the one side, the heap + * access will be more faster then off-heap, the small index block or meta block cached in + * CombinedBlockCache will benefit a lot. on other side, the LRUBlockCache size is always + * calculated based on the total heap size, if caching an off-heap block in LRUBlockCache, the + * heap size will be messed up. Here we will clone the block into an heap block if it's an + * off-heap block, otherwise just use the original block. The key point is maintain the refCnt of + * the block (HBASE-22127): <br> + * 1. if cache the cloned heap block, its refCnt is an totally new one, it's easy to handle; <br> + * 2. if cache the original heap block, we're sure that it won't be tracked in ByteBuffAllocator's + * reservoir, if both RPC and LRUBlockCache release the block, then it can be garbage collected by + * JVM, so need a retain here. + * @param buf the original block + * @return an block with an heap memory backend. + */ + private Cacheable asReferencedHeapBlock(Cacheable buf) { + if (buf instanceof HFileBlock) { + HFileBlock blk = ((HFileBlock) buf); + if (!blk.isOnHeap()) { + return blk.deepCloneOnHeap(); + } + } + // The block will be referenced by this LRUBlockCache, so should increase its refCnt here. + return buf.retain(); + } + // BlockCache implementation /** @@ -394,8 +420,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { } return; } - // The block will be referenced by the LRUBlockCache, so should increase the refCnt here. - buf.retain(); + // Ensure that the block is an heap one. + buf = asReferencedHeapBlock(buf); cb = new LruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory); long newSize = updateSizeMetrics(cb, false); map.put(cacheKey, cb); @@ -495,7 +521,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize { if (caching) { if (result instanceof HFileBlock && ((HFileBlock) result).usesSharedMemory()) { Cacheable original = result; - result = ((HFileBlock) original).deepClone(); + result = ((HFileBlock) original).deepCloneOnHeap(); // deepClone an new one, so need to put the original one back to free it. victimHandler.returnBlock(cacheKey, original); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 1db1013..7c709c1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -50,6 +50,8 @@ import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.io.HeapSize; @@ -564,10 +566,10 @@ public class BucketCache implements BlockCache, HeapSize { if (!cacheEnabled) { return false; } - RAMQueueEntry removedBlock = checkRamCache(cacheKey); + boolean existed = removeFromRamCache(cacheKey); BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry == null) { - if (removedBlock != null) { + if (existed) { cacheStats.evicted(0, cacheKey.isPrimary()); return true; } else { @@ -578,7 +580,7 @@ public class BucketCache implements BlockCache, HeapSize { try { lock.writeLock().lock(); if (backingMap.remove(cacheKey, bucketEntry)) { - blockEvicted(cacheKey, bucketEntry, removedBlock == null); + blockEvicted(cacheKey, bucketEntry, !existed); } else { return false; } @@ -589,23 +591,23 @@ public class BucketCache implements BlockCache, HeapSize { return true; } - private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { - RAMQueueEntry removedBlock = ramCache.remove(cacheKey); - if (removedBlock != null) { - this.blockNumber.decrement(); - this.heapSize.add(-1 * removedBlock.getData().heapSize()); - } - return removedBlock; + private boolean removeFromRamCache(BlockCacheKey cacheKey) { + return ramCache.remove(cacheKey, re -> { + if (re != null) { + this.blockNumber.decrement(); + this.heapSize.add(-1 * re.getData().heapSize()); + } + }); } public boolean evictBlock(BlockCacheKey cacheKey, boolean deletedBlock) { if (!cacheEnabled) { return false; } - RAMQueueEntry removedBlock = checkRamCache(cacheKey); + boolean existed = removeFromRamCache(cacheKey); BucketEntry bucketEntry = backingMap.get(cacheKey); if (bucketEntry == null) { - if (removedBlock != null) { + if (existed) { cacheStats.evicted(0, cacheKey.isPrimary()); return true; } else { @@ -618,7 +620,7 @@ public class BucketCache implements BlockCache, HeapSize { int refCount = bucketEntry.getRefCount(); if (refCount == 0) { if (backingMap.remove(cacheKey, bucketEntry)) { - blockEvicted(cacheKey, bucketEntry, removedBlock == null); + blockEvicted(cacheKey, bucketEntry, !existed); } else { return false; } @@ -1041,10 +1043,12 @@ public class BucketCache implements BlockCache, HeapSize { putIntoBackingMap(key, bucketEntries[i]); } // Always remove from ramCache even if we failed adding it to the block cache above. - RAMQueueEntry ramCacheEntry = ramCache.remove(key); - if (ramCacheEntry != null) { - heapSize.add(-1 * entries.get(i).getData().heapSize()); - } else if (bucketEntries[i] != null){ + boolean existed = ramCache.remove(key, re -> { + if (re != null) { + heapSize.add(-1 * re.getData().heapSize()); + } + }); + if (!existed && bucketEntries[i] != null) { // Block should have already been evicted. Remove it and free space. ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); try { @@ -1753,12 +1757,23 @@ public class BucketCache implements BlockCache, HeapSize { return previous; } - public RAMQueueEntry remove(BlockCacheKey key) { + public boolean remove(BlockCacheKey key) { + return remove(key, re->{}); + } + + /** + * Defined an {@link Consumer} here, because once the removed entry release its reference count, + * then it's ByteBuffers may be recycled and accessing it outside this method will be thrown an + * exception. the consumer will access entry to remove before release its reference count. + * Notice, don't change its reference count in the {@link Consumer} + */ + public boolean remove(BlockCacheKey key, Consumer<RAMQueueEntry> action) { RAMQueueEntry previous = delegate.remove(key); + action.accept(previous); if (previous != null) { previous.getData().release(); } - return previous; + return previous != null; } public boolean isEmpty() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java index c432fa9..2aebc8c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestChecksum.java @@ -100,7 +100,8 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path, meta, ByteBuffAllocator.HEAP); - HFileBlock b = hbr.readBlockData(0, -1, false, false); + HFileBlock b = hbr.readBlockData(0, -1, false, false, true); + assertTrue(b.isOnHeap()); assertEquals(b.getChecksumType(), ChecksumType.getDefaultChecksumType().getCode()); } @@ -146,7 +147,8 @@ public class TestChecksum { meta = new HFileContextBuilder().withHBaseCheckSum(true).build(); HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, (HFileSystem) fs, path, meta, ByteBuffAllocator.HEAP); - HFileBlock b = hbr.readBlockData(0, -1, false, false); + HFileBlock b = hbr.readBlockData(0, -1, false, false, true); + assertTrue(b.isOnHeap()); // verify SingleByteBuff checksum. verifySBBCheckSum(b.getBufferReadOnly()); @@ -215,7 +217,7 @@ public class TestChecksum { .withHBaseCheckSum(true) .build(); HFileBlock.FSReader hbr = new CorruptedFSReaderImpl(is, totalSize, fs, path, meta); - HFileBlock b = hbr.readBlockData(0, -1, pread, false); + HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); b.sanityCheck(); assertEquals(4936, b.getUncompressedSizeWithoutHeader()); assertEquals(algo == GZ ? 2173 : 4936, @@ -236,19 +238,19 @@ public class TestChecksum { // requests. Verify that this is correct. for (int i = 0; i < HFileBlock.CHECKSUM_VERIFICATION_NUM_IO_THRESHOLD + 1; i++) { - b = hbr.readBlockData(0, -1, pread, false); + b = hbr.readBlockData(0, -1, pread, false, true); assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); } // The next read should have hbase checksum verification reanabled, // we verify this by assertng that there was a hbase-checksum failure. - b = hbr.readBlockData(0, -1, pread, false); + b = hbr.readBlockData(0, -1, pread, false, true); assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(1, HFile.getAndResetChecksumFailuresCount()); // Since the above encountered a checksum failure, we switch // back to not checking hbase checksums. - b = hbr.readBlockData(0, -1, pread, false); + b = hbr.readBlockData(0, -1, pread, false, true); assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); is.close(); @@ -260,7 +262,7 @@ public class TestChecksum { assertEquals(false, newfs.useHBaseChecksum()); is = new FSDataInputStreamWrapper(newfs, path); hbr = new CorruptedFSReaderImpl(is, totalSize, newfs, path, meta); - b = hbr.readBlockData(0, -1, pread, false); + b = hbr.readBlockData(0, -1, pread, false, true); is.close(); b.sanityCheck(); b = b.unpack(meta, hbr); @@ -343,7 +345,7 @@ public class TestChecksum { HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(new FSDataInputStreamWrapper(is, nochecksum), totalSize, hfs, path, meta, ByteBuffAllocator.HEAP); - HFileBlock b = hbr.readBlockData(0, -1, pread, false); + HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); assertTrue(b.getBufferReadOnly() instanceof SingleByteBuff); is.close(); b.sanityCheck(); @@ -389,13 +391,13 @@ public class TestChecksum { @Override protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, - long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics) - throws IOException { + long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum, boolean updateMetrics, + boolean useHeap) throws IOException { if (verifyChecksum) { corruptDataStream = true; } HFileBlock b = super.readBlockDataInternal(is, offset, onDiskSizeWithHeaderL, pread, - verifyChecksum, updateMetrics); + verifyChecksum, updateMetrics, useHeap); corruptDataStream = false; return b; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java index f58fe3e..0ed933b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFile.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.hbase.io.hfile; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_IOENGINE_KEY; +import static org.apache.hadoop.hbase.HConstants.BUCKET_CACHE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.ByteBuffAllocator.BUFFER_SIZE_KEY; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MAX_BUFFER_COUNT_KEY; import static org.apache.hadoop.hbase.io.ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY; +import static org.apache.hadoop.hbase.io.hfile.CacheConfig.EVICT_BLOCKS_ON_CLOSE_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -104,33 +108,129 @@ public class TestHFile { fs = TEST_UTIL.getTestFileSystem(); } - @Test - public void testReaderWithoutBlockCache() throws Exception { - int bufCount = 32; + private ByteBuffAllocator initAllocator(boolean reservoirEnabled, int bufSize, int bufCount, + int minAllocSize) { Configuration that = HBaseConfiguration.create(conf); + that.setInt(BUFFER_SIZE_KEY, bufSize); that.setInt(MAX_BUFFER_COUNT_KEY, bufCount); - // AllByteBuffers will be allocated from the buffers. - that.setInt(MIN_ALLOCATE_SIZE_KEY, 0); - ByteBuffAllocator alloc = ByteBuffAllocator.create(that, true); - List<ByteBuff> buffs = new ArrayList<>(); + // All ByteBuffers will be allocated from the buffers. + that.setInt(MIN_ALLOCATE_SIZE_KEY, minAllocSize); + return ByteBuffAllocator.create(that, reservoirEnabled); + } + + private void fillByteBuffAllocator(ByteBuffAllocator alloc, int bufCount) { // Fill the allocator with bufCount ByteBuffer + List<ByteBuff> buffs = new ArrayList<>(); for (int i = 0; i < bufCount; i++) { buffs.add(alloc.allocateOneBuffer()); + Assert.assertEquals(alloc.getQueueSize(), 0); } - Assert.assertEquals(alloc.getQueueSize(), 0); - for (ByteBuff buf : buffs) { - buf.release(); - } + buffs.forEach(ByteBuff::release); Assert.assertEquals(alloc.getQueueSize(), bufCount); + } + + @Test + public void testReaderWithoutBlockCache() throws Exception { + int bufCount = 32; + // AllByteBuffers will be allocated from the buffers. + ByteBuffAllocator alloc = initAllocator(true, 64 * 1024, bufCount, 0); + fillByteBuffAllocator(alloc, bufCount); // start write to store file. Path path = writeStoreFile(); try { - readStoreFile(path, that, alloc); + readStoreFile(path, conf, alloc); } catch (Exception e) { // fail test assertTrue(false); } Assert.assertEquals(bufCount, alloc.getQueueSize()); + alloc.clean(); + } + + /** + * Test case for HBASE-22127 in LruBlockCache. + */ + @Test + public void testReaderWithLRUBlockCache() throws Exception { + int bufCount = 1024, blockSize = 64 * 1024; + ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); + fillByteBuffAllocator(alloc, bufCount); + Path storeFilePath = writeStoreFile(); + // Open the file reader with LRUBlockCache + BlockCache lru = new LruBlockCache(1024 * 1024 * 32, blockSize, true, conf); + CacheConfig cacheConfig = new CacheConfig(conf, null, lru, alloc); + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); + long offset = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); + HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); + offset += block.getOnDiskSizeWithHeader(); + // Ensure the block is an heap one. + Cacheable cachedBlock = lru.getBlock(key, false, false, true); + Assert.assertNotNull(cachedBlock); + Assert.assertTrue(cachedBlock instanceof HFileBlock); + Assert.assertTrue(((HFileBlock) cachedBlock).isOnHeap()); + // Should never allocate off-heap block from allocator because ensure that it's LRU. + Assert.assertEquals(bufCount, alloc.getQueueSize()); + block.release(); // return back the ByteBuffer back to allocator. + } + reader.close(); + Assert.assertEquals(bufCount, alloc.getQueueSize()); + alloc.clean(); + lru.shutdown(); + } + + private BlockCache initCombinedBlockCache() { + Configuration that = HBaseConfiguration.create(conf); + that.setFloat(BUCKET_CACHE_SIZE_KEY, 32); // 32MB for bucket cache. + that.set(BUCKET_CACHE_IOENGINE_KEY, "offheap"); + BlockCache bc = BlockCacheFactory.createBlockCache(that); + Assert.assertNotNull(bc); + Assert.assertTrue(bc instanceof CombinedBlockCache); + return bc; + } + + /** + * Test case for HBASE-22127 in CombinedBlockCache + */ + @Test + public void testReaderWithCombinedBlockCache() throws Exception { + int bufCount = 1024, blockSize = 64 * 1024; + ByteBuffAllocator alloc = initAllocator(true, bufCount, blockSize, 0); + fillByteBuffAllocator(alloc, bufCount); + Path storeFilePath = writeStoreFile(); + // Open the file reader with CombinedBlockCache + BlockCache combined = initCombinedBlockCache(); + conf.setBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, true); + CacheConfig cacheConfig = new CacheConfig(conf, null, combined, alloc); + HFile.Reader reader = HFile.createReader(fs, storeFilePath, cacheConfig, true, conf); + long offset = 0; + while (offset < reader.getTrailer().getLoadOnOpenDataOffset()) { + BlockCacheKey key = new BlockCacheKey(storeFilePath.getName(), offset); + HFileBlock block = reader.readBlock(offset, -1, true, true, false, true, null, null); + offset += block.getOnDiskSizeWithHeader(); + // Read the cached block. + Cacheable cachedBlock = combined.getBlock(key, false, false, true); + try { + Assert.assertNotNull(cachedBlock); + Assert.assertTrue(cachedBlock instanceof HFileBlock); + HFileBlock hfb = (HFileBlock) cachedBlock; + // Data block will be cached in BucketCache, so it should be an off-heap block. + if (hfb.getBlockType().isData()) { + Assert.assertFalse(hfb.isOnHeap()); + } else { + // Non-data block will be cached in LRUBlockCache, so it must be an on-heap block. + Assert.assertTrue(hfb.isOnHeap()); + } + } finally { + combined.returnBlock(key, cachedBlock); + } + block.release(); // return back the ByteBuffer back to allocator. + } + reader.close(); + combined.shutdown(); + Assert.assertEquals(bufCount, alloc.getQueueSize()); + alloc.clean(); } private void readStoreFile(Path storeFilePath, Configuration conf, ByteBuffAllocator alloc) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index efdae16..2733ca2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -40,6 +40,8 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -48,6 +50,7 @@ import org.apache.hadoop.hbase.ArrayBackedTag; import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -68,6 +71,7 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.Compressor; +import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -93,10 +97,12 @@ public class TestHFileBlock { private static final Logger LOG = LoggerFactory.getLogger(TestHFileBlock.class); - static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, GZ }; + // TODO let uncomment the GZ algorithm in HBASE-21937, because no support BB unpack yet. + static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = { NONE, /* GZ */ }; private static final int NUM_TEST_BLOCKS = 1000; private static final int NUM_READER_THREADS = 26; + private static final int MAX_BUFFER_COUNT = 2048; // Used to generate KeyValues private static int NUM_KEYVALUES = 50; @@ -108,14 +114,51 @@ public class TestHFileBlock { private final boolean includesMemstoreTS; private final boolean includesTag; - public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag) { + private final boolean useHeapAllocator; + private final ByteBuffAllocator alloc; + + public TestHFileBlock(boolean includesMemstoreTS, boolean includesTag, boolean useHeapAllocator) { this.includesMemstoreTS = includesMemstoreTS; this.includesTag = includesTag; + this.useHeapAllocator = useHeapAllocator; + this.alloc = useHeapAllocator ? ByteBuffAllocator.HEAP : createOffHeapAlloc(); + assertAllocator(); } @Parameters public static Collection<Object[]> parameters() { - return HBaseTestingUtility.MEMSTORETS_TAGS_PARAMETRIZED; + List<Object[]> params = new ArrayList<>(); + // Generate boolean triples from 000 to 111 + for (int i = 0; i < (1 << 3); i++) { + Object[] flags = new Boolean[3]; + for (int k = 0; k < 3; k++) { + flags[k] = (i & (1 << k)) != 0; + } + params.add(flags); + } + return params; + } + + private ByteBuffAllocator createOffHeapAlloc() { + Configuration conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf.setInt(ByteBuffAllocator.MAX_BUFFER_COUNT_KEY, MAX_BUFFER_COUNT); + conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); + ByteBuffAllocator alloc = ByteBuffAllocator.create(conf, true); + // Fill the allocator + List<ByteBuff> bufs = new ArrayList<>(); + for (int i = 0; i < MAX_BUFFER_COUNT; i++) { + ByteBuff bb = alloc.allocateOneBuffer(); + assertTrue(!bb.hasArray()); + bufs.add(bb); + } + bufs.forEach(ByteBuff::release); + return alloc; + } + + private void assertAllocator() { + if (!useHeapAllocator) { + assertEquals(MAX_BUFFER_COUNT, alloc.getQueueSize()); + } } @Before @@ -123,6 +166,12 @@ public class TestHFileBlock { fs = HFileSystem.get(TEST_UTIL.getConfiguration()); } + @After + public void tearDown() throws IOException { + assertAllocator(); + alloc.clean(); + } + static void writeTestBlockContents(DataOutputStream dos) throws IOException { // This compresses really well. for (int i = 0; i < 1000; ++i) @@ -327,9 +376,8 @@ public class TestHFileBlock { .withIncludesMvcc(includesMemstoreTS) .withIncludesTags(includesTag) .withCompression(algo).build(); - HFileBlock.FSReader hbr = - new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP); - HFileBlock b = hbr.readBlockData(0, -1, pread, false); + HFileBlock.FSReader hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc); + HFileBlock b = hbr.readBlockData(0, -1, pread, false, true); is.close(); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); @@ -341,14 +389,14 @@ public class TestHFileBlock { if (algo == GZ) { is = fs.open(path); - hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP); - b = hbr.readBlockData(0, 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + - b.totalChecksumBytes(), pread, false); + hbr = new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc); + b = hbr.readBlockData(0, + 2173 + HConstants.HFILEBLOCK_HEADER_SIZE + b.totalChecksumBytes(), pread, false, true); assertEquals(expected, b); int wrongCompressedSize = 2172; try { - b = hbr.readBlockData(0, wrongCompressedSize - + HConstants.HFILEBLOCK_HEADER_SIZE, pread, false); + hbr.readBlockData(0, wrongCompressedSize + HConstants.HFILEBLOCK_HEADER_SIZE, pread, + false, true); fail("Exception expected"); } catch (IOException ex) { String expectedPrefix = "Passed in onDiskSizeWithHeader="; @@ -356,8 +404,10 @@ public class TestHFileBlock { + "'.\nMessage is expected to start with: '" + expectedPrefix + "'", ex.getMessage().startsWith(expectedPrefix)); } + assertTrue(b.release()); is.close(); } + assertTrue(expected.release()); } } } @@ -428,13 +478,13 @@ public class TestHFileBlock { .withIncludesTags(includesTag) .build(); HFileBlock.FSReaderImpl hbr = - new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP); + new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc); hbr.setDataBlockEncoder(dataBlockEncoder); hbr.setIncludesMemStoreTS(includesMemstoreTS); HFileBlock blockFromHFile, blockUnpacked; int pos = 0; for (int blockId = 0; blockId < numBlocks; ++blockId) { - blockFromHFile = hbr.readBlockData(pos, -1, pread, false); + blockFromHFile = hbr.readBlockData(pos, -1, pread, false, true); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); blockFromHFile.sanityCheck(); pos += blockFromHFile.getOnDiskSizeWithHeader(); @@ -487,6 +537,10 @@ public class TestHFileBlock { blockUnpacked, deserialized.unpack(meta, hbr)); } } + assertTrue(blockUnpacked.release()); + if (blockFromHFile != blockUnpacked) { + blockFromHFile.release(); + } } is.close(); } @@ -557,7 +611,7 @@ public class TestHFileBlock { .withIncludesTags(includesTag) .withCompression(algo).build(); HFileBlock.FSReader hbr = - new HFileBlock.FSReaderImpl(is, totalSize, meta, ByteBuffAllocator.HEAP); + new HFileBlock.FSReaderImpl(is, totalSize, meta, alloc); long curOffset = 0; for (int i = 0; i < NUM_TEST_BLOCKS; ++i) { if (!pread) { @@ -569,7 +623,7 @@ public class TestHFileBlock { if (detailedLogging) { LOG.info("Reading block #" + i + " at offset " + curOffset); } - HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false); + HFileBlock b = hbr.readBlockData(curOffset, -1, pread, false, true); if (detailedLogging) { LOG.info("Block #" + i + ": " + b); } @@ -583,7 +637,8 @@ public class TestHFileBlock { // Now re-load this block knowing the on-disk size. This tests a // different branch in the loader. - HFileBlock b2 = hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false); + HFileBlock b2 = + hbr.readBlockData(curOffset, b.getOnDiskSizeWithHeader(), pread, false, true); b2.sanityCheck(); assertEquals(b.getBlockType(), b2.getBlockType()); @@ -599,6 +654,7 @@ public class TestHFileBlock { assertEquals(b.getOnDiskDataSizeWithHeader(), b2.getOnDiskDataSizeWithHeader()); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); + assertTrue(b2.release()); curOffset += b.getOnDiskSizeWithHeader(); @@ -606,14 +662,14 @@ public class TestHFileBlock { // NOTE: cache-on-write testing doesn't actually involve a BlockCache. It simply // verifies that the unpacked value read back off disk matches the unpacked value // generated before writing to disk. - b = b.unpack(meta, hbr); + HFileBlock newBlock = b.unpack(meta, hbr); // b's buffer has header + data + checksum while // expectedContents have header + data only - ByteBuff bufRead = b.getBufferReadOnly(); + ByteBuff bufRead = newBlock.getBufferReadOnly(); ByteBuffer bufExpected = expectedContents.get(i); boolean bytesAreCorrect = Bytes.compareTo(bufRead.array(), bufRead.arrayOffset(), - bufRead.limit() - b.totalChecksumBytes(), + bufRead.limit() - newBlock.totalChecksumBytes(), bufExpected.array(), bufExpected.arrayOffset(), bufExpected.limit()) == 0; String wrongBytesMsg = ""; @@ -642,9 +698,12 @@ public class TestHFileBlock { } } assertTrue(wrongBytesMsg, bytesAreCorrect); + assertTrue(newBlock.release()); + if (newBlock != b) { + assertTrue(b.release()); + } } } - assertEquals(curOffset, fs.getFileStatus(path).getLen()); is.close(); } @@ -687,29 +746,37 @@ public class TestHFileBlock { boolean pread = true; boolean withOnDiskSize = rand.nextBoolean(); long expectedSize = - (blockId == NUM_TEST_BLOCKS - 1 ? fileSize - : offsets.get(blockId + 1)) - offset; - - HFileBlock b; + (blockId == NUM_TEST_BLOCKS - 1 ? fileSize : offsets.get(blockId + 1)) - offset; + HFileBlock b = null; try { long onDiskSizeArg = withOnDiskSize ? expectedSize : -1; - b = hbr.readBlockData(offset, onDiskSizeArg, pread, false); + b = hbr.readBlockData(offset, onDiskSizeArg, pread, false, false); + if (useHeapAllocator) { + assertTrue(b.isOnHeap()); + } else { + assertTrue(!b.getBlockType().isData() || !b.isOnHeap()); + } + assertEquals(types.get(blockId), b.getBlockType()); + assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); + assertEquals(offset, b.getOffset()); } catch (IOException ex) { - LOG.error("Error in client " + clientId + " trying to read block at " - + offset + ", pread=" + pread + ", withOnDiskSize=" + - withOnDiskSize, ex); + LOG.error("Error in client " + clientId + " trying to read block at " + offset + + ", pread=" + pread + ", withOnDiskSize=" + withOnDiskSize, + ex); return false; + } finally { + if (b != null) { + b.release(); + } } - - assertEquals(types.get(blockId), b.getBlockType()); - assertEquals(expectedSize, b.getOnDiskSizeWithHeader()); - assertEquals(offset, b.getOffset()); - ++numBlocksRead; - if (pread) + if (pread) { ++numPositionalRead; - if (withOnDiskSize) + } + + if (withOnDiskSize) { ++numWithOnDiskSize; + } } LOG.info("Client " + clientId + " successfully read " + numBlocksRead + " blocks (with pread: " + numPositionalRead + ", with onDiskSize " + @@ -717,7 +784,6 @@ public class TestHFileBlock { return true; } - } @Test @@ -742,7 +808,7 @@ public class TestHFileBlock { .withCompression(compressAlgo) .build(); HFileBlock.FSReader hbr = - new HFileBlock.FSReaderImpl(is, fileSize, meta, ByteBuffAllocator.HEAP); + new HFileBlock.FSReaderImpl(is, fileSize, meta, alloc); Executor exec = Executors.newFixedThreadPool(NUM_READER_THREADS); ExecutorCompletionService<Boolean> ecs = new ExecutorCompletionService<>(exec); @@ -761,7 +827,6 @@ public class TestHFileBlock { + ")"); } } - is.close(); } } @@ -874,9 +939,9 @@ public class TestHFileBlock { ByteBuffer buf = ByteBuffer.wrap(byteArr, 0, size); HFileContext meta = new HFileContextBuilder().build(); HFileBlock blockWithNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, 52, -1, meta, ByteBuffAllocator.HEAP); + HFileBlock.FILL_HEADER, -1, 52, -1, meta, alloc); HFileBlock blockWithoutNextBlockMetadata = new HFileBlock(BlockType.DATA, size, size, -1, buf, - HFileBlock.FILL_HEADER, -1, -1, -1, meta, ByteBuffAllocator.HEAP); + HFileBlock.FILL_HEADER, -1, -1, -1, meta, alloc); ByteBuffer buff1 = ByteBuffer.allocate(length); ByteBuffer buff2 = ByteBuffer.allocate(length); blockWithNextBlockMetadata.serialize(buff1, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java index 73f1c24..6f8d0b0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlockIndex.java @@ -192,7 +192,7 @@ public class TestHFileBlockIndex { } missCount += 1; - prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false); + prevBlock = realReader.readBlockData(offset, onDiskSize, pread, false, true); prevOffset = offset; prevOnDiskSize = onDiskSize; prevPread = pread; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java index 1222d07..508b1fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileEncryption.java @@ -109,7 +109,7 @@ public class TestHFileEncryption { private long readAndVerifyBlock(long pos, HFileContext ctx, HFileBlock.FSReaderImpl hbr, int size) throws IOException { - HFileBlock b = hbr.readBlockData(pos, -1, false, false); + HFileBlock b = hbr.readBlockData(pos, -1, false, false, true); assertEquals(0, HFile.getAndResetChecksumFailuresCount()); b.sanityCheck(); assertFalse(b.isUnpacked()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java index b92f7c6..f8da706 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileWriterV3.java @@ -224,8 +224,8 @@ public class TestHFileWriterV3 { fsdis.seek(0); long curBlockPos = 0; while (curBlockPos <= trailer.getLastDataBlockOffset()) { - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false) - .unpack(context, blockReader); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true) + .unpack(context, blockReader); assertEquals(BlockType.DATA, block.getBlockType()); ByteBuff buf = block.getBufferWithoutHeader(); int keyLen = -1; @@ -285,8 +285,8 @@ public class TestHFileWriterV3 { while (fsdis.getPos() < trailer.getLoadOnOpenDataOffset()) { LOG.info("Current offset: " + fsdis.getPos() + ", scanning until " + trailer.getLoadOnOpenDataOffset()); - HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false) - .unpack(context, blockReader); + HFileBlock block = blockReader.readBlockData(curBlockPos, -1, false, false, true) + .unpack(context, blockReader); assertEquals(BlockType.META, block.getBlockType()); Text t = new Text(); ByteBuff buf = block.getBufferWithoutHeader(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java index 5935f91..f1a12a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestLazyDataBlockDecompression.java @@ -160,7 +160,7 @@ public class TestLazyDataBlockDecompression { CacheConfig cc = new CacheConfig(lazyCompressDisabled, new LruBlockCache(maxSize, HConstants.DEFAULT_BLOCKSIZE, false, lazyCompressDisabled)); assertFalse(cc.shouldCacheDataCompressed()); - assertTrue(cc.getBlockCache().get() instanceof LruBlockCache); + assertFalse(cc.isCombinedBlockCache()); LruBlockCache disabledBlockCache = (LruBlockCache) cc.getBlockCache().get(); LOG.info("disabledBlockCache=" + disabledBlockCache); assertEquals("test inconsistency detected.", maxSize, disabledBlockCache.getMaxSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java index 4e7291d..746cf8d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketWriterThread.java @@ -59,10 +59,10 @@ public class TestBucketWriterThread { private static class MockBucketCache extends BucketCache { public MockBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, - int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) - throws FileNotFoundException, IOException { + int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration) + throws IOException { super(ioEngineName, capacity, blockSize, bucketSizes, writerThreadNum, writerQLen, - persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); + persistencePath, ioErrorsTolerationDuration, HBaseConfiguration.create()); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 3aef976..af1f151 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -192,9 +191,7 @@ public abstract class AbstractTestDLS { Path rootdir = FSUtils.getRootDir(conf); int numRegions = 50; - try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); - Table t = installTable(zkw, numRegions)) { - TableName table = t.getName(); + try (Table t = installTable(numRegions)) { List<RegionInfo> regions = null; HRegionServer hrs = null; for (int i = 0; i < NUM_RS; i++) { @@ -224,7 +221,6 @@ public abstract class AbstractTestDLS { int count = 0; for (RegionInfo hri : regions) { - Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") Path editsdir = WALSplitter .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, @@ -266,8 +262,7 @@ public abstract class AbstractTestDLS { // they will consume recovered.edits master.balanceSwitch(false); - try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); - Table ht = installTable(zkw, numRegionsToCreate)) { + try (Table ht = installTable(numRegionsToCreate)) { HRegionServer hrs = findRSToKill(false); List<RegionInfo> regions = ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()); makeWAL(hrs, regions, numLogLines, 100); @@ -329,8 +324,7 @@ public abstract class AbstractTestDLS { final Path logDir = new Path(rootdir, AbstractFSWALProvider.getWALDirectoryName(hrs.getServerName().toString())); - try (ZKWatcher zkw = new ZKWatcher(conf, "table-creation", null); - Table t = installTable(zkw, 40)) { + try (Table t = installTable(40)) { makeWAL(hrs, ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices()), numLogLines, 100); new Thread() { @@ -380,8 +374,7 @@ public abstract class AbstractTestDLS { startCluster(NUM_RS); // NUM_RS=6. - try (ZKWatcher zkw = new ZKWatcher(conf, "distributed log splitting test", null); - Table table = installTable(zkw, numRegionsToCreate)) { + try (Table table = installTable(numRegionsToCreate)) { populateDataInTable(numRowsPerRegion); List<RegionServerThread> rsts = cluster.getLiveRegionServerThreads(); @@ -482,11 +475,11 @@ public abstract class AbstractTestDLS { } } - private Table installTable(ZKWatcher zkw, int nrs) throws Exception { - return installTable(zkw, nrs, 0); + private Table installTable(int nrs) throws Exception { + return installTable(nrs, 0); } - private Table installTable(ZKWatcher zkw, int nrs, int existingRegions) throws Exception { + private Table installTable(int nrs, int existingRegions) throws Exception { // Create a table with regions byte[] family = Bytes.toBytes("family"); LOG.info("Creating table with " + nrs + " regions"); @@ -497,14 +490,14 @@ public abstract class AbstractTestDLS { } assertEquals(nrs, numRegions); LOG.info("Waiting for no more RIT\n"); - blockUntilNoRIT(zkw, master); + blockUntilNoRIT(); // disable-enable cycle to get rid of table's dead regions left behind // by createMultiRegions assertTrue(TEST_UTIL.getAdmin().isTableEnabled(tableName)); LOG.debug("Disabling table\n"); TEST_UTIL.getAdmin().disableTable(tableName); LOG.debug("Waiting for no more RIT\n"); - blockUntilNoRIT(zkw, master); + blockUntilNoRIT(); NavigableSet<String> regions = HBaseTestingUtility.getAllOnlineRegions(cluster); LOG.debug("Verifying only catalog region is assigned\n"); if (regions.size() != 1) { @@ -515,7 +508,7 @@ public abstract class AbstractTestDLS { LOG.debug("Enabling table\n"); TEST_UTIL.getAdmin().enableTable(tableName); LOG.debug("Waiting for no more RIT\n"); - blockUntilNoRIT(zkw, master); + blockUntilNoRIT(); LOG.debug("Verifying there are " + numRegions + " assigned on cluster\n"); regions = HBaseTestingUtility.getAllOnlineRegions(cluster); assertEquals(numRegions + 1 + existingRegions, regions.size()); @@ -651,7 +644,7 @@ public abstract class AbstractTestDLS { return count; } - private void blockUntilNoRIT(ZKWatcher zkw, HMaster master) throws Exception { + private void blockUntilNoRIT() throws Exception { TEST_UTIL.waitUntilNoRegionsInTransition(60000); }