Author: elecharny Date: Sat Feb 20 17:04:32 2010 New Revision: 912149 URL: http://svn.apache.org/viewvc?rev=912149&view=rev Log: o added a isEncoded() method in the WriteRequest interface, to avoid calling the messageSent method if we are dealing with an encoded message o The created ProtocolCodecFactory when using the classes to initialize the filter now use local references, instead of creating new instances o The initCodec() method has been removed o The getDecoder0 and getEncoder0 methods have been removed o The ProtocolEncoderOutputImpl.flushWithoutFuture method has been removed o Loop on the queue instead of using a for(;;) o Minor refactoring
Modified: mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Modified: mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java Sat Feb 20 17:04:32 2010 @@ -448,7 +448,10 @@ } Entry head = this.head; - callNextMessageSent(head, session, request); + + if (!request.isEncoded()) { + callNextMessageSent(head, session, request); + } } private void callNextMessageSent(Entry entry, IoSession session, Modified: mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java Sat Feb 20 17:04:32 2010 @@ -679,7 +679,8 @@ private void read(T session) { IoSessionConfig config = session.getConfig(); - IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize()); + int bufferSize = config.getReadBufferSize(); + IoBuffer buf = IoBuffer.allocate(bufferSize); final boolean hasFragmentation = session.getTransportMetadata() .hasFragmentation(); @@ -690,6 +691,7 @@ try { if (hasFragmentation) { + while ((ret = read(session, buf)) > 0) { readBytes += ret; Modified: mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java Sat Feb 20 17:04:32 2010 @@ -182,4 +182,9 @@ return sb.toString(); } + + public boolean isEncoded() + { + return false; + } } \ No newline at end of file Modified: mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java Sat Feb 20 17:04:32 2010 @@ -53,4 +53,11 @@ * @return <tt>null</tt> for the default destination */ SocketAddress getDestination(); + + /** + * Tells if the current message has been encoded + * + * @return true if the message has already been encoded + */ + boolean isEncoded(); } \ No newline at end of file Modified: mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java Sat Feb 20 17:04:32 2010 @@ -69,4 +69,9 @@ public String toString() { return "WR Wrapper" + parentRequest.toString(); } + + public boolean isEncoded() + { + return false; + } } Modified: mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=912149&r1=912148&r2=912149&view=diff ============================================================================== --- mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java (original) +++ mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java Sat Feb 20 17:04:32 2010 @@ -72,6 +72,7 @@ if (factory == null) { throw new NullPointerException("factory"); } + this.factory = factory; } @@ -144,15 +145,32 @@ "decoderClass doesn't have a public default constructor."); } - // Create the inner factory based on the two parameters. We instantiate - // the encoder and decoder locally. + final ProtocolEncoder encoder; + + try { + encoder = encoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException( + "encoderClass cannot be initialized"); + } + + final ProtocolDecoder decoder; + + try { + decoder = decoderClass.newInstance(); + } catch (Exception e) { + throw new IllegalArgumentException( + "decoderClass cannot be initialized"); + } + + // Create the inner factory based on the two parameters. this.factory = new ProtocolCodecFactory() { public ProtocolEncoder getEncoder(IoSession session) throws Exception { - return encoderClass.newInstance(); + return encoder; } public ProtocolDecoder getDecoder(IoSession session) throws Exception { - return decoderClass.newInstance(); + return decoder; } }; } @@ -175,9 +193,6 @@ throw new IllegalArgumentException( "You can't add the same filter instance more than once. Create another instance and add it."); } - - // Initialize the encoder and decoder - initCodec(parent.getSession()); } @Override @@ -210,7 +225,7 @@ } IoBuffer in = (IoBuffer) message; - ProtocolDecoder decoder = getDecoder0(session); + ProtocolDecoder decoder = factory.getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); // Loop until we don't have anymore byte in the buffer, @@ -219,6 +234,7 @@ // data in the buffer while (in.hasRemaining()) { int oldPos = in.position(); + try { synchronized (decoderOut) { // Call the decoder with the read bytes @@ -282,13 +298,13 @@ // Bypass the encoding if the message is contained in a IoBuffer, // as it has already been encoded before - if (message instanceof IoBuffer || message instanceof FileRegion) { + if ((message instanceof IoBuffer) || (message instanceof FileRegion)) { nextFilter.filterWrite(session, writeRequest); return; } // Get the encoder in the session - ProtocolEncoder encoder = getEncoder0(session); + ProtocolEncoder encoder = factory.getEncoder(session); ProtocolEncoderOutput encoderOut = getEncoderOut(session, nextFilter, writeRequest); @@ -306,7 +322,21 @@ encoder.encode(session, message, encoderOut); // Send it directly - ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture(); + Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput)encoderOut).getMessageQueue(); + + // Write all the encoded messages now + while (!bufferQueue.isEmpty()) { + Object encodedMessage = bufferQueue.poll(); + + // Flush only when the buffer has remaining. + if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { + SocketAddress destination = writeRequest.getDestination(); + WriteRequest encodedWriteRequest = new EncodedWriteRequest(encodedMessage, null, destination); + + nextFilter.filterWrite(session, encodedWriteRequest); + } + } + // Call the next filter nextFilter.filterWrite(session, new MessageWriteRequest( @@ -330,7 +360,7 @@ public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { // Call finishDecode() first when a connection is closed. - ProtocolDecoder decoder = getDecoder0(session); + ProtocolDecoder decoder = factory.getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); try { @@ -358,6 +388,10 @@ WriteFuture future, SocketAddress destination) { super(encodedMessage, future, destination); } + + public boolean isEncoded() { + return true; + } } private static class MessageWriteRequest extends WriteRequestWrapper { @@ -384,6 +418,7 @@ public void flush(NextFilter nextFilter, IoSession session) { Queue<Object> messageQueue = getMessageQueue(); + while (!messageQueue.isEmpty()) { nextFilter.messageReceived(session, messageQueue.poll()); } @@ -408,15 +443,12 @@ public WriteFuture flush() { Queue<Object> bufferQueue = getMessageQueue(); WriteFuture future = null; - for (;;) { + + while (!bufferQueue.isEmpty()) { Object encodedMessage = bufferQueue.poll(); - if (encodedMessage == null) { - break; - } // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || - ((IoBuffer) encodedMessage).hasRemaining()) { + if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) encodedMessage).hasRemaining()) { future = new DefaultWriteFuture(session); nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage, future, writeRequest.getDestination())); @@ -430,43 +462,10 @@ return future; } - - public void flushWithoutFuture() { - Queue<Object> bufferQueue = getMessageQueue(); - for (;;) { - Object encodedMessage = bufferQueue.poll(); - if (encodedMessage == null) { - break; - } - - // Flush only when the buffer has remaining. - if (!(encodedMessage instanceof IoBuffer) || - ((IoBuffer) encodedMessage).hasRemaining()) { - SocketAddress destination = writeRequest.getDestination(); - WriteRequest writeRequest = new EncodedWriteRequest( - encodedMessage, null, destination); - nextFilter.filterWrite(session, writeRequest); - } - } - } } //----------- Helper methods --------------------------------------------- /** - * Initialize the encoder and the decoder, storing them in the - * session attributes. - */ - private void initCodec(IoSession session) throws Exception { - // Creates the decoder and stores it into the newly created session - ProtocolDecoder decoder = factory.getDecoder(session); - session.setAttribute(DECODER, decoder); - - // Creates the encoder and stores it into the newly created session - ProtocolEncoder encoder = factory.getEncoder(session); - session.setAttribute(ENCODER, encoder); - } - - /** * Dispose the encoder, decoder, and the callback for the decoded * messages. */ @@ -537,28 +536,6 @@ return out; } - private ProtocolEncoder getEncoder0(IoSession session) throws Exception { - ProtocolEncoder encoder = (ProtocolEncoder) session - .getAttribute(ENCODER); - if (encoder == null) { - encoder = factory.getEncoder(session); - session.setAttribute(ENCODER, encoder); - } - return encoder; - } - - private ProtocolDecoder getDecoder0(IoSession session) throws Exception { - ProtocolDecoder decoder = (ProtocolDecoder) session - .getAttribute(DECODER); - - if (decoder == null) { - decoder = factory.getDecoder(session); - session.setAttribute(DECODER, decoder); - } - - return decoder; - } - private ProtocolEncoderOutput getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { ProtocolEncoderOutput out = (ProtocolEncoderOutput) session.getAttribute(ENCODER_OUT);