Hi,
I have a ActiveMQ where I have setup Redelivery on the client side. With a
simple consumer it works as expected with the below configurations:
/import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.boot.jta.atomikos.AtomikosConnectionFactoryBean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
...
@Bean
public ConnectionFactory atomikosConnectionFactoryBean() {
String mqUrl = System.getenv("MQ_URL");
AtomikosConnectionFactoryBean atomikos = new
AtomikosConnectionFactoryBean();
atomikos.setLocalTransactionMode(false);
atomikos.setMaxPoolSize(10);
atomikos.setUniqueResourceName("QUEUE_BROKER");
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(4);
redeliveryPolicy.setBackOffMultiplier(10);
redeliveryPolicy.setRedeliveryDelay(1000L);
redeliveryPolicy.setInitialRedeliveryDelay(1000L);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveryDelay(86400000L);
ActiveMQXAConnectionFactory xaConnectionFactoryBean = new
ActiveMQXAConnectionFactory(System.getenv("MQ_USERNAME"),
System.getenv("MQ_PASSWORD"), mqUrl);
xaConnectionFactoryBean.setRedeliveryPolicy(redeliveryPolicy);
xaConnectionFactoryBean.setNonBlockingRedelivery(true);
atomikos.setXaConnectionFactory(xaConnectionFactoryBean);
return atomikos;
}
@Bean
public JmsListenerContainerFactory<?>
jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new
DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setErrorHandler(new EHealthEventErrorHandler());
factory.setMessageConverter(jacksonJmsMessageConverter());
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setDestinationResolver(new EHealthDestinationResolver());
factory.setSessionTransacted(true);
return factory;
}
@Bean(autowire = Autowire.BY_TYPE)
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new
JmsTemplate(atomikosConnectionFactoryBean());
jmsTemplate.setDestinationResolver(new
EHealthDestinationResolver());
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
.../
/import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@JmsListener(destination = "XXX")
public void onMessageReceived(XXXEvent event) {
throw new Exception();
}
```
So the above works as expected and the message is redelivered with the
ExponentialBackOff strategy.
BUT it goes sideways when the message consumer (onMessageReceived) calls a
method on a class that sends a message to another queue in a new
transaction.
Then the message is not redelivered if the exception is thrown after the new
transaction have been committed, ex:
```
import org.springframework.transaction.annotation.Transactional;
public class FooClass {
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void createInNewTransaction() {
sendMessageToAnotherQueue();
}
}/
/import org.springframework.jms.annotation.JmsListener;
import org.springframework.transaction.annotation.Transactional;
@Transactional
@JmsListener(destination = "Foo")
public void onMessageReceived(FooEvent event) {
fooClass.createInNewTransaction();
throw new Exception();
}/
In the stacktrace below it is seen that the
org.apache.activemq.TransactionContext.synchronizations are nulled when
sending the message in the new transaction. The
TransactionContext.synchronizations contains the ActiveMQMessageConsumer
that is used to receive the message and is needed for the redelivery after
the exception is thrown. When this is cleared the message is not
redelivered:
<http://activemq.2283324.n4.nabble.com/file/t379855/Sync_nulled.png>
/private void afterRollback() throws JMSException {
if (synchronizations == null) {
return;
}
...
}/
It is the method
com.atomikos.datasource.xa.session.BranchEnlistedStateHandler.checkEnlistBeforeUse()
that detects that the transaction context is different and throws an
exception that is catched in SessionHandleState.notifyBeforeUse():
/TransactionContextStateHandler checkEnlistBeforeUse ( CompositeTransaction
currentTx)
throws InvalidSessionHandleStateException,
UnexpectedTransactionContextException
{
if ( currentTx == null || !currentTx.isSameTransaction ( ct ) )
{
//OOPS! we are being used a different tx context than
the one expected...
//TODO check: what if subtransaction? Possible
solution: ignore if
serial_jta mode, error otherwise.
String msg = "The connection/session object is already
enlisted in a
(different) transaction.";
if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( msg );
throw new UnexpectedTransactionContextException();
}
//tx context is still the same -> no change in state required
return null;
}/
Then a new context is created and currentContext.checkEnlistBeforeUse(ct) is
called which ends up clearing the TransactionContext.synchronizations
There is a comment in BranchEnlistedStateHandler.checkEnlistBeforeUse():
"//TODO check: what if subtransaction? Possible solution: ignore if
serial_jta mode, error otherwise."
I have a subtransaction and have
"com.atomikos.icatch.serial_jta_transactions" set to true. Am I just unlucky
to have hit something that is not supported yet?
Versions used: "org.springframework:spring-jms:5.1.10.RELEASE",
"com.atomikos:transactions:5.0.3",
"org.apache.activemq:activemq-client:5.15.10"
Have tried to bump to newest versions, but didn't make a difference.
--
Sent from: http://activemq.2283324.n4.nabble.com/ActiveMQ-Dev-f2368404.html