Deadlock in Queue.java
----------------------
Key: AMQ-1070
URL: https://issues.apache.org/activemq/browse/AMQ-1070
Project: ActiveMQ
Issue Type: Bug
Components: Broker
Affects Versions: 4.0.2
Reporter: Tom Kaitchuck
It is possible to have a deadlock as follows:
"ActiveMQ Transport: tcp:///127.0.0.1:53335":
at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:66)
- waiting to lock <0x90786240> (a
org.apache.activemq.broker.region.QueueSubscription)
at
org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:192)
- locked <0x908fa480> (a java.util.LinkedList)
at
org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:93)
- locked <0x903b9b40> (a java.lang.Object)
at
org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:221)
at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
at
org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:142)
at
org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
at
org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:143)
at
org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:182)
- locked <0x908e6cb8> (a java.lang.Object)
at
org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:297)
at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
at
org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:78)
at
org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
at
org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:87)
at
org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:538)
at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:296)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
at java.lang.Thread.run(Thread.java:595)
"ActiveMQ Transport: tcp:///127.0.0.1:53315":
at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:321)
- waiting to lock <0x908fa480> (a java.util.LinkedList)
at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:315)
at
org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:54)
at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:125)
- locked <0x90786240> (a
org.apache.activemq.broker.region.QueueSubscription)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:265)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:366)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:79)
at
org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:445)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
at
org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
at java.lang.Thread.run(Thread.java:595)
The simple solution is in AbstractReagion.java:
@@ -89,10 +89,12 @@
// Add all consumers that are interested in the destination.
for (Iterator iter = subscriptions.values().iterator();
iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
+ synchronized (sub) {
if( sub.matches(destination) ) {
dest.addSubscription(context, sub);
}
}
+ }
return dest;
}
}
@@ -104,11 +106,13 @@
if( timeout == 0 ) {
for(Iterator
iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
+ synchronized (sub) {
if(sub.matches(destination)){
throw new JMSException("Destination still has an active
subscription: "+destination);
}
}
}
+ }
if( timeout > 0 ) {
// TODO: implement a way to notify the subscribers that we want to
take the down
@@ -125,10 +129,12 @@
// timeout<0 or we timed out, we now force any remaining
subscriptions to un-subscribe.
for(Iterator
iter=subscriptions.values().iterator();iter.hasNext();){
Subscription sub=(Subscription) iter.next();
+ synchronized (sub) {
if(sub.matches(destination)){
dest.removeSubscription(context, sub);
}
}
+ }
destinationMap.removeAll(destination);
dest.dispose(context);
@@ -173,7 +179,8 @@
// broker will not see a destination that exists in persistent
store. We may want to
// eagerly load all destinations into the broker but have an
inactive state for the
// destination which has reduced memory usage.
- //
+ synchronized (sub)
+ {
if( persistenceAdapter!=null ) {
Set inactiveDests = getInactiveDestinations();
for (Iterator iter = inactiveDests.iterator();
iter.hasNext();) {
@@ -183,6 +190,7 @@
}
}
}
+ }
subscriptions.put(info.getConsumerId(), sub);
@@ -193,16 +201,16 @@
// no mutex held. Remove is only essentially run once
// so everything after this point would be leaked.
+ synchronized (sub) {
// Add the subscription to all the matching queues.
for (Iterator iter =
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.addSubscription(context, sub);
}
-
if( info.isBrowser() ) {
((QueueBrowserSubscription)sub).browseDone();
}
-
+ }
return sub;
}
}
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators:
https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira