Author: elecharny
Date: Sat Feb 20 17:04:32 2010
New Revision: 912149

URL: http://svn.apache.org/viewvc?rev=912149&view=rev
Log:
o added a isEncoded() method in the WriteRequest interface, to avoid calling 
the messageSent method if we are dealing with an encoded message
o The created ProtocolCodecFactory when using the classes to initialize the 
filter now use local references, instead of creating new instances
o The initCodec() method has been removed
o The getDecoder0 and getEncoder0 methods have been removed
o The ProtocolEncoderOutputImpl.flushWithoutFuture method has been removed
o Loop on the queue instead of using a for(;;) 
o Minor refactoring

Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
    
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
    
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
    mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
    
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
 Sat Feb 20 17:04:32 2010
@@ -448,7 +448,10 @@
         }
 
         Entry head = this.head;
-        callNextMessageSent(head, session, request);
+        
+        if (!request.isEncoded()) {
+            callNextMessageSent(head, session, request);
+        }
     }
 
     private void callNextMessageSent(Entry entry, IoSession session,

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
 Sat Feb 20 17:04:32 2010
@@ -679,7 +679,8 @@
 
     private void read(T session) {
         IoSessionConfig config = session.getConfig();
-        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
+        int bufferSize = config.getReadBufferSize();
+        IoBuffer buf = IoBuffer.allocate(bufferSize);
 
         final boolean hasFragmentation = session.getTransportMetadata()
                 .hasFragmentation();
@@ -690,6 +691,7 @@
 
             try {
                 if (hasFragmentation) {
+                    
                     while ((ret = read(session, buf)) > 0) {
                         readBytes += ret;
                         

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
 Sat Feb 20 17:04:32 2010
@@ -182,4 +182,9 @@
 
         return sb.toString();
     }
+
+    public boolean isEncoded()
+    {
+        return false;
+    }
 }
\ No newline at end of file

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java 
Sat Feb 20 17:04:32 2010
@@ -53,4 +53,11 @@
      * @return <tt>null</tt> for the default destination
      */
     SocketAddress getDestination();
+    
+    /**
+     * Tells if the current message has been encoded
+     *
+     * @return true if the message has already been encoded
+     */
+    boolean isEncoded();
 }
\ No newline at end of file

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
 Sat Feb 20 17:04:32 2010
@@ -69,4 +69,9 @@
     public String toString() {
         return "WR Wrapper" + parentRequest.toString();
     }
+
+    public boolean isEncoded()
+    {
+        return false;
+    }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
 Sat Feb 20 17:04:32 2010
@@ -72,6 +72,7 @@
         if (factory == null) {
             throw new NullPointerException("factory");
         }
+        
         this.factory = factory;
     }
 
@@ -144,15 +145,32 @@
                     "decoderClass doesn't have a public default constructor.");
         }
 
-        // Create the inner factory based on the two parameters. We instantiate
-        // the encoder and decoder locally.
+        final ProtocolEncoder encoder;
+        
+        try {
+            encoder = encoderClass.newInstance();
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                "encoderClass cannot be initialized");
+        }
+
+        final ProtocolDecoder decoder;
+        
+        try {
+            decoder = decoderClass.newInstance();
+        } catch (Exception e) {
+            throw new IllegalArgumentException(
+                "decoderClass cannot be initialized");
+        }
+    
+        // Create the inner factory based on the two parameters.
         this.factory = new ProtocolCodecFactory() {
             public ProtocolEncoder getEncoder(IoSession session) throws 
Exception {
-                return encoderClass.newInstance();
+                return encoder;
             }
 
             public ProtocolDecoder getDecoder(IoSession session) throws 
Exception {
-                return decoderClass.newInstance();
+                return decoder;
             }
         };
     }
@@ -175,9 +193,6 @@
             throw new IllegalArgumentException(
                     "You can't add the same filter instance more than once.  
Create another instance and add it.");
         }
-        
-        // Initialize the encoder and decoder
-        initCodec(parent.getSession());
     }
 
     @Override
@@ -210,7 +225,7 @@
         }
 
         IoBuffer in = (IoBuffer) message;
-        ProtocolDecoder decoder = getDecoder0(session);
+        ProtocolDecoder decoder = factory.getDecoder(session);
         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
         
         // Loop until we don't have anymore byte in the buffer,
