This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new e65f05ba PROTON-2845 Expand the test peer APIs for handling 
dispositions
e65f05ba is described below

commit e65f05ba774b899f97f2b0dac22f97169f4b7f73
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Mon Aug 5 18:33:48 2024 -0400

    PROTON-2845 Expand the test peer APIs for handling dispositions
    
    Expands the test peer API for scripting sends of and matching of
    dispositions.
---
 .../protonj2/client/impl/TransactionsTest.java     |  61 ++++++++-
 .../driver/actions/DispositionInjectAction.java    |  28 ++++
 .../test/driver/actions/TransferInjectAction.java  |  49 ++++++-
 .../expectations/DispositionExpectation.java       | 151 ++++++++++++++++++---
 .../driver/matchers/messaging/RejectedMatcher.java |  28 ++++
 .../protonj2/test/driver/ReceiverHandlingTest.java | 123 +++++++++++++++++
 6 files changed, 415 insertions(+), 25 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 55429625..1a640daa 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -1536,7 +1536,7 @@ public class TransactionsTest extends 
ImperativeClientTestCase {
     }
 
     @Test
-    public void testAcceptAndRejectInSameTransaction() throws Exception {
+    public void testAcceptAndReleaseInSameTransaction() throws Exception {
         final byte[] txnId = new byte[] { 0, 1, 2, 3 };
 
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1603,4 +1603,63 @@ public class TransactionsTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testModifiedDispositionInTransaction() throws Exception {
+        final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow();
+            peer.start();
+
+            final URI remoteURI = peer.getServerURI();
+            final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession();
+            ReceiverOptions options = new 
ReceiverOptions().autoAccept(false).autoSettle(false);
+            Receiver receiver = session.openReceiver("test-queue", 
options).openFuture().get();
+
+            peer.expectCoordinatorAttach().respond();
+            peer.remoteFlow().withLinkCredit(2).queue();
+            peer.expectDeclare().accept(txnId);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withSettled(true)
+                                    .withState()
+                                    .transactional()
+                                    .withTxnId(txnId)
+                                    .withModified(true, true);
+            peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            session.beginTransaction();
+
+            final Delivery delivery = receiver.receive(100, 
TimeUnit.MILLISECONDS);
+
+            assertNotNull(delivery);
+            assertFalse(delivery.settled());
+            assertNull(delivery.state());
+
+            delivery.modified(true, true);
+
+            session.commitTransaction();
+            receiver.closeAsync();
+            connection.closeAsync().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
index b4f7958d..114efbb3 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.protonj2.test.driver.actions;
 
+import java.util.Map;
+
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
 import org.apache.qpid.protonj2.test.driver.SessionTracker;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
@@ -31,6 +33,7 @@ import 
org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
 import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
+import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;
 
 /**
  * AMQP Disposition injection action which can be added to a driver for write 
at a specific time or
@@ -169,6 +172,11 @@ public class DispositionInjectAction extends 
AbstractPerformativeInjectAction<Di
             return DispositionInjectAction.this;
         }
 
+        public DispositionInjectAction modified(boolean failed, boolean 
undeliverableHere, Map<String, Object> annotations) {
+            withState(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+            return DispositionInjectAction.this;
+        }
+
         public TransactionalStateBuilder transactional() {
             TransactionalStateBuilder builder = new 
TransactionalStateBuilder(DispositionInjectAction.this);
             withState(builder.getState());
@@ -236,6 +244,21 @@ public class DispositionInjectAction extends 
AbstractPerformativeInjectAction<Di
             return this;
         }
 
+        public TransactionalStateBuilder withRejected(Symbol condition, String 
description) {
+            withOutcome(new Rejected().setError(new ErrorCondition(condition, 
description)));
+            return this;
+        }
+
+        public TransactionalStateBuilder withRejected(String condition, String 
description, Map<String, Object> info) {
+            withOutcome(new Rejected().setError(new 
ErrorCondition(Symbol.valueOf(condition), description, 
TypeMapper.toSymbolKeyedMap(info))));
+            return this;
+        }
+
+        public TransactionalStateBuilder withRejected(Symbol condition, String 
description, Map<Symbol, Object> info) {
+            withOutcome(new Rejected().setError(new ErrorCondition(condition, 
description, info)));
+            return this;
+        }
+
         public TransactionalStateBuilder withModified() {
             withOutcome(new Modified());
             return this;
@@ -250,5 +273,10 @@ public class DispositionInjectAction extends 
AbstractPerformativeInjectAction<Di
             withOutcome(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
             return this;
         }
+
+        public TransactionalStateBuilder withModified(boolean failed, boolean 
undeliverableHere, Map<String, Object> annotations) {
+            withOutcome(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+            return this;
+        }
     }
 }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
index f4d561a5..b95951be 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
@@ -52,6 +52,7 @@ import 
org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
 import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
 import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
+import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;
 
 /**
  * AMQP Close injection action which can be added to a driver for write at a 
specific time or
@@ -347,6 +348,10 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
         public TransferInjectAction also() {
             return TransferInjectAction.this;
         }
+
+        public TransferInjectAction and() {
+            return TransferInjectAction.this;
+        }
     }
 
     public final class HeaderBuilder extends SectionBuilder {
@@ -582,18 +587,38 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
             return TransferInjectAction.this;
         }
 
+        public TransferInjectAction rejected(Symbol condition, String 
description) {
+            withState(new Rejected().setError(new ErrorCondition(condition, 
description)));
+            return TransferInjectAction.this;
+        }
+
+        public TransferInjectAction rejected(String condition, String 
description, Map<String, Object> info) {
+            withState(new Rejected().setError(new 
ErrorCondition(Symbol.valueOf(condition), description, 
TypeMapper.toSymbolKeyedMap(info))));
+            return TransferInjectAction.this;
+        }
+
+        public TransferInjectAction rejected(Symbol condition, String 
description, Map<Symbol, Object> info) {
+            withState(new Rejected().setError(new ErrorCondition(condition, 
description, info)));
+            return TransferInjectAction.this;
+        }
+
         public TransferInjectAction modified() {
             withState(new Modified());
             return TransferInjectAction.this;
         }
 
         public TransferInjectAction modified(boolean failed) {
-            withState(new Modified());
+            withState(new Modified().setDeliveryFailed(failed));
             return TransferInjectAction.this;
         }
 
         public TransferInjectAction modified(boolean failed, boolean 
undeliverableHere) {
-            withState(new Modified());
+            withState(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
+            return TransferInjectAction.this;
+        }
+
+        public TransferInjectAction modified(boolean failed, boolean 
undeliverableHere, Map<String, Object> annotations) {
+            withState(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
             return TransferInjectAction.this;
         }
 
@@ -664,6 +689,21 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
             return this;
         }
 
+        public TransactionalStateBuilder withRejected(Symbol condition, String 
description) {
+            withOutcome(new Rejected().setError(new ErrorCondition(condition, 
description)));
+            return this;
+        }
+
+        public TransactionalStateBuilder withRejected(String condition, String 
description, Map<String, Object> info) {
+            withOutcome(new Rejected().setError(new 
ErrorCondition(Symbol.valueOf(condition), description, 
TypeMapper.toSymbolKeyedMap(info))));
+            return this;
+        }
+
+        public TransactionalStateBuilder withRejected(Symbol condition, String 
description, Map<Symbol, Object> info) {
+            withOutcome(new Rejected().setError(new ErrorCondition(condition, 
description, info)));
+            return this;
+        }
+
         public TransactionalStateBuilder withModified() {
             withOutcome(new Modified());
             return this;
@@ -678,6 +718,11 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
             withOutcome(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
             return this;
         }
+
+        public TransactionalStateBuilder modified(boolean failed, boolean 
undeliverableHere, Map<String, Object> annotations) {
+            withOutcome(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+            return this;
+        }
     }
 
     public final class MessageBuilder extends SectionBuilder {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
index 1e9b5936..add9614f 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
@@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Random;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -27,21 +28,21 @@ import java.util.function.Predicate;
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
 import org.apache.qpid.protonj2.test.driver.SessionTracker;
 import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Rejected;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort;
-import org.apache.qpid.protonj2.test.driver.codec.transactions.Declared;
 import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
-import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.ReleasedMatcher;
+import 
org.apache.qpid.protonj2.test.driver.matchers.transactions.DeclaredMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transport.DispositionMatcher;
+import 
org.apache.qpid.protonj2.test.driver.matchers.transport.ErrorConditionMatcher;
 import org.hamcrest.Matcher;
 
 /**
@@ -197,37 +198,107 @@ public class DispositionExpectation extends 
AbstractExpectation<Disposition> {
     public final class DeliveryStateBuilder {
 
         public DispositionExpectation accepted() {
-            withState(Accepted.getInstance());
+            withState(new AcceptedMatcher());
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation released() {
-            withState(Released.getInstance());
+            withState(new ReleasedMatcher());
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation rejected() {
-            withState(new Rejected());
+            withState(new RejectedMatcher());
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(String condition) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Symbol condition) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Matcher<?> condition) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition)));
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation rejected(String condition, String 
description) {
-            withState(new Rejected().setError(new 
ErrorCondition(Symbol.valueOf(condition), description)));
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Symbol condition, String 
description) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(String condition, Matcher<?> 
description) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Symbol condition, Matcher<?> 
description) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(String condition, String 
description, Map<String, Object> info) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Symbol condition, String 
description, Map<String, Object> info) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(String condition, String 
description, Matcher<?> info) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Symbol condition, String 
description, Matcher<?> info) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Matcher<?> condition, 
Matcher<?> description) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation rejected(Matcher<?> condition, 
Matcher<?> description, Matcher<?> info) {
+            withState(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation modified() {
-            withState(new Modified());
+            withState(new ModifiedMatcher());
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation modified(boolean failed) {
-            withState(new Modified());
+            withState(new ModifiedMatcher().withDeliveryFailed(failed));
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation modified(boolean failed, boolean 
undeliverableHere) {
-            withState(new Modified());
+            withState(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation modified(boolean failed, boolean 
undeliverableHere, Map<String, Object> annotations) {
+            withState(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
+            return DispositionExpectation.this;
+        }
+
+        public DispositionExpectation modified(boolean failed, boolean 
undeliverableHere, Matcher<?> annotations) {
+            withState(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
             return DispositionExpectation.this;
         }
 
@@ -238,12 +309,13 @@ public class DispositionExpectation extends 
AbstractExpectation<Disposition> {
             rand.setSeed(System.nanoTime());
             rand.nextBytes(txnId);
 
-            withState(new Declared().setTxnId(txnId));
+            withState(new DeclaredMatcher().withTxnId(txnId));
+
             return DispositionExpectation.this;
         }
 
         public DispositionExpectation declared(byte[] txnId) {
-            withState(new Declared().setTxnId(txnId));
+            withState(new DeclaredMatcher().withTxnId(txnId));
             return DispositionExpectation.this;
         }
 
@@ -307,37 +379,72 @@ public class DispositionExpectation extends 
AbstractExpectation<Disposition> {
         // ----- Add a layer to allow configuring the outcome without specific 
type dependencies
 
         public DispositionTransactionalStateMatcher withAccepted() {
-            super.withOutcome(Accepted.getInstance());
+            super.withOutcome(new AcceptedMatcher());
             return this;
         }
 
         public DispositionTransactionalStateMatcher withReleased() {
-            super.withOutcome(Released.getInstance());
+            super.withOutcome(new ReleasedMatcher());
             return this;
         }
 
         public DispositionTransactionalStateMatcher withRejected() {
-            super.withOutcome(new Rejected());
+            super.withOutcome(new RejectedMatcher());
             return this;
         }
 
         public DispositionTransactionalStateMatcher withRejected(String 
condition, String description) {
-            super.withOutcome(new Rejected().setError(new 
ErrorCondition(Symbol.valueOf(condition), description)));
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withRejected(Symbol 
condition, String description) {
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withRejected(String 
condition, String description, Map<String, Object> info) {
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withRejected(Symbol 
condition, String description, Map<String, Object> info) {
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withRejected(String 
condition, String description, Matcher<?> info) {
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withRejected(Symbol 
condition, String description, Matcher<?> info) {
+            super.withOutcome(new RejectedMatcher().withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
             return this;
         }
 
         public DispositionTransactionalStateMatcher withModified() {
-            super.withOutcome(new Modified());
+            super.withOutcome(new ModifiedMatcher());
             return this;
         }
 
         public DispositionTransactionalStateMatcher withModified(boolean 
failed) {
-            super.withOutcome(new Modified().setDeliveryFailed(failed));
+            super.withOutcome(new 
ModifiedMatcher().withDeliveryFailed(failed));
             return this;
         }
 
         public DispositionTransactionalStateMatcher withModified(boolean 
failed, boolean undeliverableHere) {
-            super.withOutcome(new 
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
+            super.withOutcome(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withModified(boolean 
failed, boolean undeliverableHere, Map<String, Object> annotations) {
+            super.withOutcome(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
+            return this;
+        }
+
+        public DispositionTransactionalStateMatcher withModified(boolean 
failed, boolean undeliverableHere, Matcher<?> annotations) {
+            super.withOutcome(new 
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
             return this;
         }
     }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
index 7e32cbd4..3ada7cea 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
@@ -56,6 +56,18 @@ public class RejectedMatcher extends 
ListDescribedTypeMatcher {
         return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description));
     }
 
+    public RejectedMatcher withError(String condition, Matcher<?> description) 
{
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+    }
+
+    public RejectedMatcher withError(Symbol condition, String description) {
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+    }
+
+    public RejectedMatcher withError(Symbol condition, Matcher<?> description) 
{
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+    }
+
     public RejectedMatcher withError(String condition, String description, 
Map<String, Object> info) {
         return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
     }
@@ -64,6 +76,22 @@ public class RejectedMatcher extends 
ListDescribedTypeMatcher {
         return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info));
     }
 
+    public RejectedMatcher withError(String condition, String description, 
Matcher<?> info) {
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+    }
+
+    public RejectedMatcher withError(Symbol condition, String description, 
Matcher<?> info) {
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+    }
+
+    public RejectedMatcher withError(String condition, Matcher<?> description, 
Matcher<?> info) {
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+    }
+
+    public RejectedMatcher withError(Symbol condition, Matcher<?> description, 
Matcher<?> info) {
+        return withError(new 
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+    }
+
     //----- Matcher based with methods for more complex validation
 
     public RejectedMatcher withError(Matcher<?> m) {
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
index 408c2cac..d468c8a4 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.protonj2.test.driver;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.net.URI;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
@@ -476,4 +477,126 @@ class ReceiverHandlingTest extends TestPeerTestsBase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiverSendsRejectedDisposition() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respondInKind();
+            peer.remoteFlow().withLinkCredit(1).queue();
+            
peer.expectTransfer().respond().withSettled(true).withState().rejected("error", 
"Error Code: 111222");
+            peer.expectEnd().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver();
+            client.expectFlow().withLinkCredit(1);
+            
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+            
client.expectDisposition().withSettled(true).withState().rejected("error", 
Matchers.containsString("111222"));
+            client.remoteEnd().queue();
+            client.expectEnd();
+
+            // Initiate the exchange
+            client.remoteAMQPHeader().now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testReceiverSendsModifiedDisposition() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respondInKind();
+            peer.remoteFlow().withLinkCredit(1).queue();
+            
peer.expectTransfer().respond().withSettled(true).withState().modified(true, 
true);
+            peer.expectEnd().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver();
+            client.expectFlow().withLinkCredit(1);
+            
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+            
client.expectDisposition().withSettled(true).withState().modified(true, true);
+            client.remoteEnd().queue();
+            client.expectEnd();
+
+            // Initiate the exchange
+            client.remoteAMQPHeader().now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testReceiverSendsModifiedDispositionWithAnnotations() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            final Map<String, Object> annotations = Map.of("test", "value");
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respondInKind();
+            peer.remoteFlow().withLinkCredit(1).queue();
+            
peer.expectTransfer().respond().withSettled(true).withState().modified(true, 
true, annotations);
+            peer.expectEnd().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver();
+            client.expectFlow().withLinkCredit(1);
+            
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+            
client.expectDisposition().withSettled(true).withState().modified(true, true, 
annotations);
+            client.remoteEnd().queue();
+            client.expectEnd();
+
+            // Initiate the exchange
+            client.remoteAMQPHeader().now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org


Reply via email to