Author: trustin
Date: Mon Nov 5 05:48:47 2007
New Revision: 591994
URL: http://svn.apache.org/viewvc?rev=591994&view=rev
Log:
Improved CachedBufferAllocator
* Direct buffer is supported
* free() is supported
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java?rev=591994&r1=591993&r2=591994&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
Mon Nov 5 05:48:47 2007
@@ -53,53 +53,70 @@
* environment.
* <p>
* [EMAIL PROTECTED] CachedBufferAllocator} uses [EMAIL PROTECTED]
ThreadLocal} to store the cached
- * buffer, allocates buffers whose capacity is power of 2 only, and doesn't
- * provide any caching for direct buffers.
+ * buffer, allocates buffers whose capacity is power of 2 only and provides
+ * performance advantage if [EMAIL PROTECTED] IoBuffer#free()} is called
properly.
*
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
public class CachedBufferAllocator implements IoBufferAllocator {
+
private static final int MAX_POOL_SIZE = 8;
- private final ThreadLocal<Map<Integer, Queue<ByteBuffer>>>
localRecyclables =
- new ThreadLocal<Map<Integer, Queue<ByteBuffer>>>() {
+ private static Map<Integer, Queue<CachedBuffer>> newPoolMap() {
+ Map<Integer, Queue<CachedBuffer>> poolMap =
+ new HashMap<Integer, Queue<CachedBuffer>>();
+ for (int i = 0; i < 31; i ++) {
+ poolMap.put(1 << i, new
CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+ }
+ poolMap.put(0, new CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+ poolMap.put(Integer.MAX_VALUE, new
CircularQueue<CachedBuffer>(MAX_POOL_SIZE));
+ return poolMap;
+ }
+
+ private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> heapBuffers =
+ new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
@Override
- protected Map<Integer, Queue<ByteBuffer>> initialValue() {
- Map<Integer, Queue<ByteBuffer>> queues =
- new HashMap<Integer, Queue<ByteBuffer>>();
- for (int i = 0; i < 31; i ++) {
- queues.put(1 << i, new
CircularQueue<ByteBuffer>(MAX_POOL_SIZE));
- }
- queues.put(Integer.MAX_VALUE, new
CircularQueue<ByteBuffer>(MAX_POOL_SIZE));
- return queues;
+ protected Map<Integer, Queue<CachedBuffer>> initialValue() {
+ return newPoolMap();
}
- };
-
+ };
+
+ private final ThreadLocal<Map<Integer, Queue<CachedBuffer>>> directBuffers
=
+ new ThreadLocal<Map<Integer, Queue<CachedBuffer>>>() {
+ @Override
+ protected Map<Integer, Queue<CachedBuffer>> initialValue() {
+ return newPoolMap();
+ }
+ };
+
public IoBuffer allocate(int capacity, boolean direct) {
- return wrap(allocate0(capacity, direct));
- }
-
- private ByteBuffer allocate0(int capacity, boolean direct) {
capacity = normalizeCapacity(capacity);
- ByteBuffer buf;
+ Queue<CachedBuffer> pool;
if (direct) {
- buf = ByteBuffer.allocateDirect(capacity);
+ pool = directBuffers.get().get(capacity);
} else {
- // Recycle if possible.
- Queue<ByteBuffer> pool = localRecyclables.get().get(capacity);
- buf = pool.poll();
- if (buf != null) {
- buf.clear();
+ pool = heapBuffers.get().get(capacity);
+ }
+
+ // Recycle if possible.
+ IoBuffer buf = pool.poll();
+ if (buf != null) {
+ buf.clear();
+ buf.setAutoExpand(false);
+ buf.order(ByteOrder.BIG_ENDIAN);
+ } else {
+ if (direct) {
+ buf = wrap(ByteBuffer.allocateDirect(capacity));
} else {
- buf = ByteBuffer.allocate(capacity);
+ buf = wrap(ByteBuffer.allocate(capacity));
}
}
return buf;
}
-
+
public IoBuffer wrap(ByteBuffer nioBuffer) {
- return new DirtyBuffer(nioBuffer, true);
+ return new CachedBuffer(nioBuffer, true);
}
public void dispose() {
@@ -107,6 +124,7 @@
private static int normalizeCapacity(int requestedCapacity) {
switch (requestedCapacity) {
+ case 0:
case 1 << 0: case 1 << 1: case 1 << 2: case 1 << 3: case 1 << 4:
case 1 << 5: case 1 << 6: case 1 << 7: case 1 << 8: case 1 << 9:
case 1 << 10: case 1 << 11: case 1 << 12: case 1 << 13: case 1 << 14:
@@ -127,17 +145,22 @@
return newCapacity;
}
- private class DirtyBuffer extends AbstractIoBuffer {
+ private class CachedBuffer extends AbstractIoBuffer {
+ private final Thread ownerThread;
private ByteBuffer buf;
- protected DirtyBuffer(ByteBuffer buf, boolean autoExpandAllowed) {
+ protected CachedBuffer(ByteBuffer buf, boolean autoExpandAllowed) {
super(autoExpandAllowed);
+ this.ownerThread = Thread.currentThread();
this.buf = buf;
buf.order(ByteOrder.BIG_ENDIAN);
}
@Override
public ByteBuffer buf() {
+ if (buf == null) {
+ throw new IllegalStateException("Buffer has been freed
already.");
+ }
return buf;
}
@@ -145,59 +168,75 @@
protected void capacity0(int requestedCapacity) {
int newCapacity = normalizeCapacity(requestedCapacity);
- ByteBuffer oldBuf = this.buf;
- ByteBuffer newBuf = allocate0(newCapacity, isDirect());
+ ByteBuffer oldBuf = buf();
+ ByteBuffer newBuf = allocate(newCapacity, isDirect()).buf();
oldBuf.clear();
newBuf.put(oldBuf);
this.buf = newBuf;
free(oldBuf);
- free(buf);
}
@Override
protected IoBuffer duplicate0() {
- return new DirtyBuffer(this.buf.duplicate(), false);
+ return new CachedBuffer(buf().duplicate(), false);
}
@Override
protected IoBuffer slice0() {
- return new DirtyBuffer(this.buf.slice(), false);
+ return new CachedBuffer(buf().slice(), false);
}
@Override
protected IoBuffer asReadOnlyBuffer0() {
- return new DirtyBuffer(this.buf.asReadOnlyBuffer(), false);
+ return new CachedBuffer(buf().asReadOnlyBuffer(), false);
}
@Override
public byte[] array() {
- return buf.array();
+ return buf().array();
}
@Override
public int arrayOffset() {
- return buf.arrayOffset();
+ return buf().arrayOffset();
}
@Override
public boolean hasArray() {
- return buf.hasArray();
+ return buf().hasArray();
}
@Override
public void free() {
free(buf);
- buf = null; // FIXME better sanity check scheme.
+ buf = null;
}
- private void free(ByteBuffer buf) {
+ private void free(ByteBuffer oldBuf) {
+ if (oldBuf == null) {
+ return;
+ }
+ if (Thread.currentThread() != ownerThread) {
+ return;
+ }
+
// Add to the cache.
- if (!buf.isDirect() && !buf.isReadOnly() && !isDerived()) {
- Queue<ByteBuffer> pool =
localRecyclables.get().get(buf.capacity());
+ if (!oldBuf.isReadOnly() && !isDerived()) {
+ Queue<CachedBuffer> pool;
+ if (oldBuf.isDirect()) {
+ pool = directBuffers.get().get(oldBuf.capacity());
+ } else {
+ pool = heapBuffers.get().get(oldBuf.capacity());
+ }
+
+ if (pool == null) {
+ return;
+ }
+
// Restrict the size of the pool to prevent OOM.
if (pool.size() < MAX_POOL_SIZE) {
- pool.offer(buf);
+ pool.offer(new CachedBuffer(oldBuf, true));
}
}
}