(Sorry for the spam, a mail to add the message to the forum after I subscribed to the mailing list)
________________________________ From: Mario Rassy <mariora...@hotmail.com> Sent: Tuesday, September 26, 2017 7:34:34 PM To: users@camel.apache.org Subject: JMS redelivery count is incremented on stop Hi, - Context (simplified) - We have a route listening from a JMS queue. The route is transacted. For each message received, we try to process it. If the processing fails, we rollback the message, and we get redelivered by the JMS server, and we retry to process it. In parallel, for each processing failure, we increment a global error counter. When the redelivery count of the message (JMSXDeliveryCount) reaches a max retry count, we send the message to a "Dead letter queue". When the global error count reaches a max value, we stop the context and the whole JVM. - Problem - When the global error count reaches its max value, the camel context .stop method is called in a separate thread. The message will be rollbacked, and a redelivery (from the JMS server) will happen before the context stop finishes. Since we are in a stopping mode, the org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(Session, Message) will immediately rollback the redelivered message and throw a MessageRejectedWhileStoppingException, which will lead to a new redelivery of the same message. Once the application is started again, the JMS message will be redelivered one last time, and since the count is greater than the retry count, the message will go to the dead letter queue. I have added in the end of this mail a main method in which I reproduce the issue. Any suggestion on how to avoid having the redeliveries during the stop of the camel context? Thank you, Mario ======== CODE TO REPRODUCE ======== package com.test.camel.jmstest; import javax.jms.ConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.RedeliveryPolicy; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.impl.DefaultCamelContext; public class JmsTest { private static final String JMS_ENDPOINT_URI = "test-jms:queue:test.queue"; private static final int NB_CONCURRENT_CONSUMERS = 10; private CamelContext context; public static void main(String[] args) throws Exception { new JmsTest(); } private JmsTest() throws Exception, InterruptedException { context = new DefaultCamelContext(); context.addComponent("test-jms", createJmsComponent()); context.addRoutes(new JmsTestRouteBuilder()); ProducerTemplate producer = context.createProducerTemplate(); context.start(); producer.sendBody(JMS_ENDPOINT_URI, "This is a test message"); } private JmsComponent createJmsComponent() { ConnectionFactory connectionFactory = getConnectionFactory(); JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(connectionFactory); jmsComponent.getConfiguration().setConcurrentConsumers(NB_CONCURRENT_CONSUMERS); return jmsComponent; } private ConnectionFactory getConnectionFactory() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "vm://localhost?broker.persistent=false"); RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); redeliveryPolicy.setMaximumRedeliveries(100); connectionFactory.setRedeliveryPolicy(redeliveryPolicy); return connectionFactory; } private final class JmsTestRouteBuilder extends RouteBuilder { public void configure() { // @formatter:off from(JMS_ENDPOINT_URI) .process(exchange -> print("Received JMS Message #" + exchange.getIn().getHeader("JMSXDeliveryCount"))) .choice() .when(exchange -> !((Boolean) exchange.getIn().getHeader("JMSRedelivered"))) .process(exchange -> print("Shutdown will be scheduled")) .process(e -> new Thread(new Shutdown()).start()) .throwException(new RuntimeException()) .endChoice() .otherwise() .process(exchange -> print("Retrying to process message")) .throwException(new RuntimeException()) .endChoice() .end(); // @formatter:on } private final class Shutdown implements Runnable { @Override public void run() { try { print("Stopping context"); context.stop(); } catch (Exception e) { e.printStackTrace(); } } } private void print(String message) { System.out.println(message); } } } ======== CODE TO REPRODUCE ========