Repository: hbase Updated Branches: refs/heads/branch-2 d7a74a75a -> 68ec2a9da
HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/68ec2a9d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/68ec2a9d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/68ec2a9d Branch: refs/heads/branch-2 Commit: 68ec2a9da022f7824e9a45ef89a0c4b8bcb838f3 Parents: d7a74a7 Author: anastas <anas...@yahoo-inc.com> Authored: Wed Sep 6 18:48:53 2017 +0300 Committer: anastas <anas...@yahoo-inc.com> Committed: Wed Sep 6 18:48:53 2017 +0300 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/ChunkCreator.java | 127 ++++++++++++------- .../hbase/regionserver/CompactingMemStore.java | 24 +++- .../hbase/regionserver/CompactionPipeline.java | 4 +- .../hbase/regionserver/MemStoreLABImpl.java | 10 ++ .../hbase/regionserver/TestMemStoreLAB.java | 12 +- .../TestMemstoreLABWithoutPool.java | 3 +- 6 files changed, 124 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 38d7136..61cf2b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -50,10 +50,9 @@ public class ChunkCreator { // monotonically increasing chunkid private AtomicInteger chunkID = new AtomicInteger(1); // maps the chunk against the monotonically increasing chunk id. We need to preserve the - // natural ordering of the key - // CellChunkMap creation should convert the soft ref to hard reference - private Map<Integer, SoftReference<Chunk>> chunkIdMap = - new ConcurrentHashMap<Integer, SoftReference<Chunk>>(); + // natural ordering of the key. It also helps to protect from GC. + private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>(); + private final int chunkSize; private final boolean offheap; @VisibleForTesting @@ -75,7 +74,7 @@ public class ChunkCreator { } /** - * Initializes the instance of MSLABChunkCreator + * Initializes the instance of ChunkCreator * @param chunkSize the chunkSize * @param offheap indicates if the chunk is to be created offheap or not * @param globalMemStoreSize the global memstore size @@ -100,10 +99,19 @@ public class ChunkCreator { } /** - * Creates and inits a chunk. + * Creates and inits a chunk. The default implementation. * @return the chunk that was initialized */ Chunk getChunk() { + return getChunk(CompactingMemStore.IndexType.ARRAY_MAP); + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index + */ + Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) { Chunk chunk = null; if (pool != null) { // the pool creates the chunk internally. The chunk#init() call happens here @@ -117,44 +125,49 @@ public class ChunkCreator { } } if (chunk == null) { - chunk = createChunk(); + // the second boolean parameter means: + // if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap + chunk = createChunk(false, chunkIndexType); } - // put this chunk into the chunkIdMap - this.chunkIdMap.put(chunk.getId(), new SoftReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, // else only the offset is set to the beginning of the chunk to accept allocations chunk.init(); return chunk; } - private Chunk createChunk() { - return createChunk(false); + private Chunk createChunkForPool() { + return createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP); } /** * Creates the chunk either onheap or offheap * @param pool indicates if the chunks have to be created which will be used by the Pool + * @param chunkIndexType * @return the chunk */ - private Chunk createChunk(boolean pool) { + private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) { + Chunk chunk = null; int id = chunkID.getAndIncrement(); assert id > 0; // do not create offheap chunk on demand if (pool && this.offheap) { - return new OffheapChunk(chunkSize, id, pool); + chunk = new OffheapChunk(chunkSize, id, pool); } else { - return new OnheapChunk(chunkSize, id, pool); + chunk = new OnheapChunk(chunkSize, id, pool); } + if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) { + // put the pool chunk into the chunkIdMap so it is not GC-ed + this.chunkIdMap.put(chunk.getId(), chunk); + } + return chunk; } @VisibleForTesting - // TODO : To be used by CellChunkMap + // Used to translate the ChunkID into a chunk ref Chunk getChunk(int id) { - SoftReference<Chunk> ref = chunkIdMap.get(id); - if (ref != null) { - return ref.get(); - } - return null; + // can return null if chunk was never mapped + return chunkIdMap.get(id); } int getChunkSize() { @@ -170,15 +183,13 @@ public class ChunkCreator { } Chunk removeChunk(int chunkId) { - SoftReference<Chunk> ref = this.chunkIdMap.remove(chunkId); - if (ref != null) { - return ref.get(); - } - return null; + return this.chunkIdMap.remove(chunkId); } @VisibleForTesting - int size() { + // the chunks in the chunkIdMap may already be released so we shouldn't relay + // on this counting for strong correctness. This method is used only in testing. + int numberOfMappedChunks() { return this.chunkIdMap.size(); } @@ -213,7 +224,8 @@ public class ChunkCreator { this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue<>(); for (int i = 0; i < initialCount; i++) { - Chunk chunk = createChunk(true); + // Chunks from pool are covered with strong references anyway + Chunk chunk = createChunkForPool(); chunk.init(); reclaimedChunks.add(chunk); } @@ -232,7 +244,7 @@ public class ChunkCreator { * then. * Note: Chunks returned by this pool must be put back to the pool after its use. * @return a chunk - * @see #putbackChunks(Set) + * @see #putbackChunks(Chunk) */ Chunk getChunk() { Chunk chunk = reclaimedChunks.poll(); @@ -245,7 +257,7 @@ public class ChunkCreator { long created = this.chunkCount.get(); if (created < this.maxCount) { if (this.chunkCount.compareAndSet(created, created + 1)) { - chunk = createChunk(true); + chunk = createChunkForPool(); break; } } else { @@ -259,21 +271,16 @@ public class ChunkCreator { /** * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining * chunks - * @param chunks + * @param c */ - private void putbackChunks(Set<Integer> chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Iterator<Integer> iterator = chunks.iterator(); - while (iterator.hasNext()) { - Integer chunkId = iterator.next(); - // remove the chunks every time though they are from the pool or not - Chunk chunk = ChunkCreator.this.removeChunk(chunkId); - if (chunk != null) { - if (chunk.isFromPool() && toAdd > 0) { - reclaimedChunks.add(chunk); - } - toAdd--; - } + private void putbackChunks(Chunk c) { + int toAdd = this.maxCount - reclaimedChunks.size(); + if (c.isFromPool() && toAdd > 0) { + reclaimedChunks.add(c); + } else { + // remove the chunk (that is not going to pool) + // though it is initially from the pool or not + ChunkCreator.this.removeChunk(c.getId()); } } @@ -384,6 +391,20 @@ public class ChunkCreator { return 0; } + @VisibleForTesting + boolean isChunkInPool(int chunkId) { + if (pool != null) { + // chunks that are from pool will return true chunk reference not null + Chunk c = getChunk(chunkId); + if (c==null) { + return false; + } + return pool.reclaimedChunks.contains(c); + } + + return false; + } + /* * Only used in testing */ @@ -395,10 +416,24 @@ public class ChunkCreator { } synchronized void putbackChunks(Set<Integer> chunks) { - if (pool != null) { - pool.putbackChunks(chunks); - } else { + // if there is no pool just try to clear the chunkIdMap in case there is something + if ( pool == null ) { this.removeChunks(chunks); + return; } + + // if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not + for (int chunkID : chunks) { + // translate chunk ID to chunk, if chunk initially wasn't in pool + // this translation will (most likely) return null + Chunk chunk = ChunkCreator.this.getChunk(chunkID); + if (chunk != null) { + pool.putbackChunks(chunk); + } + // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also), + // so we have nothing to do on its release + } + return; } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java index 4de78ca..d554d85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactingMemStore.java @@ -57,6 +57,12 @@ public class CompactingMemStore extends AbstractMemStore { "hbase.hregion.compacting.memstore.type"; public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT = String.valueOf(MemoryCompactionPolicy.BASIC); + // The external setting of the compacting MemStore behaviour + public static final String COMPACTING_MEMSTORE_INDEX_KEY = + "hbase.hregion.compacting.memstore.index"; + // usage of CellArrayMap is default, later it will be decided how to use CellChunkMap + public static final String COMPACTING_MEMSTORE_INDEX_DEFAULT = + String.valueOf(IndexType.ARRAY_MAP); // Default fraction of in-memory-flush size w.r.t. flush-to-disk size public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY = "hbase.memstore.inmemoryflush.threshold.factor"; @@ -78,10 +84,22 @@ public class CompactingMemStore extends AbstractMemStore { private final AtomicBoolean allowCompaction = new AtomicBoolean(true); private boolean compositeSnapshot = true; + /** + * Types of indexes (part of immutable segments) to be used after flattening, + * compaction, or merge are applied. + */ + public enum IndexType { + CSLM_MAP, // ConcurrentSkipLisMap + ARRAY_MAP, // CellArrayMap + CHUNK_MAP // CellChunkMap + } + + private IndexType indexType = IndexType.ARRAY_MAP; // default implementation public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD - + 6 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, - // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction + + 7 * ClassSize.REFERENCE // Store, RegionServicesForStores, CompactionPipeline, + // MemStoreCompactor, inMemoryFlushInProgress, allowCompaction, + // indexType + Bytes.SIZEOF_LONG // inmemoryFlushSize + 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay + 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryFlushInProgress and allowCompaction @@ -96,6 +114,8 @@ public class CompactingMemStore extends AbstractMemStore { this.pipeline = new CompactionPipeline(getRegionServices()); this.compactor = createMemStoreCompactor(compactionPolicy); initInmemoryFlushSize(conf); + indexType = IndexType.valueOf(conf.get(CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT)); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 06e83a3..97ea568 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -256,6 +256,8 @@ public class CompactionPipeline { private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { + pipeline.removeAll(suffix); + if(segment != null) pipeline.addLast(segment); // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -267,8 +269,6 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); } public Segment getTail() { http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index ba53348..112f69e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -269,4 +269,14 @@ public class MemStoreLABImpl implements MemStoreLAB { } return pooledChunks; } + + @VisibleForTesting Integer getNumOfChunksReturnedToPool() { + int i = 0; + for (Integer id : this.chunks) { + if (chunkCreator.isChunkInPool(id)) { + i++; + } + } + return i; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index f171dd0..06b9c40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -247,14 +247,16 @@ public class TestMemStoreLAB { } } // none of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() != 0); + int pooledChunksNum = mslab.getPooledChunks().size(); // close the mslab mslab.close(); - // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getPooledChunks().size(); + // make sure all chunks where reclaimed back to pool + int queueLength = mslab.getNumOfChunksReturnedToPool(); assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, - queueLength == 0); + + " after mslab closed but actually: " + (pooledChunksNum-queueLength), + pooledChunksNum-queueLength == 0); } finally { ChunkCreator.INSTANCE = oldInstance; } http://git-wip-us.apache.org/repos/asf/hbase/blob/68ec2a9d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java index 96be8ec..d3f9bc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool { mslab[i].close(); } // all of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() == 0); } private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,