This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 3bc5b16 PROTON-2383 Adds basic channel max validation to peer begin handler 3bc5b16 is described below commit 3bc5b164ff935344aeca3ae30c05138f692a4fea Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri May 14 16:15:07 2021 -0400 PROTON-2383 Adds basic channel max validation to peer begin handler Adds some basic validation of the channel that incoming begin performatives arrive on in the test peer. --- .../qpid/protonj2/test/driver/AMQPTestDriver.java | 39 ++++++++++++-- .../qpid/protonj2/test/driver/DriverSessions.java | 7 +++ .../test/driver/codec/primitives/UnsignedLong.java | 2 + .../driver/codec/primitives/UnsignedShort.java | 4 +- .../protonj2/test/driver/SessionHandlingTest.java | 61 ++++++++++++++++++++++ .../driver/codec/primitives/UnsignedShortTest.java | 7 +++ 6 files changed, 115 insertions(+), 5 deletions(-) diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java index b339174..eb48d66 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java @@ -31,8 +31,8 @@ import org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType; import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome; import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader; import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat; +import org.apache.qpid.protonj2.test.driver.codec.transport.Open; import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType; -import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType.PerformativeType; import org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +51,9 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { private final FrameDecoder frameParser; private final FrameEncoder frameEncoder; + private Open localOpen; + private Open remoteOpen; + private final DriverSessions sessions = new DriverSessions(this); private final Consumer<ByteBuffer> frameConsumer; @@ -228,6 +231,20 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { } } + /** + * @return the remote {@link Open} that this peer received (if any). + */ + public Open getRemoteOpen() { + return remoteOpen; + } + + /** + * @return the local {@link Open} that this peer sent (if any). + */ + public Open getLocalOpen() { + return localOpen; + } + //----- Test driver handling of decoded AMQP frames void handleHeader(AMQPHeader header) throws AssertionError { @@ -281,8 +298,14 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { } void handlePerformative(PerformativeDescribedType amqp, int channel, ByteBuf payload) throws AssertionError { - if (!amqp.getPerformativeType().equals(PerformativeType.HEARTBEAT)) { - performativeCount++; + switch (amqp.getPerformativeType()) { + case HEARTBEAT: + break; + case OPEN: + remoteOpen = (Open) amqp; + default: + performativeCount++; + break; } synchronized (script) { @@ -465,7 +488,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { */ public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) { LOG.trace("{} Sending performative: {}", driverName, performative); - // TODO - handle split frames when frame size requires it + + if (performative instanceof PerformativeDescribedType) { + switch (((PerformativeDescribedType) performative).getPerformativeType()) { + case OPEN: + localOpen = (Open) performative; + default: + break; + } + } try { final ByteBuf buffer = frameEncoder.handleWrite(performative, channel, payload, null); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java index 2ac4935..ba548ea 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java @@ -78,6 +78,13 @@ public class DriverSessions { throw new AssertionError("Received duplicate Begin for already opened session on channel: " + remoteChannel); } + final UnsignedShort localChannelMax = driver.getLocalOpen() == null ? UnsignedShort.ZERO : + driver.getLocalOpen().getChannelMax() == null ? UnsignedShort.MAX_VALUE : driver.getLocalOpen().getChannelMax(); + + if (remoteChannel.compareTo(localChannelMax) > 0) { + throw new AssertionError("Channel Max [" + localChannelMax + "] Exceeded for session Begin: " + remoteChannel); + } + final SessionTracker sessionTracker; // Result that we need to update here once validation is complete. if (remoteBegin.getRemoteChannel() != null) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java index 63fe45c..351f1fa 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java @@ -34,6 +34,8 @@ public final class UnsignedLong extends Number implements Comparable<UnsignedLon } public static final UnsignedLong ZERO = cachedValues[0]; + public static final UnsignedLong ONE = cachedValues[1]; + public static final UnsignedLong MAX_VALUE = new UnsignedLong(0xffffffffffffffffl); private final long underlying; diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java index 007cc12..a96de9b 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java @@ -27,7 +27,9 @@ public final class UnsignedShort extends Number implements Comparable<UnsignedSh } } - public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) -1); + public static final UnsignedShort ZERO = cachedValues[0]; + public static final UnsignedShort ONE = cachedValues[1]; + public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 65535); private final short underlying; diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java index 01b5948..caf3cb7 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java @@ -247,4 +247,65 @@ class SessionHandlingTest extends TestPeerTestsBase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testPeerEndsConnectionIfRemoteRespondsWithToHighChannelValue() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().withChannelMax(0).respond(); + peer.expectBegin(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().withChannelMax(0).now(); + client.remoteBegin().now(); + + // Wait for the above and then script next steps + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + client.expectBegin(); + + // Now we respond to the last begin we saw at the server side. + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.respondToLastBegin().onChannel(42).now(); + + assertThrows(AssertionError.class, () -> client.waitForScriptToComplete(5, TimeUnit.SECONDS)); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testPeerEnforcesChannelMaxOfZeroOnPipelinedOpenBegin() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen(); + peer.expectBegin(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.remoteHeader(AMQPHeader.getAMQPHeader()).now(); + client.remoteOpen().now(); + client.remoteBegin().onChannel(42).now(); + + // Wait for the above and then script next steps + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + + assertThrows(AssertionError.class, () -> peer.waitForScriptToComplete(5, TimeUnit.SECONDS)); + } + } } diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java index 6ae7369..783f8b0 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java @@ -45,6 +45,13 @@ public class UnsignedShortTest { } @Test + public void testUnsignedShortConstants() { + assertEquals((short) 0, UnsignedShort.ZERO.shortValue()); + assertEquals((short) 65535, UnsignedShort.MAX_VALUE.shortValue()); + assertEquals((short) 1, UnsignedShort.ONE.shortValue()); + } + + @Test public void testShortValue() { assertEquals((short) 0, UnsignedShort.valueOf((short) 0).shortValue()); assertEquals((short) 65535, UnsignedShort.valueOf((short) 65535).shortValue()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org