This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new a52bc0f PROTON-2383 Fixes, tests and enhancements for transaction
testing
a52bc0f is described below
commit a52bc0f2205107ad4691598c0f3c22430f98219a
Author: Timothy Bish <[email protected]>
AuthorDate: Mon May 24 16:04:15 2021 -0400
PROTON-2383 Fixes, tests and enhancements for transaction testing
Fixes some issues and adds tests for the test driver when used to test
AMQP transactions and adds some better auto complete for performative
fields when scripting tests.
---
.../actions/DetachLastCoordinatorInjectAction.java | 37 +++-
.../test/driver/codec/transactions/Declared.java | 4 +
.../driver/expectations/DeclareExpectation.java | 31 +++
.../expectations/DispositionExpectation.java | 19 ++
.../test/driver/TransactionHandlingTest.java | 220 +++++++++++++++++++++
.../engine/impl/ProtonTransactionManager.java | 5 +-
.../engine/impl/ProtonTransactionManagerTest.java | 16 +-
7 files changed, 317 insertions(+), 15 deletions(-)
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java
index b4b1441..7bd1d26 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DetachLastCoordinatorInjectAction.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.test.driver.actions;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.LinkTracker;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
/**
* Detach actions that ignores any other handle or channel configuration
@@ -41,12 +42,40 @@ public class DetachLastCoordinatorInjectAction extends
DetachInjectAction {
throw new AssertionError("Cannot send coordinator dectach as
scripted, no active coordinator found.");
}
+ onChannel(tracker.getSession().getLocalChannel().intValue());
+
if (!tracker.isLocallyAttached()) {
- // TODO: We could attempt to create an attach here and send that
first
- throw new AssertionError("Cannot send coordinator dectach as
scripted, Coordinator link was not locally attached.");
- }
+ AttachInjectAction attach = new AttachInjectAction(driver);
- onChannel(tracker.getSession().getLocalChannel().intValue());
+ attach.onChannel(onChannel());
+ attach.withName(tracker.getName());
+ attach.withSource(tracker.getRemoteSource());
+ if (tracker.getRemoteTarget() != null) {
+ attach.withTarget(tracker.getRemoteTarget());
+ } else {
+ attach.withTarget(tracker.getRemoteCoordinator());
+ }
+
+ if (tracker.isSender()) {
+ attach.withRole(Role.SENDER);
+ // Signal that a detach is incoming since an error was set
+ // the action will not override an explicitly null source.
+ if (getPerformative().getError() != null) {
+ attach.withNullSource();
+ }
+ } else {
+ attach.withRole(Role.RECEIVER);
+ // Signal that a detach is incoming since an error was set
+ // the action will not override an explicitly null target.
+ if (getPerformative().getError() != null) {
+ if (getPerformative().getError() != null) {
+ attach.withNullTarget();
+ }
+ }
+ }
+
+ attach.perform(driver);
+ }
getPerformative().setHandle(tracker.getHandle());
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transactions/Declared.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transactions/Declared.java
index fc2026c..f2cc44f 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transactions/Declared.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/codec/transactions/Declared.java
@@ -56,6 +56,10 @@ public class Declared extends ListDescribedType implements
DeliveryState, Outcom
return DESCRIPTOR_SYMBOL;
}
+ public Declared setTxnId(byte[] array) {
+ return setTxnId(new Binary(array));
+ }
+
public Declared setTxnId(Binary o) {
getList().set(Field.TXN_ID.ordinal(), o);
return this;
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DeclareExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DeclareExpectation.java
index a0b867d..88708b3 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DeclareExpectation.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DeclareExpectation.java
@@ -49,6 +49,15 @@ public class DeclareExpectation extends TransferExpectation {
return accept(txnId);
}
+ /**
+ * Indicates a successful transaction declaration by returning a {@link
Declared}
+ * disposition with the given transaction Id.
+ *
+ * @param txnId
+ * byte array containing the transaction id that has been
declared.
+ *
+ * @return this {@link DispositionInjectAction} instance.
+ */
public DispositionInjectAction accept(byte[] txnId) {
response = new DispositionInjectAction(driver);
response.withSettled(true);
@@ -62,6 +71,28 @@ public class DeclareExpectation extends TransferExpectation {
return response;
}
+ /**
+ * Indicates a successful transaction declaration by returning a {@link
Declared}
+ * disposition with the given transaction Id.
+ *
+ * @param txnId
+ * byte array containing the transaction id that has been
declared.
+ *
+ * @return this {@link DispositionInjectAction} instance.
+ */
+ public DispositionInjectAction declared(byte[] txnId) {
+ response = new DispositionInjectAction(driver);
+ response.withSettled(true);
+ if (txnId != null) {
+ response.withState(new Declared().setTxnId(new Binary(txnId)));
+ } else {
+ response.withState(new Declared());
+ }
+
+ driver.addScriptedElement(response);
+ return response;
+ }
+
//----- Type specific with methods that perform simple equals checks
@Override
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
index 4d7afc4..246a5cb 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
@@ -19,6 +19,8 @@ package org.apache.qpid.protonj2.test.driver.expectations;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
+import java.util.Random;
+
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.SessionTracker;
import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
@@ -30,6 +32,7 @@ import
org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort;
+import org.apache.qpid.protonj2.test.driver.codec.transactions.Declared;
import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
@@ -215,6 +218,22 @@ public class DispositionExpectation extends
AbstractExpectation<Disposition> {
return DispositionExpectation.this;
}
+ public DispositionExpectation declared() {
+ final byte[] txnId = new byte[4];
+
+ Random rand = new Random();
+ rand.setSeed(System.nanoTime());
+ rand.nextBytes(txnId);
+
+ withState(new Declared().setTxnId(txnId));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation declared(byte[] txnId) {
+ withState(new Declared().setTxnId(txnId));
+ return DispositionExpectation.this;
+ }
+
public DispositionTransactionalStateMatcher transactional() {
DispositionTransactionalStateMatcher matcher = new
DispositionTransactionalStateMatcher(DispositionExpectation.this);
withState(matcher);
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/TransactionHandlingTest.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/TransactionHandlingTest.java
new file mode 100644
index 0000000..3690890
--- /dev/null
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/TransactionHandlingTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver;
+
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
+import org.apache.qpid.protonj2.test.driver.utils.TestPeerTestsBase;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test various aspects of how the test peer handles and responds to
transactional work.
+ */
+@Timeout(20)
+class TransactionHandlingTest extends TestPeerTestsBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SessionHandlingTest.class);
+
+ @Test
+ public void testCoordinatorAttachAndDetachHandling() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectCoordinatorAttach().ofSender().respond();
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().onChannel(0).withHandle(0);
+ client.expectDetach();
+ client.expectEnd();
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender()
+ .withCoordinator().also()
+ .withSource().withAddress("txn-address")
+ .and().now();
+ client.remoteDetach().now();
+ client.remoteEnd().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testRemoteDetachLastCoordiantorLink() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectCoordinatorAttach().ofSender().respond();
+ peer.remoteDetachLastCoordinatorLink().queue();
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver().onChannel(0).withHandle(0);
+ client.expectDetach();
+ client.expectEnd();
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender()
+ .withCoordinator().also()
+ .withSource().withAddress("txn-address")
+ .and().now();
+ client.remoteEnd().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testRemoteDetachLastCoordiantorLinkAttachesFirstIfNeeded()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectCoordinatorAttach().ofSender();
+ peer.remoteDetachLastCoordinatorLink().queue();
+ peer.expectDetach();
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver()
+ .onChannel(0)
+ .withHandle(0)
+ .withSource(notNullValue())
+ .withTarget(notNullValue())
+ .withName("txn");
+ client.expectDetach().respond();
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender()
+ .withName("txn")
+ .withCoordinator().also()
+ .withSource().withAddress("txn-address")
+ .and().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.expectEnd();
+ client.remoteEnd().now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testTxnDeclarationAndDischarge() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectCoordinatorAttach().ofSender().respond();
+ peer.remoteFlow().withLinkCredit(2).queue();
+ peer.expectDeclare().declared(new byte[] { 0, 1, 2, 3 });
+ peer.expectDischarge().accept();
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectCoordinatorAttach().ofReceiver();
+
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender()
+ .withName("txn")
+ .withCoordinator().also()
+ .withSource().withAddress("txn-address")
+ .and().now();
+ client.expectFlow().withLinkCredit(2);
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.expectDisposition().withState().declared(new byte[] {0, 1,
2, 3});
+ client.remoteDeclare().withDeliveryTag(new byte[]
{0}).withDeliveryId(0).now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.expectDisposition().withState().accepted();
+ client.expectDetach();
+ client.expectEnd();
+
+ client.remoteDischarge().withDeliveryId(1).withDeliveryTag(new
byte[] {1}).now();
+ client.remoteDetach().now();
+ client.remoteEnd().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ client.close();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+}
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManager.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManager.java
index bac5e9c..94821ed 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManager.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManager.java
@@ -41,9 +41,9 @@ import org.apache.qpid.protonj2.types.messaging.Rejected;
import org.apache.qpid.protonj2.types.messaging.Source;
import org.apache.qpid.protonj2.types.transactions.Coordinator;
import org.apache.qpid.protonj2.types.transactions.Declare;
+import org.apache.qpid.protonj2.types.transactions.Declared;
import org.apache.qpid.protonj2.types.transactions.Discharge;
import org.apache.qpid.protonj2.types.transactions.TransactionErrors;
-import org.apache.qpid.protonj2.types.transactions.TransactionalState;
import org.apache.qpid.protonj2.types.transport.ErrorCondition;
/**
@@ -124,8 +124,7 @@ public final class ProtonTransactionManager extends
ProtonEndpoint<TransactionMa
// Start tracking this transaction as active.
transactions.put(txnId.asProtonBuffer(), txn);
- TransactionalState declaration = new TransactionalState();
- declaration.setOutcome(Accepted.getInstance());
+ Declared declaration = new Declared();
declaration.setTxnId(txnId);
txn.getDeclare().disposition(declaration, true);
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManagerTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManagerTest.java
index e21653f..b1665a5 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManagerTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonTransactionManagerTest.java
@@ -468,7 +468,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withTxnId(TXN_ID).withFail(txnFailed).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
peer.expectDisposition().withState().accepted();
peer.expectDetach().respond();
@@ -612,7 +612,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withTxnId(TXN_ID).withFail(false).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
peer.expectDisposition().withState().rejected(failureError.getCondition().toString(),
failureError.getDescription());
peer.expectDetach().respond();
@@ -683,7 +683,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager.addCredit(1);
peer.waitForScriptToComplete();
-
peer.expectDisposition().withState().transactional().withTxnId(TXN_ID).withAccepted();
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
@@ -762,7 +762,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager.addCredit(1);
peer.waitForScriptToComplete();
-
peer.expectDisposition().withState().transactional().withTxnId(TXN_ID).withAccepted();
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.expectDetach().respond();
peer.expectEnd().respond();
peer.expectClose().respond();
@@ -864,7 +864,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager2.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withHandle(0).withTxnId(TXN_ID).withFail(false).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
peer.expectDisposition().withState().accepted();
peer.expectDetach().withHandle(0).respond();
@@ -1070,7 +1070,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager2.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withHandle(0).withTxnId(TXN_ID).withFail(false).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
// Starts the flow of Transaction frames
@@ -1179,7 +1179,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager2.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withHandle(0).withTxnId(TXN_ID).withFail(false).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
// Starts the flow of Transaction frames
@@ -1261,7 +1261,7 @@ class ProtonTransactionManagerTest extends
ProtonEngineTestSupport {
manager.addCredit(2);
peer.waitForScriptToComplete();
- peer.expectDisposition().withState().transactional().withTxnId(TXN_ID);
+ peer.expectDisposition().withState().declared(TXN_ID);
peer.remoteDischarge().withTxnId(TXN_ID_UNKNOWN).withFail(false).withDeliveryId(1).withDeliveryTag(new
byte[] {1}).queue();
peer.expectDisposition().withState().rejected(failureError.getCondition().toString(),
failureError.getDescription());
peer.expectDetach().respond();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]