Hi,
I have some problems with JMS message redeliveries when using cache levels
CACHE_CONSUMER or CACHE_SESSION on JmsComponent. I use transacted endpoints
and local transactions (JmsTransactionManager). I am using Camel 1.3-rc2,
Spring 2.5.1 and ActiveMQ 5.0.0. (However, JMS message redeliveries do work
when using CACHE_CONNECTION or CACHE_NONE)
Here's the Spring application context:
<camelContext id="camel"
xmlns="http://activemq.apache.org/camel/schema/spring" />
<bean id="activemqConfig"
class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
<property name="concurrentConsumers" value="5"/>
<property name="maxConcurrentConsumers" value="10"/>
<property name="transacted" value="true"/>
</bean>
<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="activemqConfig"/>
</bean>
<bean id="PROPAGATION_REQUIRED"
class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
depends-on="broker">
<property name="brokerURL" value="tcp://localhost:60001"/>
</bean>
<bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
<property name="config" value="/activemq.xml"/>
</bean>
</beans>
And here's the route definition in SpringRouteBuilder's configure() method:
Policy failure = new ForceFailure(); // see below
Policy required = new SpringTransactionPolicy(bean(
TransactionTemplate.class, "PROPAGATION_REQUIRED"));
from("activemq:queue.a").errorHandler(noErrorHandler())
.to("direct:y");
from("direct:y").errorHandler(noErrorHandler())
.policy(required)
.policy(failure)
.to("activemq:queue.c");
from("activemq:queue.c").trace();
Here's the ForceFailure policy I used for testing purposes. It simply
delegates processing to the next processor and then throws a
RuntimeException, causing the transaction interceptor (.policy(required)) to
initiate a rollback:
public class ForceFailure implements Policy {
public Processor wrap(Processor processor) {
return new DelegateProcessor(processor) {
@Override
public void process(Exchange exchange) throws Exception {
processNext(exchange);
System.out.println("throwing exception ...");
throw new RuntimeException("rollback");
}
@Override
public String toString() {
return "rollback(" + getProcessor() + ")";
}
};
}
}
With any cache level, no messages are consumed from queue.c, as expected.
When I omit .policy(failure) in the route definition then the message is
consumed from queue.c.
Any ideas why CACHE_CONSUMER or CACHE_SESSION don't cause message
redeliveries?
A related problem I have is that ActiveMQ redelivery policies are ignored
when using cache levels CACHE_CONNECTION or CACHE_NONE (for the others I
couldn't test). ActiveMQ is doing endless redeliveries. I configured this
redelivery policy with the connection factory:
<bean id="jmsConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
depends-on="broker">
<property name="brokerURL" value="tcp://localhost:60001"/>
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="1000" />
<property name="maximumRedeliveries" value="2"/>
<property name="useExponentialBackOff" value="false"/>
<property name="backOffMultiplier" value="1"/>
</bean>
</property>
</bean>
Any ideas why this redelivery policy is ignored?
Thanks in advance for your help.
Cheers,
Martin