[ 
https://issues.apache.org/jira/browse/CAMEL-15350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brad Harvey reopened CAMEL-15350:
---------------------------------

Hi [~davsclaus] ,

You were too fast for me.  I have tested out the simple fix but unfortunately 
it is too simple.

First, I saw no change in behaviour.  I had to set "keepAliveDelay" on the 
endpoint otherwise it just rethrows the exception anyway - a bit of a trap for 
new players.  After setting keepAliveDelay to a positive value it did retry, 
but only once.  The reason is that it fails on the createSession call which is 
outside the try/catch block that was changed, so again it just logs a warning 
and stops doing anything else.  

My test case is to restart the message broker.

It does work OK if the message broker is restarted and ready in time for the 
first retry (before keepAliveDelay is exceeded).  It is possible Spring's 
SingleConnectionFactory is helping out here, allowing createSession to recover 
the connection under the covers.

Regards, Brad

 

Interesting bits of the log below:

First error 
{code:java}
WARN ||| 62704 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : 
Exception caught consuming from myQueue. Caused by: 
[javax.jms.IllegalStateException - The Consumer is closed]" 

org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:426)
 at {code}
Second error (first and only retry).  After this there is nothing.
{code:java}
WARN ||| 62704 --- [msBatchConsumer] o.a.c.c.sjms.batch.SjmsBatchConsumer     : 
Exception caught consuming from myQueue. Caused by: [javax.jms.JMSException - 
Could not connect to broker URL: tcp://localhost:61616. Reason: 
java.net.ConnectException: Connection refused: connect] javax.jms.JMSException: 
Could not connect to broker URL: tcp://localhost:61616. Reason: 
java.net.ConnectException: Connection refused: connect at  

org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:318)
 at {code}
 

This is the full log from just prior to stopping activemq.
{code:java}
2020-08-04 22:36:50.857 TRACE||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : No message received2020-08-04 
22:36:50.857 TRACE||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : No message received2020-08-04 
22:36:50.857 TRACE||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask 
running2020-08-04 22:36:51.364 INFO ||| 62704 --- [0.1:61616@64353] 
o.s.j.c.CachingConnectionFactory         : Encountered a JMSException - 
resetting the underlying JMS Connection
javax.jms.JMSException: java.io.EOFException at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
 at 
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1960)
 at 
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1979)
 at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
 at 
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:126)
 at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
 at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
 at 
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
 at 
org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
 at 
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
 at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219) 
at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException: null 
at java.io.DataInputStream.readInt(DataInputStream.java:392) at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) 
at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)Caused
 by: java.io.EOFException: null
 at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) 
... 1 common frames omitted
2020-08-04 22:36:51.366 TRACE||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : No message received2020-08-04 
22:36:51.366 TRACE||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : BatchConsumptionTask 
running2020-08-04 22:36:51.368 WARN ||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : Exception caught consuming from 
myQueue. Caused by: [javax.jms.IllegalStateException - The Consumer is closed]
javax.jms.IllegalStateException: The Consumer is closed at 
org.apache.activemq.ActiveMQMessageConsumer.checkClosed(ActiveMQMessageConsumer.java:879)
 at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:640)
 at 
org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:426)
 at 
org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:380)
 at 
org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:325)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)
2020-08-04 22:36:51.369 WARN ||| 62704 --- [0.1:61616@64353] 
o.s.j.c.CachingConnectionFactory         : Could not close shared JMS Connection
javax.jms.JMSException: Disposed due to prior exception at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
 at 
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1421)
 at org.apache.activemq.ActiveMQConnection.close(ActiveMQConnection.java:688) 
at 
org.springframework.jms.connection.SingleConnectionFactory.closeConnection(SingleConnectionFactory.java:501)
 at 
org.springframework.jms.connection.SingleConnectionFactory.resetConnection(SingleConnectionFactory.java:389)
 at 
org.springframework.jms.connection.CachingConnectionFactory.resetConnection(CachingConnectionFactory.java:205)
 at 
org.springframework.jms.connection.SingleConnectionFactory.onException(SingleConnectionFactory.java:367)
 at 
org.springframework.jms.connection.SingleConnectionFactory$AggregatedExceptionListener.onException(SingleConnectionFactory.java:721)
 at org.apache.activemq.ActiveMQConnection$5.run(ActiveMQConnection.java:1967) 
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: 
org.apache.activemq.transport.TransportDisposedIOException: Disposed due to 
prior exception at 
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:125)
 at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
 at 
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)Caused
 by: org.apache.activemq.transport.TransportDisposedIOException: Disposed due 
to prior exception
 at 
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:173)
 at 
org.apache.activemq.transport.AbstractInactivityMonitor.onException(AbstractInactivityMonitor.java:345)
 at 
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
 at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219) 
... 1 common frames omittedCaused by: java.io.EOFException: null at 
java.io.DataInputStream.readInt(DataInputStream.java:392)Caused by: 
java.io.EOFException: null
 at 
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) 
at 
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
 at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) 
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) 
... 1 common frames omitted
2020-08-04 22:36:53.944 WARN ||| 62704 --- [msBatchConsumer] 
o.a.c.c.sjms.batch.SjmsBatchConsumer     : Exception caught consuming from 
myQueue. Caused by: [javax.jms.JMSException - Could not connect to broker URL: 
tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: 
connect]
javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. 
Reason: java.net.ConnectException: Connection refused: connect at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)
 at 
org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:374)
 at 
org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:304)
 at 
org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:244)
 at 
org.springframework.jms.connection.SingleConnectionFactory.doCreateConnection(SingleConnectionFactory.java:409)
 at 
org.springframework.jms.connection.SingleConnectionFactory.initConnection(SingleConnectionFactory.java:349)
 at 
org.springframework.jms.connection.SingleConnectionFactory.getConnection(SingleConnectionFactory.java:327)
 at 
org.springframework.jms.connection.SingleConnectionFactory$SharedConnectionInvocationHandler.invoke(SingleConnectionFactory.java:649)
 at com.sun.proxy.$Proxy131.createSession(Unknown Source) at 
org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:318)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)Caused by: java.net.ConnectException: 
Connection refused: connect at 
java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)Caused by: 
java.net.ConnectException: Connection refused: connect
 at 
java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
 at 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at 
java.net.Socket.connect(Socket.java:607) at 
org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:525) 
at 
org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:488) 
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55) at 
org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:169)
 at 
org.apache.activemq.transport.InactivityMonitor.start(InactivityMonitor.java:52)
 at 
org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:64) at 
org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72)
 at 
org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:64) at 
org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:64) at 
org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:354)
 ... 13 common frames omitted {code}
 

 

 

> SJMS Batch Consumer error recovery
> ----------------------------------
>
>                 Key: CAMEL-15350
>                 URL: https://issues.apache.org/jira/browse/CAMEL-15350
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-sjms
>    Affects Versions: 3.4.1
>            Reporter: Brad Harvey
>            Assignee: Claus Ibsen
>            Priority: Major
>             Fix For: 3.5.0, 3.4.3
>
>
> If the SJMS-Batch consumption loop encounters anything other than a 
> javax.jms.IllegalStateException 
> ([https://github.com/apache/camel/blob/master/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java#L330])
>  then it logs a warning and stops consuming messages 
> ([https://github.com/apache/camel/blob/master/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java#L348)].
> Example log after message broker is shut down:
> {code:java}
>  2020-07-29 10:33:32.060 WARN ||| 34468 --- [msBatchConsumer] 
> o.a.c.c.sjms.batch.SjmsBatchConsumer     : Exception caught consuming from 
> myQueue. Caused by: [javax.jms.JMSException - Connection refused: no further 
> information: localhost/127.0.0.1:5672]
> javax.jms.JMSException: Connection refused: no further information: 
> localhost/127.0.0.1:5672
>     at 
> org.apache.qpid.jms.provider.ProviderException.toJMSException(ProviderException.java:34)
>     at 
> org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:80)
>     at 
> org.apache.qpid.jms.exceptions.JmsExceptionSupport.create(JmsExceptionSupport.java:112)
>     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:915)
>     at org.apache.qpid.jms.JmsConnection.pull(JmsConnection.java:899)
>     at 
> org.apache.qpid.jms.JmsMessageConsumer.performPullIfRequired(JmsMessageConsumer.java:732)
>     at 
> org.apache.qpid.jms.JmsMessageConsumer.dequeue(JmsMessageConsumer.java:332)
>     at 
> org.apache.qpid.jms.JmsMessageConsumer.receive(JmsMessageConsumer.java:213)
>     at 
> org.springframework.jms.connection.CachedMessageConsumer.receive(CachedMessageConsumer.java:86)
>     at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.consumeBatchesOnLoop(SjmsBatchConsumer.java:427)
>     at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop$BatchConsumptionTask.access$1300(SjmsBatchConsumer.java:381)
>     at 
> org.apache.camel.component.sjms.batch.SjmsBatchConsumer$BatchConsumptionLoop.run(SjmsBatchConsumer.java:326)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.qpid.jms.provider.exceptions.ProviderIOException: 
> Connection refused: no further information: localhost/127.0.0.1:5672{code}
> {{}}
> It is not obvious from this log message that the route is now broken and will 
> not receive any more messages. The route state is still "Started".
>  
>  
> Expected Behaviour: The batch consumer can recover after errors (including a 
> broker restart or failover) and continue to consume messages once the broker 
> is available again.
>  
> A simple change is to catch any exception instead of only 
> javax.jms.IllegalStateException.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to