Hi there, I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading ExchangePattern.InOnly messages from a JMS queue, and want to expire those which are not processed within a given time explicitly to a named dead letter queue. The problem is I can't get things to expire.
I have the following route: public class FulfillmentRequestRoute extends RouteBuilder { @Override public void configure() throws Exception { errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage()); from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true") .transacted() .to("mock:initialProcessor"); } } And the following ActiveMQ config: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:broker="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd"> <broker:broker useJmx="true" persistent="true" brokerName="myBroker"> <broker:transportConnectors> <broker:transportConnector name="vm" uri="vm://myBroker" /> <broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" /> </broker:transportConnectors> <broker:persistenceAdapter> <broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/> </broker:persistenceAdapter> <broker:destinationPolicy> <broker:policyMap> <broker:policyEntries> <broker:policyEntry queue=">"> <broker:deadLetterStrategy> <broker:sharedDeadLetterStrategy processExpired="true" processNonPersistent="true" /> </broker:deadLetterStrategy> </broker:policyEntry> </broker:policyEntries> </broker:policyMap> </broker:destinationPolicy> </broker:broker> <bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="brokerURL" value="vm://myBroker" /> <property name="transacted" value="true"/> <property name="transactionManager" ref="jmsTransactionManager"/> <property name="acceptMessagesWhileStopping" value="false"/> </bean> <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory" ref="jmsConnectionFactory"/> </bean> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://myBroker" /> </bean> </beans> Finally I have a Unit Test which creates two messages,one which will be processed, and the other which should time-out. @RunWith(CamelSpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"}) public class FulfillmentRequestTimeoutTest { @EndpointInject(uri = "mock:initialProcessor") protected MockEndpoint mockEndpoint; @Produce protected ProducerTemplate template; protected ConsumerTemplate consumer; @Autowired @Qualifier("camel-server") protected CamelContext context; @DirtiesContext @Test public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception { // Given consumer = context.createConsumerTemplate(); int expectedValidMessageCount = 3; mockEndpoint.expectedMessageCount(expectedValidMessageCount); // When String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>"; template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1); long ttl = System.currentTimeMillis() - 12000000; String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>"; template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl); // Then // The second message is not processed mockEndpoint.assertIsSatisfied(); // This fails, but it sees two messages rather than just one List<Exchange> list = mockEndpoint.getReceivedExchanges(); String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class); assertEquals(xmlBody1, notTimedOutMessageBody); Thread.sleep(5000); // And is instead routed to the timedOut JMS queue Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead"); assertNotNull("Should not lose the message", dlqBody); // This also fails if I comment out the assert above assertEquals(xmlBody2, dlqBody); } @Configuration public static class ContextConfig extends SingleRouteCamelConfiguration { @Bean public RouteBuilder route() { return new FulfillmentRequestRoute(); } } } I've been staring at this for a while, and while I think I've only been changing one thing at a time, I may have made an error or left behind some config which is shooting me in the foot. One final thing to note, I have this pattern working elsewhere in tests which explicitly throw exceptions from with transactions in Camel, but I'd prefer not to have to manually start looking into headers myself when this all seems to be handled already. I hope you can help. TIA Cheers, Andrew -- View this message in context: http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html Sent from the Camel - Users mailing list archive at Nabble.com.