http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/src/java/org/apache/cassandra/utils/memory/BufferPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/BufferPool.java 
b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
new file mode 100644
index 0000000..0731301
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/BufferPool.java
@@ -0,0 +1,858 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.memory;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.BufferPoolMetrics;
+import org.apache.cassandra.utils.concurrent.Ref;
+
+/**
+ * A pool of ByteBuffers that can be recycled.
+ */
+public class BufferPool
+{
+    /** The size of a page aligned buffer, 64kbit */
+    static final int CHUNK_SIZE = 64 << 10;
+
+    @VisibleForTesting
+    public static long MEMORY_USAGE_THRESHOLD = 
DatabaseDescriptor.getFileCacheSizeInMB() * 1024L * 1024L;
+
+    @VisibleForTesting
+    public static boolean ALLOCATE_ON_HEAP_WHEN_EXAHUSTED = 
DatabaseDescriptor.getBufferPoolUseHeapIfExhausted();
+
+    @VisibleForTesting
+    public static boolean DISABLED = 
Boolean.parseBoolean(System.getProperty("cassandra.test.disable_buffer_pool", 
"false"));
+
+    @VisibleForTesting
+    public static boolean DEBUG = false;
+
+    private static final Logger logger = 
LoggerFactory.getLogger(BufferPool.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 15L, TimeUnit.MINUTES);
+    private static final ByteBuffer EMPTY_BUFFER = 
ByteBuffer.allocateDirect(0);
+
+    /** A global pool of chunks (page aligned buffers) */
+    private static final GlobalPool globalPool = new GlobalPool();
+
+    /** A thread local pool of chunks, where chunks come from the global pool 
*/
+    private static final ThreadLocal<LocalPool> localPool = new 
ThreadLocal<LocalPool>() {
+        @Override
+        protected LocalPool initialValue()
+        {
+            return new LocalPool();
+        }
+    };
+
+    public static ByteBuffer get(int size)
+    {
+        if (DISABLED)
+            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
+        else
+            return takeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
+    }
+
+    public static ByteBuffer get(int size, BufferType bufferType)
+    {
+        boolean direct = bufferType == BufferType.OFF_HEAP;
+        if (DISABLED | !direct)
+            return allocate(size, !direct);
+        else
+            return takeFromPool(size, !direct);
+    }
+
+    /** Unlike the get methods, this will return null if the pool is exhausted 
*/
+    public static ByteBuffer tryGet(int size)
+    {
+        if (DISABLED)
+            return allocate(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
+        else
+            return maybeTakeFromPool(size, ALLOCATE_ON_HEAP_WHEN_EXAHUSTED);
+    }
+
+    private static ByteBuffer allocate(int size, boolean onHeap)
+    {
+        return onHeap
+               ? ByteBuffer.allocate(size)
+               : ByteBuffer.allocateDirect(size);
+    }
+
+    private static ByteBuffer takeFromPool(int size, boolean 
allocateOnHeapWhenExhausted)
+    {
+        ByteBuffer ret = maybeTakeFromPool(size, allocateOnHeapWhenExhausted);
+        if (ret != null)
+            return ret;
+
+        if (logger.isTraceEnabled())
+            logger.trace("Requested buffer size {} has been allocated directly 
due to lack of capacity", size);
+
+        return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
+    }
+
+    private static ByteBuffer maybeTakeFromPool(int size, boolean 
allocateOnHeapWhenExhausted)
+    {
+        if (size < 0)
+            throw new IllegalArgumentException("Size must be positive (" + 
size + ")");
+
+        if (size == 0)
+            return EMPTY_BUFFER;
+
+        if (size > CHUNK_SIZE)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Requested buffer size {} is bigger than {}, 
allocating directly", size, CHUNK_SIZE);
+
+            return localPool.get().allocate(size, allocateOnHeapWhenExhausted);
+        }
+
+        return localPool.get().get(size);
+    }
+
+    public static void put(ByteBuffer buffer)
+    {
+        if (!(DISABLED | buffer.hasArray()))
+            localPool.get().put(buffer);
+    }
+
+    /** This is not thread safe and should only be used for unit testing. */
+    @VisibleForTesting
+    static void reset()
+    {
+        localPool.get().reset();
+        globalPool.reset();
+    }
+
+    @VisibleForTesting
+    static Chunk currentChunk()
+    {
+        return localPool.get().chunks[0];
+    }
+
+    @VisibleForTesting
+    static int numChunks()
+    {
+        int ret = 0;
+        for (Chunk chunk : localPool.get().chunks)
+        {
+            if (chunk != null)
+                ret++;
+        }
+        return ret;
+    }
+
+    @VisibleForTesting
+    static void assertAllRecycled()
+    {
+        globalPool.debug.check();
+    }
+
+    public static long sizeInBytes()
+    {
+        return globalPool.sizeInBytes();
+    }
+
+    static final class Debug
+    {
+        long recycleRound = 1;
+        final Queue<Chunk> allChunks = new ConcurrentLinkedQueue<>();
+        void register(Chunk chunk)
+        {
+            allChunks.add(chunk);
+        }
+        void recycle(Chunk chunk)
+        {
+            chunk.lastRecycled = recycleRound;
+        }
+        void check()
+        {
+            for (Chunk chunk : allChunks)
+                assert chunk.lastRecycled == recycleRound;
+            recycleRound++;
+        }
+    }
+
+    /**
+     * A queue of page aligned buffers, the chunks, which have been sliced 
from bigger chunks,
+     * the macro-chunks, also page aligned. Macro-chunks are allocated as long 
as we have not exceeded the
+     * memory maximum threshold, MEMORY_USAGE_THRESHOLD and are never released.
+     *
+     * This class is shared by multiple thread local pools and must be 
thread-safe.
+     */
+    static final class GlobalPool
+    {
+        /** The size of a bigger chunk, 1-mbit, must be a multiple of 
CHUNK_SIZE */
+        static final int MACRO_CHUNK_SIZE = 1 << 20;
+
+        static
+        {
+            assert Integer.bitCount(CHUNK_SIZE) == 1; // must be a power of 2
+            assert Integer.bitCount(MACRO_CHUNK_SIZE) == 1; // must be a power 
of 2
+            assert MACRO_CHUNK_SIZE % CHUNK_SIZE == 0; // must be a multiple
+
+            if (DISABLED)
+                logger.info("Global buffer pool is disabled, allocating {}", 
ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off heap");
+            else
+                logger.info("Global buffer pool is enabled, when pool is 
exahusted (max is {} mb) it will allocate {}",
+                            MEMORY_USAGE_THRESHOLD / (1024L * 1024L),
+                            ALLOCATE_ON_HEAP_WHEN_EXAHUSTED ? "on heap" : "off 
heap");
+        }
+
+        private final Debug debug = new Debug();
+        private final Queue<Chunk> macroChunks = new ConcurrentLinkedQueue<>();
+        // TODO (future): it would be preferable to use a CLStack to improve 
cache occupancy; it would also be preferable to use "CoreLocal" storage
+        private final Queue<Chunk> chunks = new ConcurrentLinkedQueue<>();
+        private final AtomicLong memoryUsage = new AtomicLong();
+
+        /** Return a chunk, the caller will take owership of the parent chunk. 
*/
+        public Chunk get()
+        {
+            while (true)
+            {
+                Chunk chunk = chunks.poll();
+                if (chunk != null)
+                    return chunk;
+
+                if (!allocateMoreChunks())
+                    // give it one last attempt, in case someone else 
allocated before us
+                    return chunks.poll();
+            }
+        }
+
+        /**
+         * This method might be called by multiple threads and that's fine if 
we add more
+         * than one chunk at the same time as long as we don't exceed the 
MEMORY_USAGE_THRESHOLD.
+         */
+        private boolean allocateMoreChunks()
+        {
+            while (true)
+            {
+                long cur = memoryUsage.get();
+                if (cur + MACRO_CHUNK_SIZE > MEMORY_USAGE_THRESHOLD)
+                {
+                    noSpamLogger.info("Maximum memory usage reached ({} 
bytes), cannot allocate chunk of {} bytes",
+                                      MEMORY_USAGE_THRESHOLD, 
MACRO_CHUNK_SIZE);
+                    return false;
+                }
+                if (memoryUsage.compareAndSet(cur, cur + MACRO_CHUNK_SIZE))
+                    break;
+            }
+
+            // allocate a large chunk
+            Chunk chunk = new Chunk(allocateDirectAligned(MACRO_CHUNK_SIZE));
+            chunk.acquire(null);
+            macroChunks.add(chunk);
+            for (int i = 0 ; i < MACRO_CHUNK_SIZE ; i += CHUNK_SIZE)
+            {
+                Chunk add = new Chunk(chunk.get(CHUNK_SIZE));
+                chunks.add(add);
+                if (DEBUG)
+                    debug.register(add);
+            }
+
+            return true;
+        }
+
+        public void recycle(Chunk chunk)
+        {
+            chunks.add(chunk);
+        }
+
+        public long sizeInBytes()
+        {
+            return memoryUsage.get();
+        }
+
+        /** This is not thread safe and should only be used for unit testing. 
*/
+        @VisibleForTesting
+        void reset()
+        {
+            while (!chunks.isEmpty())
+                chunks.poll().reset();
+
+            while (!macroChunks.isEmpty())
+                macroChunks.poll().reset();
+
+            memoryUsage.set(0);
+        }
+    }
+
+    /**
+     * A thread local class that grabs chunks from the global pool for this 
thread allocations.
+     * Only one thread can do the allocations but multiple threads can release 
the allocations.
+     */
+    static final class LocalPool
+    {
+        private final static BufferPoolMetrics metrics = new 
BufferPoolMetrics();
+        // a microqueue of Chunks:
+        //  * if any are null, they are at the end;
+        //  * new Chunks are added to the last null index
+        //  * if no null indexes available, the smallest is swapped with the 
last index, and this replaced
+        //  * this results in a queue that will typically be visited in 
ascending order of available space, so that
+        //    small allocations preferentially slice from the Chunks with the 
smallest space available to furnish them
+        // WARNING: if we ever change the size of this, we must update 
removeFromLocalQueue, and addChunk
+        private final Chunk[] chunks = new Chunk[3];
+        private byte chunkCount = 0;
+
+        public LocalPool()
+        {
+            localPoolReferences.add(new LocalPoolRef(this, localPoolRefQueue));
+        }
+
+        private Chunk addChunkFromGlobalPool()
+        {
+            Chunk chunk = globalPool.get();
+            if (chunk == null)
+                return null;
+
+            addChunk(chunk);
+            return chunk;
+        }
+
+        private void addChunk(Chunk chunk)
+        {
+            chunk.acquire(this);
+
+            if (chunkCount < 3)
+            {
+                chunks[chunkCount++] = chunk;
+                return;
+            }
+
+            int smallestChunkIdx = 0;
+            if (chunks[1].free() < chunks[0].free())
+                smallestChunkIdx = 1;
+            if (chunks[2].free() < chunks[smallestChunkIdx].free())
+                smallestChunkIdx = 2;
+
+            chunks[smallestChunkIdx].release();
+            if (smallestChunkIdx != 2)
+                chunks[smallestChunkIdx] = chunks[2];
+            chunks[2] = chunk;
+        }
+
+        public ByteBuffer get(int size)
+        {
+            for (Chunk chunk : chunks)
+            { // first see if our own chunks can serve this buffer
+                if (chunk == null)
+                    break;
+
+                ByteBuffer buffer = chunk.get(size);
+                if (buffer != null)
+                    return buffer;
+            }
+
+            // else ask the global pool
+            Chunk chunk = addChunkFromGlobalPool();
+            if (chunk != null)
+                return chunk.get(size);
+
+           return null;
+        }
+
+        private ByteBuffer allocate(int size, boolean onHeap)
+        {
+            metrics.misses.mark();
+            return BufferPool.allocate(size, onHeap);
+        }
+
+        public void put(ByteBuffer buffer)
+        {
+            Chunk chunk = Chunk.getParentChunk(buffer);
+            if (chunk == null)
+            {
+                FileUtils.clean(buffer);
+                return;
+            }
+
+            LocalPool owner = chunk.owner;
+            // ask the free method to take exclusive ownership of the act of 
recycling
+            // if we are either: already not owned by anyone, or owned by 
ourselves
+            long free = chunk.free(buffer, owner == null | owner == this);
+            if (free == 0L)
+            {
+                // 0L => we own recycling responsibility, so must recycle;
+                chunk.recycle();
+                // if we are also the owner, we must remove the Chunk from our 
local queue
+                if (owner == this)
+                    removeFromLocalQueue(chunk);
+            }
+            else if (((free == -1L) & owner != this) && chunk.owner == null)
+            {
+                // although we try to take recycle ownership cheaply, it is 
not always possible to do so if the owner is racing to unset.
+                // we must also check after completely freeing if the owner 
has since been unset, and try to recycle
+                chunk.tryRecycle();
+            }
+        }
+
+        private void removeFromLocalQueue(Chunk chunk)
+        {
+            // since we only have three elements in the queue, it is clearer, 
easier and faster to just hard code the options
+            if (chunks[0] == chunk)
+            {   // remove first by shifting back second two
+                chunks[0] = chunks[1];
+                chunks[1] = chunks[2];
+            }
+            else if (chunks[1] == chunk)
+            {   // remove second by shifting back last
+                chunks[1] = chunks[2];
+            }
+            else assert chunks[2] == chunk;
+            // whatever we do, the last element myst be null
+            chunks[2] = null;
+            chunkCount--;
+        }
+
+        @VisibleForTesting
+        void reset()
+        {
+            chunkCount = 0;
+            for (int i = 0; i < chunks.length; i++)
+            {
+                if (chunks[i] != null)
+                {
+                    chunks[i].owner = null;
+                    chunks[i].freeSlots = 0L;
+                    chunks[i].recycle();
+                    chunks[i] = null;
+                }
+            }
+        }
+    }
+
+    private static final class LocalPoolRef extends  
PhantomReference<LocalPool>
+    {
+        private final Chunk[] chunks;
+        public LocalPoolRef(LocalPool localPool, ReferenceQueue<? super 
LocalPool> q)
+        {
+            super(localPool, q);
+            chunks = localPool.chunks;
+        }
+
+        public void release()
+        {
+            for (int i = 0 ; i < chunks.length ; i++)
+            {
+                if (chunks[i] != null)
+                {
+                    chunks[i].release();
+                    chunks[i] = null;
+                }
+            }
+        }
+    }
+
+    private static final ConcurrentLinkedQueue<LocalPoolRef> 
localPoolReferences = new ConcurrentLinkedQueue<>();
+
+    private static final ReferenceQueue<Object> localPoolRefQueue = new 
ReferenceQueue<>();
+    private static final ExecutorService EXEC = 
Executors.newFixedThreadPool(1, new NamedThreadFactory("LocalPool-Cleaner"));
+    static
+    {
+        EXEC.execute(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    while (true)
+                    {
+                        Object obj = localPoolRefQueue.remove();
+                        if (obj instanceof LocalPoolRef)
+                        {
+                            ((LocalPoolRef) obj).release();
+                            localPoolReferences.remove(obj);
+                        }
+                    }
+                }
+                catch (InterruptedException e)
+                {
+                }
+                finally
+                {
+                    EXEC.execute(this);
+                }
+            }
+        });
+    }
+
+    private static ByteBuffer allocateDirectAligned(int capacity)
+    {
+        int align = MemoryUtil.pageSize();
+        if (Integer.bitCount(align) != 1)
+            throw new IllegalArgumentException("Alignment must be a power of 
2");
+
+        ByteBuffer buffer = ByteBuffer.allocateDirect(capacity + align);
+        long address = MemoryUtil.getAddress(buffer);
+        long offset = address & (align -1); // (address % align)
+
+        if (offset == 0)
+        { // already aligned
+            buffer.limit(capacity);
+        }
+        else
+        { // shift by offset
+            int pos = (int)(align - offset);
+            buffer.position(pos);
+            buffer.limit(pos + capacity);
+        }
+
+        return buffer.slice();
+    }
+
+    /**
+     * A memory chunk: it takes a buffer (the slab) and slices it
+     * into smaller buffers when requested.
+     *
+     * It divides the slab into 64 units and keeps a long mask, freeSlots,
+     * indicating if a unit is in use or not. Each bit in freeSlots corresponds
+     * to a unit, if the bit is set then the unit is free (available for 
allocation)
+     * whilst if it is not set then the unit is in use.
+     *
+     * When we receive a request of a given size we round up the size to the 
nearest
+     * multiple of allocation units required. Then we search for n consecutive 
free units,
+     * where n is the number of units required. We also align to page 
boundaries.
+     *
+     * When we reiceve a release request we work out the position by comparing 
the buffer
+     * address to our base address and we simply release the units.
+     */
+    final static class Chunk
+    {
+        private final ByteBuffer slab;
+        private final long baseAddress;
+        private final int shift;
+
+        private volatile long freeSlots;
+        private static final AtomicLongFieldUpdater<Chunk> freeSlotsUpdater = 
AtomicLongFieldUpdater.newUpdater(Chunk.class, "freeSlots");
+
+        // the pool that is _currently allocating_ from this Chunk
+        // if this is set, it means the chunk may not be recycled because we 
may still allocate from it;
+        // if it has been unset the local pool has finished with it, and it 
may be recycled
+        private volatile LocalPool owner;
+        private long lastRecycled;
+        private final Chunk original;
+
+        Chunk(Chunk recycle)
+        {
+            assert recycle.freeSlots == 0L;
+            this.slab = recycle.slab;
+            this.baseAddress = recycle.baseAddress;
+            this.shift = recycle.shift;
+            this.freeSlots = -1L;
+            this.original = recycle.original;
+            if (DEBUG)
+                globalPool.debug.recycle(original);
+        }
+
+        Chunk(ByteBuffer slab)
+        {
+            assert !slab.hasArray();
+            this.slab = slab;
+            this.baseAddress = MemoryUtil.getAddress(slab);
+
+            // The number of bits by which we need to shift to obtain a unit
+            // "31 &" is because numberOfTrailingZeros returns 32 when the 
capacity is zero
+            this.shift = 31 & (Integer.numberOfTrailingZeros(slab.capacity() / 
64));
+            // -1 means all free whilst 0 means all in use
+            this.freeSlots = slab.capacity() == 0 ? 0L : -1L;
+            this.original = DEBUG ? this : null;
+        }
+
+        /**
+         * Acquire the chunk for future allocations: set the owner and prep
+         * the free slots mask.
+         */
+        void acquire(LocalPool owner)
+        {
+            assert this.owner == null;
+            this.owner = owner;
+        }
+
+        /**
+         * Set the owner to null and return the chunk to the global pool if 
the chunk is fully free.
+         * This method must be called by the LocalPool when it is certain that
+         * the local pool shall never try to allocate any more buffers from 
this chunk.
+         */
+        void release()
+        {
+            this.owner = null;
+            tryRecycle();
+        }
+
+        void tryRecycle()
+        {
+            assert owner == null;
+            if (isFree() && freeSlotsUpdater.compareAndSet(this, -1L, 0L))
+                recycle();
+        }
+
+        void recycle()
+        {
+            assert freeSlots == 0L;
+            globalPool.recycle(new Chunk(this));
+        }
+
+        /**
+         * We stash the chunk in the attachment of a buffer
+         * that was returned by get(), this method simply
+         * retrives the chunk that sliced a buffer, if any.
+         */
+        static Chunk getParentChunk(ByteBuffer buffer)
+        {
+            Object attachment = MemoryUtil.getAttachment(buffer);
+
+            if (attachment instanceof Chunk)
+                return (Chunk) attachment;
+
+            if (attachment instanceof Ref)
+                return ((Ref<Chunk>) attachment).get();
+
+            return null;
+        }
+
+        ByteBuffer setAttachment(ByteBuffer buffer)
+        {
+            if (Ref.DEBUG_ENABLED)
+                MemoryUtil.setAttachment(buffer, new Ref<>(this, null));
+            else
+                MemoryUtil.setAttachment(buffer, this);
+
+            return buffer;
+        }
+
+        boolean releaseAttachment(ByteBuffer buffer)
+        {
+            Object attachment = MemoryUtil.getAttachment(buffer);
+            if (attachment == null)
+                return false;
+
+            if (attachment instanceof Ref)
+                ((Ref<Chunk>) attachment).release();
+
+            return true;
+        }
+
+        @VisibleForTesting
+        void reset()
+        {
+            Chunk parent = getParentChunk(slab);
+            if (parent != null)
+                parent.free(slab, false);
+            else
+                FileUtils.clean(slab);
+        }
+
+        @VisibleForTesting
+        long setFreeSlots(long val)
+        {
+            long ret = freeSlots;
+            freeSlots = val;
+            return ret;
+        }
+
+        int capacity()
+        {
+            return 64 << shift;
+        }
+
+        final int unit()
+        {
+            return 1 << shift;
+        }
+
+        final boolean isFree()
+        {
+            return freeSlots == -1L;
+        }
+
+        /** The total free size */
+        int free()
+        {
+            return Long.bitCount(freeSlots) * unit();
+        }
+
+        /**
+         * Return the next available slice of this size. If
+         * we have exceeded the capacity we return null.
+         */
+        ByteBuffer get(int size)
+        {
+            // how many multiples of our units is the size?
+            // we add (unit - 1), so that when we divide by unit (>>> shift), 
we effectively round up
+            int slotCount = (size - 1 + unit()) >>> shift;
+
+            // if we require more than 64 slots, we cannot possibly 
accommodate the allocation
+            if (slotCount > 64)
+                return null;
+
+            // convert the slotCount into the bits needed in the bitmap, but 
at the bottom of the register
+            long slotBits = -1L >>> (64 - slotCount);
+
+            // in order that we always allocate page aligned results, we 
require that any allocation is "somewhat" aligned
+            // i.e. any single unit allocation can go anywhere; any 2 unit 
allocation must begin in one of the first 3 slots
+            // of a page; a 3 unit must go in the first two slots; and any 
four unit allocation must be fully page-aligned
+
+            // to achieve this, we construct a searchMask that constrains the 
bits we find to those we permit starting
+            // a match from. as we find bits, we remove them from the mask to 
continue our search.
+            // this has an odd property when it comes to concurrent 
alloc/free, as we can safely skip backwards if
+            // a new slot is freed up, but we always make forward progress 
(i.e. never check the same bits twice),
+            // so running time is bounded
+            long searchMask = 0x1111111111111111L;
+            searchMask *= 15L >>> ((slotCount - 1) & 3);
+            // i.e. switch (slotCount & 3)
+            // case 1: searchMask = 0xFFFFFFFFFFFFFFFFL
+            // case 2: searchMask = 0x7777777777777777L
+            // case 3: searchMask = 0x3333333333333333L
+            // case 0: searchMask = 0x1111111111111111L
+
+            // truncate the mask, removing bits that have too few slots 
proceeding them
+            searchMask &= -1L >>> (slotCount - 1);
+
+            // this loop is very unroll friendly, and would achieve high ILP, 
but not clear if the compiler will exploit this.
+            // right now, not worth manually exploiting, but worth noting for 
future
+            while (true)
+            {
+                long cur = freeSlots;
+                // find the index of the lowest set bit that also occurs in 
our mask (i.e. is permitted alignment, and not yet searched)
+                // we take the index, rather than finding the lowest bit, 
since we must obtain it anyway, and shifting is more efficient
+                // than multiplication
+                int index = Long.numberOfTrailingZeros(cur & searchMask);
+
+                // if no bit was actually found, we cannot serve this request, 
so return null.
+                // due to truncating the searchMask this immediately 
terminates any search when we run out of indexes
+                // that could accommodate the allocation, i.e. is equivalent 
to checking (64 - index) < slotCount
+                if (index == 64)
+                    return null;
+
+                // remove this bit from our searchMask, so we don't return 
here next round
+                searchMask ^= 1L << index;
+                // if our bits occur starting at the index, remove ourselves 
from the bitmask and return
+                long candidate = slotBits << index;
+                if ((candidate & cur) == candidate)
+                {
+                    // here we are sure we will manage to CAS successfully 
without changing candidate because
+                    // there is only one thread allocating at the moment, the 
concurrency is with the release
+                    // operations only
+                    while (true)
+                    {
+                        // clear the candidate bits (freeSlots &= ~candidate)
+                        if (freeSlotsUpdater.compareAndSet(this, cur, cur & 
~candidate))
+                            break;
+
+                        cur = freeSlots;
+                        // make sure no other thread has cleared the candidate 
bits
+                        assert ((candidate & cur) == candidate);
+                    }
+                    return get(index << shift, size);
+                }
+            }
+        }
+
+        private ByteBuffer get(int offset, int size)
+        {
+            slab.limit(offset + size);
+            slab.position(offset);
+
+            return setAttachment(slab.slice());
+        }
+
+        /**
+         * Round the size to the next unit multiple.
+         */
+        int roundUp(int v)
+        {
+            return BufferPool.roundUp(v, unit());
+        }
+
+        /**
+         * Release a buffer. Return:
+         *    0L if the buffer must be recycled after the call;
+         *   -1L if it is free (and so we should tryRecycle if owner is now 
null)
+         *    some other value otherwise
+         **/
+        long free(ByteBuffer buffer, boolean tryRelease)
+        {
+            if (!releaseAttachment(buffer))
+                return 1L;
+
+            long address = MemoryUtil.getAddress(buffer);
+            assert (address >= baseAddress) & (address <= baseAddress + 
capacity());
+
+            int position = (int)(address - baseAddress);
+            int size = roundUp(buffer.capacity());
+
+            position >>= shift;
+            int slotCount = size >> shift;
+
+            long slotBits = (1L << slotCount) - 1;
+            long shiftedSlotBits = (slotBits << position);
+
+            if (slotCount == 64)
+            {
+                assert size == capacity();
+                assert position == 0;
+                shiftedSlotBits = -1L;
+            }
+
+            long next;
+            while (true)
+            {
+                long cur = freeSlots;
+                next = cur | shiftedSlotBits;
+                assert next == (cur ^ shiftedSlotBits); // ensure no double 
free
+                if (tryRelease & (next == -1L))
+                    next = 0L;
+                if (freeSlotsUpdater.compareAndSet(this, cur, next))
+                    return next;
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("[slab %s, slots bitmap %s, capacity %d, free 
%d]", slab, Long.toBinaryString(freeSlots), capacity(), free());
+        }
+    }
+
+    @VisibleForTesting
+    public static int roundUpNormal(int size)
+    {
+        return roundUp(size, CHUNK_SIZE / 64);
+    }
+
+    private static int roundUp(int size, int unit)
+    {
+        int mask = unit - 1;
+        return (size + mask) & ~mask;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java 
b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index d2b2879..5671fa9 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -36,6 +36,7 @@ public abstract class MemoryUtil
     private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET;
     private static final long DIRECT_BYTE_BUFFER_POSITION_OFFSET;
+    private static final long DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET;
     private static final Class<?> BYTE_BUFFER_CLASS;
     private static final long BYTE_BUFFER_OFFSET_OFFSET;
     private static final long BYTE_BUFFER_HB_OFFSET;
@@ -62,6 +63,7 @@ public abstract class MemoryUtil
             DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = 
unsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity"));
             DIRECT_BYTE_BUFFER_LIMIT_OFFSET = 
unsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit"));
             DIRECT_BYTE_BUFFER_POSITION_OFFSET = 
unsafe.objectFieldOffset(Buffer.class.getDeclaredField("position"));
+            DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET = 
unsafe.objectFieldOffset(clazz.getDeclaredField("att"));
             DIRECT_BYTE_BUFFER_CLASS = clazz;
 
             clazz = ByteBuffer.allocate(0).getClass();
@@ -77,6 +79,17 @@ public abstract class MemoryUtil
         }
     }
 
+    public static int pageSize()
+    {
+        return unsafe.pageSize();
+    }
+
+    public static long getAddress(ByteBuffer buffer)
+    {
+        assert buffer.getClass() == DIRECT_BYTE_BUFFER_CLASS;
+        return unsafe.getLong(buffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET);
+    }
+
     public static long allocate(long size)
     {
         return Native.malloc(size);
@@ -177,24 +190,25 @@ public abstract class MemoryUtil
         unsafe.putInt(instance, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, length);
     }
 
-    public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, 
ByteBuffer hollowBuffer)
+    public static Object getAttachment(ByteBuffer instance)
     {
-        assert(source.isDirect());
-        unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, 
unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
-        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
-        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
-        unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
-        return hollowBuffer;
+        assert instance.getClass() == DIRECT_BYTE_BUFFER_CLASS;
+        return unsafe.getObject(instance, 
DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET);
+    }
+
+    public static void setAttachment(ByteBuffer instance, Object next)
+    {
+        assert instance.getClass() == DIRECT_BYTE_BUFFER_CLASS;
+        unsafe.putObject(instance, DIRECT_BYTE_BUFFER_ATTACHMENT_OFFSET, next);
     }
 
-    public static ByteBuffer duplicateByteBuffer(ByteBuffer source, ByteBuffer 
hollowBuffer)
+    public static ByteBuffer duplicateDirectByteBuffer(ByteBuffer source, 
ByteBuffer hollowBuffer)
     {
-        assert(!source.isDirect());
+        assert source.getClass() == DIRECT_BYTE_BUFFER_CLASS;
+        unsafe.putLong(hollowBuffer, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, 
unsafe.getLong(source, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET));
         unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_POSITION_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_POSITION_OFFSET));
         unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_LIMIT_OFFSET));
         unsafe.putInt(hollowBuffer, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, 
unsafe.getInt(source, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET));
-        unsafe.putInt(hollowBuffer, BYTE_BUFFER_OFFSET_OFFSET, 
unsafe.getInt(source, BYTE_BUFFER_OFFSET_OFFSET));
-        unsafe.putObject(hollowBuffer, BYTE_BUFFER_HB_OFFSET, 
unsafe.getObject(source, BYTE_BUFFER_HB_OFFSET));
         return hollowBuffer;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
----------------------------------------------------------------------
diff --git 
a/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java 
b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
new file mode 100644
index 0000000..17ac569
--- /dev/null
+++ b/test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.DynamicList;
+
+import static org.junit.Assert.*;
+
+public class LongBufferPoolTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LongBufferPoolTest.class);
+
+    @Test
+    public void testAllocate() throws InterruptedException, ExecutionException
+    {
+        testAllocate(Runtime.getRuntime().availableProcessors() * 2, 
TimeUnit.MINUTES.toNanos(2L), 16 << 20);
+    }
+
+    private static final class BufferCheck
+    {
+        final ByteBuffer buffer;
+        final long val;
+        DynamicList.Node<BufferCheck> listnode;
+
+        private BufferCheck(ByteBuffer buffer, long val)
+        {
+            this.buffer = buffer;
+            this.val = val;
+        }
+
+        void validate()
+        {
+            ByteBuffer read = buffer.duplicate();
+            while (read.remaining() > 8)
+                assert read.getLong() == val;
+        }
+
+        void init()
+        {
+            ByteBuffer write = buffer.duplicate();
+            while (write.remaining() > 8)
+                write.putLong(val);
+        }
+    }
+
+    public void testAllocate(int threadCount, long duration, int poolSize) 
throws InterruptedException, ExecutionException
+    {
+        final int avgBufferSize = 16 << 10;
+        final int stdevBufferSize = 10 << 10; // picked to ensure exceeding 
buffer size is rare, but occurs
+        final DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd 
HH:mm:ss");
+
+        System.out.println(String.format("%s - testing %d threads for %dm",
+                                         dateFormat.format(new Date()),
+                                         threadCount,
+                                         
TimeUnit.NANOSECONDS.toMinutes(duration)));
+
+        final long until = System.nanoTime() + duration;
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        final SPSCQueue<BufferCheck>[] sharedRecycle = new 
SPSCQueue[threadCount];
+        final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount];
+        for (int i = 0 ; i < sharedRecycle.length ; i++)
+        {
+            sharedRecycle[i] = new SPSCQueue<>();
+            makingProgress[i] = new AtomicBoolean(true);
+        }
+
+        ExecutorService executorService = 
Executors.newFixedThreadPool(threadCount + 2);
+        List<Future<Boolean>> ret = new ArrayList<>(threadCount);
+        long prevPoolSize = BufferPool.MEMORY_USAGE_THRESHOLD;
+        BufferPool.MEMORY_USAGE_THRESHOLD = poolSize;
+        BufferPool.DEBUG = true;
+        // sum(1..n) = n/2 * (n + 1); we set zero to CHUNK_SIZE, so have 
n=threadCount-1
+        int targetSizeQuanta = ((threadCount) * (threadCount - 1)) / 2;
+        // fix targetSizeQuanta at 1/64th our poolSize, so that we only 
consciously exceed our pool size limit
+        targetSizeQuanta = (targetSizeQuanta * poolSize) / 64;
+
+        {
+            // setup some high churn allocate/deallocate, without any checking
+            final SPSCQueue<ByteBuffer> burn = new SPSCQueue<>();
+            final CountDownLatch doneAdd = new CountDownLatch(1);
+            executorService.submit(new TestUntil(until)
+            {
+                int count = 0;
+                void testOne() throws Exception
+                {
+                    if (count * BufferPool.CHUNK_SIZE >= poolSize / 10)
+                    {
+                        if (burn.exhausted)
+                            count = 0;
+                        else
+                            Thread.yield();
+                        return;
+                    }
+
+                    ByteBuffer buffer = 
BufferPool.tryGet(BufferPool.CHUNK_SIZE);
+                    if (buffer == null)
+                    {
+                        Thread.yield();
+                        return;
+                    }
+
+                    BufferPool.put(buffer);
+                    burn.add(buffer);
+                    count++;
+                }
+                void cleanup()
+                {
+                    doneAdd.countDown();
+                }
+            });
+            executorService.submit(new TestUntil(until)
+            {
+                void testOne() throws Exception
+                {
+                    ByteBuffer buffer = burn.poll();
+                    if (buffer == null)
+                    {
+                        Thread.yield();
+                        return;
+                    }
+                    BufferPool.put(buffer);
+                }
+                void cleanup()
+                {
+                    Uninterruptibles.awaitUninterruptibly(doneAdd);
+                }
+            });
+        }
+
+        for (int t = 0; t < threadCount; t++)
+        {
+            final int threadIdx = t;
+            final int targetSize = t == 0 ? BufferPool.CHUNK_SIZE : 
targetSizeQuanta * t;
+
+            ret.add(executorService.submit(new TestUntil(until)
+            {
+                final SPSCQueue<BufferCheck> shareFrom = 
sharedRecycle[threadIdx];
+                final DynamicList<BufferCheck> checks = new 
DynamicList<>((int) Math.max(1, targetSize / (1 << 10)));
+                final SPSCQueue<BufferCheck> shareTo = 
sharedRecycle[(threadIdx + 1) % threadCount];
+                final ThreadLocalRandom rand = ThreadLocalRandom.current();
+                int totalSize = 0;
+                int freeingSize = 0;
+                int size = 0;
+
+                void checkpoint()
+                {
+                    if (!makingProgress[threadIdx].get())
+                        makingProgress[threadIdx].set(true);
+                }
+
+                void testOne() throws Exception
+                {
+
+                    long currentTargetSize = rand.nextInt(poolSize / 1024) == 
0 ? 0 : targetSize;
+                    int spinCount = 0;
+                    while (totalSize > currentTargetSize - freeingSize)
+                    {
+                        // free buffers until we're below our target size
+                        if (checks.size() == 0)
+                        {
+                            // if we're out of buffers to free, we're waiting 
on our neighbour to free them;
+                            // first check if the consuming neighbour has 
caught up, and if so mark that free
+                            if (shareTo.exhausted)
+                            {
+                                totalSize -= freeingSize;
+                                freeingSize = 0;
+                            }
+                            else if (!recycleFromNeighbour())
+                            {
+                                if (++spinCount > 1000 && System.nanoTime() > 
until)
+                                    return;
+                                // otherwise, free one of our other 
neighbour's buffers if can; and otherwise yield
+                                Thread.yield();
+                            }
+                            continue;
+                        }
+
+                        // pick a random buffer, with preference going to 
earlier ones
+                        BufferCheck check = sample();
+                        checks.remove(check.listnode);
+                        check.validate();
+
+                        size = 
BufferPool.roundUpNormal(check.buffer.capacity());
+                        if (size > BufferPool.CHUNK_SIZE)
+                            size = 0;
+
+                        // either share to free, or free immediately
+                        if (rand.nextBoolean())
+                        {
+                            shareTo.add(check);
+                            freeingSize += size;
+                            // interleave this with potentially messing with 
the other neighbour's stuff
+                            recycleFromNeighbour();
+                        }
+                        else
+                        {
+                            check.validate();
+                            BufferPool.put(check.buffer);
+                            totalSize -= size;
+                        }
+                    }
+
+                    // allocate a new buffer
+                    size = (int) Math.max(1, avgBufferSize + (stdevBufferSize 
* rand.nextGaussian()));
+                    if (size <= BufferPool.CHUNK_SIZE)
+                    {
+                        totalSize += BufferPool.roundUpNormal(size);
+                        allocate(size);
+                    }
+                    else if (rand.nextBoolean())
+                    {
+                        allocate(size);
+                    }
+                    else
+                    {
+                        // perform a burst allocation to exhaust all available 
memory
+                        while (totalSize < poolSize)
+                        {
+                            size = (int) Math.max(1, avgBufferSize + 
(stdevBufferSize * rand.nextGaussian()));
+                            if (size <= BufferPool.CHUNK_SIZE)
+                            {
+                                allocate(size);
+                                totalSize += BufferPool.roundUpNormal(size);
+                            }
+                        }
+                    }
+
+                    // validate a random buffer we have stashed
+                    checks.get(rand.nextInt(checks.size())).validate();
+
+                    // free all of our neighbour's remaining shared buffers
+                    while (recycleFromNeighbour());
+                }
+
+                void cleanup()
+                {
+                    while (checks.size() > 0)
+                    {
+                        BufferCheck check = checks.get(0);
+                        BufferPool.put(check.buffer);
+                        checks.remove(check.listnode);
+                    }
+                    latch.countDown();
+                }
+
+                boolean recycleFromNeighbour()
+                {
+                    BufferCheck check = shareFrom.poll();
+                    if (check == null)
+                        return false;
+                    check.validate();
+                    BufferPool.put(check.buffer);
+                    return true;
+                }
+
+                BufferCheck allocate(int size)
+                {
+                    ByteBuffer buffer = BufferPool.get(size);
+                    assertNotNull(buffer);
+                    BufferCheck check = new BufferCheck(buffer, 
rand.nextLong());
+                    assertEquals(size, buffer.capacity());
+                    assertEquals(0, buffer.position());
+                    check.init();
+                    check.listnode = checks.append(check);
+                    return check;
+                }
+
+                BufferCheck sample()
+                {
+                    // sample with preference to first elements:
+                    // element at index n will be selected with likelihood 
(size - n) / sum1ToN(size)
+                    int size = checks.size();
+
+                    // pick a random number between 1 and sum1toN(size)
+                    int sampleRange = sum1toN(size);
+                    int sampleIndex = rand.nextInt(sampleRange);
+
+                    // then binary search for the N, such that [sum1ToN(N), 
sum1ToN(N+1)) contains this random number
+                    int moveBy = Math.max(size / 4, 1);
+                    int index = size / 2;
+                    while (true)
+                    {
+                        int baseSampleIndex = sum1toN(index);
+                        int endOfSampleIndex = sum1toN(index + 1);
+                        if (sampleIndex >= baseSampleIndex)
+                        {
+                            if (sampleIndex < endOfSampleIndex)
+                                break;
+                            index += moveBy;
+                        }
+                        else index -= moveBy;
+                        moveBy = Math.max(moveBy / 2, 1);
+                    }
+
+                    // this gives us the inverse of our desired value, so just 
subtract it from the last index
+                    index = size - (index + 1);
+
+                    return checks.get(index);
+                }
+
+                private int sum1toN(int n)
+                {
+                    return (n * (n + 1)) / 2;
+                }
+            }));
+        }
+
+        boolean first = true;
+        while (!latch.await(10L, TimeUnit.SECONDS))
+        {
+            if (!first)
+                BufferPool.assertAllRecycled();
+            first = false;
+            for (AtomicBoolean progress : makingProgress)
+            {
+                assert progress.get();
+                progress.set(false);
+            }
+        }
+
+        for (SPSCQueue<BufferCheck> queue : sharedRecycle)
+        {
+            BufferCheck check;
+            while ( null != (check = queue.poll()) )
+            {
+                check.validate();
+                BufferPool.put(check.buffer);
+            }
+        }
+
+        assertEquals(0, executorService.shutdownNow().size());
+
+        BufferPool.MEMORY_USAGE_THRESHOLD = prevPoolSize;
+        for (Future<Boolean> r : ret)
+            assertTrue(r.get());
+
+        System.out.println(String.format("%s - finished.",
+                                         dateFormat.format(new Date())));
+    }
+
+    static abstract class TestUntil implements Callable<Boolean>
+    {
+        final long until;
+        protected TestUntil(long until)
+        {
+            this.until = until;
+        }
+
+        abstract void testOne() throws Exception;
+        void checkpoint() {}
+        void cleanup() {}
+
+        public Boolean call() throws Exception
+        {
+            try
+            {
+                while (System.nanoTime() < until)
+                {
+                    checkpoint();
+                    for (int i = 0 ; i < 100 ; i++)
+                        testOne();
+                }
+            }
+            catch (Exception ex)
+            {
+                logger.error("Got exception {}, current chunk {}",
+                             ex.getMessage(),
+                             BufferPool.currentChunk());
+                ex.printStackTrace();
+                return false;
+            }
+            finally
+            {
+                cleanup();
+            }
+            return true;
+        }
+    }
+
+    public static void main(String[] args) throws InterruptedException, 
ExecutionException
+    {
+        new 
LongBufferPoolTest().testAllocate(Runtime.getRuntime().availableProcessors(), 
TimeUnit.HOURS.toNanos(2L), 16 << 20);
+    }
+
+    /**
+     * A single producer, single consumer queue.
+     */
+    private static final class SPSCQueue<V>
+    {
+        static final class Node<V>
+        {
+            volatile Node<V> next;
+            final V value;
+            Node(V value)
+            {
+                this.value = value;
+            }
+        }
+
+        private volatile boolean exhausted = true;
+        Node<V> head = new Node<>(null);
+        Node<V> tail = head;
+
+        void add(V value)
+        {
+            exhausted = false;
+            tail = tail.next = new Node<>(value);
+        }
+
+        V poll()
+        {
+            Node<V> next = head.next;
+            if (next == null)
+            {
+                // this is racey, but good enough for our purposes
+                exhausted = true;
+                return null;
+            }
+            head = next;
+            return next.value;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java 
b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
index 6018cc7..8ff0074 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressorTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.BufferPool;
 
 public class CompressorTest
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/17dd4ccc/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java 
b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 0c1583d..3c9aa0e 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -18,8 +18,6 @@
  *
  */
 package org.apache.cassandra.io.util;
-
-import org.apache.cassandra.service.FileCacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.SyncUtil;
 
@@ -30,10 +28,6 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.util.Arrays;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.cassandra.Util.expectEOF;
 import static org.apache.cassandra.Util.expectException;
@@ -48,6 +42,7 @@ public class BufferedRandomAccessFileTest
     public void testReadAndWrite() throws Exception
     {
         SequentialWriter w = createTempFile("braf");
+        ChannelProxy channel = new ChannelProxy(w.getPath());
 
         // writting string of data to the file
         byte[] data = "Hello".getBytes();
@@ -58,7 +53,7 @@ public class BufferedRandomAccessFileTest
         w.sync();
 
         // reading small amount of data from file, this is handled by initial 
buffer
-        RandomAccessReader r = RandomAccessReader.open(w);
+        RandomAccessReader r = RandomAccessReader.open(channel);
 
         byte[] buffer = new byte[data.length];
         assertEquals(data.length, r.read(buffer));
@@ -81,7 +76,7 @@ public class BufferedRandomAccessFileTest
 
         w.sync();
 
-        r = RandomAccessReader.open(w); // re-opening file in read-only mode
+        r = RandomAccessReader.open(channel); // re-opening file in read-only 
mode
 
         // reading written buffer
         r.seek(initialPosition); // back to initial (before write) position
@@ -130,6 +125,7 @@ public class BufferedRandomAccessFileTest
 
         w.finish();
         r.close();
+        channel.close();
     }
 
     @Test
@@ -142,7 +138,8 @@ public class BufferedRandomAccessFileTest
         byte[] in = generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE);
         w.write(in);
 
-        RandomAccessReader r = RandomAccessReader.open(w);
+        ChannelProxy channel = new ChannelProxy(w.getPath());
+        RandomAccessReader r = RandomAccessReader.open(channel);
 
         // Read it into a same size array.
         byte[] out = new byte[RandomAccessReader.DEFAULT_BUFFER_SIZE];
@@ -154,6 +151,7 @@ public class BufferedRandomAccessFileTest
 
         r.close();
         w.finish();
+        channel.close();
     }
 
     @Test
@@ -181,9 +179,11 @@ public class BufferedRandomAccessFileTest
         w.finish();
 
         // will use cachedlength
-        RandomAccessReader r = RandomAccessReader.open(tmpFile);
-        assertEquals(lessThenBuffer.length + biggerThenBuffer.length, 
r.length());
-        r.close();
+        try (ChannelProxy channel = new ChannelProxy(tmpFile);
+            RandomAccessReader r = RandomAccessReader.open(channel))
+        {
+            assertEquals(lessThenBuffer.length + biggerThenBuffer.length, 
r.length());
+        }
     }
 
     @Test
@@ -201,7 +201,8 @@ public class BufferedRandomAccessFileTest
         w.write(data);
         w.sync();
 
-        final RandomAccessReader r = RandomAccessReader.open(w);
+        final ChannelProxy channel = new ChannelProxy(w.getPath());
+        final RandomAccessReader r = RandomAccessReader.open(channel);
 
         ByteBuffer content = r.readBytes((int) r.length());
 
@@ -225,6 +226,7 @@ public class BufferedRandomAccessFileTest
 
         w.finish();
         r.close();
+        channel.close();
     }
 
     @Test
@@ -235,7 +237,8 @@ public class BufferedRandomAccessFileTest
         w.write(data);
         w.finish();
 
-        final RandomAccessReader file = RandomAccessReader.open(w);
+        final ChannelProxy channel = new ChannelProxy(w.getPath());
+        final RandomAccessReader file = RandomAccessReader.open(channel);
 
         file.seek(0);
         assertEquals(file.getFilePointer(), 0);
@@ -265,6 +268,7 @@ public class BufferedRandomAccessFileTest
         }, IllegalArgumentException.class); // throws IllegalArgumentException
 
         file.close();
+        channel.close();
     }
 
     @Test
@@ -274,7 +278,8 @@ public class BufferedRandomAccessFileTest
         w.write(generateByteArray(RandomAccessReader.DEFAULT_BUFFER_SIZE * 2));
         w.finish();
 
-        RandomAccessReader file = RandomAccessReader.open(w);
+        ChannelProxy channel = new ChannelProxy(w.getPath());
+        RandomAccessReader file = RandomAccessReader.open(channel);
 
         file.seek(0); // back to the beginning of the file
         assertEquals(file.skipBytes(10), 10);
@@ -294,6 +299,7 @@ public class BufferedRandomAccessFileTest
         assertEquals(file.bytesRemaining(), file.length());
 
         file.close();
+        channel.close();
     }
 
     @Test
@@ -308,7 +314,8 @@ public class BufferedRandomAccessFileTest
 
         w.sync();
 
-        RandomAccessReader r = RandomAccessReader.open(w);
+        ChannelProxy channel = new ChannelProxy(w.getPath());
+        RandomAccessReader r = RandomAccessReader.open(channel);
 
         // position should change after skip bytes
         r.seek(0);
@@ -322,6 +329,7 @@ public class BufferedRandomAccessFileTest
 
         w.finish();
         r.close();
+        channel.close();
     }
 
     @Test
@@ -344,7 +352,7 @@ public class BufferedRandomAccessFileTest
             {
                 File file1 = writeTemporaryFile(new byte[16]);
                 try (final ChannelProxy channel = new ChannelProxy(file1);
-                     final RandomAccessReader file = 
RandomAccessReader.open(channel, bufferSize, null))
+                     final RandomAccessReader file = 
RandomAccessReader.open(channel, bufferSize, -1L))
                 {
                     expectEOF(new Callable<Object>()
                     {
@@ -362,7 +370,7 @@ public class BufferedRandomAccessFileTest
             {
                 File file1 = writeTemporaryFile(new byte[16]);
                 try (final ChannelProxy channel = new ChannelProxy(file1);
-                     final RandomAccessReader file = 
RandomAccessReader.open(channel, bufferSize, null))
+                     final RandomAccessReader file = 
RandomAccessReader.open(channel, bufferSize, -1L))
                 {
                     expectEOF(new Callable<Object>()
                     {
@@ -397,7 +405,8 @@ public class BufferedRandomAccessFileTest
 
         w.sync();
 
-        RandomAccessReader r = RandomAccessReader.open(w);
+        ChannelProxy channel = new ChannelProxy(w.getPath());
+        RandomAccessReader r = RandomAccessReader.open(channel);
 
         assertEquals(r.bytesRemaining(), toWrite);
 
@@ -413,6 +422,7 @@ public class BufferedRandomAccessFileTest
 
         w.finish();
         r.close();
+        channel.close();
     }
 
     @Test
@@ -483,7 +493,8 @@ public class BufferedRandomAccessFileTest
 
         w.finish();
 
-        RandomAccessReader file = RandomAccessReader.open(w);
+        ChannelProxy channel = new ChannelProxy(w.getPath());
+        RandomAccessReader file = RandomAccessReader.open(channel);
 
         file.seek(10);
         FileMark mark = file.mark();
@@ -508,6 +519,7 @@ public class BufferedRandomAccessFileTest
         assertEquals(file.bytesPastMark(), 0);
 
         file.close();
+        channel.close();
     }
 
     @Test (expected = AssertionError.class)
@@ -518,7 +530,8 @@ public class BufferedRandomAccessFileTest
             w.write(new byte[30]);
             w.flush();
 
-            try (RandomAccessReader r = RandomAccessReader.open(w))
+            try (ChannelProxy channel = new ChannelProxy(w.getPath());
+                 RandomAccessReader r = RandomAccessReader.open(channel))
             {
                 r.seek(10);
                 r.mark();
@@ -530,71 +543,6 @@ public class BufferedRandomAccessFileTest
     }
 
     @Test
-    public void testFileCacheService() throws IOException, InterruptedException
-    {
-        //see https://issues.apache.org/jira/browse/CASSANDRA-7756
-
-        final FileCacheService.CacheKey cacheKey = new 
FileCacheService.CacheKey();
-        final int THREAD_COUNT = 40;
-        ExecutorService executorService = 
Executors.newFixedThreadPool(THREAD_COUNT);
-
-        SequentialWriter w1 = createTempFile("fscache1");
-        SequentialWriter w2 = createTempFile("fscache2");
-
-        w1.write(new byte[30]);
-        w1.finish();
-
-        w2.write(new byte[30]);
-        w2.finish();
-
-        for (int i = 0; i < 20; i++)
-        {
-
-
-            RandomAccessReader r1 = RandomAccessReader.open(w1);
-            RandomAccessReader r2 = RandomAccessReader.open(w2);
-
-
-            FileCacheService.instance.put(cacheKey, r1);
-            FileCacheService.instance.put(cacheKey, r2);
-
-            final CountDownLatch finished = new CountDownLatch(THREAD_COUNT);
-            final AtomicBoolean hadError = new AtomicBoolean(false);
-
-            for (int k = 0; k < THREAD_COUNT; k++)
-            {
-                executorService.execute( new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            long size = 
FileCacheService.instance.sizeInBytes();
-
-                            while (size > 0)
-                                size = FileCacheService.instance.sizeInBytes();
-                        }
-                        catch (Throwable t)
-                        {
-                            t.printStackTrace();
-                            hadError.set(true);
-                        }
-                        finally
-                        {
-                            finished.countDown();
-                        }
-                    }
-                });
-
-            }
-
-            finished.await();
-            assert !hadError.get();
-        }
-    }
-
-    @Test
     public void testReadOnly() throws IOException
     {
         SequentialWriter file = createTempFile("brafReadOnlyTest");

Reply via email to