Mina developers-
   During some profiling of my TCP/UDP NIO layer I've been working on,
   you'll find it as no suprise that the ByteBuffer.allocate() was
   taking more time than any other method by a large margin (sorry I
   don't have specifics).    I looked at using composition to wrap a
   ByteBuffer with functionality such as the "release()" method of the
   mina bytebuffer pool, but it would require method signatures to be
   changed, and had to be handled carefully or memory leaks could occur.
     I looked at the technique of allocating a big (4MB+) ByteBuffer and
   then issuing slices but didn't really see a performance boost from
   this, and it didn't reuse the buffers.  

I created this simple class of static methods to pool buffers and in my
tests gives me about 10-15% boost.  On my MacBook Pro 2.33Ghz Core2 Duo
I was getting 100MB/s both send/receive (localhost) for a total of
200MB/s throughput on this machine and with this ByteBufferRecycler
class boosted it to 230MB/s total.   This is with a reliable UDP
implemention using a single socket connection - I haven't tested it yet
with TCP. 

The trick is to search your code for ByteBuffer.allocate and replace
with ByteBufferRecycler.getDirtyBuffer(), then carefully search for any
ByteBuffer.put() calls or anywhere you know that your ByteBuffers fall
out of scope and/or are copied into another Buffer and are no longer
used - and be sure they really are no longer used!  There
ByteBufferRecycler class has a few public static parameters you can
tweak if you like, but the defaults should work pretty well for most
mina stuff.  I use the ConcurrentHashMap and ConcurrentLinkedQueue but
you don't really have to use any sychronization at all because the worse
that can happen is a ByteBuffer is lost and it will create one from
scratch anyway.

This was a quick fix to my problem that I found simple to use throughout
my code.  Please let me know if I did anything stupid.   I played with
some SoftReference/PhantomReference techniques to get notified when the
GC flaged a ByteBuffer as disposed, but it didn't work reliably so this
is what I came up with instead.

Take care!
Lee Carlson

Here's the code:

/*
 * ByteBufferRecycler.java
 */

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;


public class ByteBufferRecycler
{
    public static int maxReadyBuffersPerIncrementSize = 20;
    public static int maxBufferSize = 1024*512;
    public static int minBufferSize = 200;
    public static int incrementSize = 1024*4;
    public static int cleanUpIntervalMs = 30000;
    public static int maxUnusedAgeMs = 20000;
    
    private static long nextScheduledCleanup;
    private static HashMap<Integer,CacheObject> map = new
    HashMap<Integer,CacheObject>();
    /**
     * Creates a new instance of ByteBufferCache
     */
    public ByteBufferRecycler()
    {
    }
    /**
     * Gets a recycled ByteBuffer, or creates a new HeapByteBuffer if a
     recycled buffer is not available.  The buffer
     * has been cleared by calling clear(), but the contents are not
     actually erased so be careful to not use this in code which
     * is expecting a buffer full of zeroed bytes.
     */
    public static ByteBuffer getDirtyBuffer(int size)
    {
        if(size<minBufferSize)
        {
            return ByteBuffer.allocate(size);
        }
        
        int capacity = 
        ((size+incrementSize)/incrementSize)*incrementSize;
        
        CacheObject co = map.get(capacity);
        if(co==null)
        {
            co = new CacheObject();
            map.put(capacity,co);
        }
     
        ByteBuffer buf = co.get(capacity);
        buf.limit(size);
        
        return buf;
    }
    
    /**
     * Returns a ByteBuffer to the pool for later consumption.  The
     buffer is sliced as necessary for optimal use.  Before returning a
     buffer via 
     * this method, be sure it is no longer in use or you will create
     difficult bugs where buffer contents are changing "randomly".
     */
    public static void putDirtyBuffer(ByteBuffer buf)
    {
        cleanUp(false);
        
        if(buf.capacity()<incrementSize || buf.isReadOnly() ||
        buf.capacity() > maxBufferSize)
            return;
        
        int capacity = (buf.capacity()/incrementSize)*incrementSize;
        
        buf.clear();
        
        CacheObject co;
        ByteBuffer tempBuf = buf;
        while(capacity>=incrementSize)
        {
            if(tempBuf.capacity()<capacity)
            {
                capacity=capacity-incrementSize;
                continue;
            }
            co = map.get(capacity);
            if(co==null)
            {
                //don't slice the buffer for sizes which have never been
                requested
                capacity = capacity-incrementSize;
                continue;
            }
            while(true)
            {
                tempBuf.position(tempBuf.capacity()-capacity);
                if(!co.isFull() && co.put(tempBuf.slice()))
                {
                    tempBuf.limit(tempBuf.position());
                    tempBuf.position(0);
                    tempBuf=tempBuf.slice();
                    tempBuf.clear();
                    if(tempBuf.capacity()<capacity)
                        break;
                }
                else
                {
                    capacity=capacity-incrementSize;
                    break;
                }
            }
        }
    }
    public static void cleanUp(boolean forceNow)
    {
        if(!forceNow && nextScheduledCleanup >
        System.currentTimeMillis())
            return;
        
        nextScheduledCleanup = System.currentTimeMillis() +
        cleanUpIntervalMs;
        
        for(int i=1;i*incrementSize<=maxBufferSize;i++)
        {
            CacheObject co = map.get(i*incrementSize);
            if(co==null)
                continue;
            if(co.lastReadTime + maxUnusedAgeMs <
            System.currentTimeMillis())
                map.remove(i*incrementSize);
        }
    }
    static class CacheObject
    {
        private ConcurrentLinkedQueue<ByteBuffer> bufList = new
        ConcurrentLinkedQueue<ByteBuffer>();
        public long lastReadTime = System.currentTimeMillis();
        private int size;//approximate (not synchronized) size of the
        list
        
        public ByteBuffer get(int capacity)
        {
            lastReadTime = System.currentTimeMillis();
            ByteBuffer b = bufList.poll();
            if(b==null)
            {
                size=0;
                //System.out.println("NO CACHED BUFFER:" + capacity);
                b = ByteBuffer.allocate(capacity);
            }
            else
            {
                size--;
            }
            return b;
        }
        public boolean put(ByteBuffer buf)
        {
            if(isFull())
            {
                //System.out.println("FULL BUFFER CACHE:" +
                buf.capacity());
                return false;
            }
            size++;
            bufList.offer(buf);
            return true;
        }
        public boolean isFull()
        {
            return size >= maxReadyBuffersPerIncrementSize;
        }
    }
}

Reply via email to