@@ -219,6 +234,7 @@
         // data in the buffer
         while (in.hasRemaining()) {
             int oldPos = in.position();
+            
             try {
                 synchronized (decoderOut) {
                     // Call the decoder with the read bytes
@@ -282,13 +298,13 @@
         
         // Bypass the encoding if the message is contained in a IoBuffer,
         // as it has already been encoded before
-        if (message instanceof IoBuffer || message instanceof FileRegion) {
+        if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
             nextFilter.filterWrite(session, writeRequest);
             return;
         }
 
         // Get the encoder in the session
-        ProtocolEncoder encoder = getEncoder0(session);
+        ProtocolEncoder encoder = factory.getEncoder(session);
 
         ProtocolEncoderOutput encoderOut = getEncoderOut(session,
                 nextFilter, writeRequest);
@@ -306,7 +322,21 @@
             encoder.encode(session, message, encoderOut);
             
             // Send it directly
-            ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture();
+            Queue<Object> bufferQueue = 
((AbstractProtocolEncoderOutput)encoderOut).getMessageQueue();
+            
+            // Write all the encoded messages now
+            while (!bufferQueue.isEmpty()) {
+                Object encodedMessage = bufferQueue.poll();
+                
+                // Flush only when the buffer has remaining.
+                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
encodedMessage).hasRemaining()) {
+                    SocketAddress destination = writeRequest.getDestination();
+                    WriteRequest encodedWriteRequest = new 
EncodedWriteRequest(encodedMessage, null, destination); 
+
+                    nextFilter.filterWrite(session, encodedWriteRequest);
+                }
+            }
+
             
             // Call the next filter
             nextFilter.filterWrite(session, new MessageWriteRequest(
@@ -330,7 +360,7 @@
     public void sessionClosed(NextFilter nextFilter, IoSession session)
             throws Exception {
         // Call finishDecode() first when a connection is closed.
-        ProtocolDecoder decoder = getDecoder0(session);
+        ProtocolDecoder decoder = factory.getDecoder(session);
         ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
         
         try {
@@ -358,6 +388,10 @@
                 WriteFuture future, SocketAddress destination) {
             super(encodedMessage, future, destination);
         }
+        
+        public boolean isEncoded() {
+            return true;
+        }
     }
 
     private static class MessageWriteRequest extends WriteRequestWrapper {
@@ -384,6 +418,7 @@
 
         public void flush(NextFilter nextFilter, IoSession session) {
             Queue<Object> messageQueue = getMessageQueue();
+            
             while (!messageQueue.isEmpty()) {
                 nextFilter.messageReceived(session, messageQueue.poll());
             }
@@ -408,15 +443,12 @@
         public WriteFuture flush() {
             Queue<Object> bufferQueue = getMessageQueue();
             WriteFuture future = null;
-            for (;;) {
+            
+            while (!bufferQueue.isEmpty()) {
                 Object encodedMessage = bufferQueue.poll();
-                if (encodedMessage == null) {
-                    break;
-                }
 
                 // Flush only when the buffer has remaining.
-                if (!(encodedMessage instanceof IoBuffer) ||
-                        ((IoBuffer) encodedMessage).hasRemaining()) {
+                if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) 
encodedMessage).hasRemaining()) {
                     future = new DefaultWriteFuture(session);
                     nextFilter.filterWrite(session, new 
EncodedWriteRequest(encodedMessage,
                             future, writeRequest.getDestination()));
@@ -430,43 +462,10 @@
 
             return future;
         }
-        
-        public void flushWithoutFuture() {
-            Queue<Object> bufferQueue = getMessageQueue();
-            for (;;) {
-                Object encodedMessage = bufferQueue.poll();
-                if (encodedMessage == null) {
-                    break;
-                }
-
-                // Flush only when the buffer has remaining.
-                if (!(encodedMessage instanceof IoBuffer) ||
-                        ((IoBuffer) encodedMessage).hasRemaining()) {
-                    SocketAddress destination = writeRequest.getDestination();
-                    WriteRequest writeRequest = new EncodedWriteRequest(
-                        encodedMessage, null, destination); 
-                    nextFilter.filterWrite(session, writeRequest);
-                }
-            }
-        }
     }
     
     //----------- Helper methods ---------------------------------------------
     /**
-     * Initialize the encoder and the decoder, storing them in the 
-     * session attributes.
-     */
-    private void initCodec(IoSession session) throws Exception {
-        // Creates the decoder and stores it into the newly created session 
-        ProtocolDecoder decoder = factory.getDecoder(session);
-        session.setAttribute(DECODER, decoder);
-
-        // Creates the encoder and stores it into the newly created session 
-        ProtocolEncoder encoder = factory.getEncoder(session);
-        session.setAttribute(ENCODER, encoder);
-    }
-    
-    /**
      * Dispose the encoder, decoder, and the callback for the decoded
      * messages.
      */
@@ -537,28 +536,6 @@
         return out;
     }
 
-    private ProtocolEncoder getEncoder0(IoSession session) throws Exception {
-        ProtocolEncoder encoder = (ProtocolEncoder) session
-                .getAttribute(ENCODER);
-        if (encoder == null) {
-            encoder = factory.getEncoder(session);
-            session.setAttribute(ENCODER, encoder);
-        }
-        return encoder;
-    }
-
-    private ProtocolDecoder getDecoder0(IoSession session) throws Exception {
-        ProtocolDecoder decoder = (ProtocolDecoder) session
-                .getAttribute(DECODER);
-        
-        if (decoder == null) {
-            decoder = factory.getDecoder(session);
-            session.setAttribute(DECODER, decoder);
-        }
-        
-        return decoder;
-    }
-
     private ProtocolEncoderOutput getEncoderOut(IoSession session,
         NextFilter nextFilter, WriteRequest writeRequest) {
         ProtocolEncoderOutput out = (ProtocolEncoderOutput) 
session.getAttribute(ENCODER_OUT);


Reply via email to