QPID-7981: [Broker-J, AMQP 1.0] Handle erroneous null termini cases

Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/948b284f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/948b284f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/948b284f

Branch: refs/heads/master
Commit: 948b284fdfa51ac91a65004447a527582411c047
Parents: 47dcf14
Author: Lorenz Quack <c...@lorenzquack.de>
Authored: Wed Oct 25 11:52:51 2017 +0100
Committer: Lorenz Quack <c...@lorenzquack.de>
Committed: Wed Oct 25 11:54:34 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  6 ++
 .../v1_0/transport/link/AttachTest.java         | 96 +++++++++++++++++++-
 2 files changed, 99 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/948b284f/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 6328a17..04cb94e 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -108,9 +108,15 @@ public abstract class AbstractLinkEndpoint<S extends 
BaseSource, T extends BaseT
     public void receiveAttach(final Attach attach) throws AmqpErrorException
     {
         _errored = false;
+        boolean isAttachingRemoteTerminusNull = (attach.getRole() == 
Role.SENDER ? attach.getSource() == null : attach.getTarget() == null);
         boolean isAttachingLocalTerminusNull = (attach.getRole() == 
Role.SENDER ? attach.getTarget() == null : attach.getSource() == null);
         boolean isLocalTerminusNull = (attach.getRole() == Role.SENDER ? 
getTarget() == null : getSource() == null);
 
+        if (isAttachingRemoteTerminusNull)
+        {
+            throw new AmqpErrorException(AmqpError.INVALID_FIELD, "received 
Attach with remote null terminus.");
+        }
+
         if (isAttachingLocalTerminusNull)
         {
             recoverLink(attach);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/948b284f/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index 0e33f3a..c559399 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -19,13 +19,14 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
-import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 import java.net.InetSocketAddress;
 
@@ -36,12 +37,15 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class AttachTest extends BrokerAdminUsingTestBase
 {
@@ -117,4 +121,90 @@ public class AttachTest extends BrokerAdminUsingTestBase
             assertThat(responseAttach.getSource(), is(notNullValue()));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.6.3",
+            description = "Note that if the application chooses not to create 
a terminus, the session endpoint will"
+                          + " still create a link endpoint and issue an attach 
indicating that the link endpoint has"
+                          + " no associated local terminus. In this case, the 
session endpoint MUST immediately"
+                          + " detach the newly created link endpoint.")
+    public void attachReceiverWithNullTarget() throws Exception
+    {
+        String queueName = "testQueue";
+        getBrokerAdmin().createQueue(queueName);
+        final InetSocketAddress addr = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = 
interaction.negotiateProtocol().consumeResponse()
+                                                     
.open().consumeResponse(Open.class)
+                                                     
.begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     
.attachSourceAddress(queueName)
+                                                     .attachTarget(null)
+                                                     
.attach().consumeResponse()
+                                                     
.getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(), 
is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.SENDER));
+            assertThat(responseAttach.getSource(), is(nullValue()));
+            assertThat(responseAttach.getTarget(), is(nullValue()));
+
+            final Detach responseDetach = 
interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), 
is(equalTo(AmqpError.INVALID_FIELD)));
+
+            final End endResponse = interaction.flowHandleFromLinkHandle()
+                                               .flowEcho(true)
+                                               .flow()
+                                               
.consumeResponse().getLatestResponse(End.class);
+            assertThat(endResponse.getError(), is(notNullValue()));
+            // QPID-7954
+            //assertThat(endResponse.getError().getCondition(), 
is(equalTo(SessionError.ERRANT_LINK)));
+        }
+    }
+    @Test
+    @SpecificationTest(section = "2.6.3",
+            description = "Note that if the application chooses not to create 
a terminus, the session endpoint will"
+                          + " still create a link endpoint and issue an attach 
indicating that the link endpoint has"
+                          + " no associated local terminus. In this case, the 
session endpoint MUST immediately"
+                          + " detach the newly created link endpoint.")
+    public void attachSenderWithNullSource() throws Exception
+    {
+        String queueName = "testQueue";
+        getBrokerAdmin().createQueue(queueName);
+        final InetSocketAddress addr = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            final Attach responseAttach = 
interaction.negotiateProtocol().consumeResponse()
+                                                     
.open().consumeResponse(Open.class)
+                                                     
.begin().consumeResponse(Begin.class)
+                                                     .attachRole(Role.SENDER)
+                                                     .attachSource(null)
+                                                     
.attachTargetAddress(queueName)
+                                                     
.attachInitialDeliveryCount(UnsignedInteger.ZERO)
+                                                     
.attach().consumeResponse()
+                                                     
.getLatestResponse(Attach.class);
+            assertThat(responseAttach.getName(), is(notNullValue()));
+            assertThat(responseAttach.getHandle().longValue(), 
is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+            assertThat(responseAttach.getRole(), is(Role.RECEIVER));
+            assertThat(responseAttach.getSource(), is(nullValue()));
+            assertThat(responseAttach.getTarget(), is(nullValue()));
+
+            final Detach responseDetach = 
interaction.consumeResponse().getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), 
is(equalTo(AmqpError.INVALID_FIELD)));
+
+            final End endResponse = interaction.flowHandleFromLinkHandle()
+                                               .flowEcho(true)
+                                               .flow()
+                                               
.consumeResponse().getLatestResponse(End.class);
+            assertThat(endResponse.getError(), is(notNullValue()));
+            // QPID-7954
+            //assertThat(endResponse.getError().getCondition(), 
is(equalTo(SessionError.ERRANT_LINK)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to