This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new e65f05ba PROTON-2845 Expand the test peer APIs for handling dispositions e65f05ba is described below commit e65f05ba774b899f97f2b0dac22f97169f4b7f73 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Mon Aug 5 18:33:48 2024 -0400 PROTON-2845 Expand the test peer APIs for handling dispositions Expands the test peer API for scripting sends of and matching of dispositions. --- .../protonj2/client/impl/TransactionsTest.java | 61 ++++++++- .../driver/actions/DispositionInjectAction.java | 28 ++++ .../test/driver/actions/TransferInjectAction.java | 49 ++++++- .../expectations/DispositionExpectation.java | 151 ++++++++++++++++++--- .../driver/matchers/messaging/RejectedMatcher.java | 28 ++++ .../protonj2/test/driver/ReceiverHandlingTest.java | 123 +++++++++++++++++ 6 files changed, 415 insertions(+), 25 deletions(-) diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java index 55429625..1a640daa 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java @@ -1536,7 +1536,7 @@ public class TransactionsTest extends ImperativeClientTestCase { } @Test - public void testAcceptAndRejectInSameTransaction() throws Exception { + public void testAcceptAndReleaseInSameTransaction() throws Exception { final byte[] txnId = new byte[] { 0, 1, 2, 3 }; try (ProtonTestServer peer = new ProtonTestServer()) { @@ -1603,4 +1603,63 @@ public class TransactionsTest extends ImperativeClientTestCase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testModifiedDispositionInTransaction() throws Exception { + final byte[] txnId = new byte[] { 0, 1, 2, 3 }; + + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond(); + peer.expectFlow(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello World")); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); + Session session = connection.openSession(); + ReceiverOptions options = new ReceiverOptions().autoAccept(false).autoSettle(false); + Receiver receiver = session.openReceiver("test-queue", options).openFuture().get(); + + peer.expectCoordinatorAttach().respond(); + peer.remoteFlow().withLinkCredit(2).queue(); + peer.expectDeclare().accept(txnId); + peer.remoteTransfer().withHandle(0) + .withDeliveryId(0) + .withDeliveryTag(new byte[] { 1 }) + .withMore(false) + .withMessageFormat(0) + .withPayload(payload).queue(); + peer.expectDisposition().withSettled(true) + .withState() + .transactional() + .withTxnId(txnId) + .withModified(true, true); + peer.expectDischarge().withFail(false).withTxnId(txnId).accept(); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + session.beginTransaction(); + + final Delivery delivery = receiver.receive(100, TimeUnit.MILLISECONDS); + + assertNotNull(delivery); + assertFalse(delivery.settled()); + assertNull(delivery.state()); + + delivery.modified(true, true); + + session.commitTransaction(); + receiver.closeAsync(); + connection.closeAsync().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java index b4f7958d..114efbb3 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.protonj2.test.driver.actions; +import java.util.Map; + import org.apache.qpid.protonj2.test.driver.AMQPTestDriver; import org.apache.qpid.protonj2.test.driver.SessionTracker; import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted; @@ -31,6 +33,7 @@ import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState; import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition; import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition; import org.apache.qpid.protonj2.test.driver.codec.transport.Role; +import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper; /** * AMQP Disposition injection action which can be added to a driver for write at a specific time or @@ -169,6 +172,11 @@ public class DispositionInjectAction extends AbstractPerformativeInjectAction<Di return DispositionInjectAction.this; } + public DispositionInjectAction modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations))); + return DispositionInjectAction.this; + } + public TransactionalStateBuilder transactional() { TransactionalStateBuilder builder = new TransactionalStateBuilder(DispositionInjectAction.this); withState(builder.getState()); @@ -236,6 +244,21 @@ public class DispositionInjectAction extends AbstractPerformativeInjectAction<Di return this; } + public TransactionalStateBuilder withRejected(Symbol condition, String description) { + withOutcome(new Rejected().setError(new ErrorCondition(condition, description))); + return this; + } + + public TransactionalStateBuilder withRejected(String condition, String description, Map<String, Object> info) { + withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info)))); + return this; + } + + public TransactionalStateBuilder withRejected(Symbol condition, String description, Map<Symbol, Object> info) { + withOutcome(new Rejected().setError(new ErrorCondition(condition, description, info))); + return this; + } + public TransactionalStateBuilder withModified() { withOutcome(new Modified()); return this; @@ -250,5 +273,10 @@ public class DispositionInjectAction extends AbstractPerformativeInjectAction<Di withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere)); return this; } + + public TransactionalStateBuilder withModified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations))); + return this; + } } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java index f4d561a5..b95951be 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java @@ -52,6 +52,7 @@ import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState; import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition; import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode; import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer; +import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper; /** * AMQP Close injection action which can be added to a driver for write at a specific time or @@ -347,6 +348,10 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans public TransferInjectAction also() { return TransferInjectAction.this; } + + public TransferInjectAction and() { + return TransferInjectAction.this; + } } public final class HeaderBuilder extends SectionBuilder { @@ -582,18 +587,38 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans return TransferInjectAction.this; } + public TransferInjectAction rejected(Symbol condition, String description) { + withState(new Rejected().setError(new ErrorCondition(condition, description))); + return TransferInjectAction.this; + } + + public TransferInjectAction rejected(String condition, String description, Map<String, Object> info) { + withState(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info)))); + return TransferInjectAction.this; + } + + public TransferInjectAction rejected(Symbol condition, String description, Map<Symbol, Object> info) { + withState(new Rejected().setError(new ErrorCondition(condition, description, info))); + return TransferInjectAction.this; + } + public TransferInjectAction modified() { withState(new Modified()); return TransferInjectAction.this; } public TransferInjectAction modified(boolean failed) { - withState(new Modified()); + withState(new Modified().setDeliveryFailed(failed)); return TransferInjectAction.this; } public TransferInjectAction modified(boolean failed, boolean undeliverableHere) { - withState(new Modified()); + withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere)); + return TransferInjectAction.this; + } + + public TransferInjectAction modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + withState(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations))); return TransferInjectAction.this; } @@ -664,6 +689,21 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans return this; } + public TransactionalStateBuilder withRejected(Symbol condition, String description) { + withOutcome(new Rejected().setError(new ErrorCondition(condition, description))); + return this; + } + + public TransactionalStateBuilder withRejected(String condition, String description, Map<String, Object> info) { + withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description, TypeMapper.toSymbolKeyedMap(info)))); + return this; + } + + public TransactionalStateBuilder withRejected(Symbol condition, String description, Map<Symbol, Object> info) { + withOutcome(new Rejected().setError(new ErrorCondition(condition, description, info))); + return this; + } + public TransactionalStateBuilder withModified() { withOutcome(new Modified()); return this; @@ -678,6 +718,11 @@ public class TransferInjectAction extends AbstractPerformativeInjectAction<Trans withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere)); return this; } + + public TransactionalStateBuilder modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations))); + return this; + } } public final class MessageBuilder extends SectionBuilder { diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java index 1e9b5936..add9614f 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java @@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import java.nio.ByteBuffer; +import java.util.Map; import java.util.Random; import java.util.function.Consumer; import java.util.function.Predicate; @@ -27,21 +28,21 @@ import java.util.function.Predicate; import org.apache.qpid.protonj2.test.driver.AMQPTestDriver; import org.apache.qpid.protonj2.test.driver.SessionTracker; import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType; -import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted; -import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified; -import org.apache.qpid.protonj2.test.driver.codec.messaging.Rejected; -import org.apache.qpid.protonj2.test.driver.codec.messaging.Released; import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary; import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger; import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort; -import org.apache.qpid.protonj2.test.driver.codec.transactions.Declared; import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState; import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition; -import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition; import org.apache.qpid.protonj2.test.driver.codec.transport.Role; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.ReleasedMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.transactions.DeclaredMatcher; import org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher; import org.apache.qpid.protonj2.test.driver.matchers.transport.DispositionMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.transport.ErrorConditionMatcher; import org.hamcrest.Matcher; /** @@ -197,37 +198,107 @@ public class DispositionExpectation extends AbstractExpectation<Disposition> { public final class DeliveryStateBuilder { public DispositionExpectation accepted() { - withState(Accepted.getInstance()); + withState(new AcceptedMatcher()); return DispositionExpectation.this; } public DispositionExpectation released() { - withState(Released.getInstance()); + withState(new ReleasedMatcher()); return DispositionExpectation.this; } public DispositionExpectation rejected() { - withState(new Rejected()); + withState(new RejectedMatcher()); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(String condition) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Symbol condition) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Matcher<?> condition) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition))); return DispositionExpectation.this; } public DispositionExpectation rejected(String condition, String description) { - withState(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description))); + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Symbol condition, String description) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(String condition, Matcher<?> description) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Symbol condition, Matcher<?> description) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(String condition, String description, Map<String, Object> info) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Symbol condition, String description, Map<String, Object> info) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(String condition, String description, Matcher<?> info) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Symbol condition, String description, Matcher<?> info) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Matcher<?> condition, Matcher<?> description) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return DispositionExpectation.this; + } + + public DispositionExpectation rejected(Matcher<?> condition, Matcher<?> description, Matcher<?> info) { + withState(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); return DispositionExpectation.this; } public DispositionExpectation modified() { - withState(new Modified()); + withState(new ModifiedMatcher()); return DispositionExpectation.this; } public DispositionExpectation modified(boolean failed) { - withState(new Modified()); + withState(new ModifiedMatcher().withDeliveryFailed(failed)); return DispositionExpectation.this; } public DispositionExpectation modified(boolean failed, boolean undeliverableHere) { - withState(new Modified()); + withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere)); + return DispositionExpectation.this; + } + + public DispositionExpectation modified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations)); + return DispositionExpectation.this; + } + + public DispositionExpectation modified(boolean failed, boolean undeliverableHere, Matcher<?> annotations) { + withState(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations)); return DispositionExpectation.this; } @@ -238,12 +309,13 @@ public class DispositionExpectation extends AbstractExpectation<Disposition> { rand.setSeed(System.nanoTime()); rand.nextBytes(txnId); - withState(new Declared().setTxnId(txnId)); + withState(new DeclaredMatcher().withTxnId(txnId)); + return DispositionExpectation.this; } public DispositionExpectation declared(byte[] txnId) { - withState(new Declared().setTxnId(txnId)); + withState(new DeclaredMatcher().withTxnId(txnId)); return DispositionExpectation.this; } @@ -307,37 +379,72 @@ public class DispositionExpectation extends AbstractExpectation<Disposition> { // ----- Add a layer to allow configuring the outcome without specific type dependencies public DispositionTransactionalStateMatcher withAccepted() { - super.withOutcome(Accepted.getInstance()); + super.withOutcome(new AcceptedMatcher()); return this; } public DispositionTransactionalStateMatcher withReleased() { - super.withOutcome(Released.getInstance()); + super.withOutcome(new ReleasedMatcher()); return this; } public DispositionTransactionalStateMatcher withRejected() { - super.withOutcome(new Rejected()); + super.withOutcome(new RejectedMatcher()); return this; } public DispositionTransactionalStateMatcher withRejected(String condition, String description) { - super.withOutcome(new Rejected().setError(new ErrorCondition(Symbol.valueOf(condition), description))); + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return this; + } + + public DispositionTransactionalStateMatcher withRejected(Symbol condition, String description) { + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description))); + return this; + } + + public DispositionTransactionalStateMatcher withRejected(String condition, String description, Map<String, Object> info) { + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return this; + } + + public DispositionTransactionalStateMatcher withRejected(Symbol condition, String description, Map<String, Object> info) { + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return this; + } + + public DispositionTransactionalStateMatcher withRejected(String condition, String description, Matcher<?> info) { + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); + return this; + } + + public DispositionTransactionalStateMatcher withRejected(Symbol condition, String description, Matcher<?> info) { + super.withOutcome(new RejectedMatcher().withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info))); return this; } public DispositionTransactionalStateMatcher withModified() { - super.withOutcome(new Modified()); + super.withOutcome(new ModifiedMatcher()); return this; } public DispositionTransactionalStateMatcher withModified(boolean failed) { - super.withOutcome(new Modified().setDeliveryFailed(failed)); + super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed)); return this; } public DispositionTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere) { - super.withOutcome(new Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere)); + super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere)); + return this; + } + + public DispositionTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere, Map<String, Object> annotations) { + super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations)); + return this; + } + + public DispositionTransactionalStateMatcher withModified(boolean failed, boolean undeliverableHere, Matcher<?> annotations) { + super.withOutcome(new ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations)); return this; } } diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java index 7e32cbd4..3ada7cea 100644 --- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java +++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java @@ -56,6 +56,18 @@ public class RejectedMatcher extends ListDescribedTypeMatcher { return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)); } + public RejectedMatcher withError(String condition, Matcher<?> description) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)); + } + + public RejectedMatcher withError(Symbol condition, String description) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)); + } + + public RejectedMatcher withError(Symbol condition, Matcher<?> description) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description)); + } + public RejectedMatcher withError(String condition, String description, Map<String, Object> info) { return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)); } @@ -64,6 +76,22 @@ public class RejectedMatcher extends ListDescribedTypeMatcher { return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info)); } + public RejectedMatcher withError(String condition, String description, Matcher<?> info) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)); + } + + public RejectedMatcher withError(Symbol condition, String description, Matcher<?> info) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)); + } + + public RejectedMatcher withError(String condition, Matcher<?> description, Matcher<?> info) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)); + } + + public RejectedMatcher withError(Symbol condition, Matcher<?> description, Matcher<?> info) { + return withError(new ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)); + } + //----- Matcher based with methods for more complex validation public RejectedMatcher withError(Matcher<?> m) { diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java index 408c2cac..d468c8a4 100644 --- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java +++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java @@ -20,6 +20,7 @@ package org.apache.qpid.protonj2.test.driver; import static org.junit.jupiter.api.Assertions.assertThrows; import java.net.URI; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader; @@ -476,4 +477,126 @@ class ReceiverHandlingTest extends TestPeerTestsBase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testReceiverSendsRejectedDisposition() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respondInKind(); + peer.remoteFlow().withLinkCredit(1).queue(); + peer.expectTransfer().respond().withSettled(true).withState().rejected("error", "Error Code: 111222"); + peer.expectEnd().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver(); + client.expectFlow().withLinkCredit(1); + client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue(); + client.expectDisposition().withSettled(true).withState().rejected("error", Matchers.containsString("111222")); + client.remoteEnd().queue(); + client.expectEnd(); + + // Initiate the exchange + client.remoteAMQPHeader().now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testReceiverSendsModifiedDisposition() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respondInKind(); + peer.remoteFlow().withLinkCredit(1).queue(); + peer.expectTransfer().respond().withSettled(true).withState().modified(true, true); + peer.expectEnd().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver(); + client.expectFlow().withLinkCredit(1); + client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue(); + client.expectDisposition().withSettled(true).withState().modified(true, true); + client.remoteEnd().queue(); + client.expectEnd(); + + // Initiate the exchange + client.remoteAMQPHeader().now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + + @Test + public void testReceiverSendsModifiedDispositionWithAnnotations() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer(); + ProtonTestClient client = new ProtonTestClient()) { + + final Map<String, Object> annotations = Map.of("test", "value"); + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respondInKind(); + peer.remoteFlow().withLinkCredit(1).queue(); + peer.expectTransfer().respond().withSettled(true).withState().modified(true, true, annotations); + peer.expectEnd().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + client.connect(remoteURI.getHost(), remoteURI.getPort()); + client.expectAMQPHeader(); + client.expectOpen(); + client.expectBegin(); + client.expectAttach().ofReceiver(); + client.expectFlow().withLinkCredit(1); + client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue(); + client.expectDisposition().withSettled(true).withState().modified(true, true, annotations); + client.remoteEnd().queue(); + client.expectEnd(); + + // Initiate the exchange + client.remoteAMQPHeader().now(); + client.remoteOpen().now(); + client.remoteBegin().now(); + client.remoteAttach().ofSender().now(); + + client.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org