Author: zjushch Date: Wed Mar 27 03:17:41 2013 New Revision: 1461398 URL: http://svn.apache.org/r1461398 Log: HBASE-8163 MemStoreChunkPool: An improvement for JAVA GC when using MSLAB
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1461398&r1=1461397&r2=1461398&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Wed Mar 27 03:17:41 2013 @@ -93,7 +93,9 @@ public class MemStore implements HeapSiz TimeRangeTracker timeRangeTracker; TimeRangeTracker snapshotTimeRangeTracker; - MemStoreLAB allocator; + MemStoreChunkPool chunkPool; + volatile MemStoreLAB allocator; + volatile MemStoreLAB snapshotAllocator; @@ -121,9 +123,11 @@ public class MemStore implements HeapSiz snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - this.allocator = new MemStoreLAB(conf); + this.chunkPool = MemStoreChunkPool.getPool(conf); + this.allocator = new MemStoreLAB(conf, chunkPool); } else { this.allocator = null; + this.chunkPool = null; } } @@ -157,9 +161,10 @@ public class MemStore implements HeapSiz this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); + this.snapshotAllocator = this.allocator; // Reset allocator so we get a fresh buffer for the new memstore if (allocator != null) { - this.allocator = new MemStoreLAB(conf); + this.allocator = new MemStoreLAB(conf, chunkPool); } } } @@ -188,6 +193,7 @@ public class MemStore implements HeapSiz */ void clearSnapshot(final SortedSet<KeyValue> ss) throws UnexpectedException { + MemStoreLAB tmpAllocator = null; this.lock.writeLock().lock(); try { if (this.snapshot != ss) { @@ -200,9 +206,16 @@ public class MemStore implements HeapSiz this.snapshot = new KeyValueSkipListSet(this.comparator); this.snapshotTimeRangeTracker = new TimeRangeTracker(); } + if (this.snapshotAllocator != null) { + tmpAllocator = this.snapshotAllocator; + this.snapshotAllocator = null; + } } finally { this.lock.writeLock().unlock(); } + if (tmpAllocator != null) { + tmpAllocator.close(); + } } /** @@ -697,6 +710,10 @@ public class MemStore implements HeapSiz // the pre-calculated KeyValue to be returned by peek() or next() private KeyValue theNext; + // The allocator and snapshot allocator at the time of creating this scanner + volatile MemStoreLAB allocatorAtCreation; + volatile MemStoreLAB snapshotAllocatorAtCreation; + /* Some notes... @@ -723,6 +740,14 @@ public class MemStore implements HeapSiz kvsetAtCreation = kvset; snapshotAtCreation = snapshot; + if (allocator != null) { + this.allocatorAtCreation = allocator; + this.allocatorAtCreation.incScannerCount(); + } + if (snapshotAllocator != null) { + this.snapshotAllocatorAtCreation = snapshotAllocator; + this.snapshotAllocatorAtCreation.incScannerCount(); + } } private KeyValue getNext(Iterator<KeyValue> it) { @@ -885,6 +910,15 @@ public class MemStore implements HeapSiz this.kvsetIt = null; this.snapshotIt = null; + + if (allocatorAtCreation != null) { + this.allocatorAtCreation.decScannerCount(); + this.allocatorAtCreation = null; + } + if (snapshotAllocatorAtCreation != null) { + this.snapshotAllocatorAtCreation.decScannerCount(); + this.snapshotAllocatorAtCreation = null; + } this.kvsetItRow = null; this.snapshotItRow = null; @@ -907,7 +941,7 @@ public class MemStore implements HeapSiz } public final static long FIXED_OVERHEAD = ClassSize.align( - ClassSize.OBJECT + (11 * ClassSize.REFERENCE)); + ClassSize.OBJECT + (13 * ClassSize.REFERENCE)); public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java?rev=1461398&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java (added) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreChunkPool.java Wed Mar 27 03:17:41 2013 @@ -0,0 +1,219 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.lang.management.ManagementFactory; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Chunk; +import org.apache.hadoop.util.StringUtils; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * A pool of {@link MemStoreLAB$Chunk} instances. + * + * MemStoreChunkPool caches a number of retired chunks for reusing, it could + * decrease allocating bytes when writing, thereby optimizing the garbage + * collection on JVM. + * + * The pool instance is globally unique and could be obtained through + * {@link MemStoreChunkPool#getPool(Configuration)} + * + * {@link MemStoreChunkPool#getChunk()} is called when MemStoreLAB allocating + * bytes, and {@link MemStoreChunkPool#putbackChunks(BlockingQueue)} is called + * when MemStore clearing snapshot for flush + */ +@InterfaceAudience.Private +public class MemStoreChunkPool { + private static final Log LOG = LogFactory.getLog(MemStoreChunkPool.class); + final static String CHUNK_POOL_MAXSIZE_KEY = "hbase.hregion.memstore.chunkpool.maxsize"; + final static String CHUNK_POOL_INITIALSIZE_KEY = "hbase.hregion.memstore.chunkpool.initialsize"; + final static float POOL_MAX_SIZE_DEFAULT = 0.0f; + final static float POOL_INITIAL_SIZE_DEFAULT = 0.0f; + + // Static reference to the MemStoreChunkPool + private static MemStoreChunkPool globalInstance; + /** Boolean whether we have disabled the memstore chunk pool entirely. */ + static boolean chunkPoolDisabled = false; + + private final int maxCount; + + // A queue of reclaimed chunks + private final BlockingQueue<Chunk> reclaimedChunks; + private final int chunkSize; + + /** Statistics thread schedule pool */ + private final ScheduledExecutorService scheduleThreadPool; + /** Statistics thread */ + private static final int statThreadPeriod = 60 * 5; + private AtomicLong createdChunkCount = new AtomicLong(); + private AtomicLong reusedChunkCount = new AtomicLong(); + + MemStoreChunkPool(Configuration conf, int chunkSize, int maxCount, + int initialCount) { + this.maxCount = maxCount; + this.chunkSize = chunkSize; + this.reclaimedChunks = new LinkedBlockingQueue<Chunk>(); + for (int i = 0; i < initialCount; i++) { + Chunk chunk = new Chunk(chunkSize); + chunk.init(); + reclaimedChunks.add(chunk); + } + final String n = Thread.currentThread().getName(); + scheduleThreadPool = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setNameFormat(n+"-MemStoreChunkPool Statistics") + .setDaemon(true).build()); + this.scheduleThreadPool.scheduleAtFixedRate(new StatisticsThread(this), + statThreadPeriod, statThreadPeriod, TimeUnit.SECONDS); + } + + /** + * Poll a chunk from the pool, reset it if not null, else create a new chunk + * to return + * @return a chunk + */ + Chunk getChunk() { + Chunk chunk = reclaimedChunks.poll(); + if (chunk == null) { + chunk = new Chunk(chunkSize); + createdChunkCount.incrementAndGet(); + } else { + chunk.reset(); + reusedChunkCount.incrementAndGet(); + } + return chunk; + } + + /** + * Add the chunks to the pool, when the pool achieves the max size, it will + * skip the remaining chunks + * @param chunks + */ + void putbackChunks(BlockingQueue<Chunk> chunks) { + int maxNumToPutback = this.maxCount - reclaimedChunks.size(); + if (maxNumToPutback <= 0) { + return; + } + chunks.drainTo(reclaimedChunks, maxNumToPutback); + } + + /** + * Add the chunk to the pool, if the pool has achieved the max size, it will + * skip it + * @param chunk + */ + void putbackChunk(Chunk chunk) { + if (reclaimedChunks.size() >= this.maxCount) { + return; + } + reclaimedChunks.add(chunk); + } + + int getPoolSize() { + return this.reclaimedChunks.size(); + } + + /* + * Only used in testing + */ + void clearChunks() { + this.reclaimedChunks.clear(); + } + + private static class StatisticsThread extends Thread { + MemStoreChunkPool mcp; + + public StatisticsThread(MemStoreChunkPool mcp) { + super("MemStoreChunkPool.StatisticsThread"); + setDaemon(true); + this.mcp = mcp; + } + + @Override + public void run() { + mcp.logStats(); + } + } + + private void logStats() { + if (!LOG.isDebugEnabled()) return; + long created = createdChunkCount.get(); + long reused = reusedChunkCount.get(); + long total = created + reused; + LOG.debug("Stats: current pool size=" + reclaimedChunks.size() + + ",created chunk count=" + created + + ",reused chunk count=" + reused + + ",reuseRatio=" + (total == 0 ? "0" : StringUtils.formatPercent( + (float) reused / (float) total, 2))); + } + + /** + * @param conf + * @return the global MemStoreChunkPool instance + */ + static synchronized MemStoreChunkPool getPool(Configuration conf) { + if (globalInstance != null) return globalInstance; + if (chunkPoolDisabled) return null; + + + float poolSizePercentage = conf.getFloat(CHUNK_POOL_MAXSIZE_KEY, + POOL_MAX_SIZE_DEFAULT); + if (poolSizePercentage <= 0) { + chunkPoolDisabled = true; + return null; + } + if (poolSizePercentage > 1.0) { + throw new IllegalArgumentException(CHUNK_POOL_MAXSIZE_KEY + + " must be between 0.0 and 1.0"); + } + long heapMax = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() + .getMax(); + long globalMemStoreLimit = MemStoreFlusher.globalMemStoreLimit(heapMax, + MemStoreFlusher.DEFAULT_UPPER, MemStoreFlusher.UPPER_KEY, conf); + int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, + MemStoreLAB.CHUNK_SIZE_DEFAULT); + int maxCount = (int) (globalMemStoreLimit * poolSizePercentage / chunkSize); + + float initialCountPercentage = conf.getFloat(CHUNK_POOL_INITIALSIZE_KEY, + POOL_INITIAL_SIZE_DEFAULT); + if (initialCountPercentage > 1.0 || initialCountPercentage < 0) { + throw new IllegalArgumentException(CHUNK_POOL_INITIALSIZE_KEY + + " must be between 0.0 and 1.0"); + } + + int initialCount = (int) (initialCountPercentage * maxCount); + LOG.info("Allocating MemStoreChunkPool with chunk size " + + StringUtils.byteDesc(chunkSize) + ", max count " + maxCount + + ", initial count " + initialCount); + globalInstance = new MemStoreChunkPool(conf, chunkSize, maxCount, + initialCount); + return globalInstance; + } + +} \ No newline at end of file Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1461398&r1=1461397&r2=1461398&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Wed Mar 27 03:17:41 2013 @@ -79,9 +79,9 @@ class MemStoreFlusher implements FlushRe protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; - private static final float DEFAULT_UPPER = 0.4f; + static final float DEFAULT_UPPER = 0.4f; private static final float DEFAULT_LOWER = 0.35f; - private static final String UPPER_KEY = + static final String UPPER_KEY = "hbase.regionserver.global.memstore.upperLimit"; private static final String LOWER_KEY = "hbase.regionserver.global.memstore.lowerLimit"; Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java?rev=1461398&r1=1461397&r2=1461398&view=diff ============================================================================== --- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java (original) +++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreLAB.java Wed Mar 27 03:17:41 2013 @@ -18,11 +18,15 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; + import com.google.common.base.Preconditions; /** @@ -49,6 +53,8 @@ import com.google.common.base.Preconditi @InterfaceAudience.Private public class MemStoreLAB { private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>(); + // A queue of chunks contained by this memstore + private BlockingQueue<Chunk> chunkQueue = new LinkedBlockingQueue<Chunk>(); final static String CHUNK_SIZE_KEY = "hbase.hregion.memstore.mslab.chunksize"; final static int CHUNK_SIZE_DEFAULT = 2048 * 1024; @@ -58,13 +64,30 @@ public class MemStoreLAB { final static int MAX_ALLOC_DEFAULT = 256 * 1024; // allocs bigger than this don't go through allocator final int maxAlloc; + private final MemStoreChunkPool chunkPool; + + // This flag is for closing this instance, its set when clearing snapshot of + // memstore + private volatile boolean closed = false; + // This flag is for reclaiming chunks. Its set when putting chunks back to + // pool + private AtomicBoolean reclaimed = new AtomicBoolean(false); + // Current count of open scanners which reading data from this MemStoreLAB + private final AtomicInteger openScannerCount = new AtomicInteger(); + + // Used in testing public MemStoreLAB() { this(new Configuration()); } - public MemStoreLAB(Configuration conf) { + private MemStoreLAB(Configuration conf) { + this(conf, MemStoreChunkPool.getPool(conf)); + } + + public MemStoreLAB(Configuration conf, MemStoreChunkPool pool) { chunkSize = conf.getInt(CHUNK_SIZE_KEY, CHUNK_SIZE_DEFAULT); maxAlloc = conf.getInt(MAX_ALLOC_KEY, MAX_ALLOC_DEFAULT); + this.chunkPool = pool; // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! Preconditions.checkArgument( @@ -105,6 +128,38 @@ public class MemStoreLAB { } /** + * Close this instance since it won't be used any more, try to put the chunks + * back to pool + */ + void close() { + this.closed = true; + // We could put back the chunks to pool for reusing only when there is no + // opening scanner which will read their data + if (chunkPool != null && openScannerCount.get() == 0 + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.chunkQueue); + } + } + + /** + * Called when opening a scanner on the data of this MemStoreLAB + */ + void incScannerCount() { + this.openScannerCount.incrementAndGet(); + } + + /** + * Called when closing a scanner on the data of this MemStoreLAB + */ + void decScannerCount() { + int count = this.openScannerCount.decrementAndGet(); + if (chunkPool != null && count == 0 && this.closed + && reclaimed.compareAndSet(false, true)) { + chunkPool.putbackChunks(this.chunkQueue); + } + } + + /** * Try to retire the current chunk if it is still * <code>c</code>. Postcondition is that curChunk.get() * != c @@ -134,12 +189,15 @@ public class MemStoreLAB { // No current chunk, so we want to allocate one. We race // against other allocators to CAS in an uninitialized chunk // (which is cheap to allocate) - c = new Chunk(chunkSize); + c = (chunkPool != null) ? chunkPool.getChunk() : new Chunk(chunkSize); if (curChunk.compareAndSet(null, c)) { // we won race - now we need to actually do the expensive // allocation step c.init(); + this.chunkQueue.add(c); return c; + } else if (chunkPool != null) { + chunkPool.putbackChunk(c); } // someone else won race - that's fine, we'll try to grab theirs // in the next iteration of the loop. @@ -149,7 +207,7 @@ public class MemStoreLAB { /** * A chunk of memory out of which allocations are sliced. */ - private static class Chunk { + static class Chunk { /** Actual underlying data */ private byte[] data; @@ -172,7 +230,7 @@ public class MemStoreLAB { * this is cheap. * @param size in bytes */ - private Chunk(int size) { + Chunk(int size) { this.size = size; } @@ -184,7 +242,9 @@ public class MemStoreLAB { public void init() { assert nextFreeOffset.get() == UNINITIALIZED; try { - data = new byte[size]; + if (data == null) { + data = new byte[size]; + } } catch (OutOfMemoryError e) { boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); assert failInit; // should be true. @@ -200,6 +260,16 @@ public class MemStoreLAB { } /** + * Reset the offset to UNINITIALIZED before before reusing an old chunk + */ + void reset() { + if (nextFreeOffset.get() != UNINITIALIZED) { + nextFreeOffset.set(UNINITIALIZED); + allocCount.set(0); + } + } + + /** * Try to allocate <code>size</code> bytes from the chunk. * @return the offset of the successful allocation, or -1 to indicate not-enough-space */ Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java?rev=1461398&view=auto ============================================================================== --- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java (added) +++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java Wed Mar 27 03:17:41 2013 @@ -0,0 +1,201 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.rmi.UnexpectedException; +import java.util.List; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.SmallTests; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB.Allocation; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test the {@link MemStoreChunkPool} class + */ +@Category(SmallTests.class) +public class TestMemStoreChunkPool { + private final static Configuration conf = new Configuration(); + private static MemStoreChunkPool chunkPool; + private static boolean chunkPoolDisabledBeforeTest; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf.setBoolean(MemStore.USEMSLAB_KEY, true); + conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); + chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; + MemStoreChunkPool.chunkPoolDisabled = false; + chunkPool = MemStoreChunkPool.getPool(conf); + assertTrue(chunkPool != null); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + MemStoreChunkPool.chunkPoolDisabled = chunkPoolDisabledBeforeTest; + } + + @Before + public void tearDown() throws Exception { + chunkPool.clearChunks(); + } + + @Test + public void testReusingChunks() { + Random rand = new Random(); + MemStoreLAB mslab = new MemStoreLAB(conf, chunkPool); + int expectedOff = 0; + byte[] lastBuffer = null; + // Randomly allocate some bytes + for (int i = 0; i < 100; i++) { + int size = rand.nextInt(1000); + Allocation alloc = mslab.allocateBytes(size); + + if (alloc.getData() != lastBuffer) { + expectedOff = 0; + lastBuffer = alloc.getData(); + } + assertEquals(expectedOff, alloc.getOffset()); + assertTrue("Allocation " + alloc + " overruns buffer", alloc.getOffset() + + size <= alloc.getData().length); + expectedOff += size; + } + // chunks will be put back to pool after close + mslab.close(); + int chunkCount = chunkPool.getPoolSize(); + assertTrue(chunkCount > 0); + // reconstruct mslab + mslab = new MemStoreLAB(conf, chunkPool); + // chunk should be got from the pool, so we can reuse it. + mslab.allocateBytes(1000); + assertEquals(chunkCount - 1, chunkPool.getPoolSize()); + } + + @Test + public void testPuttingBackChunksAfterFlushing() throws UnexpectedException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] val = Bytes.toBytes("testval"); + + MemStore memstore = new MemStore(); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val)); + memstore.add(new KeyValue(row, fam, qf2, val)); + memstore.add(new KeyValue(row, fam, qf3, val)); + + // Creating a snapshot + memstore.snapshot(); + KeyValueSkipListSet snapshot = memstore.getSnapshot(); + assertEquals(3, memstore.snapshot.size()); + + // Adding value to "new" memstore + assertEquals(0, memstore.kvset.size()); + memstore.add(new KeyValue(row, fam, qf4, val)); + memstore.add(new KeyValue(row, fam, qf5, val)); + assertEquals(2, memstore.kvset.size()); + memstore.clearSnapshot(snapshot); + + int chunkCount = chunkPool.getPoolSize(); + assertTrue(chunkCount > 0); + + } + + @Test + public void testPuttingBackChunksWithOpeningScanner() + throws UnexpectedException { + byte[] row = Bytes.toBytes("testrow"); + byte[] fam = Bytes.toBytes("testfamily"); + byte[] qf1 = Bytes.toBytes("testqualifier1"); + byte[] qf2 = Bytes.toBytes("testqualifier2"); + byte[] qf3 = Bytes.toBytes("testqualifier3"); + byte[] qf4 = Bytes.toBytes("testqualifier4"); + byte[] qf5 = Bytes.toBytes("testqualifier5"); + byte[] qf6 = Bytes.toBytes("testqualifier6"); + byte[] qf7 = Bytes.toBytes("testqualifier7"); + byte[] val = Bytes.toBytes("testval"); + + MemStore memstore = new MemStore(); + + // Setting up memstore + memstore.add(new KeyValue(row, fam, qf1, val)); + memstore.add(new KeyValue(row, fam, qf2, val)); + memstore.add(new KeyValue(row, fam, qf3, val)); + + // Creating a snapshot + memstore.snapshot(); + KeyValueSkipListSet snapshot = memstore.getSnapshot(); + assertEquals(3, memstore.snapshot.size()); + + // Adding value to "new" memstore + assertEquals(0, memstore.kvset.size()); + memstore.add(new KeyValue(row, fam, qf4, val)); + memstore.add(new KeyValue(row, fam, qf5, val)); + assertEquals(2, memstore.kvset.size()); + + // opening scanner before clear the snapshot + List<KeyValueScanner> scanners = memstore.getScanners(); + // Shouldn't putting back the chunks to pool,since some scanners are opening + // based on their data + memstore.clearSnapshot(snapshot); + + assertTrue(chunkPool.getPoolSize() == 0); + + // Chunks will be put back to pool after close scanners; + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + assertTrue(chunkPool.getPoolSize() > 0); + + // clear chunks + chunkPool.clearChunks(); + + // Creating another snapshot + memstore.snapshot(); + snapshot = memstore.getSnapshot(); + // Adding more value + memstore.add(new KeyValue(row, fam, qf6, val)); + memstore.add(new KeyValue(row, fam, qf7, val)); + // opening scanners + scanners = memstore.getScanners(); + // close scanners before clear the snapshot + for (KeyValueScanner scanner : scanners) { + scanner.close(); + } + // Since no opening scanner, the chunks of snapshot should be put back to + // pool + memstore.clearSnapshot(snapshot); + assertTrue(chunkPool.getPoolSize() > 0); + } + +} \ No newline at end of file