[
https://issues.apache.org/jira/browse/AMQ-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timothy Bish reassigned AMQ-3185:
---------------------------------
Assignee: Timothy Bish
> Closing a VMTransport can cause all other VMTransports to be prematurely
> closed
> -------------------------------------------------------------------------------
>
> Key: AMQ-3185
> URL: https://issues.apache.org/jira/browse/AMQ-3185
> Project: ActiveMQ
> Issue Type: Bug
> Components: Transport
> Affects Versions: 5.4.2
> Reporter: Stirling Chow
> Assignee: Timothy Bish
> Priority: Critical
> Attachments: VMTransportClosureTest.java, patch.diff
>
>
> Symptom
> =======
> We have eight servers running AMQ 5.3.1 connected in a network-of-brokers
> over HTTP. Each broker maintains local connections to internal consumers
> using the VM transport. We were noticing that about once every day, all the
> local VM connections on a broker and the outbound network bridge connections
> would fail with the following error:
> 2010-12-10 04:29:11,663 [processBroker-process-pool-thread-4] ERROR - The
> worker encountered an exception and will pause for 5 seconds before
> continuing.
> javax.jms.JMSException: Peer (vm://broker-mbus-200005#1052452) disposed.
> at
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
> at
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:453)
> at
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:570)
> at
> com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:33)
> at
> com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:95)
> at
> com.invoqsystems.foundation.component.communication.jms.worker.MessageProcessingWorker.getTask(MessageProcessingWorker.java:9)
> at
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.iterate(AbstractWorker.java:14)
> at
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.runUntilStop(AbstractWorker.java:17)
> at
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.run(AbstractWorker.java:41)
> at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.activemq.transport.TransportDisposedIOException: Peer
> (vm://broker-mbus-200005#1052452) disposed.
> at
> org.apache.activemq.transport.vm.VMTransport.stop(VMTransport.java:70)
> at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> at
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
> at
> org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:132)
> at
> org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:956)
> at
> org.apache.activemq.broker.TransportConnection$3.run(TransportConnection.java:918)
> at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
> Source)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> ... 1 more
> This was quite unexpected since we create the VM transport connection at
> broker startup and maintain it (the single connection) throughout the life of
> the application. The connection is only closed when the process terminates
> and the broker is stopped.
> Since we wrote our consumers against the JMS spec, we handle periodic
> connection failures by creating a new connection. This works fine sometimes;
> however, because of AMQ-3127, the sudden reregistration of our consumers
> simultaneously occurring with bridge re-creation causes frequent deadlock
> that can only be resolved by restarting the systems.
> Cause
> =====
> We were unable to reliably recreate the failure, so it became clear that a
> timing issue was involved. Eventually, we determined the cause of the VM
> transport failure was due to the following code in VMTransportServer:
> {code:title=VMTransportServer.java}
> public VMTransport connect() throws IOException {
> TransportAcceptListener al;
> synchronized (this) {
> if (disposed) {
> throw new IOException("Server has been disposed.");
> }
> al = acceptListener;
> }
> if (al == null) {
> throw new IOException("Server TransportAcceptListener is null.");
> }
> connectionCount.incrementAndGet();
> VMTransport client = new VMTransport(location) {
> public void stop() throws Exception {
> if (disposed) {
> return;
> }
> super.stop();
> if (connectionCount.decrementAndGet() == 0 &&
> disposeOnDisconnect) {
> VMTransportServer.this.stop();
> }
> };
> };
> VMTransport server = new VMTransport(location);
> client.setPeer(server);
> server.setPeer(client);
> al.onAccept(configure(server));
> return client;
> }
> {code}
> At issue is the override VMTransport.stop() method:
> {code:title=VMTransportServer.java}
> public void stop() throws Exception {
> if (disposed) {
> return;
> }
> super.stop();
> if (connectionCount.decrementAndGet() == 0 &&
> disposeOnDisconnect) {
> VMTransportServer.this.stop();
> }
> };
> {code}
> Note that VMTransport.disposed is used to protect against multiple calls and
> subsequently multiple decrements of connectionCount. However, in the
> implementation of super.stop(), the disposed flag is only set after the peer
> transport is informed of the stop:
> {code:title=VMTransport.java}
> public void stop() throws Exception {
> stopping.set(true);
>
> // If stop() is called while being start()ed.. then we can't stop
> until we return to the start() method.
> if( enqueueValve.isOn() ) {
>
> // let the peer know that we are disconnecting..
> try {
> >>> peer.transportListener.onCommand(new ShutdownInfo());
> } catch (Exception ignore) {
> }
>
>
> TaskRunner tr = null;
> try {
> enqueueValve.turnOff();
> if (!disposed) {
> started = false;
> >>> disposed = true;
> if (taskRunner != null) {
> tr = taskRunner;
> taskRunner = null;
> }
> }
> } finally {
> stopping.set(false);
> enqueueValve.turnOn();
> }
> if (tr != null) {
> tr.shutdown(1000);
> }
>
> }
>
> }
> {code}
> TransportConnection implements of peer.transportListener.onCommand(new
> ShutdownInfo()) by launching a asynchronous task that eventually calls back
> to the same transport that initiated the closure. If the timing is right,
> VMTransportServer's VMTransport.stop() method is called a second time before
> the disposed flag is set to true. As a result, the connectionCount is
> decremented *TWICE* instead of just once.
> In other words, the diposed check and decrement as implemented by
> VMTransport's anonymous VMTransport subclass are not thread-safe. If
> VMTransportServer miscounts the connections, it can end up stopping itself
> while there are still live connections. The result is that the live
> connections see their peer (the server part of the VMTransport) unexpectedly
> closed.
> Solution
> ========
> The attached patch prevents multiple decrements of the connectionCount by
> preventing reentrant calls to VMTransportServer's VMTransport stop() method.
> A patch is included which demonstrates the problem with the existing AMQ
> trunk code.
--
This message is automatically generated by JIRA.
-
For more information on JIRA, see: http://www.atlassian.com/software/jira