(Sorry for the spam, a mail to add the message to the forum after I subscribed 
to the mailing list)

________________________________
From: Mario Rassy <mariora...@hotmail.com>
Sent: Tuesday, September 26, 2017 7:34:34 PM
To: users@camel.apache.org
Subject: JMS redelivery count is incremented on stop


Hi,


- Context (simplified) -

We have a route listening from a JMS queue. The route is transacted.

For each message received, we try to process it. If the processing fails, we 
rollback the message, and we get redelivered by the JMS server, and we retry to 
process it.

In parallel, for each processing failure, we increment a global error counter.


When the redelivery count of the message (JMSXDeliveryCount) reaches a max 
retry count, we send the message to a "Dead letter queue".

When the global error count reaches a max value, we stop the context and the 
whole JVM.


- Problem -

When the global error count reaches its max value, the camel context .stop 
method is called in a separate thread.

The message will be rollbacked, and a redelivery (from the JMS server) will 
happen before the context stop finishes.

Since we are in a stopping mode, the 
org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(Session,
 Message) will immediately rollback the redelivered message and throw a 
MessageRejectedWhileStoppingException, which will lead to a new redelivery of 
the same message.

Once the application is started again, the JMS message will be redelivered one 
last time, and since the count is greater than the retry count, the message 
will go to the dead letter queue.


I have added in the end of this mail a main method in which I reproduce the 
issue.

Any suggestion on how to avoid having the redeliveries during the stop of the 
camel context?


Thank you,

Mario




======== CODE TO REPRODUCE ========

package com.test.camel.jmstest;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;

public class JmsTest {

    private static final String JMS_ENDPOINT_URI = "test-jms:queue:test.queue";
    private static final int NB_CONCURRENT_CONSUMERS = 10;

    private CamelContext context;

    public static void main(String[] args) throws Exception {
        new JmsTest();
    }

    private JmsTest() throws Exception, InterruptedException {
        context = new DefaultCamelContext();
        context.addComponent("test-jms", createJmsComponent());
        context.addRoutes(new JmsTestRouteBuilder());
        ProducerTemplate producer = context.createProducerTemplate();

        context.start();
        producer.sendBody(JMS_ENDPOINT_URI, "This is a test message");
    }

    private JmsComponent createJmsComponent() {
        ConnectionFactory connectionFactory = getConnectionFactory();
        JmsComponent jmsComponent = 
JmsComponent.jmsComponentTransacted(connectionFactory);
        
jmsComponent.getConfiguration().setConcurrentConsumers(NB_CONCURRENT_CONSUMERS);
        return jmsComponent;
    }

    private ConnectionFactory getConnectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
                "vm://localhost?broker.persistent=false");
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(100);
        connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        return connectionFactory;
    }

    private final class JmsTestRouteBuilder extends RouteBuilder {

        public void configure() {
            // @formatter:off
            from(JMS_ENDPOINT_URI)
                .process(exchange -> print("Received JMS Message #" + 
exchange.getIn().getHeader("JMSXDeliveryCount")))
                .choice()
                    .when(exchange -> !((Boolean) 
exchange.getIn().getHeader("JMSRedelivered")))
                        .process(exchange -> print("Shutdown will be 
scheduled"))
                        .process(e -> new Thread(new Shutdown()).start())
                        .throwException(new RuntimeException())
                    .endChoice()
                    .otherwise()
                        .process(exchange -> print("Retrying to process 
message"))
                        .throwException(new RuntimeException())
                    .endChoice()
                .end();
            // @formatter:on
        }

        private final class Shutdown implements Runnable {
            @Override
            public void run() {
                try {
                    print("Stopping context");
                    context.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        private void print(String message) {
            System.out.println(message);
        }

    }

}

======== CODE TO REPRODUCE ========

Reply via email to