Hi,

I'm trying to setup a config where message redelivery is important. Every message read from the queue is processed sending an http request to another host, so it can fail due to a lot of causes, and I want the system to retry the post later. I have made some tests and this is what is happening: There are, in this test, 100 messages in the queue. 2 messages are going to make the http request to a non existent host and exit with RuntimeException to rollback, ("bad messages"). 98 messages are going to succeed, ("good messages"). This is configurable because the host is extracted from a bdd with a message attribute as the key. What I expected to happen at the end of the test is 98 good messages processed and the 2 bad messages remaining in the queue waiting for a retry until the max redeliveries in RedeliveryPolicy is reached and then sent to DLQ. This is going to happen after some minutes because the redelivery policy is initialRedeliveryDelay 10000, maximumRedeliveries 6, useExponentialBackOff true, backOffMultiplier 3. What is happening instead is that after a while with no activity, I can see 10 messages still in the original queue, that means that 8 good messages are stucked in the queue. Then the 2 bad messages are retried until maximumRedeliveries is reached, some minutes later, sent to DLQ, and then the 8 good messages are processed succesfully. This means there is something that is retaining the rollbacked messages, not letting other messages in the queue to be processed First I thougth of prefetchPolicy, as the messages could be retained by this internal queue. Setting queuePrefetch to 1, didn't made any change. Then I thougth of DefaultMessageListenerContainer, because its processing threads could be blocked by this rollback. Also, when there are some messages waiting to be redelivered, if I desinstall the DefaultMessageListenerContainer consumers, I get an exception in the ActiveMQ broker, probably related with the connection that is retaining the rollbacked messages: 30 jul 2009 14:17:45,140 ERROR [ActiveMQ Transport: tcp:///127.0.0.1:3656] org.apache.activemq.broker.TransportConnection.Service - sync error occurred: javax.jms.JMSException: Transaction 'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started. javax.jms.JMSException: Transaction 'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started. at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
       at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:595) I don't know how to test who's causing this problem, DefaultMessageListenerContainer or ActiveMQ. Maybe is DefaultMessageListenerContainer consumer threads, but the exception in the broker means ActiveMQ could be also the problem. Anyone can help me to narrow the problem? I am using ActiveMQ 5.2 in a window box, and Spring DefaultMessageListenerContainer for the consumers with this config. In you need more config data, please ask <bean id="jmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" lazy-init="true"> <property name="brokerURL" value="failover:(tcp://localhost:6275?soTimeout=60000&amp;connectionTimeout=30000)?initialReconnectDelay=20000&amp;randomize=false&amp;maxReconnectAttempts=-1&amp;useExponentialBackOff=false&amp;reconnectDelayExponent=2"/>
       <property name="redeliveryPolicy">
           <bean class="org.apache.activemq.RedeliveryPolicy">
               <property name="initialRedeliveryDelay" value="10000" />
               <property name="maximumRedeliveries" value="6" />
               <property name="useExponentialBackOff" value="true" />
               <property name="backOffMultiplier" value="3" />
           </bean>
       </property>
       <property name="prefetchPolicy">
           <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
               <property name="queuePrefetch" value="1" />
           </bean>
       </property>
<property name="copyMessageOnSend" value="false"/> <property name="userName" value="amquser"/>
       <property name="password" value="amquserpassword"/>
   </bean>
<bean id="inputQueueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" destroy-method="shutdown" lazy-init="true">
       <property name="connectionFactory">
           <ref bean="jmsConnectionFactory" />
       </property>
       <property name="destination">
           <ref bean="inputQueue" />
       </property>
       <property name="messageListener">
           <ref bean="inputQueueMessageListener" />
       </property>
       <property name="sessionTransacted" value="true" />
       <property name="concurrentConsumers" value="2" />
       <property name="maxConcurrentConsumers" value="4" />
   </bean>

--
-------------------------------------------------------------
http://www.programacionenjava.com
-------------------------------------------------------------

Reply via email to