Repository: qpid-jms Updated Branches: refs/heads/master 989bce31b -> 477df6443
Handle remote close of the Connection by firing the normal exception events and letting things close down normally. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/477df644 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/477df644 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/477df644 Branch: refs/heads/master Commit: 477df644369ab515daf2d5a600dd245d35c05335 Parents: 989bce3 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Feb 24 09:33:04 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Feb 24 09:33:04 2015 -0500 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 6 +++- .../jms/provider/amqp/AmqpAbstractResource.java | 35 ++++++++++---------- .../qpid/jms/provider/amqp/AmqpProvider.java | 7 ++++ .../qpid/jms/provider/amqp/AmqpResource.java | 5 ++- .../integration/ConnectionIntegrationTest.java | 26 +++++++++++++++ 5 files changed, 60 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 0b73cb4..32470ee 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -1101,7 +1101,11 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti @Override public void onResourceRemotelyClosed(JmsResource resource, Exception cause) { - LOG.info("A JMS resource has been remotely closed: {}", resource); + if (resource.equals(this.connectionInfo)) { + onException(cause); + } else { + LOG.info("A JMS resource has been remotely closed: {}", resource); + } } /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 6612786..33b47cb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -22,6 +22,7 @@ import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import javax.jms.JMSSecurityException; +import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.proton.amqp.Symbol; @@ -162,25 +163,25 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } @Override - public void remotelyClosed() { - if (isAwaitingOpen()) { - Exception error = getRemoteError(); - if (error == null) { - error = new IOException("Remote has closed without error information"); - } - - if (endpoint != null) { - // TODO: if this is a producer/consumer link then we may only be detached, - // rather than fully closed, and should respond appropriately. - endpoint.close(); - } + public void remotelyClosed(AmqpProvider provider) { + Exception error = getRemoteError(); + if (error == null) { + error = new IOException("Remote has closed without error information"); + } - openRequest.onFailure(error); - openRequest = null; + if (endpoint != null) { + // TODO: if this is a producer/consumer link then we may only be detached, + // rather than fully closed, and should respond appropriately. + endpoint.close(); } - // TODO - We need a way to signal that the remote closed unexpectedly. - LOG.info("Resource was remotely closed"); + LOG.info("Resource {} was remotely closed", getJmsResource()); + + if (getJmsResource() instanceof JmsConnectionInfo) { + provider.fireProviderException(error); + } else { + provider.fireResourceRemotelyClosed(getJmsResource(), error); + } } public E getEndpoint() { @@ -270,7 +271,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp failed(openError); } else { - remotelyClosed(); + remotelyClosed(provider); } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 3c4cae6..0face7b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -802,6 +802,13 @@ public class AmqpProvider implements Provider, TransportListener { } } + void fireResourceRemotelyClosed(JmsResource resource, Exception ex) { + ProviderListener listener = this.listener; + if (listener != null) { + listener.onResourceRemotelyClosed(resource, ex); + } + } + private void checkClosed() throws ProviderClosedException { if (closed.get()) { throw new ProviderClosedException("This Provider is already closed"); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java index f577b21..0a2a398 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResource.java @@ -88,8 +88,11 @@ public interface AmqpResource { * Called to indicate that the remote end has become closed but the resource * was not awaiting a close. This could happen during an open request where * the remote does not set an error condition or during normal operation. + * + * @param provider + * a reference to the AMQP provider to use to send the remote close event. */ - void remotelyClosed(); + void remotelyClosed(AmqpProvider provider); /** * Sets the failed state for this Resource and triggers a failure signal for http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/477df644/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index c6b8632..ad1a6f6 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -25,9 +25,14 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.ConnectionMetaData; +import javax.jms.ExceptionListener; import javax.jms.IllegalStateException; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; @@ -89,6 +94,27 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 10000) + public void testRemotelyEndConnectionListenerInvoked() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final CountDownLatch done = new CountDownLatch(1); + + Connection connection = testFixture.establishConnecton(testPeer); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + done.countDown(); + } + }); + + testPeer.remotelyEndConnection(true); + testPeer.waitForAllHandlersToComplete(1000); + + assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS)); + } + } + @Ignore // TODO: resolve related issues and enable @Test(timeout = 5000) public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org