This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new eab91726 PROTON-2739 Better handle cases of scripted connect and 
reconnect
eab91726 is described below

commit eab91726b7259f5779ed73ccba0b80e0903afc10
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri May 26 15:23:01 2023 -0400

    PROTON-2739 Better handle cases of scripted connect and reconnect
    
    Better handle cases of a peer connection that drops in certain scenarios
    and then reconnects. Adds an expectation element to let the script await
    the connection drop before proceeding with additional actions etc.
---
 .../qpid/protonj2/test/driver/AMQPTestDriver.java  | 36 +++++++++---
 .../qpid/protonj2/test/driver/DriverSessions.java  |  9 +++
 .../qpid/protonj2/test/driver/ScriptWriter.java    | 66 ++++++++++++----------
 .../expectations/ConnectionDropExpectation.java    | 38 +++++++++++++
 .../expectations/SaslOutcomeExpectation.java       | 28 +++++++++
 .../protonj2/test/driver/ProtonTestClientTest.java | 51 +++++++++++++++++
 6 files changed, 189 insertions(+), 39 deletions(-)

diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index 9403e01d..a13d9c3a 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -36,6 +36,8 @@ import 
org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
 import 
org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
 import 
org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError;
+import 
org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
+import 
org.apache.qpid.protonj2.test.driver.expectations.SaslOutcomeExpectation;
 import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -255,10 +257,20 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
 
     void handleConnectedDropped() throws AssertionError {
         synchronized (script) {
-            // For now we just reset the parse as any new connection would 
need to
+            // For now we just reset the parser as any new connection would 
need to
             // send an AMQP header, other validation could be added if we 
expand
             // processing on client disconnect events.
-            frameParser.resetToExpectingHeader();
+            resetToExpectingAMQPHeader();
+            // Reset connection tracking state to empty as the connection is 
gone
+            // and we want new connection to be able to reuse handles etc.
+            sessions.reset();
+            // Check if the currently pending scripted expectation is for the 
connection
+            // to drop in which case we remove it and unblock any waiters on 
the script
+            // to complete, if this is the final scripted entry.
+            final ScriptedElement scriptEntry = script.peek();
+            if (scriptEntry instanceof ConnectionDropExpectation) {
+                processScript(script.poll());
+            }
         }
     }
 
