[ 
https://issues.apache.org/jira/browse/AMQ-5260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14149239#comment-14149239
 ] 

matteo rulli commented on AMQ-5260:
-----------------------------------

Thank you for your prompt reply! Now I understand the role of MutexTransport. 
By the way, I noticed that the deadlock involves two MutexTransport instances 
that wrap *VMTransport(s)*. Apparently if I apply the following hack
{code:java}
--- MutexTransport.java Thu Jun 05 14:48:36 2014
+++ MutexTransport.edited.java  Fri Sep 26 09:11:41 2014
@@ -19,17 +19,31 @@
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Thread safe Transport Filter that serializes calls to and from the 
Transport Stack.
  */
 public class MutexTransport extends TransportFilter {
 
-    private final ReentrantLock writeLock = new ReentrantLock();
+       private static final Logger LOG = 
LoggerFactory.getLogger(MutexTransport.class);
+    private final static ReentrantLock vmWriteLock = new ReentrantLock();
+    private ReentrantLock writeLock;
     private boolean syncOnCommand;
 
     public MutexTransport(Transport next) {
         super(next);
         this.syncOnCommand = false;
+        
+        writeLock = null;
+        if(next != null && next.toString().startsWith("vm://")){
+               writeLock = vmWriteLock;
+               LOG.error("#@mrul# vm transport with mutex: " + next);
+        }else{ 
+               writeLock = new ReentrantLock();
+               LOG.error("#@mrul# non-vm transport with mutex: " + next);
+        }
     }
 
     public MutexTransport(Transport next, boolean syncOnCommand) {
{code}

the deadlock disappears. I noticed that the network bridge create many 
VMTransport of the form vm://brokerName#<number> using basically three 
different thread paths and all these local VMTransports trigger many 
intertwined command exchanges that somehow determine the deadlock:

h1. VMTransports creation PATH 1
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkConnector.createLocalTransport(NetworkConnector.java:154)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:136)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}

h1. VMTransports creation PATH 2
{noformat}
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.network.DemandForwardingBridgeSupport.start(DemandForwardingBridgeSupport.java:184)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:152)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}

h1. VMTransports creation PATH 3
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

h1. VMTransports creation PATH 3, take 2 (cyclic???)
{noformat}
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1338)
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)            
<------------------------------ !!!
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:88)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)             
<------------------------------ !!!
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}

So, as a tentative fix I tried to globally lock VMTransport usage under the 
assumption that VM transport should be negligible in number with respect to 
other transports (tcp, nio, etc.) and thus the static lock in VM trans. should 
not constitute a big performace bottleneck; I added a new constructor in 
_*MutexTransport*_ in this way:
{code}
public class MutexTransport extends TransportFilter {
        // ...
        private final static ReentrantLock staticWriteLock = new 
ReentrantLock();
        private ReentrantLock writeLock = new ReentrantLock();
        // ...
        public MutexTransport(Transport next, boolean syncOnCommand, boolean 
useStaticLock) {
                this(next, syncOnCommand);
                if(useStaticLock)
                        writeLock = staticWriteLock;
        }
{code}
and I modified the 
_*org.apache.activemq.transport.vm.VMTransportServer.configure(Transport)*_ 
implementation in this way:
{code}
public static Transport configure(Transport transport) {
    transport = new MutexTransport(transport, false, true); // use static locks 
for VMTransport
    transport = new ResponseCorrelator(transport);
    return transport;
}
{code}

Apparently this solve the problem (anyway I'm going to perform additional tests 
to double check this). Is it in your opinion a reasonable approach or is it 
just a _quick and dirty_ solution?

> Cross talk over duplex network connection can lead to blocking (alternative 
> take)
> ---------------------------------------------------------------------------------
>
>                 Key: AMQ-5260
>                 URL: https://issues.apache.org/jira/browse/AMQ-5260
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.9.0, 5.10.0
>            Reporter: matteo rulli
>         Attachments: AMQ5260AdvancedTest.java, AMQ_5260.patch, 
> AMQ_5260_2.patch, AMQ_5260_3.patch, deadlock.jpg, debug.jpg
>
>
> Pretty the same description with respect to AMQ-4328. 
> ----
> !deadlock.jpg!
> h2. Stacktraces:
> Stacktrace no.1:
> {noformat}
> Name: ActiveMQ NIO Worker 12
> State: BLOCKED on java.net.URI@1bae2b28 owned by: ActiveMQ Transport: 
> tcp:///10.0.1.219:61616@57789
> Total blocked: 2  Total waited: 67
> Stack trace: 
>  
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
> org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
> org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.2
> {noformat}
> Name: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@3cdbfa3e owned by: 
> ActiveMQ BrokerService[master2] Task-4
> Total blocked: 19  Total waited: 3
> Stack trace: 
>  sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown
>  Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown 
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:818)
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:151)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
>    - locked java.util.concurrent.atomic.AtomicBoolean@689389da
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:856)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1128)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
>    - locked java.net.URI@1bae2b28
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:196)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.3
> {noformat}
> Name: ActiveMQ BrokerService[master2] Task-4
> State: WAITING on 
> java.util.concurrent.locks.ReentrantLock$NonfairSync@717bb9c owned by: 
> ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> Total blocked: 19  Total waited: 117
> Stack trace: 
>  sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown
>  Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown 
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:921)
> org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:173)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
>    - locked java.util.concurrent.atomic.AtomicBoolean@453bb109
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:904)
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:129)
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to