Author: norman
Date: Thu May 6 18:11:29 2010
New Revision: 941836
URL: http://svn.apache.org/viewvc?rev=941836&view=rev
Log:
Add reusable generic implementation code. This is based on NETTY (PROTOCOLS-3)
Added:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java
- copied, changed from r941826,
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java
Removed:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/ProtocolHandlerChainImpl.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java?rev=941836&r1=941835&r2=941836&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java
Thu May 6 18:11:29 2010
@@ -18,28 +18,9 @@
****************************************************************/
package org.apache.james.socket.netty;
-import java.io.FileInputStream;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.security.KeyStore;
import java.util.concurrent.Executors;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.annotation.Resource;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.HierarchicalConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.james.api.dnsservice.DNSService;
-import org.apache.james.lifecycle.Configurable;
-import org.apache.james.lifecycle.LogEnabled;
-import org.apache.james.services.FileSystem;
-import org.apache.james.services.MailServer;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
@@ -48,327 +29,61 @@ import org.jboss.netty.channel.socket.ni
* Abstract base class for Servers which want to use async io
*
*/
-public abstract class AbstractAsyncServer implements LogEnabled, Configurable{
- /**
- * The default value for the connection backlog.
- */
- private static final int DEFAULT_BACKLOG = 200;
-
- /**
- * The default value for the connection timeout.
- */
- private static final int DEFAULT_TIMEOUT = 5* 60;
+public abstract class AbstractAsyncServer {
- /**
- * The name of the parameter defining the connection timeout.
- */
- private static final String TIMEOUT_NAME = "connectiontimeout";
- /**
- * The name of the parameter defining the connection backlog.
- */
- private static final String BACKLOG_NAME = "connectionBacklog";
+ protected int connPerIP = 0;
- /**
- * The name of the parameter defining the service hello name.
- */
- public static final String HELLO_NAME = "helloName";
-
- private FileSystem fileSystem;
-
- /**
- * The internal mail server service.
- */
- private MailServer mailServer;
-
- private Log logger;
-
- private DNSService dns;
-
- private boolean enabled;
-
- protected int connPerIP;
+ protected int connectionLimit = 0;
- private boolean useStartTLS;
- private boolean useSSL;
-
-
- protected int connectionLimit;
-
- private String helloName;
-
- private String keystore;
-
- private String secret;
-
- private int backlog;
+ private int backlog = 250;
- private InetAddress bindTo;
-
private int port;
- private int timeout;
+ private int timeout = 120;
- private SSLContext context;
+ private ServerBootstrap bootstrap;
- private ServerBootstrap bootstrap;
+ private boolean started;
- @Resource(name="dnsserver")
- public final void setDNSService(DNSService dns) {
- this.dns = dns;
- }
-
- @Resource(name="filesystem")
- public final void setFileSystem(FileSystem filesystem) {
- this.fileSystem = filesystem;
- }
-
- @Resource(name="James")
- public final void setMailServer(MailServer mailServer) {
- this.mailServer = mailServer;
+ private String ip;
+
+ public AbstractAsyncServer(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
}
-
- /*
- * (non-Javadoc)
- * @see
org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
+
+ /**
+ * Start the server
+ *
*/
- public final void setLog(Log logger) {
- this.logger = logger;
- }
+ public synchronized final void start() {
+ if (started)
+ throw new IllegalStateException("Server running allready");
- /*
- * (non-Javadoc)
- * @see
org.apache.james.lifecycle.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration)
- */
- public final void configure(HierarchicalConfiguration config) throws
ConfigurationException{
-
- Configuration handlerConfiguration =
((HierarchicalConfiguration)config).configurationAt("handler");
-
- enabled = config.getBoolean("[...@enabled]", true);
-
- final Log logger = getLogger();
- if (!enabled) {
- logger.info(getServiceType() + " disabled by configuration");
- return;
- }
-
-
- /*
- boolean
streamdump=handlerConfiguration.getChild("streamdump").getAttributeAsBoolean("enabled",
false);
- String streamdumpDir=streamdump ?
handlerConfiguration.getChild("streamdump").getAttribute("directory", null) :
null;
- setStreamDumpDir(streamdumpDir);
- */
-
- port = config.getInt("port",getDefaultPort());
-
-
-
- StringBuilder infoBuffer;
-
-
- try {
- final String bindAddress = config.getString("bind",null);
- if( null != bindAddress ) {
- bindTo = InetAddress.getByName(bindAddress);
- infoBuffer =
- new StringBuilder(64)
- .append(getServiceType())
- .append(" bound to: ")
- .append(bindTo);
- logger.info(infoBuffer.toString());
- }
- }
- catch( final UnknownHostException unhe ) {
- throw new ConfigurationException( "Malformed bind parameter in
configuration of service " + getServiceType(), unhe );
- }
-
- configureHelloName(handlerConfiguration);
-
- timeout = handlerConfiguration.getInt(TIMEOUT_NAME,DEFAULT_TIMEOUT);
-
- infoBuffer =
- new StringBuilder(64)
- .append(getServiceType())
- .append(" handler connection timeout is: ")
- .append(timeout);
- logger.info(infoBuffer.toString());
-
- backlog = config.getInt(BACKLOG_NAME,DEFAULT_BACKLOG);
-
- infoBuffer =
- new StringBuilder(64)
- .append(getServiceType())
- .append(" connection backlog is: ")
- .append(backlog);
- logger.info(infoBuffer.toString());
-
-
- String connectionLimitString =
config.getString("connectionLimit",null);
- if (connectionLimitString != null) {
- try {
- connectionLimit = new Integer(connectionLimitString);
- } catch (NumberFormatException nfe) {
- logger.error("Connection limit value is not properly
formatted.", nfe);
- }
- if (connectionLimit < 0) {
- logger.error("Connection limit value cannot be less than
zero.");
- throw new ConfigurationException("Connection limit value
cannot be less than zero.");
- } else if (connectionLimit > 0){
- infoBuffer = new StringBuilder(128)
- .append(getServiceType())
- .append(" will allow a maximum of ")
- .append(connectionLimitString)
- .append(" connections.");
- logger.info(infoBuffer.toString());
- }
- }
-
- String connectionLimitPerIP =
handlerConfiguration.getString("connectionLimitPerIP",null);
- if (connectionLimitPerIP != null) {
- try {
- connPerIP = new Integer(connectionLimitPerIP).intValue();
- } catch (NumberFormatException nfe) {
- logger.error("Connection limit per IP value is not properly
formatted.", nfe);
- }
- if (connPerIP < 0) {
- logger.error("Connection limit per IP value cannot be less
than zero.");
- throw new ConfigurationException("Connection limit value
cannot be less than zero.");
- } else if (connPerIP > 0){
- infoBuffer = new StringBuilder(128)
- .append(getServiceType())
- .append(" will allow a maximum of ")
- .append(connPerIP)
- .append(" per IP connections for " +getServiceType());
- logger.info(infoBuffer.toString());
- }
- }
-
-
- useStartTLS = config.getBoolean("tl...@starttls]", false);
- useSSL = config.getBoolean("tl...@sockettls]", false);
-
- if (useSSL && useStartTLS) throw new ConfigurationException("startTLS
is only supported when using plain sockets");
-
- if (useStartTLS || useSSL) {
- keystore = config.getString("tls.keystore", null);
- if (keystore == null) {
- throw new ConfigurationException("keystore needs to get
configured");
- }
- secret = config.getString("tls.secret","");
- }
-
- doConfigure(config);
+ bootstrap = new ServerBootstrap(new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
+ // Configure the pipeline factory.
+ bootstrap.setPipelineFactory(createPipelineFactory());
- }
-
-
- @PostConstruct
- public final void init() throws Exception {
- if (isEnabled()) {
- preInit();
- buildSSLContext();
-
- bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
-
Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // Configure the pipeline factory.
- bootstrap.setPipelineFactory(createPipelineFactory());
-
- // Bind and start to accept incoming connections.
- bootstrap.setOption("backlog",backlog);
- bootstrap.setOption("reuseAddress",true);
-
- //acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE,
timeout );
- //acceptor.setHandler(createIoHandler());
- bootstrap.bind(new InetSocketAddress(bindTo,port));
- }
- }
+ // Bind and start to accept incoming connections.
+ bootstrap.setOption("backlog", backlog);
+ bootstrap.setOption("reuseAddress", true);
+
+ bootstrap.bind(new InetSocketAddress(ip, port));
+ started = true;
- @PreDestroy
- public final void destroy() {
- getLogger().info("Dispose " + getServiceType());
-
- bootstrap.releaseExternalResources();
- }
-
-
- /**
- * This method is called on init of the Server. Subclasses should override
this method to init stuff
- *
- * @throws Exception
- */
- protected void preInit() throws Exception {
- // override me
- }
-
- protected void doConfigure(HierarchicalConfiguration config) throws
ConfigurationException {
- // override me
}
/**
- * Return the DNSService
- *
- * @return dns
+ * Stop the server
*/
- protected DNSService getDNSService() {
- return dns;
+ public synchronized final void stop() {
+ bootstrap.releaseExternalResources();
+ started = false;
}
- /**
- * Return the MailServer
- *
- * @return mailServer
- */
- protected MailServer getMailServer() {
- return mailServer;
- }
- /**
- * Return the FileSystem
- *
- * @return fileSystem
- */
- protected FileSystem getFileSystem() {
- return fileSystem;
- }
-
- /**
- * Configure the helloName for the given Configuration
- *
- * @param handlerConfiguration
- */
- private void configureHelloName(Configuration handlerConfiguration) {
- StringBuilder infoBuffer;
- String hostName = null;
- try {
- hostName = dns.getHostName(dns.getLocalHost());
- } catch (UnknownHostException ue) {
- hostName = "localhost";
- }
-
- infoBuffer =
- new StringBuilder(64)
- .append(getServiceType())
- .append(" is running on: ")
- .append(hostName);
- getLogger().info(infoBuffer.toString());
-
- boolean autodetect = handlerConfiguration.getBoolean(HELLO_NAME +
"/[...@autodetect]", true);
- if (autodetect) {
- helloName = hostName;
- } else {
- // Should we use the defaultdomain here ?
- helloName = handlerConfiguration.getString(HELLO_NAME +
"/localhost");
- }
-
- infoBuffer =
- new StringBuilder(64)
- .append(getServiceType())
- .append(" handler hello name is: ")
- .append(helloName);
- getLogger().info(infoBuffer.toString());
- }
/**
* Return the port this server will listen on
@@ -379,110 +94,50 @@ public abstract class AbstractAsyncServe
return port;
}
- /**
- * Return the logger
- *
- * @return logger
- */
- protected Log getLogger() {
- return logger;
- }
- /**
- * Return if the server is enabled by the configuration
- *
- * @return enabled
- */
- public boolean isEnabled() {
- return enabled;
- }
/**
- * Return helloName for this server
+ * Create ChannelPipelineFactory to use by this Server implementation
*
- * @return helloName
+ * @return factory
*/
- public String getHelloName() {
- return helloName;
- }
-
-
- /**
- * Return if startTLS is supported by this server
- *
- * @return startTlsSupported
- */
- protected boolean isStartTLSSupported() {
- return useStartTLS;
- }
+ protected abstract ChannelPipelineFactory createPipelineFactory();
/**
- * Return if the socket is using SSL
- *
- * @return useSSL
- */
- protected boolean isSSLSocket() {
- return useSSL;
- }
-
- /**
- * Build the SSLEngine
+ * Set the read/write timeout for the server. This will throw a {...@link
IllegalStateException} if the
+ * server is running.
*
- * @throws Exception
+ * @param timeout
*/
-
- private void buildSSLContext() throws Exception {
- if (useStartTLS) {
- String algorithm = "SunX509";
- KeyStore ks = KeyStore.getInstance("JKS");
- ks.load(new FileInputStream(fileSystem.getFile(keystore)),
secret.toCharArray());
-
- // Set up key manager factory to use our key store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm);
- kmf.init(ks, secret.toCharArray());
-
- // Initialize the SSLContext to work with our key managers.
- context = SSLContext.getInstance("TLS");
- context.init(kmf.getKeyManagers(), null, null);
-
-
- }
+ public synchronized void setTimeout(int timeout) {
+ if (started) throw new IllegalStateException("Can only be set when the
server is not running");
+ this.timeout = timeout;
}
-
-
- /**
- * Createh IoHandler to use by this Server implementation
- *
- * @return ioHandler
- */
- protected abstract ChannelPipelineFactory createPipelineFactory();
/**
- * Return the SslContextFactory which was created for this service.
- *
- * @return contextFactory
+ * Set the Backlog for the socket. This will throw a {...@link
IllegalStateException} if the server is running.
+ * @param backlog
*/
+ public synchronized void setBacklog(int backlog) {
+ if (started) throw new IllegalStateException("Can only be set when the
server is not running");
+ this.backlog = backlog;
+ }
/**
- * Return the default port which will get used for this server if non is
specify in the configuration
+ * Return the backlog for the socket
*
- * @return port
+ * @return backlog
*/
- protected abstract int getDefaultPort();
+ public int getBacklog() {
+ return backlog;
+ }
/**
- * Return textual representation of the service this server provide
- *
- * @return serviceType
+ * Return the read/write timeout for the socket.
+ * @return
*/
- protected abstract String getServiceType();
-
- protected int getTimeout() {
+ public int getTimeout() {
return timeout;
}
-
- protected SSLContext getSSLContext() {
- return context;
- }
}
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java?rev=941836&r1=941835&r2=941836&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java
Thu May 6 18:11:29 2010
@@ -32,7 +32,6 @@ import org.jboss.netty.handler.connectio
import org.jboss.netty.handler.connection.ConnectionPerIpLimitUpstreamHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timer;
/**
* Abstract base class for {...@link ChannelPipelineFactory} implementations
@@ -42,13 +41,19 @@ import org.jboss.netty.util.Timer;
public abstract class AbstractChannelPipelineFactory implements
ChannelPipelineFactory{
public final static int MAX_LINE_LENGTH = 8192;
- private final Timer timer = new HashedWheelTimer();
private final ConnectionLimitUpstreamHandler connectionLimitHandler;
private final ConnectionPerIpLimitUpstreamHandler
connectionPerIpLimitHandler;
+ private TimeoutHandler timeoutHandler;
+
+ public AbstractChannelPipelineFactory(int timeout, int maxConnections, int
maxConnectsPerIp) {
+ timeoutHandler = new TimeoutHandler(new HashedWheelTimer(), timeout,
timeout, 0);
+ connectionLimitHandler = new
ConnectionLimitUpstreamHandler(maxConnections);
+ connectionPerIpLimitHandler = new
ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp);
+ }
+
public AbstractChannelPipelineFactory() {
- connectionLimitHandler = new
ConnectionLimitUpstreamHandler(getMaxConnections());
- connectionPerIpLimitHandler = new
ConnectionPerIpLimitUpstreamHandler(getMaxConnectionsPerIP());
+ this(120, 0, 0);
}
/*
* (non-Javadoc)
@@ -70,7 +75,7 @@ public abstract class AbstractChannelPip
pipeline.addLast("encoderResponse", createEncoder());
pipeline.addLast("streamer", new ChunkedWriteHandler());
- pipeline.addLast("timeoutHandler", new TimeoutHandler(timer, 120, 120,
0));
+ pipeline.addLast("timeoutHandler", timeoutHandler);
pipeline.addLast("coreHandler", createHandler());
@@ -91,27 +96,6 @@ public abstract class AbstractChannelPip
*/
protected abstract OneToOneEncoder createEncoder();
- /**
- * Return the timeout in seconds
- *
- * @return timeout
- */
- protected abstract int getTimeout();
-
-
- /**
- * Return the max connections
- *
- * @return max connections
- */
- protected abstract int getMaxConnections();
-
- /**
- * Return the max connections per ip
- *
- * @return max connections per ip
- */
- protected abstract int getMaxConnectionsPerIP();
}
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java?rev=941836&r1=941835&r2=941836&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java
Thu May 6 18:11:29 2010
@@ -61,6 +61,9 @@ public abstract class AbstractChannelUps
+ /**
+ * Call the {...@link ConnectHandler} instances which are stored in the
{...@link ProtocolHandlerChain}
+ */
@SuppressWarnings("unchecked")
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
e) throws Exception {
@@ -76,6 +79,9 @@ public abstract class AbstractChannelUps
+ /**
+ * Call the {...@link LineHandler}
+ */
@SuppressWarnings("unchecked")
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
@@ -111,7 +117,12 @@ public abstract class AbstractChannelUps
}
}
- private void cleanup(Channel channel) {
+ /**
+ * Cleanup the channel
+ *
+ * @param channel
+ */
+ protected void cleanup(Channel channel) {
ProtocolSession session = (ProtocolSession) attributes.get(channel);
if (session != null) {
session.resetState();
Copied:
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java
(from r941826,
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java)
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java?p2=james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java&p1=james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java&r1=941826&r2=941836&rev=941836&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java
Thu May 6 18:11:29 2010
@@ -38,7 +38,7 @@ import org.jboss.netty.handler.stream.Ch
*
*
*/
-public abstract class AbstractNettySession implements TLSSupportedSession {
+public abstract class AbstractSession implements TLSSupportedSession {
protected ChannelHandlerContext handlerContext;
protected InetSocketAddress socketAddress;
@@ -46,14 +46,14 @@ public abstract class AbstractNettySessi
protected SSLEngine engine;
protected String user;
- public AbstractNettySession(Log logger, ChannelHandlerContext
handlerContext, SSLEngine engine) {
+ public AbstractSession(Log logger, ChannelHandlerContext handlerContext,
SSLEngine engine) {
this.handlerContext = handlerContext;
this.socketAddress = (InetSocketAddress)
handlerContext.getChannel().getRemoteAddress();
this.logger = logger;
this.engine = engine;
}
- public AbstractNettySession(Log logger, ChannelHandlerContext
handlerContext) {
+ public AbstractSession(Log logger, ChannelHandlerContext handlerContext) {
this(logger, handlerContext, null);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]