All VM transports on a broker can be prematurely closed.
--------------------------------------------------------

                 Key: AMQ-3185
                 URL: https://issues.apache.org/jira/browse/AMQ-3185
             Project: ActiveMQ
          Issue Type: Bug
          Components: Transport
    Affects Versions: 5.4.2
            Reporter: Stirling Chow
            Priority: Critical
         Attachments: patch.diff

Symptom
=======
We have eight servers running AMQ 5.3.1 connected in a network-of-brokers over 
HTTP.  Each broker maintains local connections to internal consumers using the 
VM transport.  We were noticing that about once every day, all the local VM 
connections on a broker would fail with the following error:

2010-12-10 04:29:11,663 [processBroker-process-pool-thread-4] ERROR - The 
worker encountered an exception and will pause for 5 seconds before continuing.
javax.jms.JMSException: Peer (vm://broker-mbus-200005#1052452) disposed.
        at 
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
        at 
org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:453)
        at 
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:570)
        at 
com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:33)
        at 
com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:95)
        at 
com.invoqsystems.foundation.component.communication.jms.worker.MessageProcessingWorker.getTask(MessageProcessingWorker.java:9)
        at 
com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.iterate(AbstractWorker.java:14)
        at 
com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.runUntilStop(AbstractWorker.java:17)
        at 
com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.run(AbstractWorker.java:41)
        at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.activemq.transport.TransportDisposedIOException: Peer 
(vm://broker-mbus-200005#1052452) disposed.
        at 
org.apache.activemq.transport.vm.VMTransport.stop(VMTransport.java:70)
        at 
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at 
org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
        at 
org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:132)
        at 
org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:956)
        at 
org.apache.activemq.broker.TransportConnection$3.run(TransportConnection.java:918)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown 
Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        ... 1 more

This was quite unexpected since we create the VM transport connection at broker 
startup and maintain it (the single connection) throughout the life of the 
application.  The connection is only closed when the process terminates and the 
broker is stopped.

Since we wrote our consumers against the JMS spec, we handle periodic 
connection failures by creating a new connection.  This is works fine 
sometimes; however, because of AMQ-3127, the sudden reregistration of our 
consumers simultaneously occurring with bridge creation causes frequent 
deadlock that can only be resolved by restarting the systems.

Cause
=====
We were unable to reliably recreate the failure, so it became clear that a 
timing issue was involved.  Eventually, we determined the cause of the VM 
transport failure was due to the following code in VMTransportServer:

    public VMTransport connect() throws IOException {
        TransportAcceptListener al;
        synchronized (this) {
            if (disposed) {
                throw new IOException("Server has been disposed.");
            }
            al = acceptListener;
        }
        if (al == null) {
            throw new IOException("Server TransportAcceptListener is null.");
        }

        connectionCount.incrementAndGet();
        VMTransport client = new VMTransport(location) {
            public void stop() throws Exception {
                if (disposed) {
                    return;
                }
                super.stop();
                if (connectionCount.decrementAndGet() == 0 && 
disposeOnDisconnect) {
                    VMTransportServer.this.stop();
                }
            };
        };

        VMTransport server = new VMTransport(location);
        client.setPeer(server);
        server.setPeer(client);
        al.onAccept(configure(server));
        return client;
    }

At issue is the override VMTransport.stop() method:

            public void stop() throws Exception {
                if (disposed) {
                    return;
                }
                super.stop();
                if (connectionCount.decrementAndGet() == 0 && 
disposeOnDisconnect) {
                    VMTransportServer.this.stop();
                }
            };

Note that VMTransport.disposed is used to protect against multiple calls and 
subsequently multiple decrements of connectionCount.  However, in the 
implementation of super.stop(), the disposed flag is only set after the peer 
transport is informed of the stop:

    public void stop() throws Exception {
        stopping.set(true);
        
        // If stop() is called while being start()ed.. then we can't stop until 
we return to the start() method.
        if( enqueueValve.isOn() ) {
                
            // let the peer know that we are disconnecting..
            try {
>>>                peer.transportListener.onCommand(new ShutdownInfo());
            } catch (Exception ignore) {
            }
                
                
            TaskRunner tr = null;
            try {
                enqueueValve.turnOff();
                if (!disposed) {
                    started = false;
>>>                    disposed = true;
                    if (taskRunner != null) {
                        tr = taskRunner;
                        taskRunner = null;
                    }
                }
            } finally {
                stopping.set(false);
                enqueueValve.turnOn();
            }
            if (tr != null) {
                tr.shutdown(1000);
            }
            

        }
        
    }

TransportConnection implements of peer.transportListener.onCommand(new 
ShutdownInfo()) by launching a asynchronous task that eventually calls back to 
the same transport that initiated the closure.  If the timing is right, 
VMTransportServer's VMTransport.stop() method is called a second time before 
the disposed flag is set to true.  As a result, the connectionCount is 
decremented *TWICE* instead of just once.

In other words, the diposed check and decrement as implemented by VMTransport's 
anonymous VMTransport subclass are not thread-safe.  If VMTransportServer 
miscounts the connections, it can end up stopping itself while there are still 
live connections.  The result is that the live connections see their peer (the 
server part of the VMTransport) unexpectedly closed.

Solution
========
The attached patch prevents multiple decrements of the connectionCount by 
preventing reentrant calls to VMTransportServer's VMTransport stop() method.

A patch is included which demonstrates the problem with the existing AMQ trunk 
code.

-- 
This message is automatically generated by JIRA.
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to