Greg Garlak created AMQ-5107:
--------------------------------

             Summary: In-flight queue message redelivered to multiple listeners 
upon broker shutdown
                 Key: AMQ-5107
                 URL: https://issues.apache.org/jira/browse/AMQ-5107
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 5.9.0
         Environment: Windows 7 64Bit - Java "1.6.0_20"
CentOS 6.0 - Java "1.7.0_09-icedtea" 
            Reporter: Greg Garlak
             Fix For: NEEDS_REVIEW



To reproduce: 

1) Start 3 or more listener processes (see listener code below)

2) Run producer to push one message on queue (see producer code below)

3) One of the listeners will pick-up the message and sleep for one minute 
before auto acknowledging the message

4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C 
or issue Terminate jvm(int) command from Hawtio console) 

5) All other idle listeners should get the same message redelivered 
simultaneously, each one having deliveryCount incremented 



Listener code:
--------------

package com.test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class TestListener {
        public static void main(String[] args) {
                try {   
                        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
                        Connection connection = 
connectionFactory.createConnection();
                        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
                        Destination destination = 
session.createQueue("TEST.QUEUE");
                        MessageConsumer consumer = 
session.createConsumer(destination);
                        
                        consumer.setMessageListener(new MessageListener() {
                                public void onMessage(Message message) {
                                        try     {
                                                TextMessage textMessage = 
(TextMessage) message;
                                                System.out.print("\nReceived " 
+ textMessage.getText());
                                                System.out.print(", Redelivery: 
" + message.getJMSRedelivered());
                                                System.out.print(", Count: " + 
message.getLongProperty("JMSXDeliveryCount"));
                                                Thread.sleep(60000);            
        
                                                System.out.print("... finished 
after sleep");
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                }
                        });
                        
                        connection.start();
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

        public TestListener() {
                super();
        }
}


Producer code:
--------------

package com.test;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestProducer {
        public static void main(String[] args) {
                try {
                        thread(new HelloWorldProducer(), false);
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }
 
        public static class HelloWorldProducer implements Runnable {
                public void run() {
                        try {
                                ActiveMQConnectionFactory connectionFactory = 
new ActiveMQConnectionFactory("tcp://localhost:61616");
                                Connection connection = 
connectionFactory.createConnection();
                                connection.start();
                                Session session = 
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                                Destination destination = 
session.createQueue("TEST.QUEUE");
                                MessageProducer producer = 
session.createProducer(destination);

                                String text = "test message created on " + new 
Date();
                                TextMessage message = 
session.createTextMessage(text);
                                System.out.println("Sent " + text);
                                producer.send(message);

                                session.close();
                                connection.close();
                        }
                        catch (Exception e) {
                                e.printStackTrace();
                        }
                }
                public HelloWorldProducer() {}
        }

        public static void thread(Runnable runnable, boolean daemon) {
                Thread brokerThread = new Thread(runnable);
                brokerThread.setDaemon(daemon);
                brokerThread.start();
        }
    
        public TestProducer() {
                super();
        }
}






--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to