Repository: qpid-broker-j
Updated Branches:
  refs/heads/master e64e28268 -> 948b284fd


QPID-7988: [Broker-J, AMQP 1.0] Receiving a Flow with an unknown handle MUST 
result in a session error


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/47dcf144
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/47dcf144
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/47dcf144

Branch: refs/heads/master
Commit: 47dcf144ec4c2beafa858d67312a08c02884c17f
Parents: e64e282
Author: Lorenz Quack <c...@lorenzquack.de>
Authored: Wed Oct 25 11:27:43 2017 +0100
Committer: Lorenz Quack <c...@lorenzquack.de>
Committed: Wed Oct 25 11:54:19 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 24 ++++++++++------
 .../protocol/v1_0/transport/link/FlowTest.java  | 30 ++++++++++++++++++--
 2 files changed, 43 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/47dcf144/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 753fcec..d18712a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -397,16 +397,24 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
             _remoteOutgoingWindow = flow.getOutgoingWindow();
 
             UnsignedInteger handle = flow.getHandle();
-            final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint =
-                    handle == null ? null : _inputHandleToEndpoint.get(handle);
-
-            if (endpoint != null)
+            if (handle != null)
             {
-                endpoint.receiveFlow(flow);
-
-                if (Boolean.TRUE.equals(flow.getEcho()))
+                final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> 
endpoint = _inputHandleToEndpoint.get(handle);
+                if (endpoint == null)
                 {
-                    endpoint.sendFlow();
+                    End end = new End();
+                    end.setError(new Error(SessionError.UNATTACHED_HANDLE,
+                                           String.format("Received Flow with 
unknown handle %d", handle.intValue())));
+                    end(end);
+                }
+                else
+                {
+                    endpoint.receiveFlow(flow);
+
+                    if (Boolean.TRUE.equals(flow.getEcho()))
+                    {
+                        endpoint.sendFlow();
+                    }
                 }
             }
             else

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/47dcf144/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index a5537f0..ed72ba3 100644
--- 
a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ 
b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -36,14 +36,16 @@ import 
org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class FlowTest extends BrokerAdminUsingTestBase
 {
@@ -136,7 +138,6 @@ public class FlowTest extends BrokerAdminUsingTestBase
         assertThat(data, is(equalTo("foo")));
     }
 
-
     @Test
     @SpecificationTest(section = "2.6.7",
             description = "If the sender's drain flag is set and there are no 
available messages,"
@@ -171,4 +172,27 @@ public class FlowTest extends BrokerAdminUsingTestBase
             assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.7.4",
+            description = "If set to a handle that is not currently associated 
with an attached link, the recipient"
+                          + " MUST respond by ending the session with an 
unattached-handle session error.")
+    public void flowWithUnknownHandle() throws Exception
+    {
+        final InetSocketAddress addr = 
getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            End responseEnd = transport.newInteraction()
+                                       .negotiateProtocol().consumeResponse()
+                                       .open().consumeResponse(Open.class)
+                                       .begin().consumeResponse(Begin.class)
+                                       .flowEcho(true)
+                                       .flowHandle(UnsignedInteger.ONE)
+                                       .flow()
+                                       
.consumeResponse().getLatestResponse(End.class);
+
+            assertThat(responseEnd.getError(), is(notNullValue()));
+            assertThat(responseEnd.getError().getCondition(), 
is(equalTo(SessionError.UNATTACHED_HANDLE)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to