Hi,
I'm trying to setup a config where message redelivery is important.
Every message read from the queue is processed sending an http request
to another host, so it can fail due to a lot of causes, and I want the
system to retry the post later.
I have made some tests and this is what is happening:
There are, in this test, 100 messages in the queue. 2 messages are
going to make the http request to a non existent host and exit with
RuntimeException to rollback, ("bad messages"). 98 messages are going to
succeed, ("good messages"). This is configurable because the host is
extracted from a bdd with a message attribute as the key. What I
expected to happen at the end of the test is 98 good messages processed
and the 2 bad messages remaining in the queue waiting for a retry until
the max redeliveries in RedeliveryPolicy is reached and then sent to
DLQ. This is going to happen after some minutes because the redelivery
policy is initialRedeliveryDelay 10000, maximumRedeliveries 6,
useExponentialBackOff true, backOffMultiplier 3.
What is happening instead is that after a while with no activity, I
can see 10 messages still in the original queue, that means that 8 good
messages are stucked in the queue. Then the 2 bad messages are retried
until maximumRedeliveries is reached, some minutes later, sent to DLQ,
and then the 8 good messages are processed succesfully.
This means there is something that is retaining the rollbacked
messages, not letting other messages in the queue to be processed
First I thougth of prefetchPolicy, as the messages could be retained
by this internal queue. Setting queuePrefetch to 1, didn't made any
change. Then I thougth of DefaultMessageListenerContainer, because its
processing threads could be blocked by this rollback.
Also, when there are some messages waiting to be redelivered, if I
desinstall the DefaultMessageListenerContainer consumers, I get an
exception in the ActiveMQ broker, probably related with the connection
that is retaining the rollbacked messages:
30 jul 2009 14:17:45,140 ERROR [ActiveMQ Transport:
tcp:///127.0.0.1:3656]
org.apache.activemq.broker.TransportConnection.Service - sync error
occurred:
javax.jms.JMSException: Transaction
'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
javax.jms.JMSException: Transaction
'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
at
org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
at java.lang.Thread.run(Thread.java:595)
I don't know how to test who's causing this problem,
DefaultMessageListenerContainer or ActiveMQ. Maybe is
DefaultMessageListenerContainer consumer threads, but the exception in
the broker means ActiveMQ could be also the problem.
Anyone can help me to narrow the problem?
I am using ActiveMQ 5.2 in a window box, and Spring
DefaultMessageListenerContainer for the consumers with this config. In
you need more config data, please ask
<bean id="jmsConnectionFactory"
class="org.apache.activemq.spring.ActiveMQConnectionFactory"
lazy-init="true">
<property name="brokerURL"
value="failover:(tcp://localhost:6275?soTimeout=60000&connectionTimeout=30000)?initialReconnectDelay=20000&randomize=false&maxReconnectAttempts=-1&useExponentialBackOff=false&reconnectDelayExponent=2"/>
<property name="redeliveryPolicy">
<bean class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="10000" />
<property name="maximumRedeliveries" value="6" />
<property name="useExponentialBackOff" value="true" />
<property name="backOffMultiplier" value="3" />
</bean>
</property>
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="queuePrefetch" value="1" />
</bean>
</property>
<property name="copyMessageOnSend" value="false"/>
<property name="userName" value="amquser"/>
<property name="password" value="amquserpassword"/>
</bean>
<bean id="inputQueueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer"
destroy-method="shutdown" lazy-init="true">
<property name="connectionFactory">
<ref bean="jmsConnectionFactory" />
</property>
<property name="destination">
<ref bean="inputQueue" />
</property>
<property name="messageListener">
<ref bean="inputQueueMessageListener" />
</property>
<property name="sessionTransacted" value="true" />
<property name="concurrentConsumers" value="2" />
<property name="maxConcurrentConsumers" value="4" />
</bean>
--
-------------------------------------------------------------
http://www.programacionenjava.com
-------------------------------------------------------------