Hi there,

I am using ActiveMQ 5.8.0 and Camel 2.10.4. I am reading
ExchangePattern.InOnly messages from a JMS queue, and want to expire those
which are not processed within a given time explicitly to a named dead
letter queue.  The problem is I can't get things to expire.

I have the following route:

public class FulfillmentRequestRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

       
errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
       
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=1&transacted=true&preserveMessageQos=true")
            .transacted()
            .to("mock:initialProcessor");
    }
}

And the following ActiveMQ config:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans";
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
       xmlns:broker="http://activemq.apache.org/schema/core";
       xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
                           http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd";>
             
    
    <broker:broker useJmx="true" persistent="true" brokerName="myBroker">
        <broker:transportConnectors>
            
            <broker:transportConnector name="vm" uri="vm://myBroker" />
            
            <broker:transportConnector name="tcp"
uri="tcp://localhost:${tcp.port}" />
        </broker:transportConnectors>
        <broker:persistenceAdapter>
            <broker:kahaPersistenceAdapter
directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
        </broker:persistenceAdapter>
        <broker:destinationPolicy>
            <broker:policyMap>
              <broker:policyEntries>
                
                <broker:policyEntry queue=">">
                  <broker:deadLetterStrategy>
                    <broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
                  </broker:deadLetterStrategy>
                </broker:policyEntry>
              </broker:policyEntries>
            </broker:policyMap>
        </broker:destinationPolicy>
    </broker:broker>

    
    
    <bean id="jms"
class="org.apache.activemq.camel.component.ActiveMQComponent">
        <property name="brokerURL" value="vm://myBroker" />
        <property name="transacted" value="true"/>
        <property name="transactionManager" ref="jmsTransactionManager"/>
        <property name="acceptMessagesWhileStopping" value="false"/>
    </bean>
    <bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
        <property name="connectionFactory" ref="jmsConnectionFactory"/>
    </bean>
    <bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="vm://myBroker" />
    </bean>
         
</beans>

Finally I have a Unit Test which creates two messages,one which will be
processed, and the other which should time-out.

@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations =
{"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {

    @EndpointInject(uri = "mock:initialProcessor")
    protected MockEndpoint mockEndpoint;

    @Produce
    protected ProducerTemplate template;

    protected ConsumerTemplate consumer;

    @Autowired
    @Qualifier("camel-server")
    protected CamelContext context;

    @DirtiesContext
    @Test
    public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws
Exception {

        // Given
        consumer = context.createConsumerTemplate();

        int expectedValidMessageCount = 3;
        mockEndpoint.expectedMessageCount(expectedValidMessageCount);        

        // When 
        String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT
TIMEOUT</body>";
        template.sendBody("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody1);

        long ttl = System.currentTimeMillis() - 12000000;
        String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED
OUT!!!!!</body>";
        template.sendBodyAndHeader("jms:queue:fulfillmentRequest",
ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);

        // Then
        // The second message is not processed
        mockEndpoint.assertIsSatisfied(); // This fails, but it sees two
messages rather than just one

        List<Exchange> list = mockEndpoint.getReceivedExchanges();
        String notTimedOutMessageBody = (String)
list.get(0).getIn().getBody(String.class);

        assertEquals(xmlBody1, notTimedOutMessageBody);

        Thread.sleep(5000);

        // And is instead routed to the timedOut JMS queue
        Object dlqBody  = consumer.receiveBodyNoWait("jms:queue:dead");
        assertNotNull("Should not lose the message", dlqBody);          //
This also fails if I comment out the assert above
        assertEquals(xmlBody2, dlqBody);
    }

    @Configuration
    public static class ContextConfig extends SingleRouteCamelConfiguration
{

        @Bean
        public RouteBuilder route() {
            return new FulfillmentRequestRoute();
        }
    }
}

I've been staring at this for a while, and while I think I've only been
changing one thing at a time, I may have made an error or left behind some
config which is shooting me in the foot.

One final thing to note, I have this pattern working elsewhere in tests
which explicitly throw exceptions from with transactions in Camel, but I'd
prefer not to have to manually start looking into headers myself when this
all seems to be handled already.

I hope you can help.

TIA

Cheers, Andrew



--
View this message in context: 
http://camel.465427.n5.nabble.com/Not-Expiring-JMS-Messages-with-ActiveMQ-Camel-tp5732841.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to