As far as I know, retroactive consumers only applies to topics. For queues, you should get all messages whether the consumer is online or offline. The example in the documentation could be wrong.

Andrew M wrote:
Aaron,
My original producer and consumer code are at the bottom of the msg.  Note
the subscribe method appends retroactive=true. Thanks for any suggestions you may have..
Andrew


-----Original Message-----
From: [EMAIL PROTECTED] [mailto:[EMAIL PROTECTED] On Behalf Of Aaron
Mulder
Sent: Tuesday, March 18, 2008 10:07 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumer...yes, no, maybe so?

Do you want to post your example that's *not* working?  I last used
retroactive consumers probably 18 months ago, and they worked fine at
that time.  I was doing a network of brokers with fail-over, and if I
took one broker down and caused a consumer to fail over, it missed
messages during the fail-over operation.  With retroactive consumer
enabled, it didn't miss any messages (but got some duplicates) once it
failed over.  I don't have that code/configuration at hand, though --
just this from my notes:

topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");

And I used this to set the retroactive consumers to receive the last
30s worth of messages, instead of the default (which I think at the
time was last 100):

<broker>
  <destinationPolicy>
    <policyMap>
      <defaultEntry>
        <policyEntry topic="*">
          <subscriptionRecoveryPolicy>
            <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
          </subscriptionRecoveryPolicy>
        </policyEntry>
      </defaultEntry>
    </policyMap>
  </destinationPolicy>
</broker>

http://www.activemq.org/site/retroactive-consumer.html
http://www.activemq.org/site/subscription-recovery-policy.html

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <[EMAIL PROTECTED]> wrote:
 -----Original Message-----
 From: Andrew [mailto:[EMAIL PROTECTED]
 Sent: Wednesday, March 05, 2008 2:40 PM
 To: users@activemq.apache.org
 Subject: Retroactive consumer not working...

 My broker is not feeding my consumer the messages from the retroactive
queue
 when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
so
 I would think when my consumer reconnects it should receive the last 10
mins
 of msgs.  Otherwise things appear fine, new msgs are received, etc... any
 ideas?


 On the producer.......

    private Session getActiveMqSession() throws JMSException {
        String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
 ACTIVE_MQ_PORT +
 "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
        ActiveMQConnectionFactory connectionFactory = new
 ActiveMQConnectionFactory(url);
        connection = connectionFactory.createConnection();
        ((ActiveMQConnection)connection).setUseAsyncSend(false);
        connection.start();
        return connection.createSession(false,
Session.SESSION_TRANSACTED);
    }

    Session session = getActiveMqSession();

    void send(Object a) throws blah blah blah {
        Destination destination = session.createQueue(consumerName);
        producer = session.createProducer(destination);
        ObjectMessage m = session.createObjectMessage();
        m.setObject(a);
        //10 min TTL
        ((ActiveMQMessageProducer)producer).send(m,
DeliveryMode.PERSISTENT,
 Message.DEFAULT_PRIORITY, 600000L);
    }


 ...and on the Consumer...

    Session session;

    public void run() {
        String url =

"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
 ectAttempts=0";
        ActiveMQConnectionFactory connectionFactory = new
 ActiveMQConnectionFactory(url);
      Connection connection = connectionFactory.createConnection();
      connection.start();
      connection.setExceptionListener(this);
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   }

   public void subscribe(String destName, MessageListener l) throws
 JMSException {
      destName = destName + "?consumer.retroactive=true";
      MessageConsumer mc =
 session.createConsumer(session.createQueue(destName));
      mc.setMessageListener(l);
    }




Reply via email to