It seems you're expecting the broker to expire the messages?
By default the expiration task for a queue will run every 30s.. if your
test is taking less time than that, you'll want to tune the broker to check
for expiration a little sooner:

<policyEntry .... expireMessagesPeriod="1s" />


On Mon, May 20, 2013 at 5:04 AM, al94781 <and...@harmel-law.com> wrote:

> Hi there,
>
> I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading
> ExchangePattern.InOnly messages from a JMS queue, and want to expire those
> which are not processed within a given time explicitly to a named dead
> letter queue.  The problem is I can't get things to expire.
>
> I have the following route:
>
> public class FulfillmentRequestRoute extends RouteBuilder {
>
>     @Override
>     public void configure() throws Exception {
>
>
> errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
>
>
> from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true")
>             .transacted()
>             .to("mock:initialProcessor");
>     }
> }
>
> And the following ActiveMQ config:
>
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans";
>        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>        xmlns:broker="http://activemq.apache.org/schema/core";
>        xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
>                            http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd";>
>
>
>     <broker:broker useJmx="true" persistent="true" brokerName="myBroker">
>         <broker:transportConnectors>
>
>             <broker:transportConnector name="vm" uri="vm://myBroker" />
>
>             <broker:transportConnector name="tcp"
> uri="tcp://localhost:${tcp.port}" />
>         </broker:transportConnectors>
>         <broker:persistenceAdapter>
>             <broker:kahaPersistenceAdapter
> directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
>         </broker:persistenceAdapter>
>         <broker:destinationPolicy>
>             <broker:policyMap>
>               <broker:policyEntries>
>
>                 <broker:policyEntry queue=">">
>                   <broker:deadLetterStrategy>
>                     <broker:sharedDeadLetterStrategy processExpired="true"
> processNonPersistent="true" />
>                   </broker:deadLetterStrategy>
>                 </broker:policyEntry>
>               </broker:policyEntries>
>             </broker:policyMap>
>         </broker:destinationPolicy>
>     </broker:broker>
>
>
>
>     <bean id="jms"
> class="org.apache.activemq.camel.component.ActiveMQComponent">
>         <property name="brokerURL" value="vm://myBroker" />
>         <property name="transacted" value="true"/>
>         <property name="transactionManager" ref="jmsTransactionManager"/>
>         <property name="acceptMessagesWhileStopping" value="false"/>
>     </bean>
>     <bean id="jmsTransactionManager"
> class="org.springframework.jms.connection.JmsTransactionManager">
>         <property name="connectionFactory" ref="jmsConnectionFactory"/>
>     </bean>
>     <bean id="jmsConnectionFactory"
> class="org.apache.activemq.ActiveMQConnectionFactory">
>         <property name="brokerURL" value="vm://myBroker" />
>     </bean>
>
> </beans>
>
> Finally I have a Unit Test which creates two messages,one which will be
> processed, and the other which should time-out.
>
> @RunWith(CamelSpringJUnit4ClassRunner.class)
> @ContextConfiguration(locations =
> {"classpath:/META-INF/spring/camel-server.xml"})
> public class FulfillmentRequestTimeoutTest {
>
>     @EndpointInject(uri = "mock:initialProcessor")
>     protected MockEndpoint mockEndpoint;
>
>     @Produce
>     protected ProducerTemplate template;
>
>     protected ConsumerTemplate consumer;
>
>     @Autowired
>     @Qualifier("camel-server")
>     protected CamelContext context;
>
>     @DirtiesContext
>     @Test
>     public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws
> Exception {
>
>         // Given
>         consumer = context.createConsumerTemplate();
>
>         int expectedValidMessageCount = 3;
>         mockEndpoint.expectedMessageCount(expectedValidMessageCount);
>
>         // When
>         String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT
> TIMEOUT</body>";
>         template.sendBody("jms:queue:fulfillmentRequest",
> ExchangePattern.InOnly, xmlBody1);
>
>         long ttl = System.currentTimeMillis() - 12000000;
>         String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED
> OUT!!!!!</body>";
>         template.sendBodyAndHeader("jms:queue:fulfillmentRequest",
> ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);
>
>         // Then
>         // The second message is not processed
>         mockEndpoint.assertIsSatisfied(); // This fails, but it sees two
> messages rather than just one
>
>         List<Exchange> list = mockEndpoint.getReceivedExchanges();
>         String notTimedOutMessageBody = (String)
> list.get(0).getIn().getBody(String.class);
>
>         assertEquals(xmlBody1, notTimedOutMessageBody);
>
>         Thread.sleep(5000);
>
>         // And is instead routed to the timedOut JMS queue
>         Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
>         assertNotNull("Should not lose the message", dlqBody);          //
> This also fails if I comment out the assert above
>         assertEquals(xmlBody2, dlqBody);
>     }
>
>     @Configuration
>     public static class ContextConfig extends SingleRouteCamelConfiguration
> {
>
>         @Bean
>         public RouteBuilder route() {
>             return new FulfillmentRequestRoute();
>         }
>     }
> }
>
> I've been staring at this for a while, and while I think I've only been
> changing one thing at a time, I may have made an error or left behind some
> config which is shooting me in the foot.
>
> One final thing to note, I have this pattern working elsewhere in tests
> which explicitly throw exceptions from with transactions in Camel, but I'd
> prefer not to have to manually start looking into headers myself when this
> all seems to be handled already.
>
> I hope you can help.
>
> TIA
>
> Cheers, Andrew
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Reply via email to