I had encountered the same problem. I had a sender and a receiver which were
running on Tomcat server. Whenever I restarted the receiver I used to get
all the messages that I had earlier to the message broker and which had been
processed.

The only difference my code has than your code was that I am using a
MessageListener.onMessage(Message) rather than Consumer.receive().

After some investigation I found out that  my code had caused the problem.
When I had called a method in my code it encountered some exception and the
control never returned to the MessageListener (the receiver) and hence the
ActiveMQ session never got committed. And hence the message was never dequed
from the queue.

I am not sure if your problem is the same, but you can try the following:
  * Commit the session just after you receive the message
  * Acknowledge the message: message.acknowledge()

Let me know how it goes.


spiiff wrote:
> 
> Hello,
> we are facing a strange problem.
> I am running an embedded broker in my unit test:
> 
>               URI activemqConfigurationUri = new 
> URI("xbean:conf/activemq.xml");        
>               brokerService =                                  
>                       BrokerFactory.createBroker(activemqConfigurationUri);   
>         
>               brokerService.start();
> 
> 
> I have 2 connections/sessions, from every session I create a customer to a
> topic:
> 
>               connection1 = connectionFactory.createConnection();
>               connection1.setClientID("1stID");               
>               connection1.start();            
>               session1 = 
> connection1.createSession(false,Session.AUTO_ACKNOWLEDGE);         
>               MessageConsumer consumer1 = ((TopicSession)
> session1).createDurableSubscriber((Topic) destination, subscriberName);
> 
>                       
>                    connection2 = connectionFactory.createConnection();
>               connection2.setClientID("2ndID");               
>               connection2.start();
>               session2 = 
> connection2.createSession(false,Session.AUTO_ACKNOWLEDGE);
>               MessageConsumer consumer2 = ((TopicSession)
> session2).createDurableSubscriber(
>                 (Topic) destination, subscriberName);
> 
> 
> From a third session I create a producer and send 2 messages to the topic:
> 
>               producerConnection = connectionFactory.createConnection();
>                    producerConnection.setClientID("producer");
>                    producerConnection.start();                
>               producerSession =
> producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
> 
>               producer = producerSession.createProducer(destination);
>                          
> producer.setDeliveryMode(DeliveryMode.PERSISTENT);
>                           producer.setTimeToLive(300000);   
> 
> 
>               javax.jms.Message msg1 = 
> producerSession.createObjectMessage(message1);
>               javax.jms.Message msg2 = 
> producerSession.createObjectMessage(message2);                 
>               producer.send(msg1,DeliveryMode.PERSISTENT , 
> Message.DEFAULT_PRIORITY,
>                 Message.DEFAULT_TIME_TO_LIVE);
>               producer.send(msg2,DeliveryMode.PERSISTENT , 
> Message.DEFAULT_PRIORITY,
>                     Message.DEFAULT_TIME_TO_LIVE);
> 
> 
> Then I receive the 2 messages with my consumers (every consumer is
> receiving 2 messages):
> 
>               Message msg1 = consumer1.receive(3000);
>                    while(msg1!=null){
>                       Object object = ((ObjectMessage)msg1).getObject();      
>                 
>                       msg1 = consumer1.receive(3000);
>               }
> 
>               Message msg2 = consumer2.receive(3000);
>                    while(msg2!=null){
>                       Object object = ((ObjectMessage)msg2).getObject();
>                       msg2 = consumer2.receive(3000);
>                    }
> 
> 
> 
> I close the consumers:
> 
>               consumer1.close();
>               consumer2.close();
> 
> And again I send 2 messages.
> I close all connections and the embedded broker.
> The next time I run the unittest the first consumer gets 4 messages, as
> expected.
> The second consumer gets 6 messages! It gets all messages that were
> produced, all the time.
> We're using activemq 4.1.1 
> 
> When we are not running an embedded broker but an standalone broker, a
> separate process, everything 
> is Ok as long we don't stop and restart the broker. After restarting the
> second consumer again receives all the messages 
> the first time it calls consumer.receive(). Then the second time it
> receives 4 messages.
> I guess there is something wrong with the persistence to the database.
> When we're using our oracle I can query ACTIVMQ_ACKS.
> There my 2 CLIENT_IDs are listed. And the LAST_ACKED_ID only changes for
> one of the CLIENT_IDs.
> 
> Can anybody help me?
> 
> I didn't find the same problem in the forum, only alike problems:
> http://www.nabble.com/-activemq-user--Re%3A-receiving-old-messages-again-after-restart-tf95738s2354.html#a265691
> 
> Regards,
> Matthias
> 
> 

-- 
View this message in context: 
http://www.nabble.com/receiving-old-messages-when-restartin-embedded-broker---problem-with-persistence-auto_acknowledge--tf4634882s2354.html#a13289113
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to