@@ -294,12 +306,6 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
             }
 
             try {
-                // When the outcome of SASL is read the decoder should revert 
to initial state
-                // as the only valid next incoming value is an AMQP header.
-                if (sasl instanceof SaslOutcome) {
-                    frameParser.resetToExpectingHeader();
-                }
-
                 sasl.invoke(scriptEntry, frameSize, this);
             } catch (UnexpectedPerformativeError e) {
                 if (scriptEntry.isOptional()) {
@@ -381,6 +387,18 @@ public class AMQPTestDriver implements 
Consumer<ByteBuffer> {
 
     //----- Test driver actions
 
+    /**
+     * Resets the frame parser to a state where it expects to read an AMQP 
Header type instead
+     * of a normal AMQP frame type.  By default the parser starts in this 
state and then switches
+     * to frames after the first header is read.  In cases where SASL is in 
use this needs to be
+     * reset to the header read state when the outcome is known.  Normally the 
script should handle
+     * this via a {@link SaslOutcomeExpectation} but other script variations 
might want to drive
+     * this manually.
+     */
+    public void resetToExpectingAMQPHeader() {
+        this.frameParser.resetToExpectingHeader();
+    }
+
     /**
      * Waits indefinitely for the scripted expectations and actions to be 
performed.  If the script
      * execution encounters an error this method will throw an {@link 
AssertionError} that describes
@@ -798,7 +816,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> 
{
     }
 
     private void processScript(ScriptedElement current) {
-        while (current.performAfterwards() != null && failureCause == null) {
+        if (current.performAfterwards() != null && failureCause == null) {
             current.performAfterwards().perform(this);
         }
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
index 92a07beb..a9c3ef5a 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java
@@ -71,6 +71,15 @@ public class DriverSessions {
         return remoteSessions.get(remoteChannel);
     }
 
+    public void reset() {
+        localSessions.clear();
+        remoteSessions.clear();
+
+        lastRemotelyOpenedSession = null;
+        lastLocallyOpenedSession = null;
+        lastCoordinator = null;
+    }
+
     //----- Process performatives that require session level tracking
 
     public SessionTracker handleBegin(Begin remoteBegin, UnsignedShort 
remoteChannel) {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
index 38cddeae..62ac6899 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java
@@ -56,6 +56,7 @@ import 
org.apache.qpid.protonj2.test.driver.expectations.AMQPHeaderExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.AttachExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.BeginExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.CloseExpectation;
+import 
org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.DeclareExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.DetachExpectation;
 import org.apache.qpid.protonj2.test.driver.expectations.DischargeExpectation;
@@ -88,67 +89,73 @@ public abstract class ScriptWriter {
     //----- AMQP Performative expectations
 
     public AMQPHeaderExpectation expectAMQPHeader() {
-        AMQPHeaderExpectation expecting = new 
AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver());
+        final AMQPHeaderExpectation expecting = new 
AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public OpenExpectation expectOpen() {
-        OpenExpectation expecting = new OpenExpectation(getDriver());
+        final OpenExpectation expecting = new OpenExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public CloseExpectation expectClose() {
-        CloseExpectation expecting = new CloseExpectation(getDriver());
+        final CloseExpectation expecting = new CloseExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public BeginExpectation expectBegin() {
-        BeginExpectation expecting = new BeginExpectation(getDriver());
+        final BeginExpectation expecting = new BeginExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public EndExpectation expectEnd() {
-        EndExpectation expecting = new EndExpectation(getDriver());
+        final EndExpectation expecting = new EndExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public AttachExpectation expectAttach() {
-        AttachExpectation expecting = new AttachExpectation(getDriver());
+        final AttachExpectation expecting = new AttachExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public DetachExpectation expectDetach() {
-        DetachExpectation expecting = new DetachExpectation(getDriver());
+        final DetachExpectation expecting = new DetachExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public FlowExpectation expectFlow() {
-        FlowExpectation expecting = new FlowExpectation(getDriver());
+        final FlowExpectation expecting = new FlowExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public TransferExpectation expectTransfer() {
-        TransferExpectation expecting = new TransferExpectation(getDriver());
+        final TransferExpectation expecting = new 
TransferExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public DispositionExpectation expectDisposition() {
-        DispositionExpectation expecting = new 
DispositionExpectation(getDriver());
+        final DispositionExpectation expecting = new 
DispositionExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public EmptyFrameExpectation expectEmptyFrame() {
-        EmptyFrameExpectation expecting = new 
EmptyFrameExpectation(getDriver());
+        final EmptyFrameExpectation expecting = new 
EmptyFrameExpectation(getDriver());
+        getDriver().addScriptedElement(expecting);
+        return expecting;
+    }
+
+    public ConnectionDropExpectation expectConnectionToDrop() {
+        final ConnectionDropExpectation expecting = new 
ConnectionDropExpectation();
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
@@ -156,7 +163,7 @@ public abstract class ScriptWriter {
     //----- Transaction expectations
 
     public AttachExpectation expectCoordinatorAttach() {
-        AttachExpectation expecting = new AttachExpectation(getDriver());
+        final AttachExpectation expecting = new AttachExpectation(getDriver());
 
         expecting.withRole(Role.SENDER);
         expecting.withCoordinator(isA(Coordinator.class));
@@ -167,7 +174,7 @@ public abstract class ScriptWriter {
     }
 
     public DeclareExpectation expectDeclare() {
-        DeclareExpectation expecting = new DeclareExpectation(getDriver());
+        final DeclareExpectation expecting = new 
DeclareExpectation(getDriver());
 
         expecting.withHandle(notNullValue());
         expecting.withDeliveryId(notNullValue());
@@ -179,7 +186,7 @@ public abstract class ScriptWriter {
     }
 
     public DischargeExpectation expectDischarge() {
-        DischargeExpectation expecting = new DischargeExpectation(getDriver());
+        final DischargeExpectation expecting = new 
DischargeExpectation(getDriver());
 
         expecting.withHandle(notNullValue());
         expecting.withDeliveryId(notNullValue());
@@ -193,37 +200,37 @@ public abstract class ScriptWriter {
     //----- SASL performative expectations
 
     public AMQPHeaderExpectation expectSASLHeader() {
-        AMQPHeaderExpectation expecting = new 
AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver());
+        final AMQPHeaderExpectation expecting = new 
AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public SaslMechanismsExpectation expectSaslMechanisms() {
-        SaslMechanismsExpectation expecting = new 
SaslMechanismsExpectation(getDriver());
+        final SaslMechanismsExpectation expecting = new 
SaslMechanismsExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public SaslInitExpectation expectSaslInit() {
-        SaslInitExpectation expecting = new SaslInitExpectation(getDriver());
+        final SaslInitExpectation expecting = new 
SaslInitExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public SaslChallengeExpectation expectSaslChallenge() {
-        SaslChallengeExpectation expecting = new 
SaslChallengeExpectation(getDriver());
+        final SaslChallengeExpectation expecting = new 
SaslChallengeExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public SaslResponseExpectation expectSaslResponse() {
-        SaslResponseExpectation expecting = new 
SaslResponseExpectation(getDriver());
+        final SaslResponseExpectation expecting = new 
SaslResponseExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
 
     public SaslOutcomeExpectation expectSaslOutcome() {
-        SaslOutcomeExpectation expecting = new 
SaslOutcomeExpectation(getDriver());
+        final SaslOutcomeExpectation expecting = new 
SaslOutcomeExpectation(getDriver());
         getDriver().addScriptedElement(expecting);
         return expecting;
     }
@@ -628,9 +635,9 @@ public abstract class ScriptWriter {
     //----- Utility methods for tests writing raw scripted SASL tests
 
     public byte[] saslPlainInitialResponse(String username, String password) {
-        byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
-        byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
-        byte[] initialResponse = new 
byte[usernameBytes.length+passwordBytes.length+2];
+        final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+        final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+        final byte[] initialResponse = new 
byte[usernameBytes.length+passwordBytes.length+2];
         System.arraycopy(usernameBytes, 0, initialResponse, 1, 
usernameBytes.length);
         System.arraycopy(passwordBytes, 0, initialResponse, 2 + 
usernameBytes.length, passwordBytes.length);
 
@@ -638,9 +645,9 @@ public abstract class ScriptWriter {
     }
 
     public byte[] saslXOauth2InitialResponse(String username, String password) 
{
-        byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
-        byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
-        byte[] initialResponse = new 
byte[usernameBytes.length+passwordBytes.length+20];
+        final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+        final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+        final byte[] initialResponse = new 
byte[usernameBytes.length+passwordBytes.length+20];
 
         System.arraycopy("user=".getBytes(StandardCharsets.US_ASCII), 0, 
initialResponse, 0, 5);
         System.arraycopy(usernameBytes, 0, initialResponse, 5, 
usernameBytes.length);
@@ -665,9 +672,9 @@ public abstract class ScriptWriter {
      * @throws IllegalStateException if no Begin has yet been received from 
the remote.
      */
     public BeginInjectAction respondToLastBegin() {
-        BeginInjectAction response = new BeginInjectAction(getDriver());
+        final BeginInjectAction response = new BeginInjectAction(getDriver());
 
-        SessionTracker session = 
getDriver().sessions().getLastRemotelyOpenedSession();
+        final SessionTracker session = 
getDriver().sessions().getLastRemotelyOpenedSession();
         if (session == null) {
             throw new IllegalStateException("Cannot create response to Begin 
before one has been received.");
         }
@@ -688,8 +695,7 @@ public abstract class ScriptWriter {
      * @throws IllegalStateException if no Attach has yet been received from 
the remote.
      */
     public AttachInjectAction respondToLastAttach() {
-        AttachInjectAction response = new AttachInjectAction(getDriver());
-
+        final AttachInjectAction response = new 
AttachInjectAction(getDriver());
         final SessionTracker session = 
getDriver().sessions().getLastRemotelyOpenedSession();
         final LinkTracker link = session.getLastRemotelyOpenedLink();
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
new file mode 100644
index 00000000..d57f26b0
--- /dev/null
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver.expectations;
+
+import org.apache.qpid.protonj2.test.driver.ScriptedExpectation;
+
+/**
+ * Expectation used to script an expected connection drop from the remotely
+ * connected peer.
+ * <p>
+ * This expectation type is best used to await a remote peer response of
+ * dropping the connection in relation to some scripted action that will
+ * result in it closing its side of the connection.
+ */
+public class ConnectionDropExpectation implements ScriptedExpectation {
+
+    /**
+     * Creates a simple connection dropped expectation instance.
+     */
+    public ConnectionDropExpectation() {
+        // No setup needed as the driver should handle this explicitly.
+    }
+}
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
index 36c81e8d..eecf87e2 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
@@ -19,6 +19,7 @@ package org.apache.qpid.protonj2.test.driver.expectations;
 import static org.hamcrest.CoreMatchers.equalTo;
 
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
+import org.apache.qpid.protonj2.test.driver.ScriptedAction;
 import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
@@ -38,6 +39,33 @@ public class SaslOutcomeExpectation extends 
AbstractExpectation<SaslOutcome> {
         super(driver);
     }
 
+    @Override
+    public ScriptedAction performAfterwards() {
+        return new ScriptedAction() {
+
+            @Override
+            public ScriptedAction queue() {
+                throw new UnsupportedOperationException("Cannot be called on 
this action");
+            }
+
+            @Override
+            public ScriptedAction perform(AMQPTestDriver driver) {
+                driver.resetToExpectingAMQPHeader();
+                return null;
+            }
+
+            @Override
+            public ScriptedAction now() {
+                throw new UnsupportedOperationException("Cannot be called on 
this action");
+            }
+
+            @Override
+            public ScriptedAction later(int waitTime) {
+                throw new UnsupportedOperationException("Cannot be called on 
this action");
+            }
+        };
+    }
+
     //----- Type specific with methods that perform simple equals checks
 
     public SaslOutcomeExpectation withCode(byte code) {
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index 2cc9caa2..c671b594 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -55,6 +55,8 @@ class ProtonTestClientTest extends TestPeerTestsBase {
 
             LOG.info("Test started, peer listening on: {}", remoteURI);
 
+            Thread.sleep(100);
+
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
@@ -90,6 +92,55 @@ class ProtonTestClientTest extends TestPeerTestsBase {
         }
     }
 
+    @Test
+    public void testTwoClientConnectionsHandlesOpenBeginAttach() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().respond();
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            // Server can accept two connection, although not at the same time.
+
+            try (ProtonTestClient client = new ProtonTestClient()) {
+                client.connect(remoteURI.getHost(), remoteURI.getPort());
+                client.expectAMQPHeader();
+                client.expectOpen();
+                client.expectBegin();
+                client.expectAttach();
+                client.dropAfterLastHandler(10);
+                client.remoteAMQPHeader().now();
+                client.remoteOpen().now();
+                client.remoteBegin().now();
+                client.remoteAttach().ofSender().now();
+                client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            }
+
+            try (ProtonTestClient client = new ProtonTestClient()) {
+                client.connect(remoteURI.getHost(), remoteURI.getPort());
+                client.expectAMQPHeader();
+                client.expectOpen();
+                client.expectBegin();
+                client.expectAttach();
+                client.remoteAMQPHeader().now();
+                client.remoteOpen().now();
+                client.remoteBegin().now();
+                client.remoteAttach().ofSender().now();
+                client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            }
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
     @Test
     public void testClientDetectsUnexpectedPerformativeResponseToAMQPHeader() 
throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to