Repository: qpid-jms
Updated Branches:
  refs/heads/master 989bce31b -> 477df6443


Handle remote close of the Connection by firing the normal exception
events and letting things close down normally. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/477df644
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/477df644
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/477df644

Branch: refs/heads/master
Commit: 477df644369ab515daf2d5a600dd245d35c05335
Parents: 989bce3
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Feb 24 09:33:04 2015 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Feb 24 09:33:04 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  6 +++-
 .../jms/provider/amqp/AmqpAbstractResource.java | 35 ++++++++++----------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  7 ++++
 .../qpid/jms/provider/amqp/AmqpResource.java    |  5 ++-
 .../integration/ConnectionIntegrationTest.java  | 26 +++++++++++++++
 5 files changed, 60 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 0b73cb4..32470ee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1101,7 +1101,11 @@ public class JmsConnection implements Connection, 
TopicConnection, QueueConnecti
 
     @Override
     public void onResourceRemotelyClosed(JmsResource resource, Exception 
cause) {
-        LOG.info("A JMS resource has been remotely closed: {}", resource);
+        if (resource.equals(this.connectionInfo)) {
+            onException(cause);
+        } else {
+            LOG.info("A JMS resource has been remotely closed: {}", resource);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 6612786..33b47cb 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -22,6 +22,7 @@ import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.JMSSecurityException;
 
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -162,25 +163,25 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
     }
 
     @Override
-    public void remotelyClosed() {
-        if (isAwaitingOpen()) {
-            Exception error = getRemoteError();
-            if (error == null) {
-                error = new IOException("Remote has closed without error 
information");
-            }
-
-            if (endpoint != null) {
-                // TODO: if this is a producer/consumer link then we may only 
be detached,
-                // rather than fully closed, and should respond appropriately.
-                endpoint.close();
-            }
+    public void remotelyClosed(AmqpProvider provider) {
+        Exception error = getRemoteError();
+        if (error == null) {
+            error = new IOException("Remote has closed without error 
information");
+        }
 
-            openRequest.onFailure(error);
-            openRequest = null;
+        if (endpoint != null) {
+            // TODO: if this is a producer/consumer link then we may only be 
detached,
+            // rather than fully closed, and should respond appropriately.
+            endpoint.close();
         }
 
-        // TODO - We need a way to signal that the remote closed unexpectedly.
-        LOG.info("Resource was remotely closed");
+        LOG.info("Resource {} was remotely closed", getJmsResource());
+
+        if (getJmsResource() instanceof JmsConnectionInfo) {
+            provider.fireProviderException(error);
+        } else {
+            provider.fireResourceRemotelyClosed(getJmsResource(), error);
+        }
     }
 
     public E getEndpoint() {
@@ -270,7 +271,7 @@ public abstract class AmqpAbstractResource<R extends 
JmsResource, E extends Endp
 
                 failed(openError);
             } else {
-                remotelyClosed();
+                remotelyClosed(provider);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3c4cae6..0face7b 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -802,6 +802,13 @@ public class AmqpProvider implements Provider, 
TransportListener {
         }
     }
 
+    void fireResourceRemotelyClosed(JmsResource resource, Exception ex) {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onResourceRemotelyClosed(resource, ex);
+        }
+    }
+
     private void checkClosed() throws ProviderClosedException {
         if (closed.get()) {
             throw new ProviderClosedException("This Provider is already 
closed");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
index f577b21..0a2a398 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java
@@ -88,8 +88,11 @@ public interface AmqpResource {
      * Called to indicate that the remote end has become closed but the 
resource
      * was not awaiting a close.  This could happen during an open request 
where
      * the remote does not set an error condition or during normal operation.
+     *
+     * @param provider
+     *        a reference to the AMQP provider to use to send the remote close 
event.
      */
-    void remotelyClosed();
+    void remotelyClosed(AmqpProvider provider);
 
     /**
      * Sets the failed state for this Resource and triggers a failure signal 
for

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index c6b8632..ad1a6f6 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -25,9 +25,14 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionMetaData;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -89,6 +94,27 @@ public class ConnectionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 10000)
+    public void testRemotelyEndConnectionListenerInvoked() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch done = new CountDownLatch(1);
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    done.countDown();
+                }
+            });
+
+            testPeer.remotelyEndConnection(true);
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            assertTrue("Connection should report failure", done.await(5, 
TimeUnit.SECONDS));
+        }
+    }
+
     @Ignore // TODO: resolve related issues and enable
     @Test(timeout = 5000)
     public void testRemotelyEndConnectionWithSessionWithConsumer() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to