Gary Tully created AMQ-6640: ------------------------------- Summary: network of brokers, duplex connector, network consumers not registered on destination - hung bridge Key: AMQ-6640 URL: https://issues.apache.org/jira/browse/AMQ-6640 Project: ActiveMQ Issue Type: Bug Components: networkbridge Affects Versions: 5.14.0 Reporter: Gary Tully Assignee: Gary Tully Fix For: 5.15.0
Occasional hang in a network bridge under heavy load with large pending backlog across multiple destinations. Hung network bridge threads blocked through crosstalk between consumer demand and message forwarding traffic leading to blocked writes. Stack traces of the form, Broker1 {code} ~~~ "ActiveMQ Transport: tcp:///XXXX:51460@61616" daemon prio=10 tid=0x00007efbbc011000 nid=0x42be waiting on condition [0x00007efbb1a58000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000c61a2118> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:48) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:87) at org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:914) at org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1187) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:772) - locked <0x00000000c25ed058> (a java.net.URI) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:623) at org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:225) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) at java.lang.Thread.run(Thread.java:724) ~~~ ~~~ Dump 1 : "ActiveMQ VMTransport: vm://broker1#8-2" daemon prio=10 tid=0x00007efbbc11b800 nid=0x42c4 runnable [0x00007efbb1554000] java.lang.Thread.State: RUNNABLE at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:334) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:316) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:994) at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:207) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) ~~~ {code} While the broker2 on the other side of the duplex network has threads in the following state: {code} ~~~~ Thread 1: "ActiveMQ Transport: tcp:XXXX:61616@51460" prio=10 tid=0x00007f8a6055b000 nid=0x42bd waiting on condition [0x00007f8a40f8a000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000c8d5e000> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:214) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:290) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1457) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971) at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:192) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:162) at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:154) - locked <0x00000000c8db5498> (a java.util.concurrent.atomic.AtomicBoolean) at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:131) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86) at org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:914) at org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1187) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:772) - locked <0x00000000c0715280> (a java.net.URI) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:623) at org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:225) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.failover.FailoverTransport.processCommand(FailoverTransport.java:207) at org.apache.activemq.transport.failover.FailoverTransport.access$1100(FailoverTransport.java:72) at org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:216) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:125) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196) at java.lang.Thread.run(Thread.java:724) ~~~~ Thread 2: ~~~~ "ActiveMQ BrokerService[broker_2] Task-32137" daemon prio=10 tid=0x00007f8a6067d800 nid=0x244f runnable [0x00007f8a4bbb9000] java.lang.Thread.State: RUNNABLE at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) at java.io.DataOutputStream.flush(DataOutputStream.java:123) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:176) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:334) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:316) at org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:85) at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:116) at org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:667) - locked <0x00000000c8d777e8> (a java.lang.Object) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:994) at org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:207) at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) at org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:162) at org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:154) - locked <0x00000000c8db54b8> (a java.util.concurrent.atomic.AtomicBoolean) at org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:131) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1457) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:1017) at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:724) ~~~~ {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)