Author: norman
Date: Wed Sep 28 07:02:53 2011
New Revision: 1176754

URL: http://svn.apache.org/viewvc?rev=1176754&view=rev
Log:
Better use synchronized so we are 100 % sure the response order is not mixed in 
all cases. See PROTOCOLS-36

Modified:
    
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java

Modified: 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
URL: 
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java?rev=1176754&r1=1176753&r2=1176754&view=diff
==============================================================================
--- 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
 (original)
+++ 
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
 Wed Sep 28 07:02:53 2011
@@ -20,7 +20,7 @@
 package org.apache.james.protocols.impl;
 
 import java.net.InetSocketAddress;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.LinkedList;
 
 import javax.net.ssl.SSLEngine;
 
@@ -46,7 +46,7 @@ public class NettyProtocolTransport impl
     private int lineHandlerCount = 0;
     
     // TODO: Should we limit the size ?
-    private final ConcurrentLinkedQueue<Response> responses = new 
ConcurrentLinkedQueue<Response>();
+    private final LinkedList<Response> responses = new LinkedList<Response>();
     
     public NettyProtocolTransport(Channel channel, SSLEngine engine) {
         this.channel = channel;
@@ -93,11 +93,14 @@ public class NettyProtocolTransport impl
      * @see 
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
 org.apache.james.protocols.api.ProtocolSession)
      */
     public void writeResponse(Response response, final ProtocolSession 
session) {
-        // just add the response to the queue. We will trigger the write 
operation later
-        responses.add(response);
-         
-        // trigger the write
-        writeQueuedResponses(session);
+        synchronized (responses) {
+            // just add the response to the queue. We will trigger the write 
operation later
+            responses.add(response);
+             
+            // trigger the write
+            writeQueuedResponses(session);
+        }
+
     }
     
     /**
@@ -108,38 +111,41 @@ public class NettyProtocolTransport impl
      * 
      * @param session
      */
-    private void writeQueuedResponses(final ProtocolSession session) {
-        Response queuedResponse = null;
-        
-        // dequeue Responses until non is left
-        while ((queuedResponse = responses.poll()) != null) {
+    private  void writeQueuedResponses(final ProtocolSession session) {
+        synchronized (responses) {
+            Response queuedResponse = null;
             
-            // check if we need to take special care of FutureResponses
-            if (queuedResponse instanceof FutureResponse) {
-                FutureResponse futureResponse =(FutureResponse) queuedResponse;
-                if (futureResponse.isReady()) {
-                    // future is ready so we can write it without blocking the 
IO-Thread
-                    writeResponseToChannel(queuedResponse, session);
-                } else {
-                    
-                    // future is not ready so we need to write it via a 
ResponseListener otherwise we MAY block the IO-Thread
-                    futureResponse.addListener(new ResponseListener() {
+            // dequeue Responses until non is left
+            while ((queuedResponse = responses.poll()) != null) {
+                
+                // check if we need to take special care of FutureResponses
+                if (queuedResponse instanceof FutureResponse) {
+                    FutureResponse futureResponse =(FutureResponse) 
queuedResponse;
+                    if (futureResponse.isReady()) {
+                        // future is ready so we can write it without blocking 
the IO-Thread
+                        writeResponseToChannel(queuedResponse, session);
+                    } else {
+                        
+                        // future is not ready so we need to write it via a 
ResponseListener otherwise we MAY block the IO-Thread
+                        futureResponse.addListener(new ResponseListener() {
+                            
+                            public void onResponse(FutureResponse response) {
+                                writeResponseToChannel(response, session);
+                                writeQueuedResponses(session);
+                            }
+                        });
                         
-                        public void onResponse(FutureResponse response) {
-                            writeResponseToChannel(response, session);
-                            writeQueuedResponses(session);
-                        }
-                    });
+                        // just break here as we will trigger the dequeue later
+                        break;
+                    }
                     
-                    // just break here as we will trigger the dequeue later
-                    break;
+                } else {
+                    // the Response is not a FutureResponse, so just write it 
back the the remote peer
+                    writeResponseToChannel(queuedResponse, session);
                 }
-                
-            } else {
-                // the Response is not a FutureResponse, so just write it back 
the the remote peer
-                writeResponseToChannel(queuedResponse, session);
             }
         }
+        
     }
     
     private void writeResponseToChannel(Response response, ProtocolSession 
session) {



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to