[
https://issues.apache.org/jira/browse/AMQ-4138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13485231#comment-13485231
]
Stirling Chow commented on AMQ-4138:
------------------------------------
This is the output I get when I run the test case with the delayed
TopicSubscription:
2012-10-26 14:53:24,868 [main ] - INFO AutoFailTestSupport
- Starting auto fail thread...
2012-10-26 14:53:24,869 [main ] - INFO AutoFailTestSupport
- Starting auto fail thread...
2012-10-26 14:53:25,264 [main ] - INFO BrokerService
- Using Persistence Adapter: MemoryPersistenceAdapter
2012-10-26 14:53:25,285 [JMX connector ] - INFO ManagementContext
- JMX consoles can connect to
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
2012-10-26 14:53:25,332 [main ] - INFO BrokerService
- Apache ActiveMQ ${project.version} (broker2,
ID:schow-PC-55149-1351288404982-2:1) is starting
2012-10-26 14:53:25,341 [main ] - INFO TransportConnector
- Connector vm://broker2 Started
2012-10-26 14:53:25,343 [main ] - INFO BrokerService
- Apache ActiveMQ ${project.version} (broker2,
ID:schow-PC-55149-1351288404982-2:1) started
2012-10-26 14:53:25,343 [main ] - INFO BrokerService
- For help or more information please see: http://activemq.apache.org
2012-10-26 14:53:25,361 [main ] - INFO BrokerService
- Using Persistence Adapter: MemoryPersistenceAdapter
2012-10-26 14:53:25,362 [main ] - INFO BrokerService
- Apache ActiveMQ ${project.version} (broker1,
ID:schow-PC-55149-1351288404982-2:2) is starting
2012-10-26 14:53:25,363 [main ] - INFO TransportConnector
- Connector vm://broker1 Started
2012-10-26 14:53:25,363 [main ] - INFO BrokerService
- Apache ActiveMQ ${project.version} (broker1,
ID:schow-PC-55149-1351288404982-2:2) started
2012-10-26 14:53:25,363 [main ] - INFO BrokerService
- For help or more information please see: http://activemq.apache.org
2012-10-26 14:53:25,365 [JMX connector ] - WARN ManagementContext
- Failed to start jmx connector: Cannot bind to URL
[rmi://localhost:1099/jmxrmi]: javax.naming.NameAlreadyBoundException: jmxrmi
[Root exception is java.rmi.AlreadyBoundException: jmxrmi]. Will restart
management to re-create jmx connector, trying to remedy this issue.
2012-10-26 14:53:30,381 [main ] - INFO NetworkConnector
- Network Connector
DiscoveryNetworkConnector:to-broker2:BrokerService[broker1] Started
2012-10-26 14:53:30,382 [main ] - INFO NetworkConnector
- Network Connector
DiscoveryNetworkConnector:to-broker1:BrokerService[broker2] Started
2012-10-26 14:53:30,382 [ActiveMQ Task-1] - INFO DiscoveryNetworkConnector
- Establishing network connection from vm://broker1?network=true to
vm://broker2
2012-10-26 14:53:30,382 [ActiveMQ Task-1] - INFO DiscoveryNetworkConnector
- Establishing network connection from vm://broker2?network=true to
vm://broker1
2012-10-26 14:53:30,463 [=vm://broker2#4] - INFO DemandForwardingBridgeSupport
- Network connection between vm://broker2#4 and vm://broker1#1(broker1) has
been established.
2012-10-26 14:53:30,464 [=vm://broker1#6] - INFO DemandForwardingBridgeSupport
- Network connection between vm://broker1#6 and vm://broker2#0(broker2) has
been established.
2012-10-26 14:53:31,384 [main ] - INFO JmsMultipleBrokersTestSupport
- found bridge[org.apache.activemq.network.DemandForwardingBridge@4155b] to
broker1 on broker :broker2
2012-10-26 14:53:31,384 [main ] - INFO JmsMultipleBrokersTestSupport
- found bridge[org.apache.activemq.network.DemandForwardingBridge@1615ae] to
broker2 on broker :broker1
2012-10-26 14:53:32,017 [m://broker2#2-2] - INFO TopicSubscription
- Acknowledge subscription to ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:32,024 [m://broker1#3-2] - INFO TopicSubscription
- Acknowledge subscription to ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,018 [m://broker2#2-2] - INFO TopicSubscription
- Acknowledge subscription to ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,020 [m://broker1#7-1] - INFO TopicSubscription
- TopicSubscription: consumer=ID:schow-PC-55149-1351288404982-5:1:1:1,
destinations=1, dispatched=1, delivered=0, matched=0, discarded=0: Pending
message cursor
[org.apache.activemq.broker.region.cursors.VMPendingMessageCursor@1071445] is
full, temp usage (0%) or memory usage (111588%) limit reached, blocking message
add() pending the release of resources.
2012-10-26 14:53:33,020 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,040 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,060 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,080 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,100 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,120 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,140 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,160 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,180 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,200 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,220 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,240 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,260 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,280 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,300 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,320 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,340 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,360 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,380 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,400 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,420 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,440 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,460 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,480 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,500 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,520 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,540 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,560 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,580 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,600 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
2012-10-26 14:53:33,620 [m://broker1#7-1] - WARN TopicSubscription
- Waiting for space to add to subscription
ActiveMQ.Advisory.Consumer.Queue.test.queue
The "Waiting for space..." messages are from the thread deadlocking the bridge.
> Network bridges can deadlock when memory limit exceeded
> -------------------------------------------------------
>
> Key: AMQ-4138
> URL: https://issues.apache.org/jira/browse/AMQ-4138
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.7.0
> Reporter: Stirling Chow
> Priority: Critical
> Attachments: BridgeMemoryLimitDeadlockTest.txt, TopicSubscription.java
>
>
> Symptom
> =======
> We have a network of 4 brokers that share messages using distributed queues
> via demand forwarding bridges. We were validating the behaviour of the
> system when memory usage approached and exceeded the out-of-box memory limit
> (64MB).
> We discovered that with great frequency the bridges would appear to stop
> functioning --- no messages were being produced or consumed. We've
> experienced similar behaviour when producer flow control is activated, but in
> our tests, we'd turned producer flow control off (both to avoid bridges
> stalling due to producer flow control and so that we could produce enough
> messages to exceed the memory limit).
> The system would never recover from this deadlock.
> Cause
> =====
> We found a number of threads looping indefinitely with the following stack:
> {code}
> Daemon Thread [ActiveMQ VMTransport: vm://broker1#7-1] (Suspended)
> owns: Topic (id=109)
> waiting for: Object (id=110)
> Object.wait(long) line: not available [native method]
> TopicSubscription.add(MessageReference) line: 135
> SimpleDispatchPolicy.dispatch(MessageReference,
> MessageEvaluationContext, List<Subscription>) line: 48
> Topic.dispatch(ConnectionContext, Message) line: 680
> Topic.doMessageSend(ProducerBrokerExchange, Message) line: 491
> Topic.send(ProducerBrokerExchange, Message) line: 427
> ManagedTopicRegion(AbstractRegion).send(ProducerBrokerExchange,
> Message) line: 407
> ManagedRegionBroker(RegionBroker).send(ProducerBrokerExchange, Message)
> line: 503
> ManagedRegionBroker.send(ProducerBrokerExchange, Message) line: 311
> AdvisoryBroker.fireAdvisory(ConnectionContext, ActiveMQTopic, Command,
> ConsumerId, ActiveMQMessage) line: 551
> AdvisoryBroker.fireConsumerAdvisory(ConnectionContext,
> ActiveMQDestination, ActiveMQTopic, Command, ConsumerId) line: 500
> AdvisoryBroker.fireConsumerAdvisory(ConnectionContext,
> ActiveMQDestination, ActiveMQTopic, Command) line: 486
> AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 98
> CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 89
> TransactionBroker(BrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 89
> BrokerService$5(MutableBrokerFilter).addConsumer(ConnectionContext,
> ConsumerInfo) line: 95
>
> ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo)
> line: 562
> ConsumerInfo.visit(CommandVisitor) line: 332
> ManagedTransportConnection(TransportConnection).service(Command) line:
> 294
> TransportConnection$1.onCommand(Object) line: 152
> ResponseCorrelator.onCommand(Object) line: 116
> MutexTransport.onCommand(Object) line: 50
> VMTransport.iterate() line: 241
> PooledTaskRunner.runTask() line: 129
> PooledTaskRunner$1.run() line: 47
> ThreadPoolExecutor$Worker.runTask(Runnable) line: 886
> ThreadPoolExecutor$Worker.run() line: 908
> Thread.run() line: 662
> {code}
> The spinning threads were associated with the VMTransport TaskRunner from
> {{DemandForwardingBridgeSupport.localBroker}}. Since the TaskRunner was
> essentially blocked processing one message, all other messages being
> forwarded from the remote end of the bridge (e.g., ACKs) were getting queued,
> but not processed, which made the bridge appear to be stalled.
> The message being processed by the spinning thread was a ConsumerInfo
> representing a demand subscription from the remote broker, and was being
> forwarded to a subscription on the associated consumer advisory topic. The
> subscription was waiting for memory to become available in the
> {{matchedListMutex}} loop:
> {code:title=TopicSubscription.java}
> public void add(MessageReference node) throws Exception {
> if (isDuplicate(node)) {
> return;
> }
> enqueueCounter.incrementAndGet();
> if (!isFull() && matched.isEmpty() && !isSlave()) {
> // if maximumPendingMessages is set we will only discard messages
> which
> // have not been dispatched (i.e. we allow the prefetch buffer to
> be filled)
> dispatch(node);
> setSlowConsumer(false);
> } else {
> if (info.getPrefetchSize() > 1 && matched.size() >
> info.getPrefetchSize()) {
> // Slow consumers should log and set their state as such.
> if (!isSlowConsumer()) {
> LOG.warn(toString() + ": has twice its prefetch limit
> pending, without an ack; it appears to be slow");
> setSlowConsumer(true);
> for (Destination dest: destinations) {
> dest.slowConsumer(getContext(), this);
> }
> }
> }
> if (maximumPendingMessages != 0) {
> boolean warnedAboutWait = false;
> while (active) {
> synchronized (matchedListMutex) {
> while (matched.isFull()) {
> if (getContext().getStopping().get()) {
> LOG.warn(toString() + ": stopped waiting for
> space in pendingMessage cursor for: "
> + node.getMessageId());
> enqueueCounter.decrementAndGet();
> return;
> }
> if (!warnedAboutWait) {
> LOG.info(toString() + ": Pending message
> cursor [" + matched
> + "] is full, temp usage ("
> +
> +matched.getSystemUsage().getTempUsage().getPercentUsage()
> + "%) or memory usage ("
> +
> matched.getSystemUsage().getMemoryUsage().getPercentUsage()
> + "%) limit reached, blocking message
> add() pending the release of resources.");
> warnedAboutWait = true;
> }
> matchedListMutex.wait(20);
> }
> // Temporary storage could be full - so just try to
> add the message
> // see
> https://issues.apache.org/activemq/browse/AMQ-2475
> if (matched.tryAddMessageLast(node, 10)) {
> break;
> }
> }
> }
> {code}
> The {{matched.isFull()}} check that was keeping the thread in the loop is
> only cleared once the memory usage is reduced:
> {code:title=AbstractPendingMessageCursor.java}
> public boolean isFull() {
> return systemUsage != null ? systemUsage.getMemoryUsage().isFull() :
> false;
> }
> {code}
> Since the looping thread is essentially stalling the VMTransport on the local
> side of the bridge, no dispatch ACKs can be processed for messages sent from
> the local bridge to the remote broker. If all consumers are on the remote
> broker and ACKs are not being processed, then memory usage on the local
> broker is never reduced, thus creating a deadlock.
> In order for {{TopicSubscription.add(...)}} to enter the (essentially
> infinite) loop, the following conditions must *not* be met:
> {code:title=TopicSubscription.java}
> public void add(MessageReference node) throws Exception {
> if (isDuplicate(node)) {
> return;
> }
> enqueueCounter.incrementAndGet();
> if (!isFull() && matched.isEmpty() && !isSlave()) {
> {code}
> {{isFull()}} is true if the number of unacknowledged messages dispatched to
> the subscription is greater than the subscription's prefetch size. So for
> the deadlock to occur, two things must happen:
> # There must be multiple consumers being added to the queue, thus generating
> multiple adds to the consumer advisory topic subscription
> # There must be a delay in processing ACKs to the topic subscription so that
> it becomes full
> For reasons to do with distribution management, our network connectors have a
> prefetch size of 1, so under load, the deadlock occurs easily.
> I've attached a test case that clearly demonstrates the deadlock. The test
> case is simple:
> # Two brokers (broker1 and broker2) are bidirectionally bridged with a
> network prefetch of 1
> # broker1 (with producer flow control disabled), produces enough messages to
> a test queue so that the memory limit is exceeded
> # broker2 starts two consumers of the test queue, and the broker1->broker2
> bridge forwards two demand subscriptions to broker1
> # broker1 processes the demand subscriptions and starts dispatching messages
> to broker2
> # Since broker2 has a bridge back to broker1, broker1's processing of the
> demand subscriptions generates two consumer advisory messages that are sent
> over the consumer advisory topic to broker2 (of course, broker2 ignores them
> since they are represent its consumers)
> # As messages are dispatched to broker2's instance of the test queue, ACKs
> are forwarded by the broker1->broker2 bridge and processed by broker1,
> reducing the memory usage
> # Eventually all messages are consumed by broker2 and broker1's memory usage
> is 0.
> This test case generally passes since the deadlock requires a specific race
> condition: namely, the first consumer advisory message needs to be "in
> flight" when the second consumer advisory message is sent. Since the network
> prefetch is 1, when the second advisory message is processed, the topic
> subscription is "full", and the thread sending the advisory will wait for
> matched.isFull() to be false.
> In order to increase the chancethe first consumer advisory message is "in
> flight", simply add a small sleep to TopicSubscription's acknowledge method:
> {code:title=TopicSubscription.java}
> public synchronized void acknowledge(final ConnectionContext context, final
> MessageAck ack) throws Exception {
> LOG.info("Acknowledge subscription to " +
> ack.getDestination().getPhysicalName());
> Thread.sleep(1000);
> {code}
> The sleep increases the window and pretty much guarantees that the test case
> will fail (i.e., messages remain in broker1's test queue since the bridge is
> stalled).
> Even with the sleep in place, if the number of consumers on broker2 is
> reduced to 1, the test case will pass. Again, this is because at least two
> consumer advisory messages are needed to fill the subscription prefetch.
> The use of prefetch=1 for the network connector is simply so that the unit
> test can demonstrate the deadlock with 2 consumers. The deadlock can occur
> with any prefetch if the number of consumers is at least prefetch + 1.
> This is a serious issue for us as our deployment involves many networked
> brokers and a lot of dynamic consumer creation, which produces frequent
> consumer advisory messages. When a network bridge locks up, it cripples our
> system.
> Solution
> ========
> The essential problem is that sending a consumer advisory message to a topic
> can take an indefinite amount of time (i.e., waits indefinitely until memory
> is available), and during this time, no other messages sent to the
> VMTransport are processed.
> The principle tenant of the TaskRunners used in AMQ is that they implement
> "cooperative multi-tasking", this means that task execution has to be
> reasonably quick so that other tasks can be run. A task that runs
> indefinitely breaks the utility of the model.
> While it would be possible to use
> {{TopicSubscription.maximumPendingMessages}} to prevent the
> {{matchedListMutex}} loop from being entered, this would result in the
> consumer advisory message being discarded thus the loss of any demand
> subscriptions that would have resulted --- so it's not an option.
> Unfortunately, without understanding the system further, I can't offer a
> specific solution.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira