Repository: cassandra
Updated Branches:
  refs/heads/trunk e182217b2 -> 17dd4ccc7


http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java 
b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
new file mode 100644
index 0000000..208cd32
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java
@@ -0,0 +1,852 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.RandomAccessReader;
+
+import static org.junit.Assert.*;
+
+public class BufferPoolTest
+{
+    @Before
+    public void setUp()
+    {
+        BufferPool.MEMORY_USAGE_THRESHOLD = 8 * 1024L * 1024L;
+        BufferPool.DISABLED = false;
+    }
+
+    @After
+    public void cleanUp()
+    {
+        BufferPool.reset();
+    }
+
+    @Test
+    public void testGetPut() throws InterruptedException
+    {
+        final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+
+        ByteBuffer buffer = BufferPool.get(size);
+        assertNotNull(buffer);
+        assertEquals(size, buffer.capacity());
+        assertEquals(true, buffer.isDirect());
+
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, 
BufferPool.sizeInBytes());
+
+        BufferPool.put(buffer);
+        assertEquals(null, BufferPool.currentChunk());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, 
BufferPool.sizeInBytes());
+    }
+
+
+    @Test
+    public void testPageAligned()
+    {
+        final int size = 1024;
+        for (int i = size;
+                 i <= BufferPool.CHUNK_SIZE;
+                 i += size)
+        {
+            checkPageAligned(i);
+        }
+    }
+
+    private void checkPageAligned(int size)
+    {
+        ByteBuffer buffer = BufferPool.get(size);
+        assertNotNull(buffer);
+        assertEquals(size, buffer.capacity());
+        assertTrue(buffer.isDirect());
+
+        long address = MemoryUtil.getAddress(buffer);
+        assertTrue((address % MemoryUtil.pageSize()) == 0);
+
+        BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testDifferentSizes() throws InterruptedException
+    {
+        final int size1 = 1024;
+        final int size2 = 2048;
+
+        ByteBuffer buffer1 = BufferPool.get(size1);
+        assertNotNull(buffer1);
+        assertEquals(size1, buffer1.capacity());
+
+        ByteBuffer buffer2 = BufferPool.get(size2);
+        assertNotNull(buffer2);
+        assertEquals(size2, buffer2.capacity());
+
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, 
BufferPool.sizeInBytes());
+
+        BufferPool.put(buffer1);
+        BufferPool.put(buffer2);
+
+        assertEquals(null, BufferPool.currentChunk());
+        assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, 
BufferPool.sizeInBytes());
+    }
+
+    @Test
+    public void testMaxMemoryExceededDirect()
+    {
+        boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED;
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false;
+
+        requestDoubleMaxMemory();
+
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur;
+    }
+
+    @Test
+    public void testMaxMemoryExceededHeap()
+    {
+        boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED;
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true;
+
+        requestDoubleMaxMemory();
+
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur;
+    }
+
+    @Test
+    public void testMaxMemoryExceeded_SameAsChunkSize()
+    {
+        BufferPool.MEMORY_USAGE_THRESHOLD = 
BufferPool.GlobalPool.MACRO_CHUNK_SIZE;
+        requestDoubleMaxMemory();
+    }
+
+    @Test
+    public void testMaxMemoryExceeded_SmallerThanChunkSize()
+    {
+        BufferPool.MEMORY_USAGE_THRESHOLD = 
BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2;
+        requestDoubleMaxMemory();
+    }
+
+    @Test
+    public void testRecycle()
+    {
+        requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, 3 * 
BufferPool.CHUNK_SIZE);
+    }
+
+    private void requestDoubleMaxMemory()
+    {
+        requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * 
BufferPool.MEMORY_USAGE_THRESHOLD));
+    }
+
+    private void requestUpToSize(int bufferSize, int totalSize)
+    {
+        final int numBuffers = totalSize / bufferSize;
+
+        List<ByteBuffer> buffers = new ArrayList<>(numBuffers);
+        for (int i = 0; i < numBuffers; i++)
+        {
+            ByteBuffer buffer = BufferPool.get(bufferSize);
+            assertNotNull(buffer);
+            assertEquals(bufferSize, buffer.capacity());
+
+            if (BufferPool.sizeInBytes() > BufferPool.MEMORY_USAGE_THRESHOLD)
+                assertEquals(BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED, 
!buffer.isDirect());
+
+            buffers.add(buffer);
+        }
+
+        for (ByteBuffer buffer : buffers)
+            BufferPool.put(buffer);
+
+    }
+
+    @Test
+    public void testBigRequest()
+    {
+        final int size = BufferPool.CHUNK_SIZE + 1;
+
+        ByteBuffer buffer = BufferPool.get(size);
+        assertNotNull(buffer);
+        assertEquals(size, buffer.capacity());
+        BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testFillUpChunks()
+    {
+        final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE;
+        final int numBuffers = BufferPool.CHUNK_SIZE / size;
+
+        List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers);
+        List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers);
+        for (int i = 0; i < numBuffers; i++)
+            buffers1.add(BufferPool.get(size));
+
+        BufferPool.Chunk chunk1 = BufferPool.currentChunk();
+        assertNotNull(chunk1);
+
+        for (int i = 0; i < numBuffers; i++)
+            buffers2.add(BufferPool.get(size));
+
+        assertEquals(2, BufferPool.numChunks());
+
+        for (ByteBuffer buffer : buffers1)
+            BufferPool.put(buffer);
+
+        assertEquals(1, BufferPool.numChunks());
+
+        for (ByteBuffer buffer : buffers2)
+            BufferPool.put(buffer);
+
+        assertEquals(0, BufferPool.numChunks());
+
+        buffers2.clear();
+    }
+
+    @Test
+    public void testOutOfOrderFrees()
+    {
+        final int size = 4096;
+        final int maxFreeSlots = BufferPool.CHUNK_SIZE / size;
+
+        final int[] idxs = new int[maxFreeSlots];
+        for (int i = 0; i < maxFreeSlots; i++)
+            idxs[i] = i;
+
+        doTestFrees(size, maxFreeSlots, idxs);
+    }
+
+    @Test
+    public void testInOrderFrees()
+    {
+        final int size = 4096;
+        final int maxFreeSlots = BufferPool.CHUNK_SIZE / size;
+
+        final int[] idxs = new int[maxFreeSlots];
+        for (int i = 0; i < maxFreeSlots; i++)
+            idxs[i] = maxFreeSlots - 1 - i;
+
+        doTestFrees(size, maxFreeSlots, idxs);
+    }
+
+    @Test
+    public void testRandomFrees()
+    {
+        doTestRandomFrees(12345567878L);
+
+        BufferPool.reset();
+        doTestRandomFrees(20452249587L);
+
+        BufferPool.reset();
+        doTestRandomFrees(82457252948L);
+
+        BufferPool.reset();
+        doTestRandomFrees(98759284579L);
+
+        BufferPool.reset();
+        doTestRandomFrees(19475257244L);
+    }
+
+    private void doTestRandomFrees(long seed)
+    {
+        final int size = 4096;
+        final int maxFreeSlots = BufferPool.CHUNK_SIZE / size;
+
+        final int[] idxs = new int[maxFreeSlots];
+        for (int i = 0; i < maxFreeSlots; i++)
+            idxs[i] = maxFreeSlots - 1 - i;
+
+        Random rnd = new Random();
+        rnd.setSeed(seed);
+        for (int i = idxs.length - 1; i > 0; i--)
+        {
+            int idx = rnd.nextInt(i+1);
+            int v = idxs[idx];
+            idxs[idx] = idxs[i];
+            idxs[i] = v;
+        }
+
+        doTestFrees(size, maxFreeSlots, idxs);
+    }
+
+    private void doTestFrees(final int size, final int maxFreeSlots, final 
int[] toReleaseIdxs)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots);
+        for (int i = 0; i < maxFreeSlots; i++)
+        {
+            buffers.add(BufferPool.get(size));
+        }
+
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertFalse(chunk.isFree());
+
+        int freeSize = BufferPool.CHUNK_SIZE - maxFreeSlots * size;
+        assertEquals(freeSize, chunk.free());
+
+        for (int i : toReleaseIdxs)
+        {
+            ByteBuffer buffer = buffers.get(i);
+            assertNotNull(buffer);
+            assertEquals(size, buffer.capacity());
+
+            BufferPool.put(buffer);
+
+            freeSize += size;
+            if (freeSize == chunk.capacity())
+                assertEquals(0, chunk.free());
+            else
+                assertEquals(freeSize, chunk.free());
+        }
+
+        assertFalse(chunk.isFree());
+    }
+
+    @Test
+    public void testDifferentSizeBuffersOnOneChunk()
+    {
+        int[] sizes = new int[] {
+            5, 1024, 4096, 8, 16000, 78, 512, 256, 63, 55, 89, 90, 255, 32, 
2048, 128
+        };
+
+        int sum = 0;
+        List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
+        for (int i = 0; i < sizes.length; i++)
+        {
+            ByteBuffer buffer = BufferPool.get(sizes[i]);
+            assertNotNull(buffer);
+            assertTrue(buffer.capacity() >= sizes[i]);
+            buffers.add(buffer);
+
+            sum += BufferPool.currentChunk().roundUp(buffer.capacity());
+        }
+
+        // else the test will fail, adjust sizes as required
+        assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
+
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+
+        Random rnd = new Random();
+        rnd.setSeed(298347529L);
+        while (!buffers.isEmpty())
+        {
+            int index = rnd.nextInt(buffers.size());
+            ByteBuffer buffer = buffers.remove(index);
+
+            BufferPool.put(buffer);
+        }
+
+        assertEquals(null, BufferPool.currentChunk());
+        assertEquals(0, chunk.free());
+    }
+
+    @Test
+    public void testChunkExhausted()
+    {
+        final int size = BufferPool.CHUNK_SIZE / 64; // 1kbit
+        int[] sizes = new int[128];
+        Arrays.fill(sizes, size);
+
+        int sum = 0;
+        List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
+        for (int i = 0; i < sizes.length; i++)
+        {
+            ByteBuffer buffer = BufferPool.get(sizes[i]);
+            assertNotNull(buffer);
+            assertTrue(buffer.capacity() >= sizes[i]);
+            buffers.add(buffer);
+
+            sum += buffer.capacity();
+        }
+
+        // else the test will fail, adjust sizes as required
+        assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
+
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+
+        for (int i = 0; i < sizes.length; i++)
+        {
+            BufferPool.put(buffers.get(i));
+        }
+
+        assertEquals(null, BufferPool.currentChunk());
+        assertEquals(0, chunk.free());
+    }
+
+    @Test
+    public void testCompactIfOutOfCapacity()
+    {
+        final int size = 4096;
+        final int numBuffersInChunk = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 
size;
+
+        List<ByteBuffer> buffers = new ArrayList<>(numBuffersInChunk);
+        Set<Long> addresses = new HashSet<>(numBuffersInChunk);
+
+        for (int i = 0; i < numBuffersInChunk; i++)
+        {
+            ByteBuffer buffer = BufferPool.get(size);
+            buffers.add(buffer);
+            addresses.add(MemoryUtil.getAddress(buffer));
+        }
+
+        for (int i = numBuffersInChunk - 1; i >= 0; i--)
+            BufferPool.put(buffers.get(i));
+
+        buffers.clear();
+
+        for (int i = 0; i < numBuffersInChunk; i++)
+        {
+            ByteBuffer buffer = BufferPool.get(size);
+            assertNotNull(buffer);
+            assertEquals(size, buffer.capacity());
+            addresses.remove(MemoryUtil.getAddress(buffer));
+
+            buffers.add(buffer);
+        }
+
+        assertTrue(addresses.isEmpty()); // all 5 released buffers were used
+
+        for (ByteBuffer buffer : buffers)
+            BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testHeapBuffer()
+    {
+        ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP);
+        assertNotNull(buffer);
+        assertEquals(1024, buffer.capacity());
+        assertFalse(buffer.isDirect());
+        assertNotNull(buffer.array());
+        BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testSingleBufferOneChunk()
+    {
+        checkBuffer(0);
+
+        checkBuffer(1);
+        checkBuffer(2);
+        checkBuffer(4);
+        checkBuffer(5);
+        checkBuffer(8);
+        checkBuffer(16);
+        checkBuffer(32);
+        checkBuffer(64);
+
+        checkBuffer(65);
+        checkBuffer(127);
+        checkBuffer(128);
+
+        checkBuffer(129);
+        checkBuffer(255);
+        checkBuffer(256);
+
+        checkBuffer(512);
+        checkBuffer(1024);
+        checkBuffer(2048);
+        checkBuffer(4096);
+        checkBuffer(8192);
+        checkBuffer(16384);
+
+        checkBuffer(16385);
+        checkBuffer(32767);
+        checkBuffer(32768);
+
+        checkBuffer(32769);
+        checkBuffer(33172);
+        checkBuffer(33553);
+        checkBuffer(36000);
+        checkBuffer(65535);
+        checkBuffer(65536);
+
+        checkBuffer(65537);
+    }
+
+    private void checkBuffer(int size)
+    {
+        ByteBuffer buffer = BufferPool.get(size);
+        assertEquals(size, buffer.capacity());
+
+        if (size > 0 && size < BufferPool.CHUNK_SIZE)
+        {
+            BufferPool.Chunk chunk = BufferPool.currentChunk();
+            assertNotNull(chunk);
+            assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size));
+        }
+
+        BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testMultipleBuffersOneChunk()
+    {
+        checkBuffers(32768, 33553);
+        checkBuffers(32768, 32768);
+        checkBuffers(48450, 33172);
+        checkBuffers(32768, 15682, 33172);
+    }
+
+    private void checkBuffers(int ... sizes)
+    {
+        List<ByteBuffer> buffers = new ArrayList<>(sizes.length);
+
+        for (int size : sizes)
+        {
+            ByteBuffer buffer = BufferPool.get(size);
+            assertEquals(size, buffer.capacity());
+
+            buffers.add(buffer);
+        }
+
+        for (ByteBuffer buffer : buffers)
+            BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testBuffersWithGivenSlots()
+    {
+        checkBufferWithGivenSlots(21241, (-1L << 27) ^ (1L << 40));
+    }
+
+    private void checkBufferWithGivenSlots(int size, long freeSlots)
+    {
+        //first allocate to make sure there is a chunk
+        ByteBuffer buffer = BufferPool.get(size);
+
+        // now get the current chunk and override the free slots mask
+        BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+        long oldFreeSlots = chunk.setFreeSlots(freeSlots);
+
+        // now check we can still get the buffer with the free slots mask 
changed
+        ByteBuffer buffer2 = BufferPool.get(size);
+        assertEquals(size, buffer.capacity());
+        BufferPool.put(buffer2);
+
+        // reset the free slots
+        chunk.setFreeSlots(oldFreeSlots);
+        BufferPool.put(buffer);
+    }
+
+    @Test
+    public void testZeroSizeRequest()
+    {
+        ByteBuffer buffer = BufferPool.get(0);
+        assertNotNull(buffer);
+        assertEquals(0, buffer.capacity());
+        BufferPool.put(buffer);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeSizeRequest()
+    {
+        BufferPool.get(-1);
+    }
+
+    @Test
+    public void testBufferPoolDisabled()
+    {
+        BufferPool.DISABLED = true;
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true;
+        ByteBuffer buffer = BufferPool.get(1024);
+        assertEquals(0, BufferPool.numChunks());
+        assertNotNull(buffer);
+        assertEquals(1024, buffer.capacity());
+        assertFalse(buffer.isDirect());
+        assertNotNull(buffer.array());
+        BufferPool.put(buffer);
+        assertEquals(0, BufferPool.numChunks());
+
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false;
+        buffer = BufferPool.get(1024);
+        assertEquals(0, BufferPool.numChunks());
+        assertNotNull(buffer);
+        assertEquals(1024, buffer.capacity());
+        assertTrue(buffer.isDirect());
+        BufferPool.put(buffer);
+        assertEquals(0, BufferPool.numChunks());
+
+        // clean-up
+        BufferPool.DISABLED = false;
+        BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true;
+    }
+
+    @Test
+    public void testMT_SameSizeImmediateReturn() throws InterruptedException
+    {
+        checkMultipleThreads(40, 1, true, 
RandomAccessReader.DEFAULT_BUFFER_SIZE);
+    }
+
+    @Test
+    public void testMT_SameSizePostponedReturn() throws InterruptedException
+    {
+        checkMultipleThreads(40, 1, false, 
RandomAccessReader.DEFAULT_BUFFER_SIZE);
+    }
+
+    @Test
+    public void testMT_TwoSizesOneBufferImmediateReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40, 1, true, 1024, 2048);
+    }
+
+    @Test
+    public void testMT_TwoSizesOneBufferPostponedReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40, 1, false, 1024, 2048);
+    }
+
+    @Test
+    public void testMT_TwoSizesTwoBuffersImmediateReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40, 2, true, 1024, 2048);
+    }
+
+    @Test
+    public void testMT_TwoSizesTwoBuffersPostponedReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40, 2, false, 1024, 2048);
+    }
+
+    @Test
+    public void testMT_MultipleSizesOneBufferImmediateReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40,
+                             1,
+                             true,
+                             1024,
+                             2048,
+                             3072,
+                             4096,
+                             5120);
+    }
+
+    @Test
+    public void testMT_MultipleSizesOneBufferPostponedReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40,
+                             1,
+                             false,
+                             1024,
+                             2048,
+                             3072,
+                             4096,
+                             5120);
+    }
+
+    @Test
+    public void testMT_MultipleSizesMultipleBuffersImmediateReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40,
+                             4,
+                             true,
+                             1024,
+                             2048,
+                             3072,
+                             4096,
+                             5120);
+    }
+
+    @Test
+    public void testMT_MultipleSizesMultipleBuffersPostponedReturn() throws 
InterruptedException
+    {
+        checkMultipleThreads(40,
+                             3,
+                             false,
+                             1024,
+                             2048,
+                             3072,
+                             4096,
+                             5120);
+    }
+
+    private void checkMultipleThreads(int threadCount, int 
numBuffersPerThread, final boolean returnImmediately, final int ... sizes) 
throws InterruptedException
+    {
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch finished = new CountDownLatch(threadCount);
+
+        for (int i = 0; i < threadCount; i++)
+        {
+            final int[] threadSizes = new int[numBuffersPerThread];
+            for (int j = 0; j < threadSizes.length; j++)
+                threadSizes[j] = sizes[(i * numBuffersPerThread + j) % 
sizes.length];
+
+            final Random rand = new Random();
+            executorService.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        Thread.sleep(rand.nextInt(3));
+
+                        List<ByteBuffer> toBeReturned = new 
ArrayList<ByteBuffer>(threadSizes.length);
+
+                        for (int j = 0; j < threadSizes.length; j++)
+                        {
+                            ByteBuffer buffer = BufferPool.get(threadSizes[j]);
+                            assertNotNull(buffer);
+                            assertEquals(threadSizes[j], buffer.capacity());
+
+                            for (int i = 0; i < 10; i++)
+                                buffer.putInt(i);
+
+                            buffer.rewind();
+
+                            Thread.sleep(rand.nextInt(3));
+
+                            for (int i = 0; i < 10; i++)
+                                assertEquals(i, buffer.getInt());
+
+                            if (returnImmediately)
+                                BufferPool.put(buffer);
+                            else
+                                toBeReturned.add(buffer);
+
+                            assertTrue(BufferPool.sizeInBytes() > 0);
+                        }
+
+                        Thread.sleep(rand.nextInt(3));
+
+                        for (ByteBuffer buffer : toBeReturned)
+                            BufferPool.put(buffer);
+                    }
+                    catch (Exception ex)
+                    {
+                        ex.printStackTrace();
+                        fail(ex.getMessage());
+                    }
+                    finally
+                    {
+                        finished.countDown();
+                    }
+                }
+            });
+        }
+
+        finished.await();
+        assertEquals(0, executorService.shutdownNow().size());
+
+        // Make sure thread local storage gets GC-ed
+        for (int i = 0; i < 5; i++)
+        {
+            System.gc();
+            Thread.sleep(100);
+        }
+    }
+
+    @Ignore
+    public void testMultipleThreadsReleaseSameBuffer() throws 
InterruptedException
+    {
+        doMultipleThreadsReleaseBuffers(45, 4096);
+    }
+
+    @Ignore
+    public void testMultipleThreadsReleaseDifferentBuffer() throws 
InterruptedException
+    {
+        doMultipleThreadsReleaseBuffers(45, 4096, 8192);
+    }
+
+    private void doMultipleThreadsReleaseBuffers(final int threadCount, final 
int ... sizes) throws InterruptedException
+    {
+        final ByteBuffer[] buffers = new ByteBuffer[sizes.length];
+        int sum = 0;
+        for (int i = 0; i < sizes.length; i++)
+        {
+            buffers[i] = BufferPool.get(sizes[i]);
+            assertNotNull(buffers[i]);
+            assertEquals(sizes[i], buffers[i].capacity());
+            sum += BufferPool.currentChunk().roundUp(buffers[i].capacity());
+        }
+
+        final BufferPool.Chunk chunk = BufferPool.currentChunk();
+        assertNotNull(chunk);
+        assertFalse(chunk.isFree());
+
+        // if we use multiple chunks the test will fail, adjust sizes 
accordingly
+        assertTrue(sum < BufferPool.GlobalPool.MACRO_CHUNK_SIZE);
+
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch finished = new CountDownLatch(threadCount);
+
+        for (int i = 0; i < threadCount; i++)
+        {
+            final int idx = i % sizes.length;
+            final ByteBuffer buffer = buffers[idx];
+
+            executorService.submit(new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                    try
+                    {
+                        assertNotSame(chunk, BufferPool.currentChunk());
+                        BufferPool.put(buffer);
+                    }
+                    catch (AssertionError ex)
+                    { //this is expected if we release a buffer more than once
+                        ex.printStackTrace();
+                    }
+                    catch (Throwable t)
+                    {
+                        t.printStackTrace();
+                        fail(t.getMessage());
+                    }
+                    finally
+                    {
+                        finished.countDown();
+                    }
+                }
+            });
+        }
+
+        finished.await();
+        assertEquals(0, executorService.shutdownNow().size());
+
+        executorService = null;
+
+        // Make sure thread local storage gets GC-ed
+        System.gc();
+        System.gc();
+        System.gc();
+
+        assertTrue(BufferPool.currentChunk().isFree());
+
+        //make sure the main thread can still allocate buffers
+        ByteBuffer buffer = BufferPool.get(sizes[0]);
+        assertNotNull(buffer);
+        assertEquals(sizes[0], buffer.capacity());
+        BufferPool.put(buffer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
deleted file mode 100644
index 455fec4..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied.  See the License for the
-* specific language governing permissions and limitations
-* under the License.
-*/
-package org.apache.cassandra.stress.generate;
-
-import java.util.Random;
-
-import org.apache.commons.math3.random.RandomGenerator;
-
-// based on http://en.wikipedia.org/wiki/Xorshift, but periodically we reseed 
with our stronger random generator
-// note it is also non-atomically updated, so expects to be used by a single 
thread
-public class FasterRandom implements RandomGenerator
-{
-    final Random random = new Random();
-
-    private long seed;
-    private int reseed;
-
-    public void setSeed(int seed)
-    {
-        setSeed((long) seed);
-    }
-
-    public void setSeed(int[] ints)
-    {
-        if (ints.length > 1)
-            setSeed (((long) ints[0] << 32) | ints[1]);
-        else
-            setSeed(ints[0]);
-    }
-
-    public void setSeed(long seed)
-    {
-        this.seed = seed;
-        rollover();
-    }
-
-    private void rollover()
-    {
-        this.reseed = 0;
-        random.setSeed(seed);
-        seed = random.nextLong();
-    }
-
-    public void nextBytes(byte[] bytes)
-    {
-        int i = 0;
-        while (i < bytes.length)
-        {
-            long next = nextLong();
-            while (i < bytes.length)
-            {
-                bytes[i++] = (byte) (next & 0xFF);
-                next >>>= 8;
-            }
-        }
-    }
-
-    public int nextInt()
-    {
-        return (int) nextLong();
-    }
-
-    public int nextInt(int i)
-    {
-        return Math.abs((int) nextLong() % i);
-    }
-
-    public long nextLong()
-    {
-        if (++this.reseed == 32)
-            rollover();
-
-        long seed = this.seed;
-        seed ^= seed >> 12;
-        seed ^= seed << 25;
-        seed ^= seed >> 27;
-        this.seed = seed;
-        return seed * 2685821657736338717L;
-    }
-
-    public boolean nextBoolean()
-    {
-        return ((int) nextLong() & 1) == 1;
-    }
-
-    public float nextFloat()
-    {
-        return Float.intBitsToFloat((int) nextLong());
-    }
-
-    public double nextDouble()
-    {
-        return Double.longBitsToDouble(nextLong());
-    }
-
-    public double nextGaussian()
-    {
-        return random.nextGaussian();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
index 9e2e65b..243ac30 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java
@@ -20,7 +20,7 @@ package org.apache.cassandra.stress.generate;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.stress.util.DynamicList;
+import org.apache.cassandra.utils.DynamicList;
 
 public class Seed implements Comparable<Seed>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
index 071d888..5020b45 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.settings.StressSettings;
-import org.apache.cassandra.stress.util.DynamicList;
+import org.apache.cassandra.utils.LockedDynamicList;
 
 public class SeedManager
 {
@@ -34,7 +34,7 @@ public class SeedManager
     final Generator writes;
     final Generator reads;
     final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>();
-    final DynamicList<Seed> sampleFrom;
+    final LockedDynamicList<Seed> sampleFrom;
     final Distribution sample;
     final long sampleOffset;
     final int sampleSize;
@@ -69,7 +69,7 @@ public class SeedManager
         long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - 
sampleOffset;
         if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE)
             throw new IllegalArgumentException("sample range is invalid");
-        this.sampleFrom = new DynamicList<>((int) sampleSize);
+        this.sampleFrom = new LockedDynamicList<>((int) sampleSize);
         this.sample = DistributionInverted.invert(sample);
         this.sampleSize = (int) sampleSize;
         this.updateSampleImmediately = visits.average() > 1;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
index 3c15c87..c247e48 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.stress.generate.FasterRandom;
+import org.apache.cassandra.utils.FasterRandom;
 
 public class Bytes extends Generator<ByteBuffer>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
----------------------------------------------------------------------
diff --git 
a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java 
b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
index b58fee2..db78eb7 100644
--- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
+++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java
@@ -21,7 +21,7 @@
 package org.apache.cassandra.stress.generate.values;
 
 import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.stress.generate.FasterRandom;
+import org.apache.cassandra.utils.FasterRandom;
 
 public class Strings extends Generator<String>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java 
b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
deleted file mode 100644
index ee04063..0000000
--- a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.stress.util;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.TreeSet;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.cassandra.stress.generate.FasterRandom;
-
-// simple thread-unsafe skiplist that permits indexing/removal by position, 
insertion at the end
-// (though easily extended to insertion at any position, not necessary here)
-// we use it for sampling items by position for visiting writes in the pool of 
pending writes
-public class DynamicList<E>
-{
-
-    // represents a value and an index simultaneously; each node maintains a 
list
-    // of next pointers for each height in the skip-list this node 
participates in
-    // (a contiguous range from [0..height))
-    public static class Node<E>
-    {
-        // stores the size of each descendant
-        private final int[] size;
-        // TODO: alternate links to save space
-        private final Node<E>[] links;
-        private E value;
-
-        private Node(int height, E value)
-        {
-            this.value = value;
-            links = new Node[height * 2];
-            size = new int[height];
-            Arrays.fill(size, 1);
-        }
-
-        private int height()
-        {
-            return size.length;
-        }
-
-        private Node<E> next(int i)
-        {
-            return links[i * 2];
-        }
-
-        private Node<E> prev(int i)
-        {
-            return links[1 + i * 2];
-        }
-
-        private void setNext(int i, Node<E> next)
-        {
-            links[i * 2] = next;
-        }
-
-        private void setPrev(int i, Node<E> prev)
-        {
-            links[1 + i * 2] = prev;
-        }
-
-        private Node parent(int parentHeight)
-        {
-            Node prev = this;
-            while (true)
-            {
-                int height = prev.height();
-                if (parentHeight < height)
-                    return prev;
-                prev = prev.prev(height - 1);
-            }
-        }
-    }
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-    private final int maxHeight;
-    private final Node<E> head;
-    private int size;
-
-    public DynamicList(int maxExpectedSize)
-    {
-        this.maxHeight = 3 + Math.max(0, (int) 
Math.ceil(Math.log(maxExpectedSize) / Math.log(2)));
-        head = new Node<>(maxHeight, null);
-    }
-
-    private int randomLevel()
-    {
-        return 1 + Integer.bitCount(ThreadLocalRandom.current().nextInt() & 
((1 << (maxHeight - 1)) - 1));
-    }
-
-    public Node<E> append(E value)
-    {
-        return append(value, Integer.MAX_VALUE);
-    }
-
-    // add the value to the end of the list, and return the associated Node 
that permits efficient removal
-    // regardless of its future position in the list from other modifications
-    public Node<E> append(E value, int maxSize)
-    {
-        Node<E> newTail = new Node<>(randomLevel(), value);
-
-        lock.writeLock().lock();
-        try
-        {
-            if (size >= maxSize)
-                return null;
-            size++;
-
-            Node<E> tail = head;
-            for (int i = maxHeight - 1 ; i >= newTail.height() ; i--)
-            {
-                Node<E> next;
-                while ((next = tail.next(i)) != null)
-                    tail = next;
-                tail.size[i]++;
-            }
-
-            for (int i = newTail.height() - 1 ; i >= 0 ; i--)
-            {
-                Node<E> next;
-                while ((next = tail.next(i)) != null)
-                    tail = next;
-                tail.setNext(i, newTail);
-                newTail.setPrev(i, tail);
-            }
-
-            return newTail;
-        }
-        finally
-        {
-            lock.writeLock().unlock();
-        }
-    }
-
-    // remove the provided node and its associated value from the list
-    public void remove(Node<E> node)
-    {
-        lock.writeLock().lock();
-        try
-        {
-            assert node.value != null;
-            node.value = null;
-
-            size--;
-
-            // go up through each level in the skip list, unlinking this node; 
this entails
-            // simply linking each neighbour to each other, and appending the 
size of the
-            // current level owned by this node's index to the preceding 
neighbour (since
-            // ownership is defined as any node that you must visit through 
the index,
-            // removal of ourselves from a level means the preceding index 
entry is the
-            // entry point to all of the removed node's descendants)
-            for (int i = 0 ; i < node.height() ; i++)
-            {
-                Node<E> prev = node.prev(i);
-                Node<E> next = node.next(i);
-                assert prev != null;
-                prev.setNext(i, next);
-                if (next != null)
-                    next.setPrev(i, prev);
-                prev.size[i] += node.size[i] - 1;
-            }
-
-            // then go up the levels, removing 1 from the size at each height 
above ours
-            for (int i = node.height() ; i < maxHeight ; i++)
-            {
-                // if we're at our height limit, we backtrack at our top level 
until we
-                // hit a neighbour with a greater height
-                while (i == node.height())
-                    node = node.prev(i - 1);
-                node.size[i]--;
-            }
-        }
-        finally
-        {
-            lock.writeLock().unlock();
-        }
-    }
-
-    // retrieve the item at the provided index, or return null if the index is 
past the end of the list
-    public E get(int index)
-    {
-        lock.readLock().lock();
-        try
-        {
-            if (index >= size)
-                return null;
-
-            index++;
-            int c = 0;
-            Node<E> finger = head;
-            for (int i = maxHeight - 1 ; i >= 0 ; i--)
-            {
-                while (c + finger.size[i] <= index)
-                {
-                    c += finger.size[i];
-                    finger = finger.next(i);
-                }
-            }
-
-            assert c == index;
-            return finger.value;
-        }
-        finally
-        {
-            lock.readLock().unlock();
-        }
-    }
-
-    // some quick and dirty tests to confirm the skiplist works as intended
-    // don't create a separate unit test - tools tree doesn't currently 
warrant them
-
-    private boolean isWellFormed()
-    {
-        for (int i = 0 ; i < maxHeight ; i++)
-        {
-            int c = 0;
-            for (Node node = head ; node != null ; node = node.next(i))
-            {
-                if (node.prev(i) != null && node.prev(i).next(i) != node)
-                    return false;
-                if (node.next(i) != null && node.next(i).prev(i) != node)
-                    return false;
-                c += node.size[i];
-                if (i + 1 < maxHeight && node.parent(i + 1).next(i + 1) == 
node.next(i))
-                {
-                    if (node.parent(i + 1).size[i + 1] != c)
-                        return false;
-                    c = 0;
-                }
-            }
-            if (i == maxHeight - 1 && c != size + 1)
-                return false;
-        }
-        return true;
-    }
-
-    public static void main(String[] args)
-    {
-        DynamicList<Integer> list = new DynamicList<>(20);
-        TreeSet<Integer> canon = new TreeSet<>();
-        HashMap<Integer, Node> nodes = new HashMap<>();
-        int c = 0;
-        for (int i = 0 ; i < 100000 ; i++)
-        {
-            nodes.put(c, list.append(c));
-            canon.add(c);
-            c++;
-        }
-        FasterRandom rand = new FasterRandom();
-        assert list.isWellFormed();
-        for (int loop = 0 ; loop < 100 ; loop++)
-        {
-            System.out.println(loop);
-            for (int i = 0 ; i < 100000 ; i++)
-            {
-                int index = rand.nextInt(100000);
-                Integer seed = list.get(index);
-//                assert canon.headSet(seed, false).size() == index;
-                list.remove(nodes.remove(seed));
-                canon.remove(seed);
-                nodes.put(c, list.append(c));
-                canon.add(c);
-                c++;
-            }
-            assert list.isWellFormed();
-        }
-    }
-
-}

Reply via email to