Author: davsclaus
Date: Sun Apr 8 11:28:44 2012
New Revision: 1310979
URL: http://svn.apache.org/viewvc?rev=1310979&view=rev
Log:
CAMEL-5150: Some cleanup in camel-netty according to Netty docs.
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/test/resources/log4j.properties
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
Sun Apr 8 11:28:44 2012
@@ -68,8 +68,6 @@ public class NettyConfiguration implemen
private long sendBufferSize = 65536;
private long receiveBufferSize = 65536;
private int receiveBufferSizePredictor;
- private int corePoolSize = 10;
- private int maxPoolSize = 100;
private int workerCount;
private String keyStoreFormat;
private String securityProvider;
@@ -385,22 +383,6 @@ public class NettyConfiguration implemen
this.trustStoreFile = trustStoreFile;
}
- public int getCorePoolSize() {
- return corePoolSize;
- }
-
- public void setCorePoolSize(int corePoolSize) {
- this.corePoolSize = corePoolSize;
- }
-
- public int getMaxPoolSize() {
- return maxPoolSize;
- }
-
- public void setMaxPoolSize(int maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
- }
-
public String getKeyStoreFormat() {
return keyStoreFormat;
}
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConsumer.java
Sun Apr 8 11:28:44 2012
@@ -46,12 +46,14 @@ public class NettyConsumer extends Defau
private ServerBootstrap serverBootstrap;
private ConnectionlessBootstrap connectionlessServerBootstrap;
private Channel channel;
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
public NettyConsumer(NettyEndpoint nettyEndpoint, Processor processor,
NettyConfiguration configuration) {
super(nettyEndpoint, processor);
this.context = this.getEndpoint().getCamelContext();
this.configuration = configuration;
- this.allChannels = new DefaultChannelGroup("NettyProducer-" +
nettyEndpoint.getEndpointUri());
+ this.allChannels = new DefaultChannelGroup("NettyConsumer-" +
nettyEndpoint.getEndpointUri());
}
@Override
@@ -78,14 +80,23 @@ public class NettyConsumer extends Defau
LOG.debug("Netty consumer unbinding from: {}",
configuration.getAddress());
// close all channels
+ LOG.trace("Closing {} channels", allChannels.size());
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
- // and then release other resources
+ // close server external resources
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
+ // and then shutdown the thread pools
+ if (bossExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ }
+ if (workerExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ }
+
super.doStop();
LOG.info("Netty consumer unbound from: " + configuration.getAddress());
@@ -144,12 +155,10 @@ public class NettyConsumer extends Defau
}
private void initializeTCPServerSocketCommunicationLayer() throws
Exception {
- ExecutorService bossExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ bossExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
- if (configuration.getWorkerCount() == 0) {
+ if (configuration.getWorkerCount() <= 0) {
channelFactory = new NioServerSocketChannelFactory(bossExecutor,
workerExecutor);
} else {
channelFactory = new NioServerSocketChannelFactory(bossExecutor,
workerExecutor,
@@ -164,6 +173,7 @@ public class NettyConsumer extends Defau
}
serverBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
serverBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
+ serverBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
serverBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
serverBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
@@ -173,10 +183,12 @@ public class NettyConsumer extends Defau
}
private void initializeUDPServerSocketCommunicationLayer() throws
Exception {
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
-
- datagramChannelFactory = new NioDatagramChannelFactory(workerExecutor);
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
+ if (configuration.getWorkerCount() <= 0) {
+ datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor);
+ } else {
+ datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor, configuration.getWorkerCount());
+ }
connectionlessServerBootstrap = new
ConnectionlessBootstrap(datagramChannelFactory);
if (configuration.getServerPipelineFactory() != null) {
configuration.getServerPipelineFactory().setConsumer(this);
@@ -186,6 +198,7 @@ public class NettyConsumer extends Defau
}
connectionlessServerBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
connectionlessServerBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
+ connectionlessServerBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
connectionlessServerBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
connectionlessServerBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
connectionlessServerBootstrap.setOption("child.broadcast",
configuration.isBroadcast());
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Sun Apr 8 11:28:44 2012
@@ -55,6 +55,8 @@ public class NettyProducer extends Defau
private ChannelFactory channelFactory;
private DatagramChannelFactory datagramChannelFactory;
private CamelLogger noReplyLogger;
+ private ExecutorService bossExecutor;
+ private ExecutorService workerExecutor;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration
configuration) {
super(nettyEndpoint);
@@ -103,6 +105,7 @@ public class NettyProducer extends Defau
protected void doStop() throws Exception {
LOG.debug("Stopping producer at address: {}",
configuration.getAddress());
// close all channels
+ LOG.trace("Closing {} channels", ALL_CHANNELS.size());
ChannelGroupFuture future = ALL_CHANNELS.close();
future.awaitUninterruptibly();
@@ -110,6 +113,15 @@ public class NettyProducer extends Defau
if (channelFactory != null) {
channelFactory.releaseExternalResources();
}
+
+ // and then shutdown the thread pools
+ if (bossExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(bossExecutor);
+ }
+ if (workerExecutor != null) {
+ context.getExecutorServiceManager().shutdownNow(workerExecutor);
+ }
+
super.doStop();
}
@@ -208,18 +220,15 @@ public class NettyProducer extends Defau
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
- ExecutorService bossExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPBoss",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyTCPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ bossExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPWorker");
channelFactory = new NioClientSocketChannelFactory(bossExecutor,
workerExecutor);
}
}
protected void setupUDPCommunication() throws Exception {
if (datagramChannelFactory == null) {
- ExecutorService workerExecutor =
context.getExecutorServiceManager().newThreadPool(this, "NettyUDPWorker",
- configuration.getCorePoolSize(),
configuration.getMaxPoolSize());
+ workerExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyUDPWorker");
datagramChannelFactory = new
NioDatagramChannelFactory(workerExecutor);
}
}
@@ -243,16 +252,17 @@ public class NettyProducer extends Defau
if (isTcp()) {
ClientBootstrap clientBootstrap = new
ClientBootstrap(channelFactory);
- clientBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
- clientBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
- clientBootstrap.setOption("child.reuseAddress",
configuration.isReuseAddress());
- clientBootstrap.setOption("child.connectTimeoutMillis",
configuration.getConnectTimeout());
+ clientBootstrap.setOption("keepAlive",
configuration.isKeepAlive());
+ clientBootstrap.setOption("tcpNoDelay",
configuration.isTcpNoDelay());
+ clientBootstrap.setOption("reuseAddress",
configuration.isReuseAddress());
+ clientBootstrap.setOption("connectTimeoutMillis",
configuration.getConnectTimeout());
// set the pipeline on the bootstrap
clientBootstrap.setPipeline(clientPipeline);
answer = clientBootstrap.connect(new
InetSocketAddress(configuration.getHost(), configuration.getPort()));
return answer;
} else {
+ // TODO: Is this correct for a UDP client
ConnectionlessBootstrap connectionlessClientBootstrap = new
ConnectionlessBootstrap(datagramChannelFactory);
connectionlessClientBootstrap.setOption("child.keepAlive",
configuration.isKeepAlive());
connectionlessClientBootstrap.setOption("child.tcpNoDelay",
configuration.isTcpNoDelay());
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Sun Apr 8 11:28:44 2012
@@ -42,7 +42,7 @@ public class ClientChannelHandler extend
private final Exchange exchange;
private final AsyncCallback callback;
private boolean messageReceived;
- private boolean exceptionHandled;
+ private volatile boolean exceptionHandled;
public ClientChannelHandler(NettyProducer producer, Exchange exchange,
AsyncCallback callback) {
this.producer = producer;
Added:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java?rev=1310979&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
(added)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseConnectionTest.java
Sun Apr 8 11:28:44 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class NettyReuseConnectionTest extends BaseNettyTest {
+
+ private String uri =
"netty:tcp://localhost:{{port}}?sync=true&disconnect=false";
+
+ @Test
+ public void testReuseConnection() throws Exception {
+ for (int i = 0; i < 20; i++) {
+ String out = template.requestBody(uri, "" + i, String.class);
+ assertEquals("Reply " + i, out);
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(uri).transform().simple("Reply ${body}");
+ }
+ };
+ }
+}
Modified: camel/trunk/components/camel-netty/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/resources/log4j.properties?rev=1310979&r1=1310978&r2=1310979&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/test/resources/log4j.properties
(original)
+++ camel/trunk/components/camel-netty/src/test/resources/log4j.properties Sun
Apr 8 11:28:44 2012
@@ -23,7 +23,7 @@ log4j.rootLogger=INFO, file
# uncomment the following to enable camel debugging
#log4j.logger.org.apache.camel.component.netty=TRACE
#log4j.logger.org.apache.camel=DEBUG
-#log4j.logger.org.apache.commons.net=TRACE
+#log4j.logger.org.jboss.netty=TRACE
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender