We are facing an issue using a single-threaded route, dequeueing
messages from an ActiveMQ queue, following is the route configuration:

<routeContext id="pms2dm-display-routes"
xmlns="http://camel.apache.org/schema/spring";>
        <!-- Route used to send messages to displays -->

        <camel:route id="pms2dm-diplay_entryP1">
            <camel:from
uri="jms:queue:displayqueue_POLMONE1?concurrentConsumers=1&amp;acknowledgementModeName=CLIENT_ACKNOWLEDGE&amp;cacheLevelName=CACHE_NONE&amp;maxConcurrentConsumers=1"/>
                <camel:threads executorServiceRef="displayP1ThreadPoolProfile">
                    <camel:to uri="direct:toDisplay"/>
                </camel:threads>
        </camel:route>

while this is the thread pool configuration on the routeContext:

<camel:threadPoolProfile id="displayP1ThreadPoolProfile"
                                 defaultProfile="false" poolSize="1"
maxPoolSize="1" keepAliveTime="60"
                                 maxQueueSize="1000"
rejectedPolicy="CallerRuns"/>

After dequeueing a message it goes through some processing and then we
have to send it via socket (netty4) to an external system (let's call
it ES1), following is the netty4 params configuration:

        <camel:route id="pms2dm-display-common"  >
            <camel:from uri="direct:toDisplay" />
            <camel:log message="--- PMS2DM Route -
pms2dm-display-common: messagge scodato da
displayqueue_${header.place}, JMSMESSAGEID: ${header.JMSMessageID}
Body --> ${body}"/>
                <camel:onException>
                    <exception>java.lang.Exception</exception>
                    <handled>
                        <constant>true</constant>
                    </handled>
                    <setHeader headerName="exceptionMessage">
                        <simple>${exception.message}</simple>
                    </setHeader>
                    <camel:log message="--- PMS2DM Route -
pms2dm-display-common: Exception verificatasi in fase di invio del
JMSMESSAGEID: ${header.JMSMessageID} Body --> ${body};
ExceptionMessage --> ${header.exceptionMessage}" />
                     <camel:delay>
                            <constant>1000</constant>
                     </camel:delay>
                    <camel:rollback markRollbackOnly="true"/>
                </camel:onException>

                <setProperty propertyName="nettyParams">

<simple><![CDATA[?disconnect=false&sync=false&synchronous=false&allowDefaultCodec=false&encoder=#byteArrayEncoder&decoder=#byteArrayDecoder&reuseChannel=true]]></simple>
                </setProperty>
                <camel:to uri="bean:displayMessageProcessor"/>


                <!-- Send command for the start buzzer -->

As you can see from the parameters we have reuseChannel set to true
because for each one of the messages we dequeue from AMQ we need to
send a start_buzzer and a stop_buzzer, before and after sending the
actual message out to ES1. So to be clear: a message is dequeued,
parsed, then a start_buzzer is sent to the ES1, then the message goes
out and finally the stop_buzzer is sent. One message from AMQ = Three
communication on the socket to ES1. All of the requests on the socket
are sent using a recipientList component:

<camel:recipientList><exchangeProperty>displayConnection</exchangeProperty>
                        </camel:recipientList>


(displayConnection is a property we set in a previous processor on the
same route, it conitans the whole netty4 uri)

In test environment we don't actually have a test ES1, so we are
simulating it with an NCAT opened on another linus server, in order to
reproduce the actual socket listener.
The following scenario is giving use some problems:
After dequeueing 1 message from AMQ it starts going through the route;
The route processes it and send the start_buzzer to the NCAT, the NCAT
receives it;
At this point, before we can see the actual message being received on
the NCAT, we are shutting it down (CTRL+C);
>From here on we see on Apache Camel logs (on TRACE level) the route
keeps processing the Exchange further on: it apparently sends the
message out (even though the NCAT is closed) and then before sending
the stop_buzzer out the route itself seems to go on lock.

No more messages are dequeued from AMQ from this point on.
What we see on the logs is:

[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer
[NettyProducer.java:302] Operation complete
DefaultChannelPromise@1f6f5ec3(failure: java.io.IOException: Broken
pipe)
[Camel Thread #18 - NettyClientTCPWorker] o.a.c.c.n.NettyProducer
[ClientChannelHandler.java:102] Channel closed: [id: 0xb941777d,
L:/192.168.181.177:42584 ! R:/192.168.181.178:1471]

(on R the ES1 and on the L of course our Apache Camel)

It seems that having reuseChannel set to true destroys the channel
(when the ncat is no more reachable) though Camel doesn't recreate the
channel itself. It seems that this locks the only JMS Consumer we have
allowed on the route.
Is here any way we could force the unlock of the JMS consumer? Why is
this happening? If the channel is broken why that IOException doesn’t
get caught in our onException component?

Info: Singlethread pool and maxConsumer=1 is mandatory configuration;
Camel version is 2.17.

Thanks in advance.
Fennaro

Reply via email to