Author: norman
Date: Wed Sep 28 06:57:35 2011
New Revision: 1176753

URL: http://svn.apache.org/viewvc?rev=1176753&view=rev
Log:
Make sure the Response's are written in the right order even if we mix 
FutureResponse and Response implementations. See PROTOCOLS-36

Modified:
    
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java

Modified: 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
URL: 
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java?rev=1176753&r1=1176752&r2=1176753&view=diff
==============================================================================
--- 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
 (original)
+++ 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
 Wed Sep 28 06:57:35 2011
@@ -20,6 +20,7 @@
 package org.apache.james.protocols.impl;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.net.ssl.SSLEngine;
 
@@ -40,10 +41,13 @@ import org.jboss.netty.handler.ssl.SslHa
  */
 public class NettyProtocolTransport implements ProtocolTransport {
     
-    private Channel channel;
-    private SSLEngine engine;
+    private final Channel channel;
+    private final SSLEngine engine;
     private int lineHandlerCount = 0;
-
+    
+    // TODO: Should we limit the size ?
+    private final ConcurrentLinkedQueue<Response> responses = new 
ConcurrentLinkedQueue<Response>();
+    
     public NettyProtocolTransport(Channel channel, SSLEngine engine) {
         this.channel = channel;
         this.engine = engine;
@@ -89,15 +93,52 @@ public class NettyProtocolTransport impl
      * @see 
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
 org.apache.james.protocols.api.ProtocolSession)
      */
     public void writeResponse(Response response, final ProtocolSession 
session) {
-        if (response instanceof FutureResponse) {
-            ((FutureResponse) response).addListener(new ResponseListener() {
-                
-                public void onResponse(FutureResponse response) {
-                    writeResponseToChannel(response, session);                 
   
+        // just add the response to the queue. We will trigger the write 
operation later
+        responses.add(response);
+         
+        // trigger the write
+        writeQueuedResponses(session);
+    }
+    
+    /**
+     * Helper method which tries to write all queued {@link Response}'s to the 
remote client. This method is aware of {@link FutureResponse} and makes sure 
the {@link Response}'s are written
+     * in the correct order
+     * 
+     * This is related to PROTOCOLS-36
+     * 
+     * @param session
+     */
+    private void writeQueuedResponses(final ProtocolSession session) {
+        Response queuedResponse = null;
+        
+        // dequeue Responses until non is left
+        while ((queuedResponse = responses.poll()) != null) {
+            
+            // check if we need to take special care of FutureResponses
+            if (queuedResponse instanceof FutureResponse) {
+                FutureResponse futureResponse =(FutureResponse) queuedResponse;
+                if (futureResponse.isReady()) {
+                    // future is ready so we can write it without blocking the 
IO-Thread
+                    writeResponseToChannel(queuedResponse, session);
+                } else {
+                    
+                    // future is not ready so we need to write it via a 
ResponseListener otherwise we MAY block the IO-Thread
+                    futureResponse.addListener(new ResponseListener() {
+                        
+                        public void onResponse(FutureResponse response) {
+                            writeResponseToChannel(response, session);
+                            writeQueuedResponses(session);
+                        }
+                    });
+                    
+                    // just break here as we will trigger the dequeue later
+                    break;
                 }
-            });
-        } else {
-            writeResponseToChannel(response, session);
+                
+            } else {
+                // the Response is not a FutureResponse, so just write it back 
the the remote peer
+                writeResponseToChannel(queuedResponse, session);
+            }
         }
     }
     



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to