Author: norman
Date: Wed Sep 28 06:57:35 2011
New Revision: 1176753
URL: http://svn.apache.org/viewvc?rev=1176753&view=rev
Log:
Make sure the Response's are written in the right order even if we mix
FutureResponse and Response implementations. See PROTOCOLS-36
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java?rev=1176753&r1=1176752&r2=1176753&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java
Wed Sep 28 06:57:35 2011
@@ -20,6 +20,7 @@
package org.apache.james.protocols.impl;
import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLEngine;
@@ -40,10 +41,13 @@ import org.jboss.netty.handler.ssl.SslHa
*/
public class NettyProtocolTransport implements ProtocolTransport {
- private Channel channel;
- private SSLEngine engine;
+ private final Channel channel;
+ private final SSLEngine engine;
private int lineHandlerCount = 0;
-
+
+ // TODO: Should we limit the size ?
+ private final ConcurrentLinkedQueue<Response> responses = new
ConcurrentLinkedQueue<Response>();
+
public NettyProtocolTransport(Channel channel, SSLEngine engine) {
this.channel = channel;
this.engine = engine;
@@ -89,15 +93,52 @@ public class NettyProtocolTransport impl
* @see
org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response,
org.apache.james.protocols.api.ProtocolSession)
*/
public void writeResponse(Response response, final ProtocolSession
session) {
- if (response instanceof FutureResponse) {
- ((FutureResponse) response).addListener(new ResponseListener() {
-
- public void onResponse(FutureResponse response) {
- writeResponseToChannel(response, session);
+ // just add the response to the queue. We will trigger the write
operation later
+ responses.add(response);
+
+ // trigger the write
+ writeQueuedResponses(session);
+ }
+
+ /**
+ * Helper method which tries to write all queued {@link Response}'s to the
remote client. This method is aware of {@link FutureResponse} and makes sure
the {@link Response}'s are written
+ * in the correct order
+ *
+ * This is related to PROTOCOLS-36
+ *
+ * @param session
+ */
+ private void writeQueuedResponses(final ProtocolSession session) {
+ Response queuedResponse = null;
+
+ // dequeue Responses until non is left
+ while ((queuedResponse = responses.poll()) != null) {
+
+ // check if we need to take special care of FutureResponses
+ if (queuedResponse instanceof FutureResponse) {
+ FutureResponse futureResponse =(FutureResponse) queuedResponse;
+ if (futureResponse.isReady()) {
+ // future is ready so we can write it without blocking the
IO-Thread
+ writeResponseToChannel(queuedResponse, session);
+ } else {
+
+ // future is not ready so we need to write it via a
ResponseListener otherwise we MAY block the IO-Thread
+ futureResponse.addListener(new ResponseListener() {
+
+ public void onResponse(FutureResponse response) {
+ writeResponseToChannel(response, session);
+ writeQueuedResponses(session);
+ }
+ });
+
+ // just break here as we will trigger the dequeue later
+ break;
}
- });
- } else {
- writeResponseToChannel(response, session);
+
+ } else {
+ // the Response is not a FutureResponse, so just write it back
the the remote peer
+ writeResponseToChannel(queuedResponse, session);
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]