This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 3bc5b16  PROTON-2383 Adds basic channel max validation to peer begin 
handler
3bc5b16 is described below

commit 3bc5b164ff935344aeca3ae30c05138f692a4fea
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri May 14 16:15:07 2021 -0400

    PROTON-2383 Adds basic channel max validation to peer begin handler
    
    Adds some basic validation of the channel that incoming begin
    performatives arrive on in the test peer.
---
 .../qpid/protonj2/test/driver/AMQPTestDriver.java  | 39 ++++++++++++--
 .../qpid/protonj2/test/driver/DriverSessions.java  |  7 +++
 .../test/driver/codec/primitives/UnsignedLong.java |  2 +
 .../driver/codec/primitives/UnsignedShort.java     |  4 +-
 .../protonj2/test/driver/SessionHandlingTest.java  | 61 ++++++++++++++++++++++
 .../driver/codec/primitives/UnsignedShortTest.java |  7 +++
 6 files changed, 115 insertions(+), 5 deletions(-)

diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index b339174..eb48d66 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -31,8 +31,8 @@ import 
org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome;
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
 import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
 import 
org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
-import 
org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType.PerformativeType;
 import 
org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +51,9 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
     private final FrameDecoder frameParser;
     private final FrameEncoder frameEncoder;
 
+    private Open localOpen;
+    private Open remoteOpen;
+
     private final DriverSessions sessions = new DriverSessions(this);
 
     private final Consumer<ByteBuffer> frameConsumer;
@@ -228,6 +231,20 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
         }
     }
 
+    /**
+     * @return the remote {@link Open} that this peer received (if any).
+     */
+    public Open getRemoteOpen() {
+        return remoteOpen;
+    }
+
+    /**
+     * @return the local {@link Open} that this peer sent (if any).
+     */
+    public Open getLocalOpen() {
+        return localOpen;
+    }
+
     //----- Test driver handling of decoded AMQP frames
 
     void handleHeader(AMQPHeader header) throws AssertionError {
@@ -281,8 +298,14 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
     }
 
     void handlePerformative(PerformativeDescribedType amqp, int channel, 
ByteBuf payload) throws AssertionError {
-        if (!amqp.getPerformativeType().equals(PerformativeType.HEARTBEAT)) {
-            performativeCount++;
+        switch (amqp.getPerformativeType()) {
+            case HEARTBEAT:
+                break;
+            case OPEN:
+                remoteOpen = (Open) amqp;
+            default:
+                performativeCount++;
+                break;
         }
 
         synchronized (script) {
@@ -465,7 +488,15 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
      */
     public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf 
payload) {
         LOG.trace("{} Sending performative: {}", driverName, performative);
-        // TODO - handle split frames when frame size requires it
+
+        if (performative instanceof PerformativeDescribedType) {
+            switch (((PerformativeDescribedType) 
performative).getPerformativeType()) {
+                case OPEN:
+                    localOpen = (Open) performative;
+                default:
+                    break;
+            }
+        }
 
         try {
             final ByteBuf buffer = frameEncoder.handleWrite(performative, 
channel, payload, null);
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
index 2ac4935..ba548ea 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
@@ -78,6 +78,13 @@ public class DriverSessions {
             throw new AssertionError("Received duplicate Begin for already 
opened session on channel: " + remoteChannel);
         }
 
+        final UnsignedShort localChannelMax = driver.getLocalOpen() == null ? 
UnsignedShort.ZERO :
+            driver.getLocalOpen().getChannelMax() == null ? 
UnsignedShort.MAX_VALUE : driver.getLocalOpen().getChannelMax();
+
+        if (remoteChannel.compareTo(localChannelMax) > 0) {
+            throw new AssertionError("Channel Max [" + localChannelMax + "] 
Exceeded for session Begin: " + remoteChannel);
+        }
+
         final SessionTracker sessionTracker;  // Result that we need to update 
here once validation is complete.
 
         if (remoteBegin.getRemoteChannel() != null) {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java
index 63fe45c..351f1fa 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedLong.java
@@ -34,6 +34,8 @@ public final class UnsignedLong extends Number implements 
Comparable<UnsignedLon
     }
 
     public static final UnsignedLong ZERO = cachedValues[0];
+    public static final UnsignedLong ONE = cachedValues[1];
+    public static final UnsignedLong MAX_VALUE = new 
UnsignedLong(0xffffffffffffffffl);
 
     private final long underlying;
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java
index 007cc12..a96de9b 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShort.java
@@ -27,7 +27,9 @@ public final class UnsignedShort extends Number implements 
Comparable<UnsignedSh
         }
     }
 
-    public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 
-1);
+    public static final UnsignedShort ZERO = cachedValues[0];
+    public static final UnsignedShort ONE = cachedValues[1];
+    public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 
65535);
 
     private final short underlying;
 
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java
index 01b5948..caf3cb7 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SessionHandlingTest.java
@@ -247,4 +247,65 @@ class SessionHandlingTest extends TestPeerTestsBase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testPeerEndsConnectionIfRemoteRespondsWithToHighChannelValue() 
throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().withChannelMax(0).respond();
+            peer.expectBegin();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().withChannelMax(0).now();
+            client.remoteBegin().now();
+
+            // Wait for the above and then script next steps
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            client.expectBegin();
+
+            // Now we respond to the last begin we saw at the server side.
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.respondToLastBegin().onChannel(42).now();
+
+            assertThrows(AssertionError.class, () -> 
client.waitForScriptToComplete(5, TimeUnit.SECONDS));
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testPeerEnforcesChannelMaxOfZeroOnPipelinedOpenBegin() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen();
+            peer.expectBegin();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().onChannel(42).now();
+
+            // Wait for the above and then script next steps
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
 }
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java
index 6ae7369..783f8b0 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/codec/primitives/UnsignedShortTest.java
@@ -45,6 +45,13 @@ public class UnsignedShortTest {
     }
 
     @Test
+    public void testUnsignedShortConstants() {
+        assertEquals((short) 0, UnsignedShort.ZERO.shortValue());
+        assertEquals((short) 65535, UnsignedShort.MAX_VALUE.shortValue());
+        assertEquals((short) 1, UnsignedShort.ONE.shortValue());
+    }
+
+    @Test
     public void testShortValue() {
         assertEquals((short) 0, UnsignedShort.valueOf((short) 0).shortValue());
         assertEquals((short) 65535, UnsignedShort.valueOf((short) 
65535).shortValue());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to