can you verify if this behavior exists on trunk by using a 5.3-SNAPSHOT? If
it does, the please open a jira issue and attach your test case. thanks.

2009/7/30 Diego Rodríguez Martín <drodrig...@altiria.com>

> Hi,
>
>   I'm trying to setup a config where message redelivery is important. Every
> message read from the queue is processed sending an http request to another
> host, so it can fail due to a lot of causes, and I want the system to retry
> the post later.
>     I have made some tests and this is what is happening:
>     There are, in this test, 100 messages in the queue. 2 messages are
> going to make the http request to a non existent host and exit with
> RuntimeException to rollback, ("bad messages"). 98 messages are going to
> succeed, ("good messages"). This is configurable because the host is
> extracted from a bdd with a message attribute as the key. What I expected to
> happen at the end of the test is 98 good messages processed and the 2 bad
> messages remaining in the queue waiting for a retry until the max
> redeliveries in RedeliveryPolicy is reached and then sent to DLQ. This is
> going to happen after some minutes because the redelivery policy is
> initialRedeliveryDelay 10000, maximumRedeliveries 6, useExponentialBackOff
> true, backOffMultiplier 3.
>     What is happening instead is that after a while with no activity, I can
> see 10 messages still in the original queue, that means that 8 good messages
> are stucked in the queue. Then the 2 bad messages are retried until
> maximumRedeliveries is reached, some minutes later, sent to DLQ, and then
> the 8 good messages are processed succesfully.
>     This means there is something that is retaining the rollbacked
> messages, not letting other messages in the queue to be processed
>     First I thougth of prefetchPolicy, as the messages could be retained by
> this internal queue. Setting queuePrefetch to 1, didn't made any change.
> Then I thougth of DefaultMessageListenerContainer, because its processing
> threads could be blocked by this rollback.
>     Also, when there are some messages waiting to be redelivered, if I
> desinstall the DefaultMessageListenerContainer consumers, I get an exception
> in the ActiveMQ broker, probably related with the connection that is
> retaining the rollbacked messages:
>  30 jul 2009 14:17:45,140 ERROR [ActiveMQ Transport: tcp:///127.0.0.1:3656]
> org.apache.activemq.broker.TransportConnection.Service  - sync error
> occurred:
> javax.jms.JMSException: Transaction
> 'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
> javax.jms.JMSException: Transaction
> 'TX:ID:mymachine-4579-1248953104078-0:5207:52' has not been started.
>       at
> org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:270)
>       at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:190)
>       at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>       at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>       at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
>       at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85)
>       at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:456)
>       at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205)
>       at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:305)
>       at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179)
>       at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
>       at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
>       at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
>       at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203)
>       at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
>       at java.lang.Thread.run(Thread.java:595)         I don't know how to
> test who's causing this problem, DefaultMessageListenerContainer or
> ActiveMQ. Maybe is DefaultMessageListenerContainer consumer threads, but the
> exception in the broker means ActiveMQ could be also the problem.
>     Anyone can help me to narrow the problem?
>     I am using ActiveMQ 5.2 in a window box, and Spring
> DefaultMessageListenerContainer for the consumers with this config. In you
> need more config data, please ask
>       <bean id="jmsConnectionFactory"
> class="org.apache.activemq.spring.ActiveMQConnectionFactory"
> lazy-init="true">
>       <property name="brokerURL"
> value="failover:(tcp://localhost:6275?soTimeout=60000&amp;connectionTimeout=30000)?initialReconnectDelay=20000&amp;randomize=false&amp;maxReconnectAttempts=-1&amp;useExponentialBackOff=false&amp;reconnectDelayExponent=2"/>
>       <property name="redeliveryPolicy">
>           <bean class="org.apache.activemq.RedeliveryPolicy">
>               <property name="initialRedeliveryDelay" value="10000" />
>               <property name="maximumRedeliveries" value="6" />
>               <property name="useExponentialBackOff" value="true" />
>               <property name="backOffMultiplier" value="3" />
>           </bean>
>       </property>
>       <property name="prefetchPolicy">
>           <bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
>               <property name="queuePrefetch" value="1" />
>           </bean>
>       </property>
>       <property name="copyMessageOnSend" value="false"/>
>           <property name="userName" value="amquser"/>
>       <property name="password" value="amquserpassword"/>
>   </bean>
>     <bean id="inputQueueListenerContainer"
> class="org.springframework.jms.listener.DefaultMessageListenerContainer"
> destroy-method="shutdown"  lazy-init="true">
>       <property name="connectionFactory">
>           <ref bean="jmsConnectionFactory" />
>       </property>
>       <property name="destination">
>           <ref bean="inputQueue" />
>       </property>
>       <property name="messageListener">
>           <ref bean="inputQueueMessageListener" />
>       </property>
>       <property name="sessionTransacted" value="true" />
>       <property name="concurrentConsumers" value="2" />
>       <property name="maxConcurrentConsumers" value="4" />
>   </bean>
>
> --
> -------------------------------------------------------------
> http://www.programacionenjava.com
> -------------------------------------------------------------
>
>


-- 
http://blog.garytully.com

Open Source Integration
http://fusesource.com

Reply via email to