update handling of producer/consumer link refusal - Ensure the initial attach doesnt result in open request succeeding. - Use the remote error if given in detach, otherwise suply a more meaningful fallback when failing the open request.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/664f8ad1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/664f8ad1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/664f8ad1 Branch: refs/heads/master Commit: 664f8ad1982e4907c447c538b01a3cbc3319d118 Parents: 72e601e Author: Robert Gemmell <rob...@apache.org> Authored: Tue Feb 24 14:54:06 2015 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Tue Feb 24 14:54:06 2015 +0000 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpAbstractResource.java | 13 +++++++++-- .../qpid/jms/provider/amqp/AmqpConsumer.java | 24 ++++++++++++++++++++ .../jms/provider/amqp/AmqpFixedProducer.java | 20 ++++++++++++---- 3 files changed, 51 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/664f8ad1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java index 850c4ba..a0d308a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java @@ -252,9 +252,9 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp LOG.warn("Open of {} failed: ", this); Exception openError; if (hasRemoteError()) { - openError = this.getRemoteError(); + openError = getRemoteError(); } else { - openError = new IOException("Open failed unexpectedly."); + openError = getOpenAbortException(); } failed(openError); @@ -292,6 +292,15 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp } /** + * When aborting the open operation, and there isnt an error condition, + * provided by the peer, the returned exception will be used instead. + * A subclass may override this method to provide alternative behaviour. + */ + protected Exception getOpenAbortException() { + return new IOException("Open failed unexpectedly."); + } + + /** * Perform the close operation on the managed endpoint. A subclass may * override this method to provide additional close actions or alter the * standard close path such as endpoint detach etc. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/664f8ad1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 1f4fbd1..bc5b26b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -28,6 +28,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import org.apache.qpid.jms.JmsDestination; @@ -177,6 +178,29 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } @Override + protected void doOpenCompletion() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + super.doOpenCompletion(); + } else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null Source + org.apache.qpid.proton.amqp.transport.Source s = getEndpoint().getRemoteSource(); + if (s != null) { + return super.getOpenAbortException(); + } else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); + } + } + + @Override public void opened() { this.session.addResource(this); super.opened(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/664f8ad1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 3fd6035..3643dc8 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Set; +import javax.jms.InvalidDestinationException; import javax.jms.JMSException; import org.apache.qpid.jms.JmsDestination; @@ -280,11 +281,22 @@ public class AmqpFixedProducer extends AmqpProducer { protected void doOpenCompletion() { // Verify the attach response contained a non-null target org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); - if (t == null) { - // No link terminus was created, the peer should now detach us. Producer creation has failed. - failed(new RuntimeException("link was refused")); //TODO: proper exception. - } else { + if (t != null) { super.doOpenCompletion(); + } else { + // No link terminus was created, the peer will now detach/close us. + } + } + + @Override + protected Exception getOpenAbortException() { + // Verify the attach response contained a non-null target + org.apache.qpid.proton.amqp.transport.Target t = getEndpoint().getRemoteTarget(); + if (t != null) { + return super.getOpenAbortException(); + } else { + // No link terminus was created, the peer has detach/closed us, create IDE. + return new InvalidDestinationException("Link creation was refused"); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org