[ 
https://issues.apache.org/jira/browse/AMQ-3185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timothy Bish reassigned AMQ-3185:
---------------------------------

    Assignee: Timothy Bish

> Closing a VMTransport can cause all other VMTransports to be prematurely 
> closed
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-3185
>                 URL: https://issues.apache.org/jira/browse/AMQ-3185
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Assignee: Timothy Bish
>            Priority: Critical
>         Attachments: VMTransportClosureTest.java, patch.diff
>
>
> Symptom
> =======
> We have eight servers running AMQ 5.3.1 connected in a network-of-brokers 
> over HTTP.  Each broker maintains local connections to internal consumers 
> using the VM transport.  We were noticing that about once every day, all the 
> local VM connections on a broker and the outbound network bridge connections 
> would fail with the following error:
> 2010-12-10 04:29:11,663 [processBroker-process-pool-thread-4] ERROR - The 
> worker encountered an exception and will pause for 5 seconds before 
> continuing.
> javax.jms.JMSException: Peer (vm://broker-mbus-200005#1052452) disposed.
>       at 
> org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
>       at 
> org.apache.activemq.ActiveMQMessageConsumer.dequeue(ActiveMQMessageConsumer.java:453)
>       at 
> org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:570)
>       at 
> com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:33)
>       at 
> com.invoqsystems.foundation.component.communication.jms.source.JMSMessageSource.getMessage(JMSMessageSource.java:95)
>       at 
> com.invoqsystems.foundation.component.communication.jms.worker.MessageProcessingWorker.getTask(MessageProcessingWorker.java:9)
>       at 
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.iterate(AbstractWorker.java:14)
>       at 
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.runUntilStop(AbstractWorker.java:17)
>       at 
> com.invoqsystems.foundation.component.communication.jms.worker.AbstractWorker.run(AbstractWorker.java:41)
>       at java.lang.Thread.run(Unknown Source)
> Caused by: org.apache.activemq.transport.TransportDisposedIOException: Peer 
> (vm://broker-mbus-200005#1052452) disposed.
>       at 
> org.apache.activemq.transport.vm.VMTransport.stop(VMTransport.java:70)
>       at 
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
>       at 
> org.apache.activemq.transport.TransportFilter.stop(TransportFilter.java:64)
>       at 
> org.apache.activemq.transport.ResponseCorrelator.stop(ResponseCorrelator.java:132)
>       at 
> org.apache.activemq.broker.TransportConnection.doStop(TransportConnection.java:956)
>       at 
> org.apache.activemq.broker.TransportConnection$3.run(TransportConnection.java:918)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown 
> Source)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>       ... 1 more
> This was quite unexpected since we create the VM transport connection at 
> broker startup and maintain it (the single connection) throughout the life of 
> the application.  The connection is only closed when the process terminates 
> and the broker is stopped.
> Since we wrote our consumers against the JMS spec, we handle periodic 
> connection failures by creating a new connection.  This works fine sometimes; 
> however, because of AMQ-3127, the sudden reregistration of our consumers 
> simultaneously occurring with bridge re-creation causes frequent deadlock 
> that can only be resolved by restarting the systems.
> Cause
> =====
> We were unable to reliably recreate the failure, so it became clear that a 
> timing issue was involved.  Eventually, we determined the cause of the VM 
> transport failure was due to the following code in VMTransportServer:
> {code:title=VMTransportServer.java}
>     public VMTransport connect() throws IOException {
>         TransportAcceptListener al;
>         synchronized (this) {
>             if (disposed) {
>                 throw new IOException("Server has been disposed.");
>             }
>             al = acceptListener;
>         }
>         if (al == null) {
>             throw new IOException("Server TransportAcceptListener is null.");
>         }
>         connectionCount.incrementAndGet();
>         VMTransport client = new VMTransport(location) {
>             public void stop() throws Exception {
>                 if (disposed) {
>                     return;
>                 }
>                 super.stop();
>                 if (connectionCount.decrementAndGet() == 0 && 
> disposeOnDisconnect) {
>                     VMTransportServer.this.stop();
>                 }
>             };
>         };
>         VMTransport server = new VMTransport(location);
>         client.setPeer(server);
>         server.setPeer(client);
>         al.onAccept(configure(server));
>         return client;
>     }
> {code}
> At issue is the override VMTransport.stop() method:
> {code:title=VMTransportServer.java}
>             public void stop() throws Exception {
>                 if (disposed) {
>                     return;
>                 }
>                 super.stop();
>                 if (connectionCount.decrementAndGet() == 0 && 
> disposeOnDisconnect) {
>                     VMTransportServer.this.stop();
>                 }
>             };
> {code}
> Note that VMTransport.disposed is used to protect against multiple calls and 
> subsequently multiple decrements of connectionCount.  However, in the 
> implementation of super.stop(), the disposed flag is only set after the peer 
> transport is informed of the stop:
> {code:title=VMTransport.java}
>     public void stop() throws Exception {
>         stopping.set(true);
>         
>         // If stop() is called while being start()ed.. then we can't stop 
> until we return to the start() method.
>         if( enqueueValve.isOn() ) {
>               
>             // let the peer know that we are disconnecting..
>             try {
> >>>                peer.transportListener.onCommand(new ShutdownInfo());
>             } catch (Exception ignore) {
>             }
>               
>               
>             TaskRunner tr = null;
>             try {
>                 enqueueValve.turnOff();
>                 if (!disposed) {
>                     started = false;
> >>>                    disposed = true;
>                     if (taskRunner != null) {
>                         tr = taskRunner;
>                         taskRunner = null;
>                     }
>                 }
>             } finally {
>                 stopping.set(false);
>                 enqueueValve.turnOn();
>             }
>             if (tr != null) {
>                 tr.shutdown(1000);
>             }
>             
>         }
>         
>     }
> {code}
> TransportConnection implements of peer.transportListener.onCommand(new 
> ShutdownInfo()) by launching a asynchronous task that eventually calls back 
> to the same transport that initiated the closure.  If the timing is right, 
> VMTransportServer's VMTransport.stop() method is called a second time before 
> the disposed flag is set to true.  As a result, the connectionCount is 
> decremented *TWICE* instead of just once.
> In other words, the diposed check and decrement as implemented by 
> VMTransport's anonymous VMTransport subclass are not thread-safe.  If 
> VMTransportServer miscounts the connections, it can end up stopping itself 
> while there are still live connections.  The result is that the live 
> connections see their peer (the server part of the VMTransport) unexpectedly 
> closed.
> Solution
> ========
> The attached patch prevents multiple decrements of the connectionCount by 
> preventing reentrant calls to VMTransportServer's VMTransport stop() method.
> A patch is included which demonstrates the problem with the existing AMQ 
> trunk code.

-- 
This message is automatically generated by JIRA.
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to