It seems you're expecting the broker to expire the messages? By default the expiration task for a queue will run every 30s.. if your test is taking less time than that, you'll want to tune the broker to check for expiration a little sooner:
<policyEntry .... expireMessagesPeriod="1s" /> On Mon, May 20, 2013 at 5:04 AM, al94781 <and...@harmel-law.com> wrote: > Hi there, > > I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading > ExchangePattern.InOnly messages from a JMS queue, and want to expire those > which are not processed within a given time explicitly to a named dead > letter queue. The problem is I can't get things to expire. > > I have the following route: > > public class FulfillmentRequestRoute extends RouteBuilder { > > @Override > public void configure() throws Exception { > > > errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage()); > > > from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true") > .transacted() > .to("mock:initialProcessor"); > } > } > > And the following ActiveMQ config: > > <?xml version="1.0" encoding="UTF-8"?> > <beans xmlns="http://www.springframework.org/schema/beans" > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > xmlns:broker="http://activemq.apache.org/schema/core" > xsi:schemaLocation="http://www.springframework.org/schema/beans > http://www.springframework.org/schema/beans/spring-beans-3.2.xsd > http://activemq.apache.org/schema/core > http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> > > > <broker:broker useJmx="true" persistent="true" brokerName="myBroker"> > <broker:transportConnectors> > > <broker:transportConnector name="vm" uri="vm://myBroker" /> > > <broker:transportConnector name="tcp" > uri="tcp://localhost:${tcp.port}" /> > </broker:transportConnectors> > <broker:persistenceAdapter> > <broker:kahaPersistenceAdapter > directory="target/olp-activemq-data" maxDataFileLength="33554432"/> > </broker:persistenceAdapter> > <broker:destinationPolicy> > <broker:policyMap> > <broker:policyEntries> > > <broker:policyEntry queue=">"> > <broker:deadLetterStrategy> > <broker:sharedDeadLetterStrategy processExpired="true" > processNonPersistent="true" /> > </broker:deadLetterStrategy> > </broker:policyEntry> > </broker:policyEntries> > </broker:policyMap> > </broker:destinationPolicy> > </broker:broker> > > > > <bean id="jms" > class="org.apache.activemq.camel.component.ActiveMQComponent"> > <property name="brokerURL" value="vm://myBroker" /> > <property name="transacted" value="true"/> > <property name="transactionManager" ref="jmsTransactionManager"/> > <property name="acceptMessagesWhileStopping" value="false"/> > </bean> > <bean id="jmsTransactionManager" > class="org.springframework.jms.connection.JmsTransactionManager"> > <property name="connectionFactory" ref="jmsConnectionFactory"/> > </bean> > <bean id="jmsConnectionFactory" > class="org.apache.activemq.ActiveMQConnectionFactory"> > <property name="brokerURL" value="vm://myBroker" /> > </bean> > > </beans> > > Finally I have a Unit Test which creates two messages,one which will be > processed, and the other which should time-out. > > @RunWith(CamelSpringJUnit4ClassRunner.class) > @ContextConfiguration(locations = > {"classpath:/META-INF/spring/camel-server.xml"}) > public class FulfillmentRequestTimeoutTest { > > @EndpointInject(uri = "mock:initialProcessor") > protected MockEndpoint mockEndpoint; > > @Produce > protected ProducerTemplate template; > > protected ConsumerTemplate consumer; > > @Autowired > @Qualifier("camel-server") > protected CamelContext context; > > @DirtiesContext > @Test > public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws > Exception { > > // Given > consumer = context.createConsumerTemplate(); > > int expectedValidMessageCount = 3; > mockEndpoint.expectedMessageCount(expectedValidMessageCount); > > // When > String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT > TIMEOUT</body>"; > template.sendBody("jms:queue:fulfillmentRequest", > ExchangePattern.InOnly, xmlBody1); > > long ttl = System.currentTimeMillis() - 12000000; > String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED > OUT!!!!!</body>"; > template.sendBodyAndHeader("jms:queue:fulfillmentRequest", > ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl); > > // Then > // The second message is not processed > mockEndpoint.assertIsSatisfied(); // This fails, but it sees two > messages rather than just one > > List<Exchange> list = mockEndpoint.getReceivedExchanges(); > String notTimedOutMessageBody = (String) > list.get(0).getIn().getBody(String.class); > > assertEquals(xmlBody1, notTimedOutMessageBody); > > Thread.sleep(5000); > > // And is instead routed to the timedOut JMS queue > Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead"); > assertNotNull("Should not lose the message", dlqBody); // > This also fails if I comment out the assert above > assertEquals(xmlBody2, dlqBody); > } > > @Configuration > public static class ContextConfig extends SingleRouteCamelConfiguration > { > > @Bean > public RouteBuilder route() { > return new FulfillmentRequestRoute(); > } > } > } > > I've been staring at this for a while, and while I think I've only been > changing one thing at a time, I may have made an error or left behind some > config which is shooting me in the foot. > > One final thing to note, I have this pattern working elsewhere in tests > which explicitly throw exceptions from with transactions in Camel, but I'd > prefer not to have to manually start looking into headers myself when this > all seems to be handled already. > > I hope you can help. > > TIA > > Cheers, Andrew > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html > Sent from the Camel - Users mailing list archive at Nabble.com. > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta