Author: chirino Date: Thu Dec 8 14:20:31 2011 New Revision: 1211900 URL: http://svn.apache.org/viewvc?rev=1211900&view=rev Log: Improve send performance of TCP transport by allowing writes to batch before they get flushed.
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1211900&r1=1211899&r2=1211900&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original) +++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Thu Dec 8 14:20:31 2011 @@ -174,6 +174,7 @@ public class TcpTransport extends JavaBa protected DispatchQueue dispatchQueue; private DispatchSource readSource; private DispatchSource writeSource; + private CustomDispatchSource<Integer, Integer> drainOutboundSource; private CustomDispatchSource<Integer, Integer> yieldSource; protected boolean useLocalHost = true; @@ -468,6 +469,13 @@ public class TcpTransport extends JavaBa } }); yieldSource.resume(); + drainOutboundSource = Dispatch.createSource(EventAggregators.INTEGER_ADD, dispatchQueue); + drainOutboundSource.setEventHandler(new Runnable() { + public void run() { + drainOutbound(); + } + }); + drainOutboundSource.resume(); readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue); writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue); @@ -528,6 +536,8 @@ public class TcpTransport extends JavaBa return codec==null || codec.full(); } + boolean rejectingOffers; + public boolean offer(Object command) { assert Dispatch.getCurrentQueue() == dispatchQueue; try { @@ -539,14 +549,12 @@ public class TcpTransport extends JavaBa } ProtocolCodec.BufferState rc = codec.write(command); + rejectingOffers = codec.full(); switch (rc ) { case FULL: return false; default: - if( drained ) { - drained = false; - resumeWrite(); - } + drainOutboundSource.merge(1); return true; } } catch (IOException e) { @@ -556,8 +564,8 @@ public class TcpTransport extends JavaBa } + boolean writeResumedForCodecFlush = false; - boolean drained = true; /** * */ @@ -568,10 +576,17 @@ public class TcpTransport extends JavaBa } try { if( codec.flush() == ProtocolCodec.BufferState.EMPTY && flush() ) { - if( !drained ) { - drained = true; + if( writeResumedForCodecFlush) { + writeResumedForCodecFlush = false; suspendWrite(); - listener.onRefill(); + } + rejectingOffers = false; + listener.onRefill(); + + } else { + if(!writeResumedForCodecFlush) { + writeResumedForCodecFlush = true; + resumeWrite(); } } } catch (IOException e) { @@ -658,6 +673,7 @@ public class TcpTransport extends JavaBa } } } + private void _resumeRead() { readSource.resume(); dispatchQueue.execute(new Runnable(){ @@ -672,14 +688,10 @@ public class TcpTransport extends JavaBa writeSource.suspend(); } } + protected void resumeWrite() { if( isConnected() && writeSource!=null ) { writeSource.resume(); - dispatchQueue.execute(new Runnable(){ - public void run() { - drainOutbound(); - } - }); } } Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala?rev=1211900&r1=1211899&r2=1211900&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/IntCounter.scala Thu Dec 8 14:20:31 2011 @@ -48,4 +48,5 @@ class IntCounter(private var value:Int = rc } + override def toString() = get().toString } \ No newline at end of file Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala?rev=1211900&r1=1211899&r2=1211900&view=diff ============================================================================== --- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala (original) +++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util/LongCounter.scala Thu Dec 8 14:20:31 2011 @@ -49,4 +49,5 @@ class LongCounter(private var value:Long rc } + override def toString() = get().toString } \ No newline at end of file