Repository: qpid-broker-j Updated Branches: refs/heads/master e64e28268 -> 948b284fd
QPID-7988: [Broker-J, AMQP 1.0] Receiving a Flow with an unknown handle MUST result in a session error Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/47dcf144 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/47dcf144 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/47dcf144 Branch: refs/heads/master Commit: 47dcf144ec4c2beafa858d67312a08c02884c17f Parents: e64e282 Author: Lorenz Quack <c...@lorenzquack.de> Authored: Wed Oct 25 11:27:43 2017 +0100 Committer: Lorenz Quack <c...@lorenzquack.de> Committed: Wed Oct 25 11:54:19 2017 +0100 ---------------------------------------------------------------------- .../qpid/server/protocol/v1_0/Session_1_0.java | 24 ++++++++++------ .../protocol/v1_0/transport/link/FlowTest.java | 30 ++++++++++++++++++-- 2 files changed, 43 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/47dcf144/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 753fcec..d18712a 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -397,16 +397,24 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget _remoteOutgoingWindow = flow.getOutgoingWindow(); UnsignedInteger handle = flow.getHandle(); - final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = - handle == null ? null : _inputHandleToEndpoint.get(handle); - - if (endpoint != null) + if (handle != null) { - endpoint.receiveFlow(flow); - - if (Boolean.TRUE.equals(flow.getEcho())) + final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = _inputHandleToEndpoint.get(handle); + if (endpoint == null) { - endpoint.sendFlow(); + End end = new End(); + end.setError(new Error(SessionError.UNATTACHED_HANDLE, + String.format("Received Flow with unknown handle %d", handle.intValue()))); + end(end); + } + else + { + endpoint.receiveFlow(flow); + + if (Boolean.TRUE.equals(flow.getEcho())) + { + endpoint.sendFlow(); + } } } else http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/47dcf144/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java index a5537f0..ed72ba3 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java @@ -36,14 +36,16 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; import org.apache.qpid.server.protocol.v1_0.type.transport.Close; +import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.protocol.v1_0.type.transport.Role; -import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError; import org.apache.qpid.tests.protocol.v1_0.FrameTransport; -import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; import org.apache.qpid.tests.protocol.v1_0.Utils; +import org.apache.qpid.tests.utils.BrokerAdmin; +import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; public class FlowTest extends BrokerAdminUsingTestBase { @@ -136,7 +138,6 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(data, is(equalTo("foo"))); } - @Test @SpecificationTest(section = "2.6.7", description = "If the sender's drain flag is set and there are no available messages," @@ -171,4 +172,27 @@ public class FlowTest extends BrokerAdminUsingTestBase assertThat(responseFlow.getDrain(), is(equalTo(Boolean.TRUE))); } } + + @Test + @SpecificationTest(section = "2.7.4", + description = "If set to a handle that is not currently associated with an attached link, the recipient" + + " MUST respond by ending the session with an unattached-handle session error.") + public void flowWithUnknownHandle() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP); + try (FrameTransport transport = new FrameTransport(addr).connect()) + { + End responseEnd = transport.newInteraction() + .negotiateProtocol().consumeResponse() + .open().consumeResponse(Open.class) + .begin().consumeResponse(Begin.class) + .flowEcho(true) + .flowHandle(UnsignedInteger.ONE) + .flow() + .consumeResponse().getLatestResponse(End.class); + + assertThat(responseEnd.getError(), is(notNullValue())); + assertThat(responseEnd.getError().getCondition(), is(equalTo(SessionError.UNATTACHED_HANDLE))); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org