QPID-7981: [Broker-J, AMQP 1.0] Handle erroneous null termini cases
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/948b284f Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/948b284f Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/948b284f Branch: refs/heads/master Commit: 948b284fdfa51ac91a65004447a527582411c047 Parents: 47dcf14 Author: Lorenz Quack <c...@lorenzquack.de> Authored: Wed Oct 25 11:52:51 2017 +0100 Committer: Lorenz Quack <c...@lorenzquack.de> Committed: Wed Oct 25 11:54:34 2017 +0100 ---------------------------------------------------------------------- .../protocol/v1_0/AbstractLinkEndpoint.java | 6 ++ .../v1_0/transport/link/AttachTest.java | 96 +++++++++++++++++++- 2 files changed, 99 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/948b284f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java index 6328a17..04cb94e 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java @@ -108,9 +108,15 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT public void receiveAttach(final Attach attach) throws AmqpErrorException { _errored = false; + boolean isAttachingRemoteTerminusNull = (attach.getRole() == Role.SENDER ? attach.getSource() == null : attach.getTarget() == null); boolean isAttachingLocalTerminusNull = (attach.getRole() == Role.SENDER ? attach.getTarget() == null : attach.getSource() == null); boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? getTarget() == null : getSource() == null); + if (isAttachingRemoteTerminusNull) + { + throw new AmqpErrorException(AmqpError.INVALID_FIELD, "received Attach with remote null terminus."); + } + if (isAttachingLocalTerminusNull) { recoverLink(attach); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/948b284f/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java index 0e33f3a..c559399 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java @@ -19,13 +19,14 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link; -import static org.hamcrest.CoreMatchers.both; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import java.net.InetSocketAddress; @@ -36,12 +37,15 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; -import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; +import org.apache.qpid.tests.protocol.v1_0.Interaction; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class AttachTest extends BrokerAdminUsingTestBase { @@ -117,4 +121,90 @@ public class AttachTest extends BrokerAdminUsingTestBase assertThat(responseAttach.getSource(), is(notNullValue())); } } + + @Test + @SpecificationTest(section = "2.6.3", + description = "Note that if the application chooses not to create a terminus, the session endpoint will" + + " still create a link endpoint and issue an attach indicating that the link endpoint has" + + " no associated local terminus. In this case, the session endpoint MUST immediately" + + " detach the newly created link endpoint.") + public void attachReceiverWithNullTarget() throws Exception + { + String queueName = "testQueue"; + getBrokerAdmin().createQueue(queueName); + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + Interaction interaction = transport.newInteraction(); + final Attach responseAttach = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.RECEIVER) + .attachSourceAddress(queueName) + .attachTarget(null) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + assertThat(responseAttach.getName(), is(notNullValue())); + assertThat(responseAttach.getHandle().longValue(), is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue())))); + assertThat(responseAttach.getRole(), is(Role.SENDER)); + assertThat(responseAttach.getSource(), is(nullValue())); + assertThat(responseAttach.getTarget(), is(nullValue())); + + final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class); + assertThat(responseDetach.getClosed(), is(true)); + assertThat(responseDetach.getError(), is(notNullValue())); + assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); + + final End endResponse = interaction.flowHandleFromLinkHandle() + .flowEcho(true) + .flow() + .consumeResponse().getLatestResponse(End.class); + assertThat(endResponse.getError(), is(notNullValue())); + // QPID-7954 + //assertThat(endResponse.getError().getCondition(), is(equalTo(SessionError.ERRANT_LINK))); + } + } + @Test + @SpecificationTest(section = "2.6.3", + description = "Note that if the application chooses not to create a terminus, the session endpoint will" + + " still create a link endpoint and issue an attach indicating that the link endpoint has" + + " no associated local terminus. In this case, the session endpoint MUST immediately" + + " detach the newly created link endpoint.") + public void attachSenderWithNullSource() throws Exception + { + String queueName = "testQueue"; + getBrokerAdmin().createQueue(queueName); + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + Interaction interaction = transport.newInteraction(); + final Attach responseAttach = interaction.negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .attachRole(Role.SENDER) + .attachSource(null) + .attachTargetAddress(queueName) + .attachInitialDeliveryCount(UnsignedInteger.ZERO) + .attach().consumeResponse() + .getLatestResponse(Attach.class); + assertThat(responseAttach.getName(), is(notNullValue())); + assertThat(responseAttach.getHandle().longValue(), is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue())))); + assertThat(responseAttach.getRole(), is(Role.RECEIVER)); + assertThat(responseAttach.getSource(), is(nullValue())); + assertThat(responseAttach.getTarget(), is(nullValue())); + + final Detach responseDetach = interaction.consumeResponse().getLatestResponse(Detach.class); + assertThat(responseDetach.getClosed(), is(true)); + assertThat(responseDetach.getError(), is(notNullValue())); + assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.INVALID_FIELD))); + + final End endResponse = interaction.flowHandleFromLinkHandle() + .flowEcho(true) + .flow() + .consumeResponse().getLatestResponse(End.class); + assertThat(endResponse.getError(), is(notNullValue())); + // QPID-7954 + //assertThat(endResponse.getError().getCondition(), is(equalTo(SessionError.ERRANT_LINK))); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org