Author: norman
Date: Thu Sep 15 10:26:18 2011
New Revision: 1171031
URL: http://svn.apache.org/viewvc?rev=1171031&view=rev
Log:
Get rid of ChannelLocal. See PROTOCOL-29
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractSession.java
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/ChannelAttributeSupport.java
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/LineHandlerUpstreamHandler.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java?rev=1171031&r1=1171030&r2=1171031&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java
Thu Sep 15 10:26:18 2011
@@ -30,7 +30,6 @@ import org.apache.james.protocols.api.Li
import org.apache.james.protocols.api.ProtocolHandlerChain;
import org.apache.james.protocols.api.ProtocolSession;
import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
@@ -45,7 +44,7 @@ import org.jboss.netty.handler.codec.fra
*
*
*/
-public abstract class AbstractChannelUpstreamHandler extends
SimpleChannelUpstreamHandler implements ChannelAttributeSupport{
+public abstract class AbstractChannelUpstreamHandler extends
SimpleChannelUpstreamHandler {
private ProtocolHandlerChain chain;
@@ -56,7 +55,7 @@ public abstract class AbstractChannelUps
@Override
public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
- attributes.set(ctx.getChannel(),createSession(ctx));
+ ctx.setAttachment(createSession(ctx));
super.channelBound(ctx, e);
}
@@ -70,7 +69,7 @@ public abstract class AbstractChannelUps
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
e) throws Exception {
List<ConnectHandler> connectHandlers =
chain.getHandlers(ConnectHandler.class);
List<ConnectHandlerResultHandler> resultHandlers =
chain.getHandlers(ConnectHandlerResultHandler.class);
- ProtocolSession session = (ProtocolSession)
attributes.get(ctx.getChannel());
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
session.getLogger().info("Connection established from " +
session.getRemoteHost() + " (" + session.getRemoteIPAddress()+ ")");
if (connectHandlers != null) {
for (int i = 0; i < connectHandlers.size(); i++) {
@@ -97,7 +96,7 @@ public abstract class AbstractChannelUps
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
List<DisconnectHandler> connectHandlers =
chain.getHandlers(DisconnectHandler.class);
- ProtocolSession session = (ProtocolSession)
attributes.get(ctx.getChannel());
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
if (connectHandlers != null) {
for (int i = 0; i < connectHandlers.size(); i++) {
connectHandlers.get(i).onDisconnect(session);
@@ -113,7 +112,7 @@ public abstract class AbstractChannelUps
@SuppressWarnings("unchecked")
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
- ProtocolSession pSession = (ProtocolSession)
attributes.get(ctx.getChannel());
+ ProtocolSession pSession = (ProtocolSession) ctx.getAttachment();
LinkedList<LineHandler> lineHandlers =
chain.getHandlers(LineHandler.class);
LinkedList<LineHandlerResultHandler> resultHandlers =
chain.getHandlers(LineHandlerResultHandler.class);
@@ -149,11 +148,11 @@ public abstract class AbstractChannelUps
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
- ProtocolSession session = (ProtocolSession)
attributes.get(ctx.getChannel());
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
if (session != null) {
session.getLogger().info("Connection closed for " +
session.getRemoteHost() + " (" + session.getRemoteIPAddress()+ ")");
}
- cleanup(ctx.getChannel());
+ cleanup(ctx);
super.channelClosed(ctx, e);
}
@@ -162,7 +161,7 @@ public abstract class AbstractChannelUps
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
if ((e.getCause() instanceof TooLongFrameException) == false) {
- cleanup(ctx.getChannel());
+ cleanup(ctx);
}
}
@@ -171,8 +170,8 @@ public abstract class AbstractChannelUps
*
* @param channel
*/
- protected void cleanup(Channel channel) {
- ProtocolSession session = (ProtocolSession) attributes.remove(channel);
+ protected void cleanup(ChannelHandlerContext ctx) {
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
if (session != null) {
session.resetState();
session = null;
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractSession.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractSession.java?rev=1171031&r1=1171030&r2=1171031&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractSession.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractSession.java
Thu Sep 15 10:26:18 2011
@@ -19,11 +19,8 @@
package org.apache.james.protocols.impl;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.net.InetSocketAddress;
-import java.nio.channels.FileChannel;
import javax.net.ssl.SSLEngine;
@@ -32,12 +29,7 @@ import org.apache.james.protocols.api.TL
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedNioFile;
-import org.jboss.netty.handler.stream.ChunkedStream;
import org.slf4j.Logger;
/**
@@ -46,7 +38,7 @@ import org.slf4j.Logger;
*
*/
public abstract class AbstractSession implements TLSSupportedSession {
- protected ChannelHandlerContext handlerContext;
+ protected Channel channel;
protected InetSocketAddress socketAddress;
private Logger logger;
private SessionLog pLog = null;
@@ -55,24 +47,18 @@ public abstract class AbstractSession im
protected String user;
private String id;
- private boolean zeroCopy;
- public AbstractSession(Logger logger, ChannelHandlerContext
handlerContext, SSLEngine engine) {
- this(logger, handlerContext, engine, true);
- }
- public AbstractSession(Logger logger, ChannelHandlerContext
handlerContext, SSLEngine engine, boolean zeroCopy) {
- this.handlerContext = handlerContext;
- this.socketAddress = (InetSocketAddress)
handlerContext.getChannel().getRemoteAddress();
+ public AbstractSession(Logger logger, Channel channel, SSLEngine engine) {
+ this.channel = channel;
+ this.socketAddress = (InetSocketAddress) channel.getRemoteAddress();
this.logger = logger;
this.engine = engine;
- this.id = handlerContext.getChannel().getId() + "";
- this.zeroCopy = zeroCopy;
-
+ this.id = channel.getId() + "";
}
- public AbstractSession(Logger logger, ChannelHandlerContext
handlerContext) {
- this(logger, handlerContext, null);
+ public AbstractSession(Logger logger, Channel channel) {
+ this(logger, channel, null);
}
/**
@@ -104,12 +90,12 @@ public abstract class AbstractSession im
}
/**
- * Return underlying IoSession
+ * Return underlying {@link Channel}
*
* @return session
*/
- public ChannelHandlerContext getChannelHandlerContext() {
- return handlerContext;
+ public Channel getChannel() {
+ return channel;
}
/**
@@ -125,7 +111,7 @@ public abstract class AbstractSession im
public boolean isTLSStarted() {
if (isStartTLSSupported()) {
- return getChannelHandlerContext().getPipeline().get("sslHandler")
!= null;
+ return channel.getPipeline().get("sslHandler") != null;
}
return false;
@@ -136,12 +122,12 @@ public abstract class AbstractSession im
*/
public void startTLS() throws IOException {
if (isStartTLSSupported() && isTLSStarted() == false) {
- getChannelHandlerContext().getChannel().setReadable(false);
+ channel.setReadable(false);
SslHandler filter = new SslHandler(engine);
filter.getEngine().setUseClientMode(false);
resetState();
- getChannelHandlerContext().getPipeline().addFirst("sslHandler",
filter);
- getChannelHandlerContext().getChannel().setReadable(true);
+ channel.getPipeline().addFirst("sslHandler", filter);
+ channel.setReadable(true);
}
}
@@ -162,7 +148,6 @@ public abstract class AbstractSession im
* @see
org.apache.james.api.protocol.ProtocolSession#writeResponse(org.apache.james.api.protocol.Response)
*/
public void writeResponse(final Response response) {
- Channel channel = getChannelHandlerContext().getChannel();
if (response != null && channel.isConnected()) {
ChannelFuture cf = channel.write(response);
if (response.isEndSession()) {
@@ -174,32 +159,6 @@ public abstract class AbstractSession im
/*
* (non-Javadoc)
- * @see
org.apache.james.protocols.api.ProtocolSession#writeStream(java.io.InputStream)
- */
- public void writeStream(InputStream stream) {
- Channel channel = getChannelHandlerContext().getChannel();
- if (stream != null && channel.isConnected()) {
-
- if (stream instanceof FileInputStream && channel.getFactory()
instanceof NioServerSocketChannelFactory) {
- FileChannel fc = ((FileInputStream) stream).getChannel();
- try {
- if (zeroCopy) {
- channel.write(new DefaultFileRegion(fc, fc.position(),
fc.size()));
- } else {
- channel.write(new ChunkedNioFile(fc, 8192));
- }
- } catch (IOException e) {
- // Catch the exception and just pass it so we get the
exception later
- channel.write(new ChunkedStream(stream));
- }
- } else {
- channel.write(new ChunkedStream(stream));
- }
- }
- }
-
- /*
- * (non-Javadoc)
* @see org.apache.james.protocols.api.ProtocolSession#getSessionID()
*/
public String getSessionID() {
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/LineHandlerUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/LineHandlerUpstreamHandler.java?rev=1171031&r1=1171030&r2=1171031&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/LineHandlerUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/LineHandlerUpstreamHandler.java
Thu Sep 15 10:26:18 2011
@@ -31,19 +31,18 @@ import org.jboss.netty.channel.SimpleCha
*
* @param <Session>
*/
-public class LineHandlerUpstreamHandler<Session extends ProtocolSession>
extends SimpleChannelUpstreamHandler implements ChannelAttributeSupport{
+public class LineHandlerUpstreamHandler<Session extends ProtocolSession>
extends SimpleChannelUpstreamHandler {
- private LineHandler<Session> handler;
+ private final LineHandler<Session> handler;
+ private final Session session;
- public LineHandlerUpstreamHandler(LineHandler<Session> handler) {
+ public LineHandlerUpstreamHandler(Session session, LineHandler<Session>
handler) {
this.handler = handler;
+ this.session = session;
}
- @SuppressWarnings("unchecked")
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
- Session pSession = (Session) attributes.get(ctx.getChannel());
-
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
byte[] line;
if (buf.hasArray()) {
@@ -54,7 +53,7 @@ public class LineHandlerUpstreamHandler<
buf.getBytes(0, line);
}
- boolean disconnect = handler.onLine(pSession, line);
+ boolean disconnect = handler.onLine(session, line);
if (disconnect) ctx.getChannel().disconnect();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]