This is an automated email from the ASF dual-hosted git repository. openinx pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6a3aa789d8e94a0304966e72d25ab89598d14eda Author: huzheng <[email protected]> AuthorDate: Mon Jun 24 12:09:47 2019 +0800 HBASE-22612 Address the final overview reviewing comments of HBASE-21879 --- .../apache/hadoop/hbase/io/ByteBuffAllocator.java | 4 +-- .../hbase/io/ByteBufferListOutputStream.java | 2 +- .../apache/hadoop/hbase/io/hfile/HFileContext.java | 4 +-- .../java/org/apache/hadoop/hbase/nio/RefCnt.java | 6 ++--- .../hadoop/hbase/io/hfile/HFileReaderImpl.java | 15 ++++++----- .../hadoop/hbase/io/hfile/LruBlockCache.java | 10 ++++++-- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 13 ++++++++-- .../hadoop/hbase/io/hfile/CacheTestUtils.java | 30 +++++++++++----------- .../hbase/io/hfile/bucket/TestBucketCache.java | 2 +- .../hbase/mob/TestMobWithByteBuffAllocator.java | 4 ++- 10 files changed, 55 insertions(+), 35 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java index 9991e79..7400b04 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBuffAllocator.java @@ -26,8 +26,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; -import sun.nio.ch.DirectBuffer; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.nio.ByteBuff; @@ -38,6 +36,8 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import sun.nio.ch.DirectBuffer; + /** * ByteBuffAllocator is used for allocating/freeing the ByteBuffers from/to NIO ByteBuffer pool, and * it provide high-level interfaces for upstream. when allocating desired memory size, it will diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java index e8bd322..200c9b3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/ByteBufferListOutputStream.java @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; public class ByteBufferListOutputStream extends ByteBufferOutputStream { private static final Logger LOG = LoggerFactory.getLogger(ByteBufferListOutputStream.class); - private ByteBuffAllocator allocator; + private final ByteBuffAllocator allocator; // Keep track of the BBs where bytes written to. We will first try to get a BB from the pool. If // it is not available will make a new one our own and keep writing to that. We keep track of all // the BBs that we got from pool, separately so that on closeAndPutbackBuffers, we can make sure diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java index 6074f10..65649f4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileContext.java @@ -193,8 +193,8 @@ public class HFileContext implements HeapSize, Cloneable { } /** - * HeapSize implementation. NOTE : The heapsize should be altered as and when new state variable - * are added + * HeapSize implementation. NOTE : The heap size should be altered when new state variable are + * added. * @return heap size of the HFileContext */ @Override diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java index 91c6ee7..018c8b4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/nio/RefCnt.java @@ -26,7 +26,7 @@ import org.apache.hbase.thirdparty.io.netty.util.ReferenceCounted; /** * Maintain an reference count integer inside to track life cycle of {@link ByteBuff}, if the - * reference count become 0, it'll call {@link Recycler#free()} once. + * reference count become 0, it'll call {@link Recycler#free()} exactly once. */ @InterfaceAudience.Private public class RefCnt extends AbstractReferenceCounted { @@ -36,8 +36,8 @@ public class RefCnt extends AbstractReferenceCounted { /** * Create an {@link RefCnt} with an initial reference count = 1. If the reference count become * zero, the recycler will do nothing. Usually, an Heap {@link ByteBuff} will use this kind of - * refCnt to track its life cycle, it help to abstract the code path although it's meaningless to - * use an refCnt for heap ByteBuff. + * refCnt to track its life cycle, it help to abstract the code path although it's not really + * needed to track on heap ByteBuff. */ public static RefCnt create() { return new RefCnt(ByteBuffAllocator.NONE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index 9cef9c0..c626426 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -920,7 +920,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { } if (block.getOffset() < 0) { releaseIfNotCurBlock(block); - throw new IOException("Invalid block file offset: " + block + ", path=" + reader.getPath()); + throw new IOException( + "Invalid block file offset: " + block + ", path=" + reader.getPath()); } // We are reading the next block without block type validation, because // it might turn out to be a non-data block. @@ -1131,7 +1132,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { isCompaction, true, BlockType.DATA, getEffectiveDataBlockEncoding()); if (newBlock.getOffset() < 0) { releaseIfNotCurBlock(newBlock); - throw new IOException("Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath()); + throw new IOException( + "Invalid block offset: " + newBlock.getOffset() + ", path=" + reader.getPath()); } updateCurrentBlock(newBlock); } @@ -1339,7 +1341,8 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { // schema definition change. LOG.info("Evicting cached block with key " + cacheKey + " because of a data block encoding mismatch" + "; expected: " - + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" + path); + + expectedDataBlockEncoding + ", actual: " + actualDataBlockEncoding + ", path=" + + path); // This is an error scenario. so here we need to release the block. cachedBlock.release(); cache.evictBlock(cacheKey); @@ -1662,9 +1665,9 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { short dataBlockEncoderId = newBlock.getDataBlockEncodingId(); if (!DataBlockEncoding.isCorrectEncoder(dataBlockEncoder, dataBlockEncoderId)) { String encoderCls = dataBlockEncoder.getClass().getName(); - throw new CorruptHFileException( - "Encoder " + encoderCls + " doesn't support data block encoding " - + DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath()); + throw new CorruptHFileException("Encoder " + encoderCls + + " doesn't support data block encoding " + + DataBlockEncoding.getNameFromId(dataBlockEncoderId) + ",path=" + reader.getPath()); } updateCurrBlockRef(newBlock); ByteBuff encodedBuffer = getEncodedBuffer(newBlock); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java index 7740460..79ec0a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/LruBlockCache.java @@ -153,8 +153,14 @@ public class LruBlockCache implements FirstLevelBlockCache { private static final String LRU_MAX_BLOCK_SIZE = "hbase.lru.max.block.size"; private static final long DEFAULT_MAX_BLOCK_SIZE = 16L * 1024L * 1024L; - /** Concurrent map (the cache) */ - private transient final Map<BlockCacheKey, LruCachedBlock> map; + /** + * Defined the cache map as {@link ConcurrentHashMap} here, because in + * {@link LruBlockCache#getBlock}, we need to guarantee the atomicity of map#computeIfPresent + * (key, func). Besides, the func method must execute exactly once only when the key is present + * and under the lock context, otherwise the reference count will be messed up. Notice that the + * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. + */ + private transient final ConcurrentHashMap<BlockCacheKey, LruCachedBlock> map; /** Eviction lock (locked when eviction in process) */ private transient final ReentrantLock evictionLock = new ReentrantLock(true); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 799f23c..e6fd742 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -140,7 +140,7 @@ public class BucketCache implements BlockCache, HeapSize { transient final RAMCache ramCache; // In this map, store the block's meta data like offset, length @VisibleForTesting - transient ConcurrentMap<BlockCacheKey, BucketEntry> backingMap; + transient ConcurrentHashMap<BlockCacheKey, BucketEntry> backingMap; /** * Flag if the cache is enabled or not... We shut it off if there are IO @@ -1524,7 +1524,16 @@ public class BucketCache implements BlockCache, HeapSize { * Wrapped the delegate ConcurrentMap with maintaining its block's reference count. */ static class RAMCache { - final ConcurrentMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); + /** + * Defined the map as {@link ConcurrentHashMap} explicitly here, because in + * {@link RAMCache#get(BlockCacheKey)} and + * {@link RAMCache#putIfAbsent(BlockCacheKey, RAMQueueEntry)} , we need to guarantee the + * atomicity of map#computeIfPresent(key, func) and map#putIfAbsent(key, func). Besides, the + * func method can execute exactly once only when the key is present(or absent) and under the + * lock context. Otherwise, the reference count of block will be messed up. Notice that the + * {@link java.util.concurrent.ConcurrentSkipListMap} can not guarantee that. + */ + final ConcurrentHashMap<BlockCacheKey, RAMQueueEntry> delegate = new ConcurrentHashMap<>(); public boolean containsKey(BlockCacheKey key) { return delegate.containsKey(key); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index a7bb8e6..f69de00 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -223,22 +223,22 @@ public class CacheTestUtils { public static class ByteArrayCacheable implements Cacheable { - static final CacheableDeserializer<Cacheable> blockDeserializer = - new CacheableDeserializer<Cacheable>() { - @Override - public int getDeserializerIdentifier() { - return deserializerIdentifier; - } + private static final CacheableDeserializer<Cacheable> blockDeserializer = + new CacheableDeserializer<Cacheable>() { + @Override + public int getDeserializerIdentifier() { + return deserializerIdentifier; + } - @Override - public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { - int len = b.getInt(); - Thread.yield(); - byte buf[] = new byte[len]; - b.get(buf); - return new ByteArrayCacheable(buf); - } - }; + @Override + public Cacheable deserialize(ByteBuff b, ByteBuffAllocator alloc) throws IOException { + int len = b.getInt(); + Thread.yield(); + byte[] buf = new byte[len]; + b.get(buf); + return new ByteArrayCacheable(buf); + } + }; final byte[] buf; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 4ac7907..bbc2e53 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -51,9 +51,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.BucketSizeInfo; import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; -import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMCache; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache.RAMQueueEntry; +import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java index e84af12..83d7d09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/TestMobWithByteBuffAllocator.java @@ -101,11 +101,13 @@ public class TestMobWithByteBuffAllocator { int rows = 0; try (Table table = UTIL.getConnection().getTable(tableName)) { try (ResultScanner scanner = table.getScanner(new Scan().setReversed(true))) { - for (Result res; (res = scanner.next()) != null;) { + Result res = scanner.next(); + while (res != null) { rows++; for (Cell cell : res.listCells()) { Assert.assertTrue(CellUtil.cloneValue(cell).length > 0); } + res = scanner.next(); } } }
