Author: davsclaus
Date: Sun Apr 8 12:52:17 2012
New Revision: 1310991
URL: http://svn.apache.org/viewvc?rev=1310991&view=rev
Log:
CAMEL-5150: Some cleanup in camel-netty according to Netty docs.
Added:
camel/branches/camel-2.9.x/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
- copied unchanged from r1310979,
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1310979
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
(original)
+++
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
Sun Apr 8 12:52:17 2012
@@ -68,8 +68,6 @@ public class NettyConfiguration implemen
private long sendBufferSize = 65536;
private long receiveBufferSize = 65536;
private int receiveBufferSizePredictor;
- private int corePoolSize = 10;
- private int maxPoolSize = 100;
private int workerCount;
private String keyStoreFormat;
private String securityProvider;
@@ -385,22 +383,6 @@ public class NettyConfiguration implemen
this.trustStoreFile = trustStoreFile;
}
- public int getCorePoolSize() {
- return corePoolSize;
- }
-
- public void setCorePoolSize(int corePoolSize) {
- this.corePoolSize = corePoolSize;
- }
-
- public int getMaxPoolSize() {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(int maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
- }
-
public String getKeyStoreFormat() {
return keyStoreFormat;
}
Modified:
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
(original)
+++
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
Sun Apr 8 12:52:17 2012
@@ -46,12 +46,14 @@ public class NettyConsumer extends Defau
private ServerBootstrap serverBootstrap;
private ConnectionlessBootstrap connectionlessServerBootstrap;
private Channel channel;
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor,
NettyConfiguration configuration) {
super(nettyEndpoint, processor);
this.context = this.getEndpoint().getCamelContext();
this.configuration = configuration;
- this.allChannels = new DefaultChannelGroup("NettyProducer-" +
nettyEndpoint.getEndpointUri());
+ this.allChannels = new DefaultChannelGroup("NettyConsumer-" +
nettyEndpoint.getEndpointUri());
}
@Override
@@ -78,14 +80,23 @@ public class NettyConsumer extends Defau
LOG.debug("Netty consumer unbinding from: {}",
configuration.getAddress());
// close all channels
+ LOG.trace("Closing {} channels", allChannels.size());
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
- // and then release other resources
+ // close server external resources
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
+ // and then shutdown the thread pools
+ if (bossExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ }
+ if (workerExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ }
+
super.doStop();
LOG.info("Netty consumer unbound from: " + configuration.getAddress());
@@ -144,12 +155,10 @@ public class NettyConsumer extends Defau
}
private void initializeTCPServerSocketCommunicationLayer() throws
Exception {
- ExecutorService bossExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ bossExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
- if (configuration.getWorkerCount() == 0) {
+ if (configuration.getWorkerCount() <= 0) {
channelFactory = new NioServerSocketChannelFactory(bossExecutor,
workerExecutor);
} else {
channelFactory = new NioServerSocketChannelFactory(bossExecutor,
workerExecutor,
@@ -164,6 +173,7 @@ public class NettyConsumer extends Defau
}
serverBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
serverBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
+ serverBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
serverBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
serverBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
@@ -173,10 +183,12 @@ public class NettyConsumer extends Defau
}
private void initializeUDPServerSocketCommunicationLayer() throws
Exception {
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
-
- datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+ if (configuration.getWorkerCount() <= 0) {
+ datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor);
+ } else {
+ datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
+ }
connectionlessServerBootstrap = new
ConnectionlessBootstrap(datagramChannelFactory);
if (configuration.getServerPipelineFactory() != null) {
configuration.getServerPipelineFactory().setConsumer(this);
@@ -186,6 +198,7 @@ public class NettyConsumer extends Defau
}
connectionlessServerBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
connectionlessServerBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
+ connectionlessServerBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
connectionlessServerBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
connectionlessServerBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
connectionlessServerBootstrap.setOption("child.broadcast",
configuration.isBroadcast());
Modified:
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Sun Apr 8 12:52:17 2012
@@ -55,6 +55,8 @@ public class NettyProducer extends Defau
private ChannelFactory channelFactory;
private DatagramChannelFactory datagramChannelFactory;
private CamelLogger noReplyLogger;
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration
configuration) {
super(nettyEndpoint);
@@ -103,6 +105,7 @@ public class NettyProducer extends Defau
protected void doStop() throws Exception {
LOG.debug("Stopping producer at address: {}",
configuration.getAddress());
// close all channels
+ LOG.trace("Closing {} channels", ALL_CHANNELS.size());
ChannelGroupFuture future = ALL_CHANNELS.close();
future.awaitUninterruptibly();
@@ -110,6 +113,15 @@ public class NettyProducer extends Defau
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
+
+ // and then shutdown the thread pools
+ if (bossExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ }
+ if (workerExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ }
+
super.doStop();
}
@@ -208,18 +220,15 @@ public class NettyProducer extends Defau
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
- ExecutorService bossExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ bossExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
channelFactory = new NioClientSocketChannelFactory(bossExecutor,
workerExecutor);
}
}
protected void setupUDPCommunication() throws Exception {
if (datagramChannelFactory == null) {
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor);
}
}
@@ -243,16 +252,17 @@ public class NettyProducer extends Defau
if (isTcp()) {
ClientBootstrap clientBootstrap = new
ClientBootstrap(channelFactory);
- clientBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
- clientBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
- clientBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
- clientBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
+ clientBootstrap.setOption("keepAlive",
configuration.isKeepAlive());
+ clientBootstrap.setOption("tcpNoDelay",
configuration.isTcpNoDelay());
+ clientBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
+ clientBootstrap.setOption("connectTimeoutMillis",
configuration.getConnectTimeout());
// set the pipeline on the bootstrap
clientBootstrap.setPipeline(clientPipeline);
answer = clientBootstrap.connect(new
InetSocketAddress(configuration.getHost(), configuration.getPort()));
return answer;
} else {
+ // TODO: Is this correct for a UDP client
ConnectionlessBootstrap connectionlessClientBootstrap = new
ConnectionlessBootstrap(datagramChannelFactory);
connectionlessClientBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
connectionlessClientBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
Modified:
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++
camel/branches/camel-2.9.x/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Sun Apr 8 12:52:17 2012
@@ -42,7 +42,7 @@ public class ClientChannelHandler extend
private final Exchange exchange;
private final AsyncCallback callback;
private boolean messageReceived;
- private boolean exceptionHandled;
+ private volatile boolean exceptionHandled;
public ClientChannelHandler(NettyProducer producer, Exchange exchange,
AsyncCallback callback) {
this.producer = producer;
Modified:
camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties?rev=1310991&r1=1310990&r2=1310991&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
(original)
+++
camel/branches/camel-2.9.x/components/camel-netty/src/test/resources/log4j.properties
Sun Apr 8 12:52:17 2012
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
#log4j.logger.org.apache.camel.component.netty=TRACE
#log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.jboss.netty=TRACE
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender