Andrew M wrote:
Aaron, My original producer and consumer code are at the bottom of the msg. Notethe subscribe method appends retroactive=true. Thanks for any suggestions you may have..Andrew-----Original Message----- From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron Mulder Sent: Tuesday, March 18, 2008 10:07 AM To: users@activemq.apache.org Subject: Re: Retroactive consumer...yes, no, maybe so? Do you want to post your example that's *not* working? I last used retroactive consumers probably 18 months ago, and they worked fine at that time. I was doing a network of brokers with fail-over, and if I took one broker down and caused a consumer to fail over, it missed messages during the fail-over operation. With retroactive consumer enabled, it didn't miss any messages (but got some duplicates) once it failed over. I don't have that code/configuration at hand, though -- just this from my notes: topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true"); And I used this to set the retroactive consumers to receive the last 30s worth of messages, instead of the default (which I think at the time was last 100): <broker> <destinationPolicy> <policyMap> <defaultEntry> <policyEntry topic="*"> <subscriptionRecoveryPolicy> <timedSubscriptionRecoveryPolicy recoverDuration="30000" /> </subscriptionRecoveryPolicy> </policyEntry> </defaultEntry> </policyMap> </destinationPolicy> </broker> http://www.activemq.org/site/retroactive-consumer.html http://www.activemq.org/site/subscription-recovery-policy.html Thanks, Aaron On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <[EMAIL PROTECTED]> wrote:-----Original Message----- From: Andrew [mailto:[EMAIL PROTECTED] Sent: Wednesday, March 05, 2008 2:40 PM To: users@activemq.apache.org Subject: Retroactive consumer not working... My broker is not feeding my consumer the messages from the retroactivequeuewhen the consumer connects. The producer puts in a 10 min (600000ms) TTLsoI would think when my consumer reconnects it should receive the last 10minsof msgs. Otherwise things appear fine, new msgs are received, etc... any ideas? On the producer....... private Session getActiveMqSession() throws JMSException { String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" + ACTIVE_MQ_PORT + "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); connection = connectionFactory.createConnection(); ((ActiveMQConnection)connection).setUseAsyncSend(false); connection.start(); return connection.createSession(false,Session.SESSION_TRANSACTED);} Session session = getActiveMqSession(); void send(Object a) throws blah blah blah { Destination destination = session.createQueue(consumerName); producer = session.createProducer(destination); ObjectMessage m = session.createObjectMessage(); m.setObject(a); //10 min TTL ((ActiveMQMessageProducer)producer).send(m,DeliveryMode.PERSISTENT,Message.DEFAULT_PRIORITY, 600000L); } ...and on the Consumer... Session session; public void run() { String url ="failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public void subscribe(String destName, MessageListener l) throws JMSException { destName = destName + "?consumer.retroactive=true"; MessageConsumer mc = session.createConsumer(session.createQueue(destName)); mc.setMessageListener(l); }