Author: norman Date: Wed Sep 28 12:25:59 2011 New Revision: 1176843 URL: http://svn.apache.org/viewvc?rev=1176843&view=rev Log: Factor out non netty specific stuff to AbstractProtocolTransport
Added: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (with props) Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java Added: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java?rev=1176843&view=auto ============================================================================== --- james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java (added) +++ james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java Wed Sep 28 12:25:59 2011 @@ -0,0 +1,100 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.protocols.api; + +import java.util.LinkedList; + +import org.apache.james.protocols.api.FutureResponse.ResponseListener; + + +public abstract class AbstractProtocolTransport implements ProtocolTransport{ + + // TODO: Should we limit the size ? + private final LinkedList<Response> responses = new LinkedList<Response>(); + /* + * (non-Javadoc) + * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession) + */ + public final void writeResponse(Response response, final ProtocolSession session) { + synchronized (responses) { + // just add the response to the queue. We will trigger the write operation later + responses.add(response); + + // trigger the write + writeQueuedResponses(session); + } + + } + + /** + * Helper method which tries to write all queued {@link Response}'s to the remote client. This method is aware of {@link FutureResponse} and makes sure the {@link Response}'s are written + * in the correct order + * + * This is related to PROTOCOLS-36 + * + * @param session + */ + private void writeQueuedResponses(final ProtocolSession session) { + synchronized (responses) { + Response queuedResponse = null; + + // dequeue Responses until non is left + while ((queuedResponse = responses.poll()) != null) { + + // check if we need to take special care of FutureResponses + if (queuedResponse instanceof FutureResponse) { + FutureResponse futureResponse =(FutureResponse) queuedResponse; + if (futureResponse.isReady()) { + // future is ready so we can write it without blocking the IO-Thread + writeResponseToClient(queuedResponse, session); + } else { + + // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread + futureResponse.addListener(new ResponseListener() { + + public void onResponse(FutureResponse response) { + writeResponseToClient(response, session); + writeQueuedResponses(session); + } + }); + + // just break here as we will trigger the dequeue later + break; + } + + } else { + // the Response is not a FutureResponse, so just write it back the the remote peer + writeResponseToClient(queuedResponse, session); + } + } + } + + } + + /** + * Write the {@link Response} to the client + * + * @param response + * @param session + */ + protected abstract void writeResponseToClient(Response response, ProtocolSession session); + +} + Propchange: james/protocols/trunk/api/src/main/java/org/apache/james/protocols/api/AbstractProtocolTransport.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java?rev=1176843&r1=1176842&r2=1176843&view=diff ============================================================================== --- james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java (original) +++ james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/NettyProtocolTransport.java Wed Sep 28 12:25:59 2011 @@ -20,14 +20,11 @@ package org.apache.james.protocols.impl; import java.net.InetSocketAddress; -import java.util.LinkedList; import javax.net.ssl.SSLEngine; -import org.apache.james.protocols.api.FutureResponse; -import org.apache.james.protocols.api.FutureResponse.ResponseListener; +import org.apache.james.protocols.api.AbstractProtocolTransport; import org.apache.james.protocols.api.ProtocolSession; -import org.apache.james.protocols.api.ProtocolTransport; import org.apache.james.protocols.api.Response; import org.apache.james.protocols.api.StartTlsResponse; import org.apache.james.protocols.api.handler.LineHandler; @@ -39,14 +36,12 @@ import org.jboss.netty.handler.ssl.SslHa /** * A Netty implementation of a ProtocolTransport */ -public class NettyProtocolTransport implements ProtocolTransport { +public class NettyProtocolTransport extends AbstractProtocolTransport { private final Channel channel; private final SSLEngine engine; private int lineHandlerCount = 0; - - // TODO: Should we limit the size ? - private final LinkedList<Response> responses = new LinkedList<Response>(); + public NettyProtocolTransport(Channel channel, SSLEngine engine) { this.channel = channel; @@ -88,85 +83,6 @@ public class NettyProtocolTransport impl return engine != null; } - /* - * (non-Javadoc) - * @see org.apache.james.protocols.api.ProtocolTransport#writeResponse(org.apache.james.protocols.api.Response, org.apache.james.protocols.api.ProtocolSession) - */ - public void writeResponse(Response response, final ProtocolSession session) { - synchronized (responses) { - // just add the response to the queue. We will trigger the write operation later - responses.add(response); - - // trigger the write - writeQueuedResponses(session); - } - - } - - /** - * Helper method which tries to write all queued {@link Response}'s to the remote client. This method is aware of {@link FutureResponse} and makes sure the {@link Response}'s are written - * in the correct order - * - * This is related to PROTOCOLS-36 - * - * @param session - */ - private void writeQueuedResponses(final ProtocolSession session) { - synchronized (responses) { - Response queuedResponse = null; - - // dequeue Responses until non is left - while ((queuedResponse = responses.poll()) != null) { - - // check if we need to take special care of FutureResponses - if (queuedResponse instanceof FutureResponse) { - FutureResponse futureResponse =(FutureResponse) queuedResponse; - if (futureResponse.isReady()) { - // future is ready so we can write it without blocking the IO-Thread - writeResponseToChannel(queuedResponse, session); - } else { - - // future is not ready so we need to write it via a ResponseListener otherwise we MAY block the IO-Thread - futureResponse.addListener(new ResponseListener() { - - public void onResponse(FutureResponse response) { - writeResponseToChannel(response, session); - writeQueuedResponses(session); - } - }); - - // just break here as we will trigger the dequeue later - break; - } - - } else { - // the Response is not a FutureResponse, so just write it back the the remote peer - writeResponseToChannel(queuedResponse, session); - } - } - } - - } - - private void writeResponseToChannel(Response response, ProtocolSession session) { - if (response != null && channel.isConnected()) { - ChannelFuture cf = channel.write(response); - if (response.isEndSession()) { - // close the channel if needed after the message was written out - cf.addListener(ChannelFutureListener.CLOSE); - } - if (response instanceof StartTlsResponse) { - if (isStartTLSSupported()) { - channel.setReadable(false); - SslHandler filter = new SslHandler(engine); - filter.getEngine().setUseClientMode(false); - session.resetState(); - channel.getPipeline().addFirst("sslHandler", filter); - channel.setReadable(true); - } - } - } - } /* * (non-Javadoc) @@ -200,5 +116,28 @@ public class NettyProtocolTransport impl public int getPushedLineHandlerCount() { return lineHandlerCount; } + + @Override + protected void writeResponseToClient(Response response, ProtocolSession session) { + if (response != null && channel.isConnected()) { + ChannelFuture cf = channel.write(response); + if (response.isEndSession()) { + // close the channel if needed after the message was written out + cf.addListener(ChannelFutureListener.CLOSE); + } + if (response instanceof StartTlsResponse) { + if (isStartTLSSupported()) { + channel.setReadable(false); + SslHandler filter = new SslHandler(engine); + filter.getEngine().setUseClientMode(false); + session.resetState(); + channel.getPipeline().addFirst("sslHandler", filter); + channel.setReadable(true); + } + } + } + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org