Repository: cassandra Updated Branches: refs/heads/trunk e182217b2 -> 17dd4ccc7
http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java new file mode 100644 index 0000000..208cd32 --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/memory/BufferPoolTest.java @@ -0,0 +1,852 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.utils.memory; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.RandomAccessReader; + +import static org.junit.Assert.*; + +public class BufferPoolTest +{ + @Before + public void setUp() + { + BufferPool.MEMORY_USAGE_THRESHOLD = 8 * 1024L * 1024L; + BufferPool.DISABLED = false; + } + + @After + public void cleanUp() + { + BufferPool.reset(); + } + + @Test + public void testGetPut() throws InterruptedException + { + final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE; + + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + assertEquals(size, buffer.capacity()); + assertEquals(true, buffer.isDirect()); + + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + + BufferPool.put(buffer); + assertEquals(null, BufferPool.currentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + } + + + @Test + public void testPageAligned() + { + final int size = 1024; + for (int i = size; + i <= BufferPool.CHUNK_SIZE; + i += size) + { + checkPageAligned(i); + } + } + + private void checkPageAligned(int size) + { + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + assertEquals(size, buffer.capacity()); + assertTrue(buffer.isDirect()); + + long address = MemoryUtil.getAddress(buffer); + assertTrue((address % MemoryUtil.pageSize()) == 0); + + BufferPool.put(buffer); + } + + @Test + public void testDifferentSizes() throws InterruptedException + { + final int size1 = 1024; + final int size2 = 2048; + + ByteBuffer buffer1 = BufferPool.get(size1); + assertNotNull(buffer1); + assertEquals(size1, buffer1.capacity()); + + ByteBuffer buffer2 = BufferPool.get(size2); + assertNotNull(buffer2); + assertEquals(size2, buffer2.capacity()); + + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + + BufferPool.put(buffer1); + BufferPool.put(buffer2); + + assertEquals(null, BufferPool.currentChunk()); + assertEquals(BufferPool.GlobalPool.MACRO_CHUNK_SIZE, BufferPool.sizeInBytes()); + } + + @Test + public void testMaxMemoryExceededDirect() + { + boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED; + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false; + + requestDoubleMaxMemory(); + + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur; + } + + @Test + public void testMaxMemoryExceededHeap() + { + boolean cur = BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED; + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; + + requestDoubleMaxMemory(); + + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = cur; + } + + @Test + public void testMaxMemoryExceeded_SameAsChunkSize() + { + BufferPool.MEMORY_USAGE_THRESHOLD = BufferPool.GlobalPool.MACRO_CHUNK_SIZE; + requestDoubleMaxMemory(); + } + + @Test + public void testMaxMemoryExceeded_SmallerThanChunkSize() + { + BufferPool.MEMORY_USAGE_THRESHOLD = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / 2; + requestDoubleMaxMemory(); + } + + @Test + public void testRecycle() + { + requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, 3 * BufferPool.CHUNK_SIZE); + } + + private void requestDoubleMaxMemory() + { + requestUpToSize(RandomAccessReader.DEFAULT_BUFFER_SIZE, (int)(2 * BufferPool.MEMORY_USAGE_THRESHOLD)); + } + + private void requestUpToSize(int bufferSize, int totalSize) + { + final int numBuffers = totalSize / bufferSize; + + List<ByteBuffer> buffers = new ArrayList<>(numBuffers); + for (int i = 0; i < numBuffers; i++) + { + ByteBuffer buffer = BufferPool.get(bufferSize); + assertNotNull(buffer); + assertEquals(bufferSize, buffer.capacity()); + + if (BufferPool.sizeInBytes() > BufferPool.MEMORY_USAGE_THRESHOLD) + assertEquals(BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED, !buffer.isDirect()); + + buffers.add(buffer); + } + + for (ByteBuffer buffer : buffers) + BufferPool.put(buffer); + + } + + @Test + public void testBigRequest() + { + final int size = BufferPool.CHUNK_SIZE + 1; + + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + assertEquals(size, buffer.capacity()); + BufferPool.put(buffer); + } + + @Test + public void testFillUpChunks() + { + final int size = RandomAccessReader.DEFAULT_BUFFER_SIZE; + final int numBuffers = BufferPool.CHUNK_SIZE / size; + + List<ByteBuffer> buffers1 = new ArrayList<>(numBuffers); + List<ByteBuffer> buffers2 = new ArrayList<>(numBuffers); + for (int i = 0; i < numBuffers; i++) + buffers1.add(BufferPool.get(size)); + + BufferPool.Chunk chunk1 = BufferPool.currentChunk(); + assertNotNull(chunk1); + + for (int i = 0; i < numBuffers; i++) + buffers2.add(BufferPool.get(size)); + + assertEquals(2, BufferPool.numChunks()); + + for (ByteBuffer buffer : buffers1) + BufferPool.put(buffer); + + assertEquals(1, BufferPool.numChunks()); + + for (ByteBuffer buffer : buffers2) + BufferPool.put(buffer); + + assertEquals(0, BufferPool.numChunks()); + + buffers2.clear(); + } + + @Test + public void testOutOfOrderFrees() + { + final int size = 4096; + final int maxFreeSlots = BufferPool.CHUNK_SIZE / size; + + final int[] idxs = new int[maxFreeSlots]; + for (int i = 0; i < maxFreeSlots; i++) + idxs[i] = i; + + doTestFrees(size, maxFreeSlots, idxs); + } + + @Test + public void testInOrderFrees() + { + final int size = 4096; + final int maxFreeSlots = BufferPool.CHUNK_SIZE / size; + + final int[] idxs = new int[maxFreeSlots]; + for (int i = 0; i < maxFreeSlots; i++) + idxs[i] = maxFreeSlots - 1 - i; + + doTestFrees(size, maxFreeSlots, idxs); + } + + @Test + public void testRandomFrees() + { + doTestRandomFrees(12345567878L); + + BufferPool.reset(); + doTestRandomFrees(20452249587L); + + BufferPool.reset(); + doTestRandomFrees(82457252948L); + + BufferPool.reset(); + doTestRandomFrees(98759284579L); + + BufferPool.reset(); + doTestRandomFrees(19475257244L); + } + + private void doTestRandomFrees(long seed) + { + final int size = 4096; + final int maxFreeSlots = BufferPool.CHUNK_SIZE / size; + + final int[] idxs = new int[maxFreeSlots]; + for (int i = 0; i < maxFreeSlots; i++) + idxs[i] = maxFreeSlots - 1 - i; + + Random rnd = new Random(); + rnd.setSeed(seed); + for (int i = idxs.length - 1; i > 0; i--) + { + int idx = rnd.nextInt(i+1); + int v = idxs[idx]; + idxs[idx] = idxs[i]; + idxs[i] = v; + } + + doTestFrees(size, maxFreeSlots, idxs); + } + + private void doTestFrees(final int size, final int maxFreeSlots, final int[] toReleaseIdxs) + { + List<ByteBuffer> buffers = new ArrayList<>(maxFreeSlots); + for (int i = 0; i < maxFreeSlots; i++) + { + buffers.add(BufferPool.get(size)); + } + + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertFalse(chunk.isFree()); + + int freeSize = BufferPool.CHUNK_SIZE - maxFreeSlots * size; + assertEquals(freeSize, chunk.free()); + + for (int i : toReleaseIdxs) + { + ByteBuffer buffer = buffers.get(i); + assertNotNull(buffer); + assertEquals(size, buffer.capacity()); + + BufferPool.put(buffer); + + freeSize += size; + if (freeSize == chunk.capacity()) + assertEquals(0, chunk.free()); + else + assertEquals(freeSize, chunk.free()); + } + + assertFalse(chunk.isFree()); + } + + @Test + public void testDifferentSizeBuffersOnOneChunk() + { + int[] sizes = new int[] { + 5, 1024, 4096, 8, 16000, 78, 512, 256, 63, 55, 89, 90, 255, 32, 2048, 128 + }; + + int sum = 0; + List<ByteBuffer> buffers = new ArrayList<>(sizes.length); + for (int i = 0; i < sizes.length; i++) + { + ByteBuffer buffer = BufferPool.get(sizes[i]); + assertNotNull(buffer); + assertTrue(buffer.capacity() >= sizes[i]); + buffers.add(buffer); + + sum += BufferPool.currentChunk().roundUp(buffer.capacity()); + } + + // else the test will fail, adjust sizes as required + assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); + + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + + Random rnd = new Random(); + rnd.setSeed(298347529L); + while (!buffers.isEmpty()) + { + int index = rnd.nextInt(buffers.size()); + ByteBuffer buffer = buffers.remove(index); + + BufferPool.put(buffer); + } + + assertEquals(null, BufferPool.currentChunk()); + assertEquals(0, chunk.free()); + } + + @Test + public void testChunkExhausted() + { + final int size = BufferPool.CHUNK_SIZE / 64; // 1kbit + int[] sizes = new int[128]; + Arrays.fill(sizes, size); + + int sum = 0; + List<ByteBuffer> buffers = new ArrayList<>(sizes.length); + for (int i = 0; i < sizes.length; i++) + { + ByteBuffer buffer = BufferPool.get(sizes[i]); + assertNotNull(buffer); + assertTrue(buffer.capacity() >= sizes[i]); + buffers.add(buffer); + + sum += buffer.capacity(); + } + + // else the test will fail, adjust sizes as required + assertTrue(sum <= BufferPool.GlobalPool.MACRO_CHUNK_SIZE); + + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + + for (int i = 0; i < sizes.length; i++) + { + BufferPool.put(buffers.get(i)); + } + + assertEquals(null, BufferPool.currentChunk()); + assertEquals(0, chunk.free()); + } + + @Test + public void testCompactIfOutOfCapacity() + { + final int size = 4096; + final int numBuffersInChunk = BufferPool.GlobalPool.MACRO_CHUNK_SIZE / size; + + List<ByteBuffer> buffers = new ArrayList<>(numBuffersInChunk); + Set<Long> addresses = new HashSet<>(numBuffersInChunk); + + for (int i = 0; i < numBuffersInChunk; i++) + { + ByteBuffer buffer = BufferPool.get(size); + buffers.add(buffer); + addresses.add(MemoryUtil.getAddress(buffer)); + } + + for (int i = numBuffersInChunk - 1; i >= 0; i--) + BufferPool.put(buffers.get(i)); + + buffers.clear(); + + for (int i = 0; i < numBuffersInChunk; i++) + { + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + assertEquals(size, buffer.capacity()); + addresses.remove(MemoryUtil.getAddress(buffer)); + + buffers.add(buffer); + } + + assertTrue(addresses.isEmpty()); // all 5 released buffers were used + + for (ByteBuffer buffer : buffers) + BufferPool.put(buffer); + } + + @Test + public void testHeapBuffer() + { + ByteBuffer buffer = BufferPool.get(1024, BufferType.ON_HEAP); + assertNotNull(buffer); + assertEquals(1024, buffer.capacity()); + assertFalse(buffer.isDirect()); + assertNotNull(buffer.array()); + BufferPool.put(buffer); + } + + @Test + public void testSingleBufferOneChunk() + { + checkBuffer(0); + + checkBuffer(1); + checkBuffer(2); + checkBuffer(4); + checkBuffer(5); + checkBuffer(8); + checkBuffer(16); + checkBuffer(32); + checkBuffer(64); + + checkBuffer(65); + checkBuffer(127); + checkBuffer(128); + + checkBuffer(129); + checkBuffer(255); + checkBuffer(256); + + checkBuffer(512); + checkBuffer(1024); + checkBuffer(2048); + checkBuffer(4096); + checkBuffer(8192); + checkBuffer(16384); + + checkBuffer(16385); + checkBuffer(32767); + checkBuffer(32768); + + checkBuffer(32769); + checkBuffer(33172); + checkBuffer(33553); + checkBuffer(36000); + checkBuffer(65535); + checkBuffer(65536); + + checkBuffer(65537); + } + + private void checkBuffer(int size) + { + ByteBuffer buffer = BufferPool.get(size); + assertEquals(size, buffer.capacity()); + + if (size > 0 && size < BufferPool.CHUNK_SIZE) + { + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + assertEquals(chunk.capacity(), chunk.free() + chunk.roundUp(size)); + } + + BufferPool.put(buffer); + } + + @Test + public void testMultipleBuffersOneChunk() + { + checkBuffers(32768, 33553); + checkBuffers(32768, 32768); + checkBuffers(48450, 33172); + checkBuffers(32768, 15682, 33172); + } + + private void checkBuffers(int ... sizes) + { + List<ByteBuffer> buffers = new ArrayList<>(sizes.length); + + for (int size : sizes) + { + ByteBuffer buffer = BufferPool.get(size); + assertEquals(size, buffer.capacity()); + + buffers.add(buffer); + } + + for (ByteBuffer buffer : buffers) + BufferPool.put(buffer); + } + + @Test + public void testBuffersWithGivenSlots() + { + checkBufferWithGivenSlots(21241, (-1L << 27) ^ (1L << 40)); + } + + private void checkBufferWithGivenSlots(int size, long freeSlots) + { + //first allocate to make sure there is a chunk + ByteBuffer buffer = BufferPool.get(size); + + // now get the current chunk and override the free slots mask + BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + long oldFreeSlots = chunk.setFreeSlots(freeSlots); + + // now check we can still get the buffer with the free slots mask changed + ByteBuffer buffer2 = BufferPool.get(size); + assertEquals(size, buffer.capacity()); + BufferPool.put(buffer2); + + // reset the free slots + chunk.setFreeSlots(oldFreeSlots); + BufferPool.put(buffer); + } + + @Test + public void testZeroSizeRequest() + { + ByteBuffer buffer = BufferPool.get(0); + assertNotNull(buffer); + assertEquals(0, buffer.capacity()); + BufferPool.put(buffer); + } + + @Test(expected = IllegalArgumentException.class) + public void testNegativeSizeRequest() + { + BufferPool.get(-1); + } + + @Test + public void testBufferPoolDisabled() + { + BufferPool.DISABLED = true; + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; + ByteBuffer buffer = BufferPool.get(1024); + assertEquals(0, BufferPool.numChunks()); + assertNotNull(buffer); + assertEquals(1024, buffer.capacity()); + assertFalse(buffer.isDirect()); + assertNotNull(buffer.array()); + BufferPool.put(buffer); + assertEquals(0, BufferPool.numChunks()); + + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = false; + buffer = BufferPool.get(1024); + assertEquals(0, BufferPool.numChunks()); + assertNotNull(buffer); + assertEquals(1024, buffer.capacity()); + assertTrue(buffer.isDirect()); + BufferPool.put(buffer); + assertEquals(0, BufferPool.numChunks()); + + // clean-up + BufferPool.DISABLED = false; + BufferPool.ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = true; + } + + @Test + public void testMT_SameSizeImmediateReturn() throws InterruptedException + { + checkMultipleThreads(40, 1, true, RandomAccessReader.DEFAULT_BUFFER_SIZE); + } + + @Test + public void testMT_SameSizePostponedReturn() throws InterruptedException + { + checkMultipleThreads(40, 1, false, RandomAccessReader.DEFAULT_BUFFER_SIZE); + } + + @Test + public void testMT_TwoSizesOneBufferImmediateReturn() throws InterruptedException + { + checkMultipleThreads(40, 1, true, 1024, 2048); + } + + @Test + public void testMT_TwoSizesOneBufferPostponedReturn() throws InterruptedException + { + checkMultipleThreads(40, 1, false, 1024, 2048); + } + + @Test + public void testMT_TwoSizesTwoBuffersImmediateReturn() throws InterruptedException + { + checkMultipleThreads(40, 2, true, 1024, 2048); + } + + @Test + public void testMT_TwoSizesTwoBuffersPostponedReturn() throws InterruptedException + { + checkMultipleThreads(40, 2, false, 1024, 2048); + } + + @Test + public void testMT_MultipleSizesOneBufferImmediateReturn() throws InterruptedException + { + checkMultipleThreads(40, + 1, + true, + 1024, + 2048, + 3072, + 4096, + 5120); + } + + @Test + public void testMT_MultipleSizesOneBufferPostponedReturn() throws InterruptedException + { + checkMultipleThreads(40, + 1, + false, + 1024, + 2048, + 3072, + 4096, + 5120); + } + + @Test + public void testMT_MultipleSizesMultipleBuffersImmediateReturn() throws InterruptedException + { + checkMultipleThreads(40, + 4, + true, + 1024, + 2048, + 3072, + 4096, + 5120); + } + + @Test + public void testMT_MultipleSizesMultipleBuffersPostponedReturn() throws InterruptedException + { + checkMultipleThreads(40, + 3, + false, + 1024, + 2048, + 3072, + 4096, + 5120); + } + + private void checkMultipleThreads(int threadCount, int numBuffersPerThread, final boolean returnImmediately, final int ... sizes) throws InterruptedException + { + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final CountDownLatch finished = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) + { + final int[] threadSizes = new int[numBuffersPerThread]; + for (int j = 0; j < threadSizes.length; j++) + threadSizes[j] = sizes[(i * numBuffersPerThread + j) % sizes.length]; + + final Random rand = new Random(); + executorService.submit(new Runnable() + { + @Override + public void run() + { + try + { + Thread.sleep(rand.nextInt(3)); + + List<ByteBuffer> toBeReturned = new ArrayList<ByteBuffer>(threadSizes.length); + + for (int j = 0; j < threadSizes.length; j++) + { + ByteBuffer buffer = BufferPool.get(threadSizes[j]); + assertNotNull(buffer); + assertEquals(threadSizes[j], buffer.capacity()); + + for (int i = 0; i < 10; i++) + buffer.putInt(i); + + buffer.rewind(); + + Thread.sleep(rand.nextInt(3)); + + for (int i = 0; i < 10; i++) + assertEquals(i, buffer.getInt()); + + if (returnImmediately) + BufferPool.put(buffer); + else + toBeReturned.add(buffer); + + assertTrue(BufferPool.sizeInBytes() > 0); + } + + Thread.sleep(rand.nextInt(3)); + + for (ByteBuffer buffer : toBeReturned) + BufferPool.put(buffer); + } + catch (Exception ex) + { + ex.printStackTrace(); + fail(ex.getMessage()); + } + finally + { + finished.countDown(); + } + } + }); + } + + finished.await(); + assertEquals(0, executorService.shutdownNow().size()); + + // Make sure thread local storage gets GC-ed + for (int i = 0; i < 5; i++) + { + System.gc(); + Thread.sleep(100); + } + } + + @Ignore + public void testMultipleThreadsReleaseSameBuffer() throws InterruptedException + { + doMultipleThreadsReleaseBuffers(45, 4096); + } + + @Ignore + public void testMultipleThreadsReleaseDifferentBuffer() throws InterruptedException + { + doMultipleThreadsReleaseBuffers(45, 4096, 8192); + } + + private void doMultipleThreadsReleaseBuffers(final int threadCount, final int ... sizes) throws InterruptedException + { + final ByteBuffer[] buffers = new ByteBuffer[sizes.length]; + int sum = 0; + for (int i = 0; i < sizes.length; i++) + { + buffers[i] = BufferPool.get(sizes[i]); + assertNotNull(buffers[i]); + assertEquals(sizes[i], buffers[i].capacity()); + sum += BufferPool.currentChunk().roundUp(buffers[i].capacity()); + } + + final BufferPool.Chunk chunk = BufferPool.currentChunk(); + assertNotNull(chunk); + assertFalse(chunk.isFree()); + + // if we use multiple chunks the test will fail, adjust sizes accordingly + assertTrue(sum < BufferPool.GlobalPool.MACRO_CHUNK_SIZE); + + ExecutorService executorService = Executors.newFixedThreadPool(threadCount); + final CountDownLatch finished = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) + { + final int idx = i % sizes.length; + final ByteBuffer buffer = buffers[idx]; + + executorService.submit(new Runnable() + { + @Override + public void run() + { + try + { + assertNotSame(chunk, BufferPool.currentChunk()); + BufferPool.put(buffer); + } + catch (AssertionError ex) + { //this is expected if we release a buffer more than once + ex.printStackTrace(); + } + catch (Throwable t) + { + t.printStackTrace(); + fail(t.getMessage()); + } + finally + { + finished.countDown(); + } + } + }); + } + + finished.await(); + assertEquals(0, executorService.shutdownNow().size()); + + executorService = null; + + // Make sure thread local storage gets GC-ed + System.gc(); + System.gc(); + System.gc(); + + assertTrue(BufferPool.currentChunk().isFree()); + + //make sure the main thread can still allocate buffers + ByteBuffer buffer = BufferPool.get(sizes[0]); + assertNotNull(buffer); + assertEquals(sizes[0], buffer.capacity()); + BufferPool.put(buffer); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java b/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java deleted file mode 100644 index 455fec4..0000000 --- a/tools/stress/src/org/apache/cassandra/stress/generate/FasterRandom.java +++ /dev/null @@ -1,116 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ -package org.apache.cassandra.stress.generate; - -import java.util.Random; - -import org.apache.commons.math3.random.RandomGenerator; - -// based on http://en.wikipedia.org/wiki/Xorshift, but periodically we reseed with our stronger random generator -// note it is also non-atomically updated, so expects to be used by a single thread -public class FasterRandom implements RandomGenerator -{ - final Random random = new Random(); - - private long seed; - private int reseed; - - public void setSeed(int seed) - { - setSeed((long) seed); - } - - public void setSeed(int[] ints) - { - if (ints.length > 1) - setSeed (((long) ints[0] << 32) | ints[1]); - else - setSeed(ints[0]); - } - - public void setSeed(long seed) - { - this.seed = seed; - rollover(); - } - - private void rollover() - { - this.reseed = 0; - random.setSeed(seed); - seed = random.nextLong(); - } - - public void nextBytes(byte[] bytes) - { - int i = 0; - while (i < bytes.length) - { - long next = nextLong(); - while (i < bytes.length) - { - bytes[i++] = (byte) (next & 0xFF); - next >>>= 8; - } - } - } - - public int nextInt() - { - return (int) nextLong(); - } - - public int nextInt(int i) - { - return Math.abs((int) nextLong() % i); - } - - public long nextLong() - { - if (++this.reseed == 32) - rollover(); - - long seed = this.seed; - seed ^= seed >> 12; - seed ^= seed << 25; - seed ^= seed >> 27; - this.seed = seed; - return seed * 2685821657736338717L; - } - - public boolean nextBoolean() - { - return ((int) nextLong() & 1) == 1; - } - - public float nextFloat() - { - return Float.intBitsToFloat((int) nextLong()); - } - - public double nextDouble() - { - return Double.longBitsToDouble(nextLong()); - } - - public double nextGaussian() - { - return random.nextGaussian(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java index 9e2e65b..243ac30 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/Seed.java @@ -20,7 +20,7 @@ package org.apache.cassandra.stress.generate; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import org.apache.cassandra.stress.util.DynamicList; +import org.apache.cassandra.utils.DynamicList; public class Seed implements Comparable<Seed> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java index 071d888..5020b45 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/SeedManager.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.settings.StressSettings; -import org.apache.cassandra.stress.util.DynamicList; +import org.apache.cassandra.utils.LockedDynamicList; public class SeedManager { @@ -34,7 +34,7 @@ public class SeedManager final Generator writes; final Generator reads; final ConcurrentHashMap<Long, Seed> managing = new ConcurrentHashMap<>(); - final DynamicList<Seed> sampleFrom; + final LockedDynamicList<Seed> sampleFrom; final Distribution sample; final long sampleOffset; final int sampleSize; @@ -69,7 +69,7 @@ public class SeedManager long sampleSize = 1 + Math.max(sample.minValue(), sample.maxValue()) - sampleOffset; if (sampleOffset < 0 || sampleSize > Integer.MAX_VALUE) throw new IllegalArgumentException("sample range is invalid"); - this.sampleFrom = new DynamicList<>((int) sampleSize); + this.sampleFrom = new LockedDynamicList<>((int) sampleSize); this.sample = DistributionInverted.invert(sample); this.sampleSize = (int) sampleSize; this.updateSampleImmediately = visits.average() > 1; http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java index 3c15c87..c247e48 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Bytes.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.stress.generate.FasterRandom; +import org.apache.cassandra.utils.FasterRandom; public class Bytes extends Generator<ByteBuffer> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java index b58fee2..db78eb7 100644 --- a/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java +++ b/tools/stress/src/org/apache/cassandra/stress/generate/values/Strings.java @@ -21,7 +21,7 @@ package org.apache.cassandra.stress.generate.values; import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.stress.generate.FasterRandom; +import org.apache.cassandra.utils.FasterRandom; public class Strings extends Generator<String> { http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java b/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java deleted file mode 100644 index ee04063..0000000 --- a/tools/stress/src/org/apache/cassandra/stress/util/DynamicList.java +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.stress.util; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.TreeSet; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.cassandra.stress.generate.FasterRandom; - -// simple thread-unsafe skiplist that permits indexing/removal by position, insertion at the end -// (though easily extended to insertion at any position, not necessary here) -// we use it for sampling items by position for visiting writes in the pool of pending writes -public class DynamicList<E> -{ - - // represents a value and an index simultaneously; each node maintains a list - // of next pointers for each height in the skip-list this node participates in - // (a contiguous range from [0..height)) - public static class Node<E> - { - // stores the size of each descendant - private final int[] size; - // TODO: alternate links to save space - private final Node<E>[] links; - private E value; - - private Node(int height, E value) - { - this.value = value; - links = new Node[height * 2]; - size = new int[height]; - Arrays.fill(size, 1); - } - - private int height() - { - return size.length; - } - - private Node<E> next(int i) - { - return links[i * 2]; - } - - private Node<E> prev(int i) - { - return links[1 + i * 2]; - } - - private void setNext(int i, Node<E> next) - { - links[i * 2] = next; - } - - private void setPrev(int i, Node<E> prev) - { - links[1 + i * 2] = prev; - } - - private Node parent(int parentHeight) - { - Node prev = this; - while (true) - { - int height = prev.height(); - if (parentHeight < height) - return prev; - prev = prev.prev(height - 1); - } - } - } - - private final ReadWriteLock lock = new ReentrantReadWriteLock(); - private final int maxHeight; - private final Node<E> head; - private int size; - - public DynamicList(int maxExpectedSize) - { - this.maxHeight = 3 + Math.max(0, (int) Math.ceil(Math.log(maxExpectedSize) / Math.log(2))); - head = new Node<>(maxHeight, null); - } - - private int randomLevel() - { - return 1 + Integer.bitCount(ThreadLocalRandom.current().nextInt() & ((1 << (maxHeight - 1)) - 1)); - } - - public Node<E> append(E value) - { - return append(value, Integer.MAX_VALUE); - } - - // add the value to the end of the list, and return the associated Node that permits efficient removal - // regardless of its future position in the list from other modifications - public Node<E> append(E value, int maxSize) - { - Node<E> newTail = new Node<>(randomLevel(), value); - - lock.writeLock().lock(); - try - { - if (size >= maxSize) - return null; - size++; - - Node<E> tail = head; - for (int i = maxHeight - 1 ; i >= newTail.height() ; i--) - { - Node<E> next; - while ((next = tail.next(i)) != null) - tail = next; - tail.size[i]++; - } - - for (int i = newTail.height() - 1 ; i >= 0 ; i--) - { - Node<E> next; - while ((next = tail.next(i)) != null) - tail = next; - tail.setNext(i, newTail); - newTail.setPrev(i, tail); - } - - return newTail; - } - finally - { - lock.writeLock().unlock(); - } - } - - // remove the provided node and its associated value from the list - public void remove(Node<E> node) - { - lock.writeLock().lock(); - try - { - assert node.value != null; - node.value = null; - - size--; - - // go up through each level in the skip list, unlinking this node; this entails - // simply linking each neighbour to each other, and appending the size of the - // current level owned by this node's index to the preceding neighbour (since - // ownership is defined as any node that you must visit through the index, - // removal of ourselves from a level means the preceding index entry is the - // entry point to all of the removed node's descendants) - for (int i = 0 ; i < node.height() ; i++) - { - Node<E> prev = node.prev(i); - Node<E> next = node.next(i); - assert prev != null; - prev.setNext(i, next); - if (next != null) - next.setPrev(i, prev); - prev.size[i] += node.size[i] - 1; - } - - // then go up the levels, removing 1 from the size at each height above ours - for (int i = node.height() ; i < maxHeight ; i++) - { - // if we're at our height limit, we backtrack at our top level until we - // hit a neighbour with a greater height - while (i == node.height()) - node = node.prev(i - 1); - node.size[i]--; - } - } - finally - { - lock.writeLock().unlock(); - } - } - - // retrieve the item at the provided index, or return null if the index is past the end of the list - public E get(int index) - { - lock.readLock().lock(); - try - { - if (index >= size) - return null; - - index++; - int c = 0; - Node<E> finger = head; - for (int i = maxHeight - 1 ; i >= 0 ; i--) - { - while (c + finger.size[i] <= index) - { - c += finger.size[i]; - finger = finger.next(i); - } - } - - assert c == index; - return finger.value; - } - finally - { - lock.readLock().unlock(); - } - } - - // some quick and dirty tests to confirm the skiplist works as intended - // don't create a separate unit test - tools tree doesn't currently warrant them - - private boolean isWellFormed() - { - for (int i = 0 ; i < maxHeight ; i++) - { - int c = 0; - for (Node node = head ; node != null ; node = node.next(i)) - { - if (node.prev(i) != null && node.prev(i).next(i) != node) - return false; - if (node.next(i) != null && node.next(i).prev(i) != node) - return false; - c += node.size[i]; - if (i + 1 < maxHeight && node.parent(i + 1).next(i + 1) == node.next(i)) - { - if (node.parent(i + 1).size[i + 1] != c) - return false; - c = 0; - } - } - if (i == maxHeight - 1 && c != size + 1) - return false; - } - return true; - } - - public static void main(String[] args) - { - DynamicList<Integer> list = new DynamicList<>(20); - TreeSet<Integer> canon = new TreeSet<>(); - HashMap<Integer, Node> nodes = new HashMap<>(); - int c = 0; - for (int i = 0 ; i < 100000 ; i++) - { - nodes.put(c, list.append(c)); - canon.add(c); - c++; - } - FasterRandom rand = new FasterRandom(); - assert list.isWellFormed(); - for (int loop = 0 ; loop < 100 ; loop++) - { - System.out.println(loop); - for (int i = 0 ; i < 100000 ; i++) - { - int index = rand.nextInt(100000); - Integer seed = list.get(index); -// assert canon.headSet(seed, false).size() == index; - list.remove(nodes.remove(seed)); - canon.remove(seed); - nodes.put(c, list.append(c)); - canon.add(c); - c++; - } - assert list.isWellFormed(); - } - } - -}