On 6/1/06, lathan <[EMAIL PROTECTED]> wrote:
In testing a network of brokers (description below) I'm having problems
getting a durable consumer to reconnect to a topic if that consumer process
is killed (kill -9) and not given a chance to clean up. In looking at the
code in TopicRegion.addConsumer it appears to be checking to make sure an
active subscription does not already exist for the given clientID.
We have to enforce this to be JMS compliant.
While
this does enforce the rule that you can only have one durable subscription
for a given client, it seems to preclude the ability to recover when a
durable consumer crashes. Am I missing something config-wise or is this a
bug?
The killed client should be detected and closed down at some point;
either by the socket timing out or the inactivity monitor detecting it
(if its enabled on the transport). Then the client will be removed
allowing the restarted client to reconnect.
Maybe the reconnection logic could be a little more smart - if a
duplicate client connects, we proactively try to ping the old client
to see if its still alive (up to some timeout) then if the client
responds, we fail the duplicate, otherwise we kill the old client
connection.
James
Exception:
2006-05-31 16:41:53,689 [INFO,DemandForwardingBridge] --> Network
connection between vm://lab00260102#6 and
tcp://localhost/127.0.0.1:60101(lab00260101) has been established.
2006-05-31 16:41:54,450 [INFO,Service] --> Async error occurred:
javax.jms.JMSException: Durable consumer is in use for client:
NC_lab00260101_inboundlab00260102 and subscriptionName: lab00260102_prov
javax.jms.JMSException: Durable consumer is in use for client:
NC_lab00260101_inboundlab00260102 and subscriptionName: lab00260102_prov
at
org.apache.activemq.broker.region.TopicRegion.addConsumer(TopicRegion.java:80)
at
org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:296)
at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:73)
at
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:77)
at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:73)
at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:86)
at
org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:427)
at
org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:295)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:201)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:62)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:405)
at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:357)
@
at
org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:307)
at
org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:124)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:114)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:122)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:87)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:143)
at java.lang.Thread.run(Thread.java:595)
2006-05-31 16:41:54,453 [WARN,DemandForwardingBridge] --> Unexpected local
command: ConnectionError {commandId = 2, responseRequired = false,
connectionId = null, exception = javax.jms.JMSException: Durable consumer is
in use for client: NC_lab00260101_inboundlab00260102 and subscriptionName:
lab00260102_prov}
My setup:
ActiveMQ 4.0
RedHat Linux 4.0
Java 1.5.0_06
Network of Brokers
- Two processes on the same machine
- Each process has a broker.
- One process represents the producer (persistent), the other the
consumer (durable consumer)
- Connection via peer transport
lathan lewis
--
View this message in context:
http://www.nabble.com/Durable+consumer+reconnect+problem-t1716817.html#a4662218
Sent from the ActiveMQ - User forum at Nabble.com.
--
James
-------
http://radio.weblogs.com/0112098/