http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/src/java/org/apache/cassandra/utils/memory/BufferPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java b/src/java/org/apache/cassandra/utils/memory/BufferPool.java new file mode 100644 index 0000000..0731301 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java @@ -0,0 +1,858 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.memory; + +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.NoSpamLogger; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.BufferPoolMetrics; +import org.apache.cassandra.utils.concurrent.Ref; + +/** + * A pool of ByteBuffers that can be recycled. + */ +public class BufferPool +{ + /** The size of a page aligned buffer, 64kbit */ + static final int CHUNK_SIZE = 64 << 10; + + @VisibleForTesting + public static long MEMORY_USAGE_THRESHOLD = DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L; + + @VisibleForTesting + public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = DatabaseDescriptor.getBufferPoolUseHeapIfExhausted(); + + @VisibleForTesting + public static boolean DISABLED = Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", "false")); + + @VisibleForTesting + public static boolean DEBUG = false; + + private static final Logger logger = LoggerFactory.getLogger(BufferPool.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES); + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0); + + /** A global pool of chunks (page aligned buffers) */ + private static final GlobalPool globalPool = new GlobalPool(); + + /** A thread local pool of chunks, where chunks come from the global pool */ + private static final ThreadLocal<LocalPool> localPool = new ThreadLocal<LocalPool>() { + @Override + protected LocalPool initialValue() + { + return new LocalPool(); + } + }; + + public static ByteBuffer get(int size) + { + if (DISABLED) + return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); + else + return takeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); + } + + public static ByteBuffer get(int size, BufferType bufferType) + { + boolean direct = bufferType == BufferType.OFF_HEAP; + if (DISABLED | !direct) + return allocate(size, !direct); + else + return takeFromPool(size, !direct); + } + + /** Unlike the get methods, this will return null if the pool is exhausted */ + public static ByteBuffer tryGet(int size) + { + if (DISABLED) + return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); + else + return maybeTakeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED); + } + + private static ByteBuffer allocate(int size, boolean onHeap) + { + return onHeap + ? ByteBuffer.allocate(size) + : ByteBuffer.allocateDirect(size); + } + + private static ByteBuffer takeFromPool(int size, boolean allocateOnHeapWhenExhausted) + { + ByteBuffer ret = maybeTakeFromPool(size, allocateOnHeapWhenExhausted); + if (ret != null) + return ret; + + if (logger.isTraceEnabled()) + logger.trace("Requested buffer size {} has been allocated directly due to lack of capacity", size); + + return localPool.get().allocate(size, allocateOnHeapWhenExhausted); + } + + private static ByteBuffer maybeTakeFromPool(int size, boolean allocateOnHeapWhenExhausted) + { + if (size < 0) + throw new IllegalArgumentException("Size must be positive (" + size + ")"); + + if (size == 0) + return EMPTY_BUFFER; + + if (size > CHUNK_SIZE) + { + if (logger.isTraceEnabled()) + logger.trace("Requested buffer size {} is bigger than {}, allocating directly", size, CHUNK_SIZE); + + return localPool.get().allocate(size, allocateOnHeapWhenExhausted); + } + + return localPool.get().get(size); + } + + public static void put(ByteBuffer buffer) + { + if (!(DISABLED | buffer.hasArray())) + localPool.get().put(buffer); + } + + /** This is not thread safe and should only be used for unit testing. */ + @VisibleForTesting + static void reset() + { + localPool.get().reset(); + globalPool.reset(); + } + + @VisibleForTesting + static Chunk currentChunk() + { + return localPool.get().chunks[0]; + } + + @VisibleForTesting + static int numChunks() + { + int ret = 0; + for (Chunk chunk : localPool.get().chunks) + { + if (chunk != null) + ret++; + } + return ret; + } + + @VisibleForTesting + static void assertAllRecycled() + { + globalPool.debug.check(); + } + + public static long sizeInBytes() + { + return globalPool.sizeInBytes(); + } + + static final class Debug + { + long recycleRound = 1; + final Queue<Chunk> allChunks = new ConcurrentLinkedQueue<>(); + void register(Chunk chunk) + { + allChunks.add(chunk); + } + void recycle(Chunk chunk) + { + chunk.lastRecycled = recycleRound; + } + void check() + { + for (Chunk chunk : allChunks) + assert chunk.lastRecycled == recycleRound; + recycleRound++; + } + } + + /** + * A queue of page aligned buffers, the chunks, which have been sliced from bigger chunks, + * the macro-chunks, also page aligned. Macro-chunks are allocated as long as we have not exceeded the + * memory maximum threshold, MEMORY_USAGE_THRESHOLD and are never released. + * + * This class is shared by multiple thread local pools and must be thread-safe. + */ + static final class GlobalPool + { + /** The size of a bigger chunk, 1-mbit, must be a multiple of CHUNK_SIZE */ + static final int MACRO_CHUNK_SIZE = 1 << 20; + + static + { + assert Integer.bitCount(CHUNK_SIZE) == 1; // must be a power of 2 + assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power of 2 + assert MACRO_CHUNK_SIZE % CHUNK_SIZE == 0; // must be a multiple + + if (DISABLED) + logger.info("Global buffer pool is disabled, allocating {}", ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); + else + logger.info("Global buffer pool is enabled, when pool is exahusted (max is {} mb) it will allocate {}", + MEMORY_USAGE_THRESHOLD / (1024L * 1024L), + ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap"); + } + + private final Debug debug = new Debug(); + private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>(); + // TODO (future): it would be preferable to use a CLStack to improve cache occupancy; it would also be preferable to use "CoreLocal" storage + private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>(); + private final AtomicLong memoryUsage = new AtomicLong(); + + /** Return a chunk, the caller will take owership of the parent chunk. */ + public Chunk get() + { + while (true) + { + Chunk chunk = chunks.poll(); + if (chunk != null) + return chunk; + + if (!allocateMoreChunks()) + // give it one last attempt, in case someone else allocated before us + return chunks.poll(); + } + } + + /** + * This method might be called by multiple threads and that's fine if we add more + * than one chunk at the same time as long as we don't exceed the MEMORY_USAGE_THRESHOLD. + */ + private boolean allocateMoreChunks() + { + while (true) + { + long cur = memoryUsage.get(); + if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD) + { + noSpamLogger.info("Maximum memory usage reached ({} bytes), cannot allocate chunk of {} bytes", + MEMORY_USAGE_THRESHOLD, MACRO_CHUNK_SIZE); + return false; + } + if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE)) + break; + } + + // allocate a large chunk + Chunk chunk = new Chunk(allocateDirectAligned(MACRO_CHUNK_SIZE)); + chunk.acquire(null); + macroChunks.add(chunk); + for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE) + { + Chunk add = new Chunk(chunk.get(CHUNK_SIZE)); + chunks.add(add); + if (DEBUG) + debug.register(add); + } + + return true; + } + + public void recycle(Chunk chunk) + { + chunks.add(chunk); + } + + public long sizeInBytes() + { + return memoryUsage.get(); + } + + /** This is not thread safe and should only be used for unit testing. */ + @VisibleForTesting + void reset() + { + while (!chunks.isEmpty()) + chunks.poll().reset(); + + while (!macroChunks.isEmpty()) + macroChunks.poll().reset(); + + memoryUsage.set(0); + } + } + + /** + * A thread local class that grabs chunks from the global pool for this thread allocations. + * Only one thread can do the allocations but multiple threads can release the allocations. + */ + static final class LocalPool + { + private final static BufferPoolMetrics metrics = new BufferPoolMetrics(); + // a microqueue of Chunks: + // * if any are null, they are at the end; + // * new Chunks are added to the last null index + // * if no null indexes available, the smallest is swapped with the last index, and this replaced + // * this results in a queue that will typically be visited in ascending order of available space, so that + // small allocations preferentially slice from the Chunks with the smallest space available to furnish them + // WARNING: if we ever change the size of this, we must update removeFromLocalQueue, and addChunk + private final Chunk[] chunks = new Chunk[3]; + private byte chunkCount = 0; + + public LocalPool() + { + localPoolReferences.add(new LocalPoolRef(this, localPoolRefQueue)); + } + + private Chunk addChunkFromGlobalPool() + { + Chunk chunk = globalPool.get(); + if (chunk == null) + return null; + + addChunk(chunk); + return chunk; + } + + private void addChunk(Chunk chunk) + { + chunk.acquire(this); + + if (chunkCount < 3) + { + chunks[chunkCount++] = chunk; + return; + } + + int smallestChunkIdx = 0; + if (chunks[1].free() < chunks[0].free()) + smallestChunkIdx = 1; + if (chunks[2].free() < chunks[smallestChunkIdx].free()) + smallestChunkIdx = 2; + + chunks[smallestChunkIdx].release(); + if (smallestChunkIdx != 2) + chunks[smallestChunkIdx] = chunks[2]; + chunks[2] = chunk; + } + + public ByteBuffer get(int size) + { + for (Chunk chunk : chunks) + { // first see if our own chunks can serve this buffer + if (chunk == null) + break; + + ByteBuffer buffer = chunk.get(size); + if (buffer != null) + return buffer; + } + + // else ask the global pool + Chunk chunk = addChunkFromGlobalPool(); + if (chunk != null) + return chunk.get(size); + + return null; + } + + private ByteBuffer allocate(int size, boolean onHeap) + { + metrics.misses.mark(); + return BufferPool.allocate(size, onHeap); + } + + public void put(ByteBuffer buffer) + { + Chunk chunk = Chunk.getParentChunk(buffer); + if (chunk == null) + { + FileUtils.clean(buffer); + return; + } + + LocalPool owner = chunk.owner; + // ask the free method to take exclusive ownership of the act of recycling + // if we are either: already not owned by anyone, or owned by ourselves + long free = chunk.free(buffer, owner == null | owner == this); + if (free == 0L) + { + // 0L => we own recycling responsibility, so must recycle; + chunk.recycle(); + // if we are also the owner, we must remove the Chunk from our local queue + if (owner == this) + removeFromLocalQueue(chunk); + } + else if (((free == -1L) & owner != this) && chunk.owner == null) + { + // although we try to take recycle ownership cheaply, it is not always possible to do so if the owner is racing to unset. + // we must also check after completely freeing if the owner has since been unset, and try to recycle + chunk.tryRecycle(); + } + } + + private void removeFromLocalQueue(Chunk chunk) + { + // since we only have three elements in the queue, it is clearer, easier and faster to just hard code the options + if (chunks[0] == chunk) + { // remove first by shifting back second two + chunks[0] = chunks[1]; + chunks[1] = chunks[2]; + } + else if (chunks[1] == chunk) + { // remove second by shifting back last + chunks[1] = chunks[2]; + } + else assert chunks[2] == chunk; + // whatever we do, the last element myst be null + chunks[2] = null; + chunkCount--; + } + + @VisibleForTesting + void reset() + { + chunkCount = 0; + for (int i = 0; i < chunks.length; i++) + { + if (chunks[i] != null) + { + chunks[i].owner = null; + chunks[i].freeSlots = 0L; + chunks[i].recycle(); + chunks[i] = null; + } + } + } + } + + private static final class LocalPoolRef extends PhantomReference<LocalPool> + { + private final Chunk[] chunks; + public LocalPoolRef(LocalPool localPool, ReferenceQueue<? super LocalPool> q) + { + super(localPool, q); + chunks = localPool.chunks; + } + + public void release() + { + for (int i = 0 ; i < chunks.length ; i++) + { + if (chunks[i] != null) + { + chunks[i].release(); + chunks[i] = null; + } + } + } + } + + private static final ConcurrentLinkedQueue<LocalPoolRef> localPoolReferences = new ConcurrentLinkedQueue<>(); + + private static final ReferenceQueue<Object> localPoolRefQueue = new ReferenceQueue<>(); + private static final ExecutorService EXEC = Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner")); + static + { + EXEC.execute(new Runnable() + { + public void run() + { + try + { + while (true) + { + Object obj = localPoolRefQueue.remove(); + if (obj instanceof LocalPoolRef) + { + ((LocalPoolRef) obj).release(); + localPoolReferences.remove(obj); + } + } + } + catch (InterruptedException e) + { + } + finally + { + EXEC.execute(this); + } + } + }); + } + + private static ByteBuffer allocateDirectAligned(int capacity) + { + int align = MemoryUtil.pageSize(); + if (Integer.bitCount(align) != 1) + throw new IllegalArgumentException("Alignment must be a power of 2"); + + ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + align); + long address = MemoryUtil.getAddress(buffer); + long offset = address & (align -1); // (address % align) + + if (offset == 0) + { // already aligned + buffer.limit(capacity); + } + else + { // shift by offset + int pos = (int)(align - offset); + buffer.position(pos); + buffer.limit(pos + capacity); + } + + return buffer.slice(); + } + + /** + * A memory chunk: it takes a buffer (the slab) and slices it + * into smaller buffers when requested. + * + * It divides the slab into 64 units and keeps a long mask, freeSlots, + * indicating if a unit is in use or not. Each bit in freeSlots corresponds + * to a unit, if the bit is set then the unit is free (available for allocation) + * whilst if it is not set then the unit is in use. + * + * When we receive a request of a given size we round up the size to the nearest + * multiple of allocation units required. Then we search for n consecutive free units, + * where n is the number of units required. We also align to page boundaries. + * + * When we reiceve a release request we work out the position by comparing the buffer + * address to our base address and we simply release the units. + */ + final static class Chunk + { + private final ByteBuffer slab; + private final long baseAddress; + private final int shift; + + private volatile long freeSlots; + private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots"); + + // the pool that is _currently allocating_ from this Chunk + // if this is set, it means the chunk may not be recycled because we may still allocate from it; + // if it has been unset the local pool has finished with it, and it may be recycled + private volatile LocalPool owner; + private long lastRecycled; + private final Chunk original; + + Chunk(Chunk recycle) + { + assert recycle.freeSlots == 0L; + this.slab = recycle.slab; + this.baseAddress = recycle.baseAddress; + this.shift = recycle.shift; + this.freeSlots = -1L; + this.original = recycle.original; + if (DEBUG) + globalPool.debug.recycle(original); + } + + Chunk(ByteBuffer slab) + { + assert !slab.hasArray(); + this.slab = slab; + this.baseAddress = MemoryUtil.getAddress(slab); + + // The number of bits by which we need to shift to obtain a unit + // "31 &" is because numberOfTrailingZeros returns 32 when the capacity is zero + this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 64)); + // -1 means all free whilst 0 means all in use + this.freeSlots = slab.capacity() == 0 ? 0L : -1L; + this.original = DEBUG ? this : null; + } + + /** + * Acquire the chunk for future allocations: set the owner and prep + * the free slots mask. + */ + void acquire(LocalPool owner) + { + assert this.owner == null; + this.owner = owner; + } + + /** + * Set the owner to null and return the chunk to the global pool if the chunk is fully free. + * This method must be called by the LocalPool when it is certain that + * the local pool shall never try to allocate any more buffers from this chunk. + */ + void release() + { + this.owner = null; + tryRecycle(); + } + + void tryRecycle() + { + assert owner == null; + if (isFree() && freeSlotsUpdater.compareAndSet(this, -1L, 0L)) + recycle(); + } + + void recycle() + { + assert freeSlots == 0L; + globalPool.recycle(new Chunk(this)); + } + + /** + * We stash the chunk in the attachment of a buffer + * that was returned by get(), this method simply + * retrives the chunk that sliced a buffer, if any. + */ + static Chunk getParentChunk(ByteBuffer buffer) + { + Object attachment = MemoryUtil.getAttachment(buffer); + + if (attachment instanceof Chunk) + return (Chunk) attachment; + + if (attachment instanceof Ref) + return ((Ref<Chunk>) attachment).get(); + + return null; + } + + ByteBuffer setAttachment(ByteBuffer buffer) + { + if (Ref.DEBUG_ENABLED) + MemoryUtil.setAttachment(buffer, new Ref<>(this, null)); + else + MemoryUtil.setAttachment(buffer, this); + + return buffer; + } + + boolean releaseAttachment(ByteBuffer buffer) + { + Object attachment = MemoryUtil.getAttachment(buffer); + if (attachment == null) + return false; + + if (attachment instanceof Ref) + ((Ref<Chunk>) attachment).release(); + + return true; + } + + @VisibleForTesting + void reset() + { + Chunk parent = getParentChunk(slab); + if (parent != null) + parent.free(slab, false); + else + FileUtils.clean(slab); + } + + @VisibleForTesting + long setFreeSlots(long val) + { + long ret = freeSlots; + freeSlots = val; + return ret; + } + + int capacity() + { + return 64 << shift; + } + + final int unit() + { + return 1 << shift; + } + + final boolean isFree() + { + return freeSlots == -1L; + } + + /** The total free size */ + int free() + { + return Long.bitCount(freeSlots) * unit(); + } + + /** + * Return the next available slice of this size. If + * we have exceeded the capacity we return null. + */ + ByteBuffer get(int size) + { + // how many multiples of our units is the size? + // we add (unit - 1), so that when we divide by unit (>>> shift), we effectively round up + int slotCount = (size - 1 + unit()) >>> shift; + + // if we require more than 64 slots, we cannot possibly accommodate the allocation + if (slotCount > 64) + return null; + + // convert the slotCount into the bits needed in the bitmap, but at the bottom of the register + long slotBits = -1L >>> (64 - slotCount); + + // in order that we always allocate page aligned results, we require that any allocation is "somewhat" aligned + // i.e. any single unit allocation can go anywhere; any 2 unit allocation must begin in one of the first 3 slots + // of a page; a 3 unit must go in the first two slots; and any four unit allocation must be fully page-aligned + + // to achieve this, we construct a searchMask that constrains the bits we find to those we permit starting + // a match from. as we find bits, we remove them from the mask to continue our search. + // this has an odd property when it comes to concurrent alloc/free, as we can safely skip backwards if + // a new slot is freed up, but we always make forward progress (i.e. never check the same bits twice), + // so running time is bounded + long searchMask = 0x1111111111111111L; + searchMask *= 15L >>> ((slotCount - 1) & 3); + // i.e. switch (slotCount & 3) + // case 1: searchMask = 0xFFFFFFFFFFFFFFFFL + // case 2: searchMask = 0x7777777777777777L + // case 3: searchMask = 0x3333333333333333L + // case 0: searchMask = 0x1111111111111111L + + // truncate the mask, removing bits that have too few slots proceeding them + searchMask &= -1L >>> (slotCount - 1); + + // this loop is very unroll friendly, and would achieve high ILP, but not clear if the compiler will exploit this. + // right now, not worth manually exploiting, but worth noting for future + while (true) + { + long cur = freeSlots; + // find the index of the lowest set bit that also occurs in our mask (i.e. is permitted alignment, and not yet searched) + // we take the index, rather than finding the lowest bit, since we must obtain it anyway, and shifting is more efficient + // than multiplication + int index = Long.numberOfTrailingZeros(cur & searchMask); + + // if no bit was actually found, we cannot serve this request, so return null. + // due to truncating the searchMask this immediately terminates any search when we run out of indexes + // that could accommodate the allocation, i.e. is equivalent to checking (64 - index) < slotCount + if (index == 64) + return null; + + // remove this bit from our searchMask, so we don't return here next round + searchMask ^= 1L << index; + // if our bits occur starting at the index, remove ourselves from the bitmask and return + long candidate = slotBits << index; + if ((candidate & cur) == candidate) + { + // here we are sure we will manage to CAS successfully without changing candidate because + // there is only one thread allocating at the moment, the concurrency is with the release + // operations only + while (true) + { + // clear the candidate bits (freeSlots &= ~candidate) + if (freeSlotsUpdater.compareAndSet(this, cur, cur & ~candidate)) + break; + + cur = freeSlots; + // make sure no other thread has cleared the candidate bits + assert ((candidate & cur) == candidate); + } + return get(index << shift, size); + } + } + } + + private ByteBuffer get(int offset, int size) + { + slab.limit(offset + size); + slab.position(offset); + + return setAttachment(slab.slice()); + } + + /** + * Round the size to the next unit multiple. + */ + int roundUp(int v) + { + return BufferPool.roundUp(v, unit()); + } + + /** + * Release a buffer. Return: + * 0L if the buffer must be recycled after the call; + * -1L if it is free (and so we should tryRecycle if owner is now null) + * some other value otherwise + **/ + long free(ByteBuffer buffer, boolean tryRelease) + { + if (!releaseAttachment(buffer)) + return 1L; + + long address = MemoryUtil.getAddress(buffer); + assert (address >= baseAddress) & (address <= baseAddress + capacity()); + + int position = (int)(address - baseAddress); + int size = roundUp(buffer.capacity()); + + position >>= shift; + int slotCount = size >> shift; + + long slotBits = (1L << slotCount) - 1; + long shiftedSlotBits = (slotBits << position); + + if (slotCount == 64) + { + assert size == capacity(); + assert position == 0; + shiftedSlotBits = -1L; + } + + long next; + while (true) + { + long cur = freeSlots; + next = cur | shiftedSlotBits; + assert next == (cur ^ shiftedSlotBits); // ensure no double free + if (tryRelease & (next == -1L)) + next = 0L; + if (freeSlotsUpdater.compareAndSet(this, cur, next)) + return next; + } + } + + @Override + public String toString() + { + return String.format("[slab %s, slots bitmap %s, capacity %d, free %d]", slab, Long.toBinaryString(freeSlots), capacity(), free()); + } + } + + @VisibleForTesting + public static int roundUpNormal(int size) + { + return roundUp(size, CHUNK_SIZE / 64); + } + + private static int roundUp(int size, int unit) + { + int mask = unit - 1; + return (size + mask) & ~mask; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java index d2b2879..5671fa9 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java +++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java @@ -36,6 +36,7 @@ public abstract class MemoryUtil private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET; private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET; private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET; + private static final long DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET; private static final Class<?> BYTE_BUFFER_CLASS; private static final long BYTE_BUFFER_OFFSET_OFFSET; private static final long BYTE_BUFFER_HB_OFFSET; @@ -62,6 +63,7 @@ public abstract class MemoryUtil DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity")); DIRECT_BYTE_BUFFER_LIMIT_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit")); DIRECT_BYTE_BUFFER_POSITION_OFFSET = unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position")); + DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET = unsafe.objectFieldOffset(clazz.getDeclaredField("att")); DIRECT_BYTE_BUFFER_CLASS = clazz; clazz = ByteBuffer.allocate(0).getClass(); @@ -77,6 +79,17 @@ public abstract class MemoryUtil } } + public static int pageSize() + { + return unsafe.pageSize(); + } + + public static long getAddress(ByteBuffer buffer) + { + assert buffer.getClass() == DIRECT_BYTE_BUFFER_CLASS; + return unsafe.getLong(buffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET); + } + public static long allocate(long size) { return Native.malloc(size); @@ -177,24 +190,25 @@ public abstract class MemoryUtil unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length); } - public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer) + public static Object getAttachment(ByteBuffer instance) { - assert(source.isDirect()); - unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET)); - unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET)); - unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET)); - unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET)); - return hollowBuffer; + assert instance.getClass() == DIRECT_BYTE_BUFFER_CLASS; + return unsafe.getObject(instance, DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET); + } + + public static void setAttachment(ByteBuffer instance, Object next) + { + assert instance.getClass() == DIRECT_BYTE_BUFFER_CLASS; + unsafe.putObject(instance, DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET, next); } - public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer) + public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, ByteBuffer hollowBuffer) { - assert(!source.isDirect()); + assert source.getClass() == DIRECT_BYTE_BUFFER_CLASS; + unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET)); unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET)); unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET)); unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET)); - unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET)); - unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET)); return hollowBuffer; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java new file mode 100644 index 0000000..17ac569 --- /dev/null +++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils.memory; + +import java.nio.ByteBuffer; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.DynamicList; + +import static org.junit.Assert.*; + +public class LongBufferPoolTest +{ + private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class); + + @Test + public void testAllocate() throws InterruptedException, ExecutionException + { + testAllocate(Runtime.getRuntime().availableProcessors() * 2, TimeUnit.MINUTES.toNanos(2L), 16 << 20); + } + + private static final class BufferCheck + { + final ByteBuffer buffer; + final long val; + DynamicList.Node<BufferCheck> listnode; + + private BufferCheck(ByteBuffer buffer, long val) + { + this.buffer = buffer; + this.val = val; + } + + void validate() + { + ByteBuffer read = buffer.duplicate(); + while (read.remaining() > 8) + assert read.getLong() == val; + } + + void init() + { + ByteBuffer write = buffer.duplicate(); + while (write.remaining() > 8) + write.putLong(val); + } + } + + public void testAllocate(int threadCount, long duration, int poolSize) throws InterruptedException, ExecutionException + { + final int avgBufferSize = 16 << 10; + final int stdevBufferSize = 10 << 10; // picked to ensure exceeding buffer size is rare, but occurs + final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + + System.out.println(String.format("%s - testing %d threads for %dm", + dateFormat.format(new Date()), + threadCount, + TimeUnit.NANOSECONDS.toMinutes(duration))); + + final long until = System.nanoTime() + duration; + final CountDownLatch latch = new CountDownLatch(threadCount); + final SPSCQueue<BufferCheck>[] sharedRecycle = new SPSCQueue[threadCount]; + final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount]; + for (int i = 0 ; i < sharedRecycle.length ; i++) + { + sharedRecycle[i] = new SPSCQueue<>(); + makingProgress[i] = new AtomicBoolean(true); + } + + ExecutorService executorService = Executors.newFixedThreadPool(threadCount + 2); + List<Future<Boolean>> ret = new ArrayList<>(threadCount); + long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD; + BufferPool.MEMORY_USAGE_THRESHOLD = poolSize; + BufferPool.DEBUG = true; + // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have n=threadCount-1 + int targetSizeQuanta = ((threadCount) * (threadCount - 1)) / 2; + // fix targetSizeQuanta at 1/64th our poolSize, so that we only consciously exceed our pool size limit + targetSizeQuanta = (targetSizeQuanta * poolSize) / 64; + + { + // setup some high churn allocate/deallocate, without any checking + final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>(); + final CountDownLatch doneAdd = new CountDownLatch(1); + executorService.submit(new TestUntil(until) + { + int count = 0; + void testOne() throws Exception + { + if (count * BufferPool.CHUNK_SIZE >= poolSize / 10) + { + if (burn.exhausted) + count = 0; + else + Thread.yield(); + return; + } + + ByteBuffer buffer = BufferPool.tryGet(BufferPool.CHUNK_SIZE); + if (buffer == null) + { + Thread.yield(); + return; + } + + BufferPool.put(buffer); + burn.add(buffer); + count++; + } + void cleanup() + { + doneAdd.countDown(); + } + }); + executorService.submit(new TestUntil(until) + { + void testOne() throws Exception + { + ByteBuffer buffer = burn.poll(); + if (buffer == null) + { + Thread.yield(); + return; + } + BufferPool.put(buffer); + } + void cleanup() + { + Uninterruptibles.awaitUninterruptibly(doneAdd); + } + }); + } + + for (int t = 0; t < threadCount; t++) + { + final int threadIdx = t; + final int targetSize = t == 0 ? BufferPool.CHUNK_SIZE : targetSizeQuanta * t; + + ret.add(executorService.submit(new TestUntil(until) + { + final SPSCQueue<BufferCheck> shareFrom = sharedRecycle[threadIdx]; + final DynamicList<BufferCheck> checks = new DynamicList<>((int) Math.max(1, targetSize / (1 << 10))); + final SPSCQueue<BufferCheck> shareTo = sharedRecycle[(threadIdx + 1) % threadCount]; + final ThreadLocalRandom rand = ThreadLocalRandom.current(); + int totalSize = 0; + int freeingSize = 0; + int size = 0; + + void checkpoint() + { + if (!makingProgress[threadIdx].get()) + makingProgress[threadIdx].set(true); + } + + void testOne() throws Exception + { + + long currentTargetSize = rand.nextInt(poolSize / 1024) == 0 ? 0 : targetSize; + int spinCount = 0; + while (totalSize > currentTargetSize - freeingSize) + { + // free buffers until we're below our target size + if (checks.size() == 0) + { + // if we're out of buffers to free, we're waiting on our neighbour to free them; + // first check if the consuming neighbour has caught up, and if so mark that free + if (shareTo.exhausted) + { + totalSize -= freeingSize; + freeingSize = 0; + } + else if (!recycleFromNeighbour()) + { + if (++spinCount > 1000 && System.nanoTime() > until) + return; + // otherwise, free one of our other neighbour's buffers if can; and otherwise yield + Thread.yield(); + } + continue; + } + + // pick a random buffer, with preference going to earlier ones + BufferCheck check = sample(); + checks.remove(check.listnode); + check.validate(); + + size = BufferPool.roundUpNormal(check.buffer.capacity()); + if (size > BufferPool.CHUNK_SIZE) + size = 0; + + // either share to free, or free immediately + if (rand.nextBoolean()) + { + shareTo.add(check); + freeingSize += size; + // interleave this with potentially messing with the other neighbour's stuff + recycleFromNeighbour(); + } + else + { + check.validate(); + BufferPool.put(check.buffer); + totalSize -= size; + } + } + + // allocate a new buffer + size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) + { + totalSize += BufferPool.roundUpNormal(size); + allocate(size); + } + else if (rand.nextBoolean()) + { + allocate(size); + } + else + { + // perform a burst allocation to exhaust all available memory + while (totalSize < poolSize) + { + size = (int) Math.max(1, avgBufferSize + (stdevBufferSize * rand.nextGaussian())); + if (size <= BufferPool.CHUNK_SIZE) + { + allocate(size); + totalSize += BufferPool.roundUpNormal(size); + } + } + } + + // validate a random buffer we have stashed + checks.get(rand.nextInt(checks.size())).validate(); + + // free all of our neighbour's remaining shared buffers + while (recycleFromNeighbour()); + } + + void cleanup() + { + while (checks.size() > 0) + { + BufferCheck check = checks.get(0); + BufferPool.put(check.buffer); + checks.remove(check.listnode); + } + latch.countDown(); + } + + boolean recycleFromNeighbour() + { + BufferCheck check = shareFrom.poll(); + if (check == null) + return false; + check.validate(); + BufferPool.put(check.buffer); + return true; + } + + BufferCheck allocate(int size) + { + ByteBuffer buffer = BufferPool.get(size); + assertNotNull(buffer); + BufferCheck check = new BufferCheck(buffer, rand.nextLong()); + assertEquals(size, buffer.capacity()); + assertEquals(0, buffer.position()); + check.init(); + check.listnode = checks.append(check); + return check; + } + + BufferCheck sample() + { + // sample with preference to first elements: + // element at index n will be selected with likelihood (size - n) / sum1ToN(size) + int size = checks.size(); + + // pick a random number between 1 and sum1toN(size) + int sampleRange = sum1toN(size); + int sampleIndex = rand.nextInt(sampleRange); + + // then binary search for the N, such that [sum1ToN(N), sum1ToN(N+1)) contains this random number + int moveBy = Math.max(size / 4, 1); + int index = size / 2; + while (true) + { + int baseSampleIndex = sum1toN(index); + int endOfSampleIndex = sum1toN(index + 1); + if (sampleIndex >= baseSampleIndex) + { + if (sampleIndex < endOfSampleIndex) + break; + index += moveBy; + } + else index -= moveBy; + moveBy = Math.max(moveBy / 2, 1); + } + + // this gives us the inverse of our desired value, so just subtract it from the last index + index = size - (index + 1); + + return checks.get(index); + } + + private int sum1toN(int n) + { + return (n * (n + 1)) / 2; + } + })); + } + + boolean first = true; + while (!latch.await(10L, TimeUnit.SECONDS)) + { + if (!first) + BufferPool.assertAllRecycled(); + first = false; + for (AtomicBoolean progress : makingProgress) + { + assert progress.get(); + progress.set(false); + } + } + + for (SPSCQueue<BufferCheck> queue : sharedRecycle) + { + BufferCheck check; + while ( null != (check = queue.poll()) ) + { + check.validate(); + BufferPool.put(check.buffer); + } + } + + assertEquals(0, executorService.shutdownNow().size()); + + BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize; + for (Future<Boolean> r : ret) + assertTrue(r.get()); + + System.out.println(String.format("%s - finished.", + dateFormat.format(new Date()))); + } + + static abstract class TestUntil implements Callable<Boolean> + { + final long until; + protected TestUntil(long until) + { + this.until = until; + } + + abstract void testOne() throws Exception; + void checkpoint() {} + void cleanup() {} + + public Boolean call() throws Exception + { + try + { + while (System.nanoTime() < until) + { + checkpoint(); + for (int i = 0 ; i < 100 ; i++) + testOne(); + } + } + catch (Exception ex) + { + logger.error("Got exception {}, current chunk {}", + ex.getMessage(), + BufferPool.currentChunk()); + ex.printStackTrace(); + return false; + } + finally + { + cleanup(); + } + return true; + } + } + + public static void main(String[] args) throws InterruptedException, ExecutionException + { + new LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), TimeUnit.HOURS.toNanos(2L), 16 << 20); + } + + /** + * A single producer, single consumer queue. + */ + private static final class SPSCQueue<V> + { + static final class Node<V> + { + volatile Node<V> next; + final V value; + Node(V value) + { + this.value = value; + } + } + + private volatile boolean exhausted = true; + Node<V> head = new Node<>(null); + Node<V> tail = head; + + void add(V value) + { + exhausted = false; + tail = tail.next = new Node<>(value); + } + + V poll() + { + Node<V> next = head.next; + if (next == null) + { + // this is racey, but good enough for our purposes + exhausted = true; + return null; + } + head = next; + return next.value; + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/io/compress/CompressorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java index 6018cc7..8ff0074 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.memory.BufferPool; public class CompressorTest { http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index 0c1583d..3c9aa0e 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -18,8 +18,6 @@ * */ package org.apache.cassandra.io.util; - -import org.apache.cassandra.service.FileCacheService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.SyncUtil; @@ -30,10 +28,6 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.cassandra.Util.expectEOF; import static org.apache.cassandra.Util.expectException; @@ -48,6 +42,7 @@ public class BufferedRandomAccessFileTest public void testReadAndWrite() throws Exception { SequentialWriter w = createTempFile("braf"); + ChannelProxy channel = new ChannelProxy(w.getPath()); // writting string of data to the file byte[] data = "Hello".getBytes(); @@ -58,7 +53,7 @@ public class BufferedRandomAccessFileTest w.sync(); // reading small amount of data from file, this is handled by initial buffer - RandomAccessReader r = RandomAccessReader.open(w); + RandomAccessReader r = RandomAccessReader.open(channel); byte[] buffer = new byte[data.length]; assertEquals(data.length, r.read(buffer)); @@ -81,7 +76,7 @@ public class BufferedRandomAccessFileTest w.sync(); - r = RandomAccessReader.open(w); // re-opening file in read-only mode + r = RandomAccessReader.open(channel); // re-opening file in read-only mode // reading written buffer r.seek(initialPosition); // back to initial (before write) position @@ -130,6 +125,7 @@ public class BufferedRandomAccessFileTest w.finish(); r.close(); + channel.close(); } @Test @@ -142,7 +138,8 @@ public class BufferedRandomAccessFileTest byte[] in = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE); w.write(in); - RandomAccessReader r = RandomAccessReader.open(w); + ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader r = RandomAccessReader.open(channel); // Read it into a same size array. byte[] out = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE]; @@ -154,6 +151,7 @@ public class BufferedRandomAccessFileTest r.close(); w.finish(); + channel.close(); } @Test @@ -181,9 +179,11 @@ public class BufferedRandomAccessFileTest w.finish(); // will use cachedlength - RandomAccessReader r = RandomAccessReader.open(tmpFile); - assertEquals(lessThenBuffer.length + biggerThenBuffer.length, r.length()); - r.close(); + try (ChannelProxy channel = new ChannelProxy(tmpFile); + RandomAccessReader r = RandomAccessReader.open(channel)) + { + assertEquals(lessThenBuffer.length + biggerThenBuffer.length, r.length()); + } } @Test @@ -201,7 +201,8 @@ public class BufferedRandomAccessFileTest w.write(data); w.sync(); - final RandomAccessReader r = RandomAccessReader.open(w); + final ChannelProxy channel = new ChannelProxy(w.getPath()); + final RandomAccessReader r = RandomAccessReader.open(channel); ByteBuffer content = r.readBytes((int) r.length()); @@ -225,6 +226,7 @@ public class BufferedRandomAccessFileTest w.finish(); r.close(); + channel.close(); } @Test @@ -235,7 +237,8 @@ public class BufferedRandomAccessFileTest w.write(data); w.finish(); - final RandomAccessReader file = RandomAccessReader.open(w); + final ChannelProxy channel = new ChannelProxy(w.getPath()); + final RandomAccessReader file = RandomAccessReader.open(channel); file.seek(0); assertEquals(file.getFilePointer(), 0); @@ -265,6 +268,7 @@ public class BufferedRandomAccessFileTest }, IllegalArgumentException.class); // throws IllegalArgumentException file.close(); + channel.close(); } @Test @@ -274,7 +278,8 @@ public class BufferedRandomAccessFileTest w.write(generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2)); w.finish(); - RandomAccessReader file = RandomAccessReader.open(w); + ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader file = RandomAccessReader.open(channel); file.seek(0); // back to the beginning of the file assertEquals(file.skipBytes(10), 10); @@ -294,6 +299,7 @@ public class BufferedRandomAccessFileTest assertEquals(file.bytesRemaining(), file.length()); file.close(); + channel.close(); } @Test @@ -308,7 +314,8 @@ public class BufferedRandomAccessFileTest w.sync(); - RandomAccessReader r = RandomAccessReader.open(w); + ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader r = RandomAccessReader.open(channel); // position should change after skip bytes r.seek(0); @@ -322,6 +329,7 @@ public class BufferedRandomAccessFileTest w.finish(); r.close(); + channel.close(); } @Test @@ -344,7 +352,7 @@ public class BufferedRandomAccessFileTest { File file1 = writeTemporaryFile(new byte[16]); try (final ChannelProxy channel = new ChannelProxy(file1); - final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, null)) + final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L)) { expectEOF(new Callable<Object>() { @@ -362,7 +370,7 @@ public class BufferedRandomAccessFileTest { File file1 = writeTemporaryFile(new byte[16]); try (final ChannelProxy channel = new ChannelProxy(file1); - final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, null)) + final RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L)) { expectEOF(new Callable<Object>() { @@ -397,7 +405,8 @@ public class BufferedRandomAccessFileTest w.sync(); - RandomAccessReader r = RandomAccessReader.open(w); + ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader r = RandomAccessReader.open(channel); assertEquals(r.bytesRemaining(), toWrite); @@ -413,6 +422,7 @@ public class BufferedRandomAccessFileTest w.finish(); r.close(); + channel.close(); } @Test @@ -483,7 +493,8 @@ public class BufferedRandomAccessFileTest w.finish(); - RandomAccessReader file = RandomAccessReader.open(w); + ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader file = RandomAccessReader.open(channel); file.seek(10); FileMark mark = file.mark(); @@ -508,6 +519,7 @@ public class BufferedRandomAccessFileTest assertEquals(file.bytesPastMark(), 0); file.close(); + channel.close(); } @Test (expected = AssertionError.class) @@ -518,7 +530,8 @@ public class BufferedRandomAccessFileTest w.write(new byte[30]); w.flush(); - try (RandomAccessReader r = RandomAccessReader.open(w)) + try (ChannelProxy channel = new ChannelProxy(w.getPath()); + RandomAccessReader r = RandomAccessReader.open(channel)) { r.seek(10); r.mark(); @@ -530,71 +543,6 @@ public class BufferedRandomAccessFileTest } @Test - public void testFileCacheService() throws IOException, InterruptedException - { - //see https://issues.apache.org/jira/browse/CASSANDRA-7756 - - final FileCacheService.CacheKey cacheKey = new FileCacheService.CacheKey(); - final int THREAD_COUNT = 40; - ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); - - SequentialWriter w1 = createTempFile("fscache1"); - SequentialWriter w2 = createTempFile("fscache2"); - - w1.write(new byte[30]); - w1.finish(); - - w2.write(new byte[30]); - w2.finish(); - - for (int i = 0; i < 20; i++) - { - - - RandomAccessReader r1 = RandomAccessReader.open(w1); - RandomAccessReader r2 = RandomAccessReader.open(w2); - - - FileCacheService.instance.put(cacheKey, r1); - FileCacheService.instance.put(cacheKey, r2); - - final CountDownLatch finished = new CountDownLatch(THREAD_COUNT); - final AtomicBoolean hadError = new AtomicBoolean(false); - - for (int k = 0; k < THREAD_COUNT; k++) - { - executorService.execute( new Runnable() - { - @Override - public void run() - { - try - { - long size = FileCacheService.instance.sizeInBytes(); - - while (size > 0) - size = FileCacheService.instance.sizeInBytes(); - } - catch (Throwable t) - { - t.printStackTrace(); - hadError.set(true); - } - finally - { - finished.countDown(); - } - } - }); - - } - - finished.await(); - assert !hadError.get(); - } - } - - @Test public void testReadOnly() throws IOException { SequentialWriter file = createTempFile("brafReadOnlyTest");