[ https://issues.apache.org/activemq/browse/AMQ-1070?page=all ]

Rob Davies resolved AMQ-1070.
-----------------------------

    Fix Version/s: 4.2.0
       Resolution: Fixed

Locking behaviour has changed for 4.2 - can now longer reproduce this

> Deadlock in Queue.java
> ----------------------
>
>                 Key: AMQ-1070
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1070
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0.2
>            Reporter: Tom Kaitchuck
>         Assigned To: Rob Davies
>             Fix For: 4.2.0
>
>
> It is possible to have a deadlock as follows:
> "ActiveMQ Transport: tcp:///127.0.0.1:53335":
>         at 
> org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:66)
>         - waiting to lock <0x90786240> (a 
> org.apache.activemq.broker.region.QueueSubscription)
>         at 
> org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:192)
>         - locked <0x908fa480> (a java.util.LinkedList)
>         at 
> org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:93)
>         - locked <0x903b9b40> (a java.lang.Object)
>         at 
> org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:221)
>         at 
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
>         at 
> org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:142)
>         at 
> org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
>         at 
> org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:143)
>         at 
> org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:182)
>         - locked <0x908e6cb8> (a java.lang.Object)
>         at 
> org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:297)
>         at 
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
>         at 
> org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:78)
>         at 
> org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
>         at 
> org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:87)
>         at 
> org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:538)
>         at 
> org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:296)
>         at 
> org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
>         at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
>         at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
>         at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>         at 
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
>         at 
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
>         at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
>         at 
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
>         at java.lang.Thread.run(Thread.java:595)
> "ActiveMQ Transport: tcp:///127.0.0.1:53315":
>         at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:321)
>         - waiting to lock <0x908fa480> (a java.util.LinkedList)
>         at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:315)
>         at 
> org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:54)
>         at 
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:125)
>         - locked <0x90786240> (a 
> org.apache.activemq.broker.region.QueueSubscription)
>         at 
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:265)
>         at 
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:366)
>         at 
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
>         at 
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
>         at 
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
>         at 
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:79)
>         at 
> org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:445)
>         at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
>         at 
> org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
>         at 
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
>         at 
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
>         at 
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
>         at 
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
>         at 
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
>         at 
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
>         at 
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
>         at java.lang.Thread.run(Thread.java:595)
> The simple solution is in AbstractReagion.java:
> @@ -89,10 +89,12 @@
>              // Add all consumers that are interested in the destination.
>              for (Iterator iter = subscriptions.values().iterator(); 
> iter.hasNext();) {
>                  Subscription sub = (Subscription) iter.next();
> +                synchronized (sub) {
>                  if( sub.matches(destination) ) {
>                      dest.addSubscription(context, sub);
>                  }
>              }
> +            }
>              return dest;
>          }
>      }
> @@ -104,11 +106,13 @@
>          if( timeout == 0 ) {
>              for(Iterator 
> iter=subscriptions.values().iterator();iter.hasNext();){
>                  Subscription sub=(Subscription) iter.next();
> +                synchronized (sub) {
>                  if(sub.matches(destination)){
>                      throw new JMSException("Destination still has an active 
> subscription: "+destination);
>                  }
>              }
>          }
> +        }
>          if( timeout > 0 ) {
>              // TODO: implement a way to notify the subscribers that we want 
> to take the down
> @@ -125,10 +129,12 @@
>                  // timeout<0 or we timed out, we now force any remaining 
> subscriptions to un-subscribe.
>                  for(Iterator 
> iter=subscriptions.values().iterator();iter.hasNext();){
>                      Subscription sub=(Subscription) iter.next();
> +                    synchronized (sub) {
>                      if(sub.matches(destination)){
>                          dest.removeSubscription(context, sub);
>                      }
>                  }
> +                }
>                  destinationMap.removeAll(destination);
>                  dest.dispose(context);
> @@ -173,7 +179,8 @@
>              // broker will not see a destination that exists in persistent 
> store.  We may want to
>              // eagerly load all destinations into the broker but have an 
> inactive state for the
>              // destination which has reduced memory usage.
> -            //
> +            synchronized (sub)
> +            {
>              if( persistenceAdapter!=null ) {
>                  Set inactiveDests = getInactiveDestinations();
>                  for (Iterator iter = inactiveDests.iterator(); 
> iter.hasNext();) {
> @@ -183,6 +190,7 @@
>                      }
>                  }
>              }
> +            }
>              subscriptions.put(info.getConsumerId(), sub);
> @@ -193,16 +201,16 @@
>              // no mutex held. Remove is only essentially run once
>              // so everything after this point would be leaked.
> +            synchronized (sub) {
>              // Add the subscription to all the matching queues.
>              for (Iterator iter = 
> destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
>                  Destination dest = (Destination) iter.next();
>                  dest.addSubscription(context, sub);
>              }
> -
>              if( info.isBrowser() ) {
>                  ((QueueBrowserSubscription)sub).browseDone();
>              }
> -
> +            }
>              return sub;
>          }
>      }

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to