Hi,

I'm using netty4 component (version 2.17.1) to send messages to a legacy tcp server through a REST server. The tcp server requires messages from one connection to be send in a specific order, for example in order to send COMMAND1 i have to first send PRE1 and PRE2 and then COMMAND1. The component is set to reuseChannel=true and be sync=true and i have the following exchange:
from("direct:command1")
    .setBody(constant("PRE1"))
    .to(nettyEndpoint)
    .setBody(constant("PRE2"))
    .to(nettyEndpoint)
    .setBody(constant("COMMAND1"))
    .to(nettyEndpoint);
The problem is that the NettyProducer returns the channel to the pool every time i send a message. So if concurrent requests are sent to the REST server things get messy. I have debugged the NettyProducer code and i can see that there is support for this behavior, if the reuseChannel is set to true the producer adds an onCompletion listener on the exchange that returns the channel to the pool. The problem is that it also sets the callback of the NettyCamelState to a NettyProducerCallback which runs every time a message is send by the ClientChannelHandler and returns the channel to the pool. I have fixed this issue by creating a custom version of the NettyProducer:

diff --git a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java
index a6bce2d..26d6acb 100644
--- a/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java +++ b/components/camel-netty4/src/main/java/org/apache/camel/component/netty4/NettyProducer.java @@ -282,7 +282,11 @@ public class NettyProducer extends DefaultAsyncProducer {

         // need to declare as final
         final Channel channel = existing;
- final AsyncCallback producerCallback = new NettyProducerCallback(channel, callback);
+    if (configuration.isReuseChannel()) {
+            producerCallback = callback;
+        } else {
+ producerCallback = new NettyProducerCallback(channel, callback);
+        }

// setup state as attachment on the channel, so we can access the state later when needed putState(channel, new NettyCamelState(producerCallback, exchange));

Thanks


Reply via email to