Author: trustin
Date: Sun Nov 4 21:48:42 2007
New Revision: 591883
URL: http://svn.apache.org/viewvc?rev=591883&view=rev
Log:
* Fixed a problem that scheduledWrite* per service doesn't decrease.
* Added IoBuffer.free() which is completely optional. (I might remove it; still
testing it..)
* Updated EchoServer example
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoBuffer.java
Sun Nov 4 21:48:42 2007
@@ -105,6 +105,11 @@
public boolean isAutoExpand() {
return autoExpand && autoExpandAllowed;
}
+
+ @Override
+ public boolean isDerived() {
+ return !autoExpandAllowed;
+ }
@Override
public IoBuffer setAutoExpand(boolean autoExpand) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
Sun Nov 4 21:48:42 2007
@@ -414,6 +414,7 @@
if (!failedRequests.isEmpty()) {
WriteToClosedSessionException cause = new
WriteToClosedSessionException(failedRequests);
for (WriteRequest r: failedRequests) {
+ session.decreaseScheduledBytesAndMessages(r);
r.getFuture().setException(cause);
}
session.getFilterChain().fireExceptionCaught(cause);
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoService.java
Sun Nov 4 21:48:42 2007
@@ -189,6 +189,10 @@
scheduledWriteMessages.incrementAndGet();
}
+ protected void decreaseScheduledWriteMessages() {
+ scheduledWriteMessages.decrementAndGet();
+ }
+
public long getActivationTime() {
return activationTime;
}
@@ -203,7 +207,6 @@
protected void increaseWrittenBytes(long increment) {
writtenBytes.addAndGet(increment);
- scheduledWriteBytes.addAndGet(-increment);
}
public long getWrittenMessages() {
@@ -212,7 +215,6 @@
protected void increaseWrittenMessages() {
writtenMessages.incrementAndGet();
- scheduledWriteMessages.decrementAndGet();
}
public Set<WriteFuture> broadcast(Object message) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
Sun Nov 4 21:48:42 2007
@@ -381,7 +381,21 @@
}
}
}
-
+
+ protected void increaseWrittenBytesAndMessages(WriteRequest request) {
+ Object message = request.getMessage();
+ if (message instanceof IoBuffer) {
+ IoBuffer b = (IoBuffer) message;
+ if (b.hasRemaining()) {
+ increaseWrittenBytes(((IoBuffer) message).remaining());
+ } else {
+ increaseWrittenMessages();
+ }
+ } else {
+ increaseWrittenMessages();
+ }
+ }
+
protected void increaseWrittenBytes(long increment) {
if (increment > 0) {
writtenBytes += increment;
@@ -389,11 +403,11 @@
idleCountForBoth = 0;
idleCountForWrite = 0;
- scheduledWriteBytes.addAndGet(-increment);
-
if (getService() instanceof AbstractIoService) {
((AbstractIoService)
getService()).increaseWrittenBytes(increment);
}
+
+ increaseScheduledWriteBytes(-increment);
}
}
@@ -406,10 +420,11 @@
protected void increaseWrittenMessages() {
writtenMessages++;
- scheduledWriteMessages.decrementAndGet();
if (getService() instanceof AbstractIoService) {
((AbstractIoService) getService()).increaseWrittenMessages();
}
+
+ decreaseScheduledWriteMessages();
}
protected void increaseScheduledWriteBytes(long increment) {
@@ -423,6 +438,27 @@
scheduledWriteMessages.incrementAndGet();
if (getService() instanceof AbstractIoService) {
((AbstractIoService)
getService()).increaseScheduledWriteMessages();
+ }
+ }
+
+ protected void decreaseScheduledWriteMessages() {
+ scheduledWriteMessages.decrementAndGet();
+ if (getService() instanceof AbstractIoService) {
+ ((AbstractIoService)
getService()).decreaseScheduledWriteMessages();
+ }
+ }
+
+ protected void decreaseScheduledBytesAndMessages(WriteRequest request) {
+ Object message = request.getMessage();
+ if (message instanceof IoBuffer) {
+ IoBuffer b = (IoBuffer) message;
+ if (b.hasRemaining()) {
+ increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
+ } else {
+ decreaseScheduledWriteMessages();
+ }
+ } else {
+ decreaseScheduledWriteMessages();
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/CachedBufferAllocator.java
Sun Nov 4 21:48:42 2007
@@ -59,7 +59,7 @@
* @version $Rev$, $Date$
*/
public class CachedBufferAllocator implements IoBufferAllocator {
- private static final int MAX_POOL_SIZE = 4;
+ private static final int MAX_POOL_SIZE = 8;
private final ThreadLocal<Map<Integer, Queue<ByteBuffer>>>
localRecyclables =
new ThreadLocal<Map<Integer, Queue<ByteBuffer>>>() {
@@ -150,14 +150,7 @@
newBuf.put(oldBuf);
this.buf = newBuf;
- // Add to the cache.
- if (!buf.isDirect() && !buf.isReadOnly()) {
- Queue<ByteBuffer> pool =
localRecyclables.get().get(buf.capacity());
- // Restrict the size of the pool to prevent OOM.
- if (pool.size() < MAX_POOL_SIZE) {
- pool.offer(buf);
- }
- }
+ free(oldBuf);
}
@Override
@@ -189,6 +182,22 @@
public boolean hasArray() {
return buf.hasArray();
}
- }
+ @Override
+ public void free() {
+ free(buf);
+ buf = null; // FIXME better sanity check scheme.
+ }
+
+ private void free(ByteBuffer buf) {
+ // Add to the cache.
+ if (!buf.isDirect() && !buf.isReadOnly() && !isDerived()) {
+ Queue<ByteBuffer> pool =
localRecyclables.get().get(buf.capacity());
+ // Restrict the size of the pool to prevent OOM.
+ if (pool.size() < MAX_POOL_SIZE) {
+ pool.offer(buf);
+ }
+ }
+ }
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Sun Nov 4 21:48:42 2007
@@ -404,17 +404,7 @@
}
public void fireMessageSent(WriteRequest request) {
- Object message = request.getMessage();
- if (message instanceof IoBuffer) {
- IoBuffer b = (IoBuffer) message;
- if (b.hasRemaining()) {
- session.increaseWrittenBytes(((IoBuffer) message).remaining());
- } else {
- session.increaseWrittenMessages();
- }
- } else {
- session.increaseWrittenMessages();
- }
+ session.increaseWrittenBytesAndMessages(request);
try {
request.getFuture().setWritten();
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoBuffer.java Sun Nov
4 21:48:42 2007
@@ -245,6 +245,14 @@
*/
protected IoBuffer() {
}
+
+ /**
+ * Declares this buffer and all its derived buffers are not used anymore
+ * so that it can be reused by some [EMAIL PROTECTED] IoBufferAllocator}
implementations.
+ * It is not mandatory to call this method, but you might want to invoke
this
+ * method for maximum performance.
+ */
+ public abstract void free();
/**
* Returns the underlying NIO buffer instance.
@@ -255,6 +263,12 @@
* @see ByteBuffer#isDirect()
*/
public abstract boolean isDirect();
+
+ /**
+ * returns <tt>true</tt> if and only if this buffer is derived from other
buffer
+ * via [EMAIL PROTECTED] #duplicate()}, [EMAIL PROTECTED] #slice()} or
[EMAIL PROTECTED] #asReadOnlyBuffer()}.
+ */
+ public abstract boolean isDerived();
/**
* @see ByteBuffer#isReadOnly()
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoBufferWrapper.java
Sun Nov 4 21:48:42 2007
@@ -636,4 +636,14 @@
public boolean hasArray() {
return buf.hasArray();
}
+
+ @Override
+ public void free() {
+ buf.free();
+ }
+
+ @Override
+ public boolean isDerived() {
+ return buf.isDerived();
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/SimpleBufferAllocator.java
Sun Nov 4 21:48:42 2007
@@ -133,5 +133,9 @@
public boolean hasArray() {
return buf.hasArray();
}
+
+ @Override
+ public void free() {
+ }
}
}
Modified:
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
URL:
http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
(original)
+++
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/EchoProtocolHandler.java
Sun Nov 4 21:48:42 2007
@@ -21,10 +21,12 @@
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoBuffer;
+import org.apache.mina.common.IoFutureListener;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionLogger;
+import org.apache.mina.common.WriteFuture;
import org.apache.mina.filter.ssl.SslFilter;
/**
@@ -43,6 +45,16 @@
}
@Override
+ public void sessionClosed(IoSession session) throws Exception {
+ IoSessionLogger.getLogger(session).info("CLOSED");
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ IoSessionLogger.getLogger(session).info("OPENED");
+ }
+
+ @Override
public void sessionIdle(IoSession session, IdleStatus status) {
IoSessionLogger.getLogger(session).info(
"*** IDLE #" + session.getIdleCount(IdleStatus.BOTH_IDLE) + "
***");
@@ -57,6 +69,11 @@
public void messageReceived(IoSession session, Object message)
throws Exception {
// Write the received data back to remote peer
- session.write(((IoBuffer) message).duplicate());
+ final IoBuffer src = (IoBuffer) message;
+ session.write(src.duplicate()).addListener(new
IoFutureListener<WriteFuture>() {
+ public void operationComplete(WriteFuture future) {
+ src.free();
+ }
+ });
}
}
Modified:
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java
URL:
http://svn.apache.org/viewvc/mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java?rev=591883&r1=591882&r2=591883&view=diff
==============================================================================
---
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java
(original)
+++
mina/trunk/example/src/main/java/org/apache/mina/example/echoserver/Main.java
Sun Nov 4 21:48:42 2007
@@ -22,7 +22,9 @@
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
+import org.apache.mina.common.CachedBufferAllocator;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.example.echoserver.ssl.BogusSslContextFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.ssl.SslFilter;
@@ -45,6 +47,8 @@
public static void main(String[] args) throws Exception {
SocketAcceptor acceptor = new
NioSocketAcceptor(Executors.newCachedThreadPool());
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
+
+ IoBuffer.setAllocator(new CachedBufferAllocator());
// Add SSL filter if SSL is enabled.
if (USE_SSL) {
@@ -56,6 +60,7 @@
// Bind
acceptor.setLocalAddress(new InetSocketAddress(PORT));
acceptor.setHandler(new EchoProtocolHandler());
+ // acceptor.getFilterChain().addLast("x", new
WriteThrottleFilter(WriteThrottlePolicy.LOG, 0, 1048576, 0, 0, 0, 0));
acceptor.bind();
System.out.println("Listening on port " + PORT);