Author: davsclaus
Date: Sun Apr 8 12:46:56 2012
New Revision: 1310989
URL: http://svn.apache.org/viewvc?rev=1310989&view=rev
Log:
CAMEL-4556: Netty producer now reuses connection.
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/ClientPipelineFactory.java
Sun Apr 8 12:46:56 2012
@@ -16,24 +16,18 @@
*/
package org.apache.camel.component.netty;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
public abstract class ClientPipelineFactory implements ChannelPipelineFactory {
protected NettyProducer producer;
- protected Exchange exchange;
- protected AsyncCallback callback;
public ClientPipelineFactory() {
}
-
- public ClientPipelineFactory(NettyProducer producer, Exchange exchange,
AsyncCallback callback) {
+
+ public ClientPipelineFactory(NettyProducer producer) {
this.producer = producer;
- this.exchange = exchange;
- this.callback = callback;
}
public ChannelPipeline getPipeline() throws Exception {
@@ -48,21 +42,4 @@ public abstract class ClientPipelineFact
public void setProducer(NettyProducer producer) {
this.producer = producer;
}
-
- public Exchange getExchange() {
- return exchange;
- }
-
- public void setExchange(Exchange exchange) {
- this.exchange = exchange;
- }
-
- public AsyncCallback getCallback() {
- return callback;
- }
-
- public void setCallback(AsyncCallback callback) {
- this.callback = callback;
- }
-
}
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/DefaultClientPipelineFactory.java
Sun Apr 8 12:46:56 2012
@@ -17,12 +17,9 @@
package org.apache.camel.component.netty;
import java.util.List;
-
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
import org.apache.camel.component.netty.handlers.ClientChannelHandler;
import org.apache.camel.component.netty.ssl.SSLEngineFactory;
import org.jboss.netty.channel.ChannelDownstreamHandler;
@@ -34,10 +31,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultClientPipelineFactory extends ClientPipelineFactory {
- private static final transient Logger LOG =
LoggerFactory.getLogger(ClientPipelineFactory.class);
+ private static final transient Logger LOG =
LoggerFactory.getLogger(DefaultClientPipelineFactory.class);
- public DefaultClientPipelineFactory(NettyProducer producer, Exchange
exchange, AsyncCallback callback) {
- super(producer, exchange, callback);
+ public DefaultClientPipelineFactory(NettyProducer producer) {
+ super(producer);
}
public ChannelPipeline getPipeline() throws Exception {
@@ -61,7 +58,7 @@ public class DefaultClientPipelineFactor
}
// our handler must be added last
- channelPipeline.addLast("handler", new ClientChannelHandler(producer,
exchange, callback));
+ channelPipeline.addLast("handler", new ClientChannelHandler(producer));
return channelPipeline;
}
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java?rev=1310989&view=auto
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
(added)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
Sun Apr 8 12:46:56 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ *
+ */
+public final class NettyCamelState {
+
+ private final Exchange exchange;
+ private final AsyncCallback callback;
+
+ public NettyCamelState(AsyncCallback callback, Exchange exchange) {
+ this.callback = callback;
+ this.exchange = exchange;
+ }
+
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+
+ public Exchange getExchange() {
+ return exchange;
+ }
+}
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
Sun Apr 8 12:46:56 2012
@@ -27,7 +27,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
@@ -37,6 +36,7 @@ import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelLocal;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
@@ -57,6 +57,9 @@ public class NettyProducer extends Defau
private CamelLogger noReplyLogger;
private ExecutorService bossExecutor;
private ExecutorService workerExecutor;
+ private final ChannelLocal<NettyCamelState> state = new
ChannelLocal<NettyCamelState>();
+ private ChannelFuture channelFuture;
+ private Channel channel;
public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration
configuration) {
super(nettyEndpoint);
@@ -157,11 +160,16 @@ public class NettyProducer extends Defau
exchange.setProperty(Exchange.CHARSET_NAME,
IOHelper.normalizeCharset(getConfiguration().getCharsetName()));
}
- ChannelFuture channelFuture;
- final Channel channel;
try {
- channelFuture = openConnection(exchange, callback);
- channel = openChannel(channelFuture);
+ // allow to reuse channel, on this producer, to avoid creating a
new connection
+ // for each message being sent
+ if (channelFuture == null || channel == null || !channel.isOpen())
{
+ channelFuture = openConnection();
+ channel = openChannel(channelFuture);
+ }
+ // setup state now we have the channel we can do this because
+ // this producer is not thread safe, but pooled using
ServicePoolAware
+ state.set(channel, new NettyCamelState(callback, exchange));
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
@@ -218,6 +226,21 @@ public class NettyProducer extends Defau
return false;
}
+ /**
+ * To get the {@link NettyCamelState} from this producer.
+ */
+ public NettyCamelState getState(Channel channel) {
+ return state.get(channel);
+ }
+
+ /**
+ * To remove the {@link NettyCamelState} stored on this producer,
+ * when no longer needed
+ */
+ public void removeState(Channel channel) {
+ state.remove(channel);
+ }
+
protected void setupTCPCommunication() throws Exception {
if (channelFactory == null) {
bossExecutor =
context.getExecutorServiceManager().newCachedThreadPool(this, "NettyTCPBoss");
@@ -233,19 +256,17 @@ public class NettyProducer extends Defau
}
}
- private ChannelFuture openConnection(Exchange exchange, AsyncCallback
callback) throws Exception {
+ private ChannelFuture openConnection() throws Exception {
ChannelFuture answer;
ChannelPipeline clientPipeline;
if (configuration.getClientPipelineFactory() != null) {
// initialize user defined client pipeline factory
configuration.getClientPipelineFactory().setProducer(this);
- configuration.getClientPipelineFactory().setExchange(exchange);
- configuration.getClientPipelineFactory().setCallback(callback);
clientPipeline =
configuration.getClientPipelineFactory().getPipeline();
} else {
// initialize client pipeline factory
- ClientPipelineFactory clientPipelineFactory = new
DefaultClientPipelineFactory(this, exchange, callback);
+ ClientPipelineFactory clientPipelineFactory = new
DefaultClientPipelineFactory(this);
// must get the pipeline from the factory when opening a new
connection
clientPipeline = clientPipelineFactory.getPipeline();
}
@@ -295,13 +316,10 @@ public class NettyProducer extends Defau
}
private void openAndCloseConnection() throws Exception {
- ChannelFuture future = openConnection(new DefaultExchange(context),
new AsyncCallback() {
- public void done(boolean doneSync) {
- // noop
- }
- });
+ ChannelFuture future = openConnection();
Channel channel = openChannel(future);
NettyHelper.close(channel);
+ ALL_CHANNELS.remove(channel);
}
public NettyConfiguration getConfiguration() {
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
Sun Apr 8 12:46:56 2012
@@ -20,6 +20,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.NoTypeConversionAvailableException;
+import org.apache.camel.component.netty.NettyCamelState;
import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.NettyPayloadHelper;
@@ -39,15 +40,11 @@ import org.slf4j.LoggerFactory;
public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Logger LOG =
LoggerFactory.getLogger(ClientChannelHandler.class);
private final NettyProducer producer;
- private final Exchange exchange;
- private final AsyncCallback callback;
private boolean messageReceived;
private volatile boolean exceptionHandled;
- public ClientChannelHandler(NettyProducer producer, Exchange exchange,
AsyncCallback callback) {
+ public ClientChannelHandler(NettyProducer producer) {
this.producer = producer;
- this.exchange = exchange;
- this.callback = callback;
}
@Override
@@ -73,20 +70,33 @@ public class ClientChannelHandler extend
if (LOG.isDebugEnabled()) {
LOG.debug("Closing channel as an exception was thrown from Netty",
cause);
}
- // set the cause on the exchange
- exchange.setException(cause);
- // close channel in case an exception was thrown
- NettyHelper.close(exceptionEvent.getChannel());
+ Exchange exchange = getExchange(ctx);
+ AsyncCallback callback = getAsyncCallback(ctx);
- // signal callback
- callback.done(false);
+ // the state may not be set
+ if (exchange != null && callback != null) {
+ // set the cause on the exchange
+ exchange.setException(cause);
+
+ // close channel in case an exception was thrown
+ NettyHelper.close(exceptionEvent.getChannel());
+
+ // signal callback
+ callback.done(false);
+ }
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
LOG.trace("Channel closed: {}", ctx.getChannel());
+ Exchange exchange = getExchange(ctx);
+ AsyncCallback callback = getAsyncCallback(ctx);
+
+ // remove state
+ producer.removeState(ctx.getChannel());
+
if (producer.getConfiguration().isSync() && !messageReceived &&
!exceptionHandled) {
// session was closed but no message received. This could be
because the remote server had an internal error
// and could not return a response. We should count down to stop
waiting for a response
@@ -103,6 +113,9 @@ public class ClientChannelHandler extend
public void messageReceived(ChannelHandlerContext ctx, MessageEvent
messageEvent) throws Exception {
messageReceived = true;
+ Exchange exchange = getExchange(ctx);
+ AsyncCallback callback = getAsyncCallback(ctx);
+
Object body = messageEvent.getMessage();
LOG.debug("Message received: {}", body);
@@ -150,4 +163,14 @@ public class ClientChannelHandler extend
}
}
+ private Exchange getExchange(ChannelHandlerContext ctx) {
+ NettyCamelState state = producer.getState(ctx.getChannel());
+ return state != null ? state.getExchange() : null;
+ }
+
+ private AsyncCallback getAsyncCallback(ChannelHandlerContext ctx) {
+ NettyCamelState state = producer.getState(ctx.getChannel());
+ return state != null ? state.getCallback() : null;
+ }
+
}
Modified:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
(original)
+++
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Sun Apr 8 12:46:56 2012
@@ -25,7 +25,6 @@ import org.apache.camel.component.netty.
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.IOHelper;
-import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -35,9 +34,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Server handler which is shared
+ * Client handler which cannot be shared
*/
[email protected]
public class ServerChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Logger LOG =
LoggerFactory.getLogger(ServerChannelHandler.class);
private NettyConsumer consumer;
Modified:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
(original)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactoryAsynchTest.java
Sun Apr 8 12:46:56 2012
@@ -97,10 +97,9 @@ public class NettyCustomPipelineFactoryA
channelPipeline.addLast("decoder-DELIM", new
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
channelPipeline.addLast("decoder-SD", new
StringDecoder(CharsetUtil.UTF_8));
channelPipeline.addLast("encoder-SD", new
StringEncoder(CharsetUtil.UTF_8));
- channelPipeline.addLast("handler", new
ClientChannelHandler(producer, exchange, callback));
+ channelPipeline.addLast("handler", new
ClientChannelHandler(producer));
return channelPipeline;
-
}
public boolean isfactoryInvoked() {
Modified:
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java?rev=1310989&r1=1310988&r2=1310989&view=diff
==============================================================================
---
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
(original)
+++
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyCustomPipelineFactorySynchTest.java
Sun Apr 8 12:46:56 2012
@@ -97,7 +97,7 @@ public class NettyCustomPipelineFactoryS
channelPipeline.addLast("decoder-DELIM", new
DelimiterBasedFrameDecoder(maxLineSize, true, Delimiters.lineDelimiter()));
channelPipeline.addLast("decoder-SD", new
StringDecoder(CharsetUtil.UTF_8));
channelPipeline.addLast("encoder-SD", new
StringEncoder(CharsetUtil.UTF_8));
- channelPipeline.addLast("handler", new
ClientChannelHandler(producer, exchange, callback));
+ channelPipeline.addLast("handler", new
ClientChannelHandler(producer));
return channelPipeline;
}