Repository: hbase Updated Branches: refs/heads/master 092dc6de8 -> 75a6b3684
HBASE-18375: Fix the bug where the pool chunks from ChunkCreator are deallocated and not returned to pool, because there is no reference to them Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/75a6b368 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/75a6b368 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/75a6b368 Branch: refs/heads/master Commit: 75a6b36849c58d6a751f57226ab0c8f7884a9e87 Parents: 092dc6d Author: anastas <anas...@yahoo-inc.com> Authored: Thu Aug 17 18:23:19 2017 +0300 Committer: anastas <anas...@yahoo-inc.com> Committed: Thu Aug 17 18:23:19 2017 +0300 ---------------------------------------------------------------------- .../regionserver/CellChunkImmutableSegment.java | 5 +- .../hadoop/hbase/regionserver/ChunkCreator.java | 171 +++++++++---------- .../hbase/regionserver/CompactionPipeline.java | 19 ++- .../hbase/regionserver/MemStoreLABImpl.java | 27 ++- .../hbase/regionserver/TestMemStoreLAB.java | 12 +- .../TestMemstoreLABWithoutPool.java | 3 +- 6 files changed, 126 insertions(+), 111 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java index cdda279..3653166 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellChunkImmutableSegment.java @@ -176,10 +176,7 @@ public class CellChunkImmutableSegment extends ImmutableSegment { private int createCellReference(ByteBufferKeyValue cell, ByteBuffer idxBuffer, int idxOffset) { int offset = idxOffset; int dataChunkID = cell.getChunkId(); - // ensure strong pointer to data chunk, as index is no longer directly points to it - Chunk c = ChunkCreator.getInstance().saveChunkFromGC(dataChunkID); - // if c is null, it means that this cell chunks was already released shouldn't happen - assert (c!=null); + offset = ByteBufferUtils.putInt(idxBuffer, offset, dataChunkID); // write data chunk id offset = ByteBufferUtils.putInt(idxBuffer, offset, cell.getOffset()); // offset offset = ByteBufferUtils.putInt(idxBuffer, offset, KeyValueUtil.length(cell)); // length http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 7e5395c..e818426 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -58,21 +57,8 @@ public class ChunkCreator { // the header size need to be changed in case chunk id size is changed public static final int SIZEOF_CHUNK_HEADER = Bytes.SIZEOF_INT; - // An object pointed by a weak reference can be garbage collected, in opposite to an object - // referenced by a strong (regular) reference. Every chunk created via ChunkCreator is referenced - // from either weakChunkIdMap or strongChunkIdMap. - // Upon chunk C creation, C's ID is mapped into weak reference to C, in order not to disturb C's - // GC in case all other reference to C are going to be removed. - // When chunk C is referenced from CellChunkMap (via C's ID) it is possible to GC the chunk C. - // To avoid that upon inserting C into CellChunkMap, C's ID is mapped into strong (regular) - // reference to C. - - // map that doesn't influence GC - private Map<Integer, WeakReference<Chunk>> weakChunkIdMap = - new ConcurrentHashMap<Integer, WeakReference<Chunk>>(); - - // map that keeps chunks from garbage collection - private Map<Integer, Chunk> strongChunkIdMap = new ConcurrentHashMap<Integer, Chunk>(); + // mapping from chunk IDs to chunks + private Map<Integer, Chunk> chunkIdMap = new ConcurrentHashMap<Integer, Chunk>(); private final int chunkSize; private final boolean offheap; @@ -95,7 +81,7 @@ public class ChunkCreator { } /** - * Initializes the instance of MSLABChunkCreator + * Initializes the instance of ChunkCreator * @param chunkSize the chunkSize * @param offheap indicates if the chunk is to be created offheap or not * @param globalMemStoreSize the global memstore size @@ -120,10 +106,19 @@ public class ChunkCreator { } /** - * Creates and inits a chunk. + * Creates and inits a chunk. The default implementation. * @return the chunk that was initialized */ Chunk getChunk() { + return getChunk(CompactingMemStore.IndexType.ARRAY_MAP); + } + + /** + * Creates and inits a chunk. + * @return the chunk that was initialized + * @param chunkIndexType whether the requested chunk is going to be used with CellChunkMap index + */ + Chunk getChunk(CompactingMemStore.IndexType chunkIndexType) { Chunk chunk = null; if (pool != null) { // the pool creates the chunk internally. The chunk#init() call happens here @@ -137,68 +132,45 @@ public class ChunkCreator { } } if (chunk == null) { - chunk = createChunk(); + // the second boolean parameter means: + // if CellChunkMap index is requested, put allocated on demand chunk mapping into chunkIdMap + chunk = createChunk(false, chunkIndexType); } - // put this chunk initially into the weakChunkIdMap - this.weakChunkIdMap.put(chunk.getId(), new WeakReference<>(chunk)); + // now we need to actually do the expensive memory allocation step in case of a new chunk, // else only the offset is set to the beginning of the chunk to accept allocations chunk.init(); return chunk; } - private Chunk createChunk() { - return createChunk(false); - } - /** * Creates the chunk either onheap or offheap * @param pool indicates if the chunks have to be created which will be used by the Pool + * @param chunkIndexType * @return the chunk */ - private Chunk createChunk(boolean pool) { + private Chunk createChunk(boolean pool, CompactingMemStore.IndexType chunkIndexType) { + Chunk chunk = null; int id = chunkID.getAndIncrement(); assert id > 0; // do not create offheap chunk on demand if (pool && this.offheap) { - return new OffheapChunk(chunkSize, id, pool); + chunk = new OffheapChunk(chunkSize, id, pool); } else { - return new OnheapChunk(chunkSize, id, pool); + chunk = new OnheapChunk(chunkSize, id, pool); } + if (pool || (chunkIndexType == CompactingMemStore.IndexType.CHUNK_MAP)) { + // put the pool chunk into the chunkIdMap so it is not GC-ed + this.chunkIdMap.put(chunk.getId(), chunk); + } + return chunk; } @VisibleForTesting // Used to translate the ChunkID into a chunk ref Chunk getChunk(int id) { - WeakReference<Chunk> ref = weakChunkIdMap.get(id); - if (ref != null) { - return ref.get(); - } - // check also the strong mapping - return strongChunkIdMap.get(id); - } - - // transfer the weak pointer to be a strong chunk pointer - Chunk saveChunkFromGC(int chunkID) { - Chunk c = strongChunkIdMap.get(chunkID); // check whether the chunk is already protected - if (c != null) // with strong pointer - return c; - WeakReference<Chunk> ref = weakChunkIdMap.get(chunkID); - if (ref != null) { - c = ref.get(); - } - if (c != null) { - // put this strong reference to chunk into the strongChunkIdMap - // the read of the weakMap is always happening before the read of the strongMap - // so no synchronization issues here - this.strongChunkIdMap.put(chunkID, c); - this.weakChunkIdMap.remove(chunkID); - return c; - } - // we should actually never return null as someone should not ask to save from GC a chunk, - // which is already released. However, we are not asserting it here and we let the caller - // to deal with the return value an assert if needed - return null; + // can return null if chunk was never mapped + return chunkIdMap.get(id); } int getChunkSize() { @@ -210,30 +182,23 @@ public class ChunkCreator { } private void removeChunks(Set<Integer> chunkIDs) { - this.weakChunkIdMap.keySet().removeAll(chunkIDs); - this.strongChunkIdMap.keySet().removeAll(chunkIDs); + this.chunkIdMap.keySet().removeAll(chunkIDs); } Chunk removeChunk(int chunkId) { - WeakReference<Chunk> weak = this.weakChunkIdMap.remove(chunkId); - Chunk strong = this.strongChunkIdMap.remove(chunkId); - if (weak != null) { - return weak.get(); - } - return strong; + return this.chunkIdMap.remove(chunkId); } @VisibleForTesting - // the chunks in the weakChunkIdMap may already be released so we shouldn't relay + // the chunks in the chunkIdMap may already be released so we shouldn't relay // on this counting for strong correctness. This method is used only in testing. - int size() { - return this.weakChunkIdMap.size()+this.strongChunkIdMap.size(); + int numberOfMappedChunks() { + return this.chunkIdMap.size(); } @VisibleForTesting void clearChunkIds() { - this.strongChunkIdMap.clear(); - this.weakChunkIdMap.clear(); + this.chunkIdMap.clear(); } /** @@ -262,7 +227,9 @@ public class ChunkCreator { this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue<>(); for (int i = 0; i < initialCount; i++) { - Chunk chunk = createChunk(true); + // Chunks from pool are covered with strong references anyway + // TODO: change to CHUNK_MAP if it is generally defined + Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP); chunk.init(); reclaimedChunks.add(chunk); } @@ -281,7 +248,7 @@ public class ChunkCreator { * then. * Note: Chunks returned by this pool must be put back to the pool after its use. * @return a chunk - * @see #putbackChunks(Set) + * @see #putbackChunks(Chunk) */ Chunk getChunk() { Chunk chunk = reclaimedChunks.poll(); @@ -294,7 +261,8 @@ public class ChunkCreator { long created = this.chunkCount.get(); if (created < this.maxCount) { if (this.chunkCount.compareAndSet(created, created + 1)) { - chunk = createChunk(true); + // TODO: change to CHUNK_MAP if it is generally defined + chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP); break; } } else { @@ -308,21 +276,16 @@ public class ChunkCreator { /** * Add the chunks to the pool, when the pool achieves the max size, it will skip the remaining * chunks - * @param chunks + * @param c */ - private void putbackChunks(Set<Integer> chunks) { - int toAdd = Math.min(chunks.size(), this.maxCount - reclaimedChunks.size()); - Iterator<Integer> iterator = chunks.iterator(); - while (iterator.hasNext()) { - Integer chunkId = iterator.next(); - // remove the chunks every time though they are from the pool or not - Chunk chunk = ChunkCreator.this.removeChunk(chunkId); - if (chunk != null) { - if (chunk.isFromPool() && toAdd > 0) { - reclaimedChunks.add(chunk); - } - toAdd--; - } + private void putbackChunks(Chunk c) { + int toAdd = this.maxCount - reclaimedChunks.size(); + if (c.isFromPool() && toAdd > 0) { + reclaimedChunks.add(c); + } else { + // remove the chunk (that is not going to pool) + // though it is initially from the pool or not + ChunkCreator.this.removeChunk(c.getId()); } } @@ -433,6 +396,20 @@ public class ChunkCreator { return 0; } + @VisibleForTesting + boolean isChunkInPool(int chunkId) { + if (pool != null) { + // chunks that are from pool will return true chunk reference not null + Chunk c = getChunk(chunkId); + if (c==null) { + return false; + } + return pool.reclaimedChunks.contains(c); + } + + return false; + } + /* * Only used in testing */ @@ -444,10 +421,24 @@ public class ChunkCreator { } synchronized void putbackChunks(Set<Integer> chunks) { - if (pool != null) { - pool.putbackChunks(chunks); - } else { + // if there is no pool just try to clear the chunkIdMap in case there is something + if ( pool == null ) { this.removeChunks(chunks); + return; } + + // if there is pool, go over all chunk IDs that came back, the chunks may be from pool or not + for (int chunkID : chunks) { + // translate chunk ID to chunk, if chunk initially wasn't in pool + // this translation will (most likely) return null + Chunk chunk = ChunkCreator.this.getChunk(chunkID); + if (chunk != null) { + pool.putbackChunks(chunk); + } + // if chunk is null, it was never covered by the chunkIdMap (and so wasn't in pool also), + // so we have nothing to do on its release + } + return; } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java index 5136f24..f281392 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionPipeline.java @@ -43,10 +43,10 @@ import org.apache.hadoop.hbase.util.ClassSize; * suffix of the pipeline. * * The synchronization model is copy-on-write. Methods which change the structure of the - * pipeline (pushHead() and swap()) apply their changes in the context of a lock. They also make - * a read-only copy of the pipeline's list. Read methods read from a read-only copy. If a read - * method accesses the read-only copy more than once it makes a local copy of it - * to ensure it accesses the same copy. + * pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes in the context of a + * lock. They also make a read-only copy of the pipeline's list. Read methods read from a + * read-only copy. If a read method accesses the read-only copy more than once it makes a local + * copy of it to ensure it accesses the same copy. * * The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also * protected by a lock since they need to have a consistent (atomic) view of the pipeline list @@ -261,6 +261,8 @@ public class CompactionPipeline { private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment, boolean closeSegmentsInSuffix) { + pipeline.removeAll(suffix); + if(segment != null) pipeline.addLast(segment); // During index merge we won't be closing the segments undergoing the merge. Segment#close() // will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy // from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data @@ -272,15 +274,20 @@ public class CompactionPipeline { itemInSuffix.close(); } } - pipeline.removeAll(suffix); - if(segment != null) pipeline.addLast(segment); } // replacing one segment in the pipeline with a new one exactly at the same index // need to be called only within synchronized block + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT", + justification="replaceAtIndex is invoked under a synchronize block so safe") private void replaceAtIndex(int idx, ImmutableSegment newSegment) { pipeline.set(idx, newSegment); readOnlyCopy = new LinkedList<>(pipeline); + // the version increment is indeed needed, because the swap uses removeAll() method of the + // linked-list that compares the objects to find what to remove. + // The flattening changes the segment object completely (creation pattern) and so + // swap will not proceed correctly after concurrent flattening. + version++; } public Segment getTail() { http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java index 85e2abe..2ae665e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLABImpl.java @@ -78,6 +78,7 @@ public class MemStoreLABImpl implements MemStoreLAB { private final int chunkSize; private final int maxAlloc; private final ChunkCreator chunkCreator; + private final CompactingMemStore.IndexType idxType; // what index is used for corresponding segment // This flag is for closing this instance, its set when clearing snapshot of // memstore @@ -100,6 +101,9 @@ public class MemStoreLABImpl implements MemStoreLAB { // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument(maxAlloc <= chunkSize, MAX_ALLOC_KEY + " must be less than " + CHUNK_SIZE_KEY); + idxType = CompactingMemStore.IndexType.valueOf(conf.get( + CompactingMemStore.COMPACTING_MEMSTORE_INDEX_KEY, + CompactingMemStore.COMPACTING_MEMSTORE_INDEX_DEFAULT)); } @Override @@ -239,7 +243,7 @@ public class MemStoreLABImpl implements MemStoreLAB { if (c != null) { return c; } - c = this.chunkCreator.getChunk(); + c = this.chunkCreator.getChunk(idxType); if (c != null) { // set the curChunk. No need of CAS as only one thread will be here curChunk.set(c); @@ -253,12 +257,15 @@ public class MemStoreLABImpl implements MemStoreLAB { return null; } - // Returning a new chunk, without replacing current chunk, - // meaning MSLABImpl does not make the returned chunk as CurChunk. - // The space on this chunk will be allocated externally - // The interface is only for external callers + /* Creating chunk to be used as index chunk in CellChunkMap, part of the chunks array. + ** Returning a new chunk, without replacing current chunk, + ** meaning MSLABImpl does not make the returned chunk as CurChunk. + ** The space on this chunk will be allocated externally. + ** The interface is only for external callers + */ @Override public Chunk getNewExternalChunk() { + // the new chunk is going to be part of the chunk array and will always be referenced Chunk c = this.chunkCreator.getChunk(); chunks.add(c.getId()); return c; @@ -280,4 +287,14 @@ public class MemStoreLABImpl implements MemStoreLAB { } return pooledChunks; } + + @VisibleForTesting Integer getNumOfChunksReturnedToPool() { + int i = 0; + for (Integer id : this.chunks) { + if (chunkCreator.isChunkInPool(id)) { + i++; + } + } + return i; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java index f171dd0..06b9c40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreLAB.java @@ -247,14 +247,16 @@ public class TestMemStoreLAB { } } // none of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() != 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() != 0); + int pooledChunksNum = mslab.getPooledChunks().size(); // close the mslab mslab.close(); - // make sure all chunks reclaimed or removed from chunk queue - int queueLength = mslab.getPooledChunks().size(); + // make sure all chunks where reclaimed back to pool + int queueLength = mslab.getNumOfChunksReturnedToPool(); assertTrue("All chunks in chunk queue should be reclaimed or removed" - + " after mslab closed but actually: " + queueLength, - queueLength == 0); + + " after mslab closed but actually: " + (pooledChunksNum-queueLength), + pooledChunksNum-queueLength == 0); } finally { ChunkCreator.INSTANCE = oldInstance; } http://git-wip-us.apache.org/repos/asf/hbase/blob/75a6b368/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java index 96be8ec..d3f9bc1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemstoreLABWithoutPool.java @@ -140,7 +140,8 @@ public class TestMemstoreLABWithoutPool { mslab[i].close(); } // all of the chunkIds would have been returned back - assertTrue("All the chunks must have been cleared", ChunkCreator.INSTANCE.size() == 0); + assertTrue("All the chunks must have been cleared", + ChunkCreator.INSTANCE.numberOfMappedChunks() == 0); } private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,