Author: chirino
Date: Thu Oct 18 17:00:45 2007
New Revision: 586185

URL: http://svn.apache.org/viewvc?rev=586185&view=rev
Log:
The VM transport could deadlock between the iterate() method and the oneway() 
method when the async message buffer used by the transport fills up.  Change 
the synchronization logic to make use the a Valve to avoid needing to lock 
mutexes for so long.


Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=586185&r1=586184&r2=586185&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
 Thu Oct 18 17:00:45 2007
@@ -17,14 +17,17 @@
 package org.apache.activemq.transport.vm;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.thread.Valve;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -54,23 +57,17 @@
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
-    private final Object mutex = new Object();
-
+    private final Object lazyInitMutext = new Object();
+    private final Valve enqueueValve = new Valve(true);
+    private final AtomicBoolean stopping = new AtomicBoolean();
+    
     public VMTransport(URI location) {
         this.location = location;
         this.id = NEXT_ID.getAndIncrement();
     }
 
-    public VMTransport getPeer() {
-        synchronized (mutex) {
-            return peer;
-        }
-    }
-
     public void setPeer(VMTransport peer) {
-        synchronized (mutex) {
-            this.peer = peer;
-        }
+        this.peer = peer;
     }
 
     public void oneway(Object command) throws IOException {
@@ -81,78 +78,42 @@
             throw new IOException("Peer not connected.");
         }
 
-        TransportListener tl = null;
-        synchronized (peer.mutex) {
-            if (peer.disposed) {
+        try {
+            // Disable the peer from changing his state while we try to 
enqueue onto him.
+            peer.enqueueValve.increment();
+        
+            if (peer.disposed || peer.stopping.get()) {
                 throw new TransportDisposedIOException("Peer (" + 
peer.toString() + ") disposed.");
             }
+            
             if (peer.started) {
                 if (peer.async) {
-                    peer.enqueue(command);
+                    peer.getMessageQueue().put(command);
                     peer.wakeup();
                 } else {
-                    tl = peer.transportListener;
+                    peer.transportListener.onCommand(command);
                 }
+                enqueueValve.decrement();
             } else {
-                peer.enqueue(command);
+                peer.getMessageQueue().put(command);
             }
-        }
-
-        if (tl != null) {
-            tl.onCommand(command);
-        }
-
-    }
-
-    private void enqueue(Object command) throws IOException {
-        try {
-            getMessageQueue().put(command);
-        } catch (final InterruptedException e) {
+            
+        } catch (InterruptedException e) {
             throw IOExceptionSupport.create(e);
+        } finally {
+            // Allow the peer to change state again...
+            peer.enqueueValve.decrement();
         }
-    }
 
-    public FutureResponse asyncRequest(Object command, ResponseCallback 
responseCallback) throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public Object request(Object command) throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public Object request(Object command, int timeout) throws IOException {
-        throw new AssertionError("Unsupported Method");
-    }
-
-    public TransportListener getTransportListener() {
-        synchronized (mutex) {
-            return transportListener;
-        }
-    }
-
-    public void setTransportListener(TransportListener commandListener) {
-        synchronized (mutex) {
-            this.transportListener = commandListener;
-            wakeup();
-        }
-    }
-
-    private LinkedBlockingQueue<Object> getMessageQueue() {
-        synchronized (mutex) {
-            if (messageQueue == null) {
-                messageQueue = new 
LinkedBlockingQueue<Object>(this.asyncQueueDepth);
-            }
-            return messageQueue;
-        }
     }
 
     public void start() throws Exception {
         if (transportListener == null) {
             throw new IOException("TransportListener not set.");
         }
-
-        synchronized (mutex) {
-            if (messageQueue != null) {
+        try {
+            enqueueValve.turnOff();
+            if (messageQueue != null && !async) {
                 Object command;
                 while ((command = messageQueue.poll()) != null) {
                     transportListener.onCommand(command);
@@ -160,12 +121,16 @@
             }
             started = true;
             wakeup();
+        } finally {
+            enqueueValve.turnOn();
         }
     }
 
     public void stop() throws Exception {
         TaskRunner tr = null;
-        synchronized (mutex) {
+        try {
+            stopping.set(true);
+            enqueueValve.turnOff();
             if (!disposed) {
                 started = false;
                 disposed = true;
@@ -174,11 +139,88 @@
                     taskRunner = null;
                 }
             }
+        } finally {
+            stopping.set(false);
+            enqueueValve.turnOn();
         }
         if (tr != null) {
             tr.shutdown(1000);
         }
     }
+    
+    /**
+     * @see org.apache.activemq.thread.Task#iterate()
+     */
+    public boolean iterate() {
+        
+        final TransportListener tl;
+        try {
+            // Disable changing the state variables while we are running... 
+            enqueueValve.increment();
+            tl = transportListener;
+            if (!started || disposed || tl == null || stopping.get()) {
+                if( stopping.get() ) {
+                    // drain the queue it since folks could be blocked putting 
on to
+                    // it and that would not allow the stop() method for 
finishing up.
+                    getMessageQueue().clear();  
+                }
+                return false;
+            }
+        } catch (InterruptedException e) {
+            return false;
+        } finally {
+            enqueueValve.decrement();
+        }
+
+        LinkedBlockingQueue<Object> mq = getMessageQueue();
+        Command command = (Command)mq.poll();
+        if (command != null) {
+            tl.onCommand(command);
+            return !mq.isEmpty();
+        } else {
+            return false;
+        }
+        
+    }
+
+    public void setTransportListener(TransportListener commandListener) {
+        try {
+            try {
+                enqueueValve.turnOff();
+                this.transportListener = commandListener;
+                wakeup();
+            } finally {
+                enqueueValve.turnOn();
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private LinkedBlockingQueue<Object> getMessageQueue() {
+        synchronized (lazyInitMutext) {
+            if (messageQueue == null) {
+                messageQueue = new 
LinkedBlockingQueue<Object>(this.asyncQueueDepth);
+            }
+            return messageQueue;
+        }
+    }
+
+    public FutureResponse asyncRequest(Object command, ResponseCallback 
responseCallback) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public Object request(Object command, int timeout) throws IOException {
+        throw new AssertionError("Unsupported Method");
+    }
+
+    public TransportListener getTransportListener() {
+        return transportListener;
+    }
 
     public <T> T narrow(Class<T> target) {
         if (target.isAssignableFrom(getClass())) {
@@ -215,28 +257,6 @@
     }
 
     /**
-     * @see org.apache.activemq.thread.Task#iterate()
-     */
-    public boolean iterate() {
-        final TransportListener tl;
-        synchronized (mutex) {
-            tl = transportListener;
-            if (!started || disposed || tl == null) {
-                return false;
-            }
-        }
-
-        LinkedBlockingQueue<Object> mq = getMessageQueue();
-        final Command command = (Command)mq.poll();
-        if (command != null) {
-            tl.onCommand(command);
-            return !mq.isEmpty();
-        } else {
-            return false;
-        }
-    }
-
-    /**
      * @return the async
      */
     public boolean isAsync() {
@@ -266,7 +286,7 @@
 
     protected void wakeup() {
         if (async) {
-            synchronized (mutex) {
+            synchronized (lazyInitMutext) {
                 if (taskRunner == null) {
                     taskRunner = TASK_RUNNER_FACTORY.createTaskRunner(this, 
"VMTransport: " + toString());
                 }


Reply via email to