This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new eab91726 PROTON-2739 Better handle cases of scripted connect and reconnect eab91726 is described below commit eab91726b7259f5779ed73ccba0b80e0903afc10 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri May 26 15:23:01 2023 -0400 PROTON-2739 Better handle cases of scripted connect and reconnect Better handle cases of a peer connection that drops in certain scenarios and then reconnects. Adds an expectation element to let the script await the connection drop before proceeding with additional actions etc. --- .../qpid/protonj2/test/driver/AMQPTestDriver.java | 36 +++++++++--- .../qpid/protonj2/test/driver/DriverSessions.java | 9 +++ .../qpid/protonj2/test/driver/ScriptWriter.java | 66 ++++++++++++---------- .../expectations/ConnectionDropExpectation.java | 38 +++++++++++++ .../expectations/SaslOutcomeExpectation.java | 28 +++++++++ .../protonj2/test/driver/ProtonTestClientTest.java | 51 +++++++++++++++++ 6 files changed, 189 insertions(+), 39 deletions(-) diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java index 9403e01d..a13d9c3a 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java @@ -36,6 +36,8 @@ import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat; import org.apache.qpid.protonj2.test.driver.codec.transport.Open; import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType; import org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError; +import org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation; +import org.apache.qpid.protonj2.test.driver.expectations.SaslOutcomeExpectation; import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -255,10 +257,20 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { void handleConnectedDropped() throws AssertionError { synchronized (script) { - // For now we just reset the parse as any new connection would need to + // For now we just reset the parser as any new connection would need to // send an AMQP header, other validation could be added if we expand // processing on client disconnect events. - frameParser.resetToExpectingHeader(); + resetToExpectingAMQPHeader(); + // Reset connection tracking state to empty as the connection is gone + // and we want new connection to be able to reuse handles etc. + sessions.reset(); + // Check if the currently pending scripted expectation is for the connection + // to drop in which case we remove it and unblock any waiters on the script + // to complete, if this is the final scripted entry. + final ScriptedElement scriptEntry = script.peek(); + if (scriptEntry instanceof ConnectionDropExpectation) { + processScript(script.poll()); + } } } @@ -294,12 +306,6 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { } try { - // When the outcome of SASL is read the decoder should revert to initial state - // as the only valid next incoming value is an AMQP header. - if (sasl instanceof SaslOutcome) { - frameParser.resetToExpectingHeader(); - } - sasl.invoke(scriptEntry, frameSize, this); } catch (UnexpectedPerformativeError e) { if (scriptEntry.isOptional()) { @@ -381,6 +387,18 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { //----- Test driver actions + /** + * Resets the frame parser to a state where it expects to read an AMQP Header type instead + * of a normal AMQP frame type. By default the parser starts in this state and then switches + * to frames after the first header is read. In cases where SASL is in use this needs to be + * reset to the header read state when the outcome is known. Normally the script should handle + * this via a {@link SaslOutcomeExpectation} but other script variations might want to drive + * this manually. + */ + public void resetToExpectingAMQPHeader() { + this.frameParser.resetToExpectingHeader(); + } + /** * Waits indefinitely for the scripted expectations and actions to be performed. If the script * execution encounters an error this method will throw an {@link AssertionError} that describes @@ -798,7 +816,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> { } private void processScript(ScriptedElement current) { - while (current.performAfterwards() != null && failureCause == null) { + if (current.performAfterwards() != null && failureCause == null) { current.performAfterwards().perform(this); } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java index 92a07beb..a9c3ef5a 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/DriverSessions.java @@ -71,6 +71,15 @@ public class DriverSessions { return remoteSessions.get(remoteChannel); } + public void reset() { + localSessions.clear(); + remoteSessions.clear(); + + lastRemotelyOpenedSession = null; + lastLocallyOpenedSession = null; + lastCoordinator = null; + } + //----- Process performatives that require session level tracking public SessionTracker handleBegin(Begin remoteBegin, UnsignedShort remoteChannel) { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java index 38cddeae..62ac6899 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ScriptWriter.java @@ -56,6 +56,7 @@ import org.apache.qpid.protonj2.test.driver.expectations.AMQPHeaderExpectation; import org.apache.qpid.protonj2.test.driver.expectations.AttachExpectation; import org.apache.qpid.protonj2.test.driver.expectations.BeginExpectation; import org.apache.qpid.protonj2.test.driver.expectations.CloseExpectation; +import org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation; import org.apache.qpid.protonj2.test.driver.expectations.DeclareExpectation; import org.apache.qpid.protonj2.test.driver.expectations.DetachExpectation; import org.apache.qpid.protonj2.test.driver.expectations.DischargeExpectation; @@ -88,67 +89,73 @@ public abstract class ScriptWriter { //----- AMQP Performative expectations public AMQPHeaderExpectation expectAMQPHeader() { - AMQPHeaderExpectation expecting = new AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver()); + final AMQPHeaderExpectation expecting = new AMQPHeaderExpectation(AMQPHeader.getAMQPHeader(), getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public OpenExpectation expectOpen() { - OpenExpectation expecting = new OpenExpectation(getDriver()); + final OpenExpectation expecting = new OpenExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public CloseExpectation expectClose() { - CloseExpectation expecting = new CloseExpectation(getDriver()); + final CloseExpectation expecting = new CloseExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public BeginExpectation expectBegin() { - BeginExpectation expecting = new BeginExpectation(getDriver()); + final BeginExpectation expecting = new BeginExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public EndExpectation expectEnd() { - EndExpectation expecting = new EndExpectation(getDriver()); + final EndExpectation expecting = new EndExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public AttachExpectation expectAttach() { - AttachExpectation expecting = new AttachExpectation(getDriver()); + final AttachExpectation expecting = new AttachExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public DetachExpectation expectDetach() { - DetachExpectation expecting = new DetachExpectation(getDriver()); + final DetachExpectation expecting = new DetachExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public FlowExpectation expectFlow() { - FlowExpectation expecting = new FlowExpectation(getDriver()); + final FlowExpectation expecting = new FlowExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public TransferExpectation expectTransfer() { - TransferExpectation expecting = new TransferExpectation(getDriver()); + final TransferExpectation expecting = new TransferExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public DispositionExpectation expectDisposition() { - DispositionExpectation expecting = new DispositionExpectation(getDriver()); + final DispositionExpectation expecting = new DispositionExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public EmptyFrameExpectation expectEmptyFrame() { - EmptyFrameExpectation expecting = new EmptyFrameExpectation(getDriver()); + final EmptyFrameExpectation expecting = new EmptyFrameExpectation(getDriver()); + getDriver().addScriptedElement(expecting); + return expecting; + } + + public ConnectionDropExpectation expectConnectionToDrop() { + final ConnectionDropExpectation expecting = new ConnectionDropExpectation(); getDriver().addScriptedElement(expecting); return expecting; } @@ -156,7 +163,7 @@ public abstract class ScriptWriter { //----- Transaction expectations public AttachExpectation expectCoordinatorAttach() { - AttachExpectation expecting = new AttachExpectation(getDriver()); + final AttachExpectation expecting = new AttachExpectation(getDriver()); expecting.withRole(Role.SENDER); expecting.withCoordinator(isA(Coordinator.class)); @@ -167,7 +174,7 @@ public abstract class ScriptWriter { } public DeclareExpectation expectDeclare() { - DeclareExpectation expecting = new DeclareExpectation(getDriver()); + final DeclareExpectation expecting = new DeclareExpectation(getDriver()); expecting.withHandle(notNullValue()); expecting.withDeliveryId(notNullValue()); @@ -179,7 +186,7 @@ public abstract class ScriptWriter { } public DischargeExpectation expectDischarge() { - DischargeExpectation expecting = new DischargeExpectation(getDriver()); + final DischargeExpectation expecting = new DischargeExpectation(getDriver()); expecting.withHandle(notNullValue()); expecting.withDeliveryId(notNullValue()); @@ -193,37 +200,37 @@ public abstract class ScriptWriter { //----- SASL performative expectations public AMQPHeaderExpectation expectSASLHeader() { - AMQPHeaderExpectation expecting = new AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver()); + final AMQPHeaderExpectation expecting = new AMQPHeaderExpectation(AMQPHeader.getSASLHeader(), getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public SaslMechanismsExpectation expectSaslMechanisms() { - SaslMechanismsExpectation expecting = new SaslMechanismsExpectation(getDriver()); + final SaslMechanismsExpectation expecting = new SaslMechanismsExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public SaslInitExpectation expectSaslInit() { - SaslInitExpectation expecting = new SaslInitExpectation(getDriver()); + final SaslInitExpectation expecting = new SaslInitExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public SaslChallengeExpectation expectSaslChallenge() { - SaslChallengeExpectation expecting = new SaslChallengeExpectation(getDriver()); + final SaslChallengeExpectation expecting = new SaslChallengeExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public SaslResponseExpectation expectSaslResponse() { - SaslResponseExpectation expecting = new SaslResponseExpectation(getDriver()); + final SaslResponseExpectation expecting = new SaslResponseExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } public SaslOutcomeExpectation expectSaslOutcome() { - SaslOutcomeExpectation expecting = new SaslOutcomeExpectation(getDriver()); + final SaslOutcomeExpectation expecting = new SaslOutcomeExpectation(getDriver()); getDriver().addScriptedElement(expecting); return expecting; } @@ -628,9 +635,9 @@ public abstract class ScriptWriter { //----- Utility methods for tests writing raw scripted SASL tests public byte[] saslPlainInitialResponse(String username, String password) { - byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); - byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); - byte[] initialResponse = new byte[usernameBytes.length+passwordBytes.length+2]; + final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); + final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); + final byte[] initialResponse = new byte[usernameBytes.length+passwordBytes.length+2]; System.arraycopy(usernameBytes, 0, initialResponse, 1, usernameBytes.length); System.arraycopy(passwordBytes, 0, initialResponse, 2 + usernameBytes.length, passwordBytes.length); @@ -638,9 +645,9 @@ public abstract class ScriptWriter { } public byte[] saslXOauth2InitialResponse(String username, String password) { - byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); - byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); - byte[] initialResponse = new byte[usernameBytes.length+passwordBytes.length+20]; + final byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); + final byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); + final byte[] initialResponse = new byte[usernameBytes.length+passwordBytes.length+20]; System.arraycopy("user=".getBytes(StandardCharsets.US_ASCII), 0, initialResponse, 0, 5); System.arraycopy(usernameBytes, 0, initialResponse, 5, usernameBytes.length); @@ -665,9 +672,9 @@ public abstract class ScriptWriter { * @throws IllegalStateException if no Begin has yet been received from the remote. */ public BeginInjectAction respondToLastBegin() { - BeginInjectAction response = new BeginInjectAction(getDriver()); + final BeginInjectAction response = new BeginInjectAction(getDriver()); - SessionTracker session = getDriver().sessions().getLastRemotelyOpenedSession(); + final SessionTracker session = getDriver().sessions().getLastRemotelyOpenedSession(); if (session == null) { throw new IllegalStateException("Cannot create response to Begin before one has been received."); } @@ -688,8 +695,7 @@ public abstract class ScriptWriter { * @throws IllegalStateException if no Attach has yet been received from the remote. */ public AttachInjectAction respondToLastAttach() { - AttachInjectAction response = new AttachInjectAction(getDriver()); - + final AttachInjectAction response = new AttachInjectAction(getDriver()); final SessionTracker session = getDriver().sessions().getLastRemotelyOpenedSession(); final LinkTracker link = session.getLastRemotelyOpenedLink(); diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java new file mode 100644 index 00000000..d57f26b0 --- /dev/null +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/ConnectionDropExpectation.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.protonj2.test.driver.expectations; + +import org.apache.qpid.protonj2.test.driver.ScriptedExpectation; + +/** + * Expectation used to script an expected connection drop from the remotely + * connected peer. + * <p> + * This expectation type is best used to await a remote peer response of + * dropping the connection in relation to some scripted action that will + * result in it closing its side of the connection. + */ +public class ConnectionDropExpectation implements ScriptedExpectation { + + /** + * Creates a simple connection dropped expectation instance. + */ + public ConnectionDropExpectation() { + // No setup needed as the driver should handle this explicitly. + } +} diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java index 36c81e8d..eecf87e2 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java @@ -19,6 +19,7 @@ package org.apache.qpid.protonj2.test.driver.expectations; import static org.hamcrest.CoreMatchers.equalTo; import org.apache.qpid.protonj2.test.driver.AMQPTestDriver; +import org.apache.qpid.protonj2.test.driver.ScriptedAction; import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType; import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte; @@ -38,6 +39,33 @@ public class SaslOutcomeExpectation extends AbstractExpectation<SaslOutcome> { super(driver); } + @Override + public ScriptedAction performAfterwards() { + return new ScriptedAction() { + + @Override + public ScriptedAction queue() { + throw new UnsupportedOperationException("Cannot be called on this action"); + } + + @Override + public ScriptedAction perform(AMQPTestDriver driver) { + driver.resetToExpectingAMQPHeader(); + return null; + } + + @Override + public ScriptedAction now() { + throw new UnsupportedOperationException("Cannot be called on this action"); + } + + @Override + public ScriptedAction later(int waitTime) { + throw new UnsupportedOperationException("Cannot be called on this action"); + } + }; + } + //----- Type specific with methods that perform simple equals checks public SaslOutcomeExpectation withCode(byte code) { diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java index 2cc9caa2..c671b594 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java @@ -55,6 +55,8 @@ class ProtonTestClientTest extends TestPeerTestsBase { LOG.info("Test started, peer listening on: {}", remoteURI); + Thread.sleep(100); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } @@ -90,6 +92,55 @@ class ProtonTestClientTest extends TestPeerTestsBase { } } + @Test + public void testTwoClientConnectionsHandlesOpenBeginAttach() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().respond(); + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + // Server can accept two connection, although not at the same time. + + try (ProtonTestClient client = new ProtonTestClient()) { + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach(); + client.dropAfterLastHandler(10); + client.remoteAMQPHeader().now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + + try (ProtonTestClient client = new ProtonTestClient()) { + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach(); + client.remoteAMQPHeader().now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + + LOG.info("Test started, peer listening on: {}", remoteURI); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } @Test public void testClientDetectsUnexpectedPerformativeResponseToAMQPHeader() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org