[ https://issues.apache.org/jira/browse/AMQ-6640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945216#comment-15945216 ]
ASF subversion and git services commented on AMQ-6640: ------------------------------------------------------ Commit 8e00c6c2bc30e38cee585d9de97b511ed664951b in activemq's branch refs/heads/master from [~gtully] [ https://git-wip-us.apache.org/repos/asf?p=activemq.git;h=8e00c6c ] [AMQ-6640] fix duplicate suppression sync request on responder end of duplex network connector only b/c that has the async local transport. Additional test. Ensure broker sync is conditional on the need for duplicate suppression which should only be necessary in ring topologies when properly configured > network of brokers, duplex connector, network consumers not registered on > destination - hung bridge > --------------------------------------------------------------------------------------------------- > > Key: AMQ-6640 > URL: https://issues.apache.org/jira/browse/AMQ-6640 > Project: ActiveMQ > Issue Type: Bug > Components: networkbridge > Affects Versions: 5.14.0 > Reporter: Gary Tully > Assignee: Gary Tully > Fix For: 5.15.0 > > > Occasional hang in a network bridge under heavy load with large pending > backlog across multiple destinations. > Hung network bridge threads blocked through crosstalk between consumer demand > and message forwarding traffic leading to blocked writes. > Stack traces of the form, Broker1 > {code} > ~~~ > "ActiveMQ Transport: tcp:///XXXX:51460@61616" daemon prio=10 > tid=0x00007efbbc011000 nid=0x42be waiting on condition [0x00007efbb1a58000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c61a2118> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > at > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) > at > org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:48) > at > org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:914) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1187) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:772) > - locked <0x00000000c25ed058> (a java.net.URI) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:623) > at > org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:225) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) > at > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) > at java.lang.Thread.run(Thread.java:724) > ~~~ > ~~~ > Dump 1 : "ActiveMQ VMTransport: vm://broker1#8-2" daemon prio=10 > tid=0x00007efbbc11b800 nid=0x42c4 runnable [0x00007efbb1554000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) > at java.io.DataOutputStream.flush(DataOutputStream.java:123) > at > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176) > at > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:334) > at > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:316) > at > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85) > at > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > at > org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:994) > at > org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:207) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > at > org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271) > at > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) > at > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > ~~~ > {code} > While the broker2 on the other side of the duplex network has threads in the > following state: > {code} > ~~~~ > Thread 1: > "ActiveMQ Transport: tcp:XXXX:61616@51460" prio=10 tid=0x00007f8a6055b000 > nid=0x42bd waiting on condition [0x00007f8a40f8a000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x00000000c8d5e000> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) > at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66) > at > org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) > at > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1457) > at > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971) > at > org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:192) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > at > org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:162) > at > org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:154) > - locked <0x00000000c8db5498> (a > java.util.concurrent.atomic.AtomicBoolean) > at > org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:131) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > at > org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) > at > org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:914) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1187) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:772) > - locked <0x00000000c0715280> (a java.net.URI) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:623) > at > org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:225) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > at > org.apache.activemq.transport.failover.FailoverTransport.processCommand(FailoverTransport.java:207) > at > org.apache.activemq.transport.failover.FailoverTransport.access$1100(FailoverTransport.java:72) > at > org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:216) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) > at > org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) > at java.lang.Thread.run(Thread.java:724) > ~~~~ > Thread 2: > ~~~~ > "ActiveMQ BrokerService[broker_2] Task-32137" daemon prio=10 > tid=0x00007f8a6067d800 nid=0x244f runnable [0x00007f8a4bbb9000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketOutputStream.socketWrite0(Native Method) > at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) > at java.net.SocketOutputStream.write(SocketOutputStream.java:153) > at > org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) > at java.io.DataOutputStream.flush(DataOutputStream.java:123) > at > org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176) > at > org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:334) > at > org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:316) > at > org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85) > at > org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) > at > org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:667) > - locked <0x00000000c8d777e8> (a java.lang.Object) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > at > org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) > at > org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:994) > at > org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:207) > at > org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) > at > org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) > at > org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:162) > at > org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:154) > - locked <0x00000000c8db54b8> (a > java.util.concurrent.atomic.AtomicBoolean) > at > org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:131) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) > at > org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) > at > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1457) > at > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971) > at > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1017) > at > org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) > at > org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > ~~~~ > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)