[ https://issues.apache.org/activemq/browse/AMQ-1070?page=all ]
Rob Davies resolved AMQ-1070. ----------------------------- Fix Version/s: 4.2.0 Resolution: Fixed Locking behaviour has changed for 4.2 - can now longer reproduce this > Deadlock in Queue.java > ---------------------- > > Key: AMQ-1070 > URL: https://issues.apache.org/activemq/browse/AMQ-1070 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 4.0.2 > Reporter: Tom Kaitchuck > Assigned To: Rob Davies > Fix For: 4.2.0 > > > It is possible to have a deadlock as follows: > "ActiveMQ Transport: tcp:///127.0.0.1:53335": > at > org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:66) > - waiting to lock <0x90786240> (a > org.apache.activemq.broker.region.QueueSubscription) > at > org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:192) > - locked <0x908fa480> (a java.util.LinkedList) > at > org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:93) > - locked <0x903b9b40> (a java.lang.Object) > at > org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:221) > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130) > at > org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:142) > at > org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130) > at > org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:143) > at > org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:182) > - locked <0x908e6cb8> (a java.lang.Object) > at > org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:297) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74) > at > org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:78) > at > org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74) > at > org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:87) > at > org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:538) > at > org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:296) > at > org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138) > at java.lang.Thread.run(Thread.java:595) > "ActiveMQ Transport: tcp:///127.0.0.1:53315": > at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:321) > - waiting to lock <0x908fa480> (a java.util.LinkedList) > at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:315) > at > org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:54) > at > org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:125) > - locked <0x90786240> (a > org.apache.activemq.broker.region.QueueSubscription) > at > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:265) > at > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:366) > at > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66) > at > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:79) > at > org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:445) > at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179) > at > org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138) > at java.lang.Thread.run(Thread.java:595) > The simple solution is in AbstractReagion.java: > @@ -89,10 +89,12 @@ > // Add all consumers that are interested in the destination. > for (Iterator iter = subscriptions.values().iterator(); > iter.hasNext();) { > Subscription sub = (Subscription) iter.next(); > + synchronized (sub) { > if( sub.matches(destination) ) { > dest.addSubscription(context, sub); > } > } > + } > return dest; > } > } > @@ -104,11 +106,13 @@ > if( timeout == 0 ) { > for(Iterator > iter=subscriptions.values().iterator();iter.hasNext();){ > Subscription sub=(Subscription) iter.next(); > + synchronized (sub) { > if(sub.matches(destination)){ > throw new JMSException("Destination still has an active > subscription: "+destination); > } > } > } > + } > if( timeout > 0 ) { > // TODO: implement a way to notify the subscribers that we want > to take the down > @@ -125,10 +129,12 @@ > // timeout<0 or we timed out, we now force any remaining > subscriptions to un-subscribe. > for(Iterator > iter=subscriptions.values().iterator();iter.hasNext();){ > Subscription sub=(Subscription) iter.next(); > + synchronized (sub) { > if(sub.matches(destination)){ > dest.removeSubscription(context, sub); > } > } > + } > destinationMap.removeAll(destination); > dest.dispose(context); > @@ -173,7 +179,8 @@ > // broker will not see a destination that exists in persistent > store. We may want to > // eagerly load all destinations into the broker but have an > inactive state for the > // destination which has reduced memory usage. > - // > + synchronized (sub) > + { > if( persistenceAdapter!=null ) { > Set inactiveDests = getInactiveDestinations(); > for (Iterator iter = inactiveDests.iterator(); > iter.hasNext();) { > @@ -183,6 +190,7 @@ > } > } > } > + } > subscriptions.put(info.getConsumerId(), sub); > @@ -193,16 +201,16 @@ > // no mutex held. Remove is only essentially run once > // so everything after this point would be leaked. > + synchronized (sub) { > // Add the subscription to all the matching queues. > for (Iterator iter = > destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) { > Destination dest = (Destination) iter.next(); > dest.addSubscription(context, sub); > } > - > if( info.isBrowser() ) { > ((QueueBrowserSubscription)sub).browseDone(); > } > - > + } > return sub; > } > } -- This message is automatically generated by JIRA. - If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa - For more information on JIRA, see: http://www.atlassian.com/software/jira