This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new e65f05ba PROTON-2845 Expand the test peer APIs for handling
dispositions
e65f05ba is described below
commit e65f05ba774b899f97f2b0dac22f97169f4b7f73
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Aug 5 18:33:48 2024 -0400
PROTON-2845 Expand the test peer APIs for handling dispositions
Expands the test peer API for scripting sends of and matching of
dispositions.
---
.../protonj2/client/impl/TransactionsTest.java | 61 ++++++++-
.../driver/actions/DispositionInjectAction.java | 28 ++++
.../test/driver/actions/TransferInjectAction.java | 49 ++++++-
.../expectations/DispositionExpectation.java | 151 ++++++++++++++++++---
.../driver/matchers/messaging/RejectedMatcher.java | 28 ++++
.../protonj2/test/driver/ReceiverHandlingTest.java | 123 +++++++++++++++++
6 files changed, 415 insertions(+), 25 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 55429625..1a640daa 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -1536,7 +1536,7 @@ public class TransactionsTest extends
ImperativeClientTestCase {
}
@Test
- public void testAcceptAndRejectInSameTransaction() throws Exception {
+ public void testAcceptAndReleaseInSameTransaction() throws Exception {
final byte[] txnId = new byte[] { 0, 1, 2, 3 };
try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1603,4 +1603,63 @@ public class TransactionsTest extends
ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testModifiedDispositionInTransaction() throws Exception {
+ final byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow();
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession();
+ ReceiverOptions options = new
ReceiverOptions().autoAccept(false).autoSettle(false);
+ Receiver receiver = session.openReceiver("test-queue",
options).openFuture().get();
+
+ peer.expectCoordinatorAttach().respond();
+ peer.remoteFlow().withLinkCredit(2).queue();
+ peer.expectDeclare().accept(txnId);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withSettled(true)
+ .withState()
+ .transactional()
+ .withTxnId(txnId)
+ .withModified(true, true);
+ peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
+ peer.expectDetach().respond();
+ peer.expectClose().respond();
+
+ session.beginTransaction();
+
+ final Delivery delivery = receiver.receive(100,
TimeUnit.MILLISECONDS);
+
+ assertNotNull(delivery);
+ assertFalse(delivery.settled());
+ assertNull(delivery.state());
+
+ delivery.modified(true, true);
+
+ session.commitTransaction();
+ receiver.closeAsync();
+ connection.closeAsync().get();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
index b4f7958d..114efbb3 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/DispositionInjectAction.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.protonj2.test.driver.actions;
+import java.util.Map;
+
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.SessionTracker;
import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
@@ -31,6 +33,7 @@ import
org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
+import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;
/**
* AMQP Disposition injection action which can be added to a driver for write
at a specific time or
@@ -169,6 +172,11 @@ public class DispositionInjectAction extends
AbstractPerformativeInjectAction<Di
return DispositionInjectAction.this;
}
+ public DispositionInjectAction modified(boolean failed, boolean
undeliverableHere, Map<String, Object> annotations) {
+ withState(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+ return DispositionInjectAction.this;
+ }
+
public TransactionalStateBuilder transactional() {
TransactionalStateBuilder builder = new
TransactionalStateBuilder(DispositionInjectAction.this);
withState(builder.getState());
@@ -236,6 +244,21 @@ public class DispositionInjectAction extends
AbstractPerformativeInjectAction<Di
return this;
}
+ public TransactionalStateBuilder withRejected(Symbol condition, String
description) {
+ withOutcome(new Rejected().setError(new ErrorCondition(condition,
description)));
+ return this;
+ }
+
+ public TransactionalStateBuilder withRejected(String condition, String
description, Map<String, Object> info) {
+ withOutcome(new Rejected().setError(new
ErrorCondition(Symbol.valueOf(condition), description,
TypeMapper.toSymbolKeyedMap(info))));
+ return this;
+ }
+
+ public TransactionalStateBuilder withRejected(Symbol condition, String
description, Map<Symbol, Object> info) {
+ withOutcome(new Rejected().setError(new ErrorCondition(condition,
description, info)));
+ return this;
+ }
+
public TransactionalStateBuilder withModified() {
withOutcome(new Modified());
return this;
@@ -250,5 +273,10 @@ public class DispositionInjectAction extends
AbstractPerformativeInjectAction<Di
withOutcome(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
return this;
}
+
+ public TransactionalStateBuilder withModified(boolean failed, boolean
undeliverableHere, Map<String, Object> annotations) {
+ withOutcome(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+ return this;
+ }
}
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
index f4d561a5..b95951be 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
@@ -52,6 +52,7 @@ import
org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
+import org.apache.qpid.protonj2.test.driver.codec.util.TypeMapper;
/**
* AMQP Close injection action which can be added to a driver for write at a
specific time or
@@ -347,6 +348,10 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
public TransferInjectAction also() {
return TransferInjectAction.this;
}
+
+ public TransferInjectAction and() {
+ return TransferInjectAction.this;
+ }
}
public final class HeaderBuilder extends SectionBuilder {
@@ -582,18 +587,38 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
return TransferInjectAction.this;
}
+ public TransferInjectAction rejected(Symbol condition, String
description) {
+ withState(new Rejected().setError(new ErrorCondition(condition,
description)));
+ return TransferInjectAction.this;
+ }
+
+ public TransferInjectAction rejected(String condition, String
description, Map<String, Object> info) {
+ withState(new Rejected().setError(new
ErrorCondition(Symbol.valueOf(condition), description,
TypeMapper.toSymbolKeyedMap(info))));
+ return TransferInjectAction.this;
+ }
+
+ public TransferInjectAction rejected(Symbol condition, String
description, Map<Symbol, Object> info) {
+ withState(new Rejected().setError(new ErrorCondition(condition,
description, info)));
+ return TransferInjectAction.this;
+ }
+
public TransferInjectAction modified() {
withState(new Modified());
return TransferInjectAction.this;
}
public TransferInjectAction modified(boolean failed) {
- withState(new Modified());
+ withState(new Modified().setDeliveryFailed(failed));
return TransferInjectAction.this;
}
public TransferInjectAction modified(boolean failed, boolean
undeliverableHere) {
- withState(new Modified());
+ withState(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
+ return TransferInjectAction.this;
+ }
+
+ public TransferInjectAction modified(boolean failed, boolean
undeliverableHere, Map<String, Object> annotations) {
+ withState(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
return TransferInjectAction.this;
}
@@ -664,6 +689,21 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
return this;
}
+ public TransactionalStateBuilder withRejected(Symbol condition, String
description) {
+ withOutcome(new Rejected().setError(new ErrorCondition(condition,
description)));
+ return this;
+ }
+
+ public TransactionalStateBuilder withRejected(String condition, String
description, Map<String, Object> info) {
+ withOutcome(new Rejected().setError(new
ErrorCondition(Symbol.valueOf(condition), description,
TypeMapper.toSymbolKeyedMap(info))));
+ return this;
+ }
+
+ public TransactionalStateBuilder withRejected(Symbol condition, String
description, Map<Symbol, Object> info) {
+ withOutcome(new Rejected().setError(new ErrorCondition(condition,
description, info)));
+ return this;
+ }
+
public TransactionalStateBuilder withModified() {
withOutcome(new Modified());
return this;
@@ -678,6 +718,11 @@ public class TransferInjectAction extends
AbstractPerformativeInjectAction<Trans
withOutcome(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
return this;
}
+
+ public TransactionalStateBuilder modified(boolean failed, boolean
undeliverableHere, Map<String, Object> annotations) {
+ withOutcome(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere).setMessageAnnotations(TypeMapper.toSymbolKeyedMap(annotations)));
+ return this;
+ }
}
public final class MessageBuilder extends SectionBuilder {
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
index 1e9b5936..add9614f 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/DispositionExpectation.java
@@ -20,6 +20,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Predicate;
@@ -27,21 +28,21 @@ import java.util.function.Predicate;
import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
import org.apache.qpid.protonj2.test.driver.SessionTracker;
import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Accepted;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Rejected;
-import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedShort;
-import org.apache.qpid.protonj2.test.driver.codec.transactions.Declared;
import org.apache.qpid.protonj2.test.driver.codec.transport.DeliveryState;
import org.apache.qpid.protonj2.test.driver.codec.transport.Disposition;
-import org.apache.qpid.protonj2.test.driver.codec.transport.ErrorCondition;
import org.apache.qpid.protonj2.test.driver.codec.transport.Role;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.messaging.ReleasedMatcher;
+import
org.apache.qpid.protonj2.test.driver.matchers.transactions.DeclaredMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
import
org.apache.qpid.protonj2.test.driver.matchers.transport.DispositionMatcher;
+import
org.apache.qpid.protonj2.test.driver.matchers.transport.ErrorConditionMatcher;
import org.hamcrest.Matcher;
/**
@@ -197,37 +198,107 @@ public class DispositionExpectation extends
AbstractExpectation<Disposition> {
public final class DeliveryStateBuilder {
public DispositionExpectation accepted() {
- withState(Accepted.getInstance());
+ withState(new AcceptedMatcher());
return DispositionExpectation.this;
}
public DispositionExpectation released() {
- withState(Released.getInstance());
+ withState(new ReleasedMatcher());
return DispositionExpectation.this;
}
public DispositionExpectation rejected() {
- withState(new Rejected());
+ withState(new RejectedMatcher());
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(String condition) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Symbol condition) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Matcher<?> condition) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition)));
return DispositionExpectation.this;
}
public DispositionExpectation rejected(String condition, String
description) {
- withState(new Rejected().setError(new
ErrorCondition(Symbol.valueOf(condition), description)));
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Symbol condition, String
description) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(String condition, Matcher<?>
description) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Symbol condition, Matcher<?>
description) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(String condition, String
description, Map<String, Object> info) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Symbol condition, String
description, Map<String, Object> info) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(String condition, String
description, Matcher<?> info) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Symbol condition, String
description, Matcher<?> info) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Matcher<?> condition,
Matcher<?> description) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation rejected(Matcher<?> condition,
Matcher<?> description, Matcher<?> info) {
+ withState(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return DispositionExpectation.this;
}
public DispositionExpectation modified() {
- withState(new Modified());
+ withState(new ModifiedMatcher());
return DispositionExpectation.this;
}
public DispositionExpectation modified(boolean failed) {
- withState(new Modified());
+ withState(new ModifiedMatcher().withDeliveryFailed(failed));
return DispositionExpectation.this;
}
public DispositionExpectation modified(boolean failed, boolean
undeliverableHere) {
- withState(new Modified());
+ withState(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation modified(boolean failed, boolean
undeliverableHere, Map<String, Object> annotations) {
+ withState(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
+ return DispositionExpectation.this;
+ }
+
+ public DispositionExpectation modified(boolean failed, boolean
undeliverableHere, Matcher<?> annotations) {
+ withState(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return DispositionExpectation.this;
}
@@ -238,12 +309,13 @@ public class DispositionExpectation extends
AbstractExpectation<Disposition> {
rand.setSeed(System.nanoTime());
rand.nextBytes(txnId);
- withState(new Declared().setTxnId(txnId));
+ withState(new DeclaredMatcher().withTxnId(txnId));
+
return DispositionExpectation.this;
}
public DispositionExpectation declared(byte[] txnId) {
- withState(new Declared().setTxnId(txnId));
+ withState(new DeclaredMatcher().withTxnId(txnId));
return DispositionExpectation.this;
}
@@ -307,37 +379,72 @@ public class DispositionExpectation extends
AbstractExpectation<Disposition> {
// ----- Add a layer to allow configuring the outcome without specific
type dependencies
public DispositionTransactionalStateMatcher withAccepted() {
- super.withOutcome(Accepted.getInstance());
+ super.withOutcome(new AcceptedMatcher());
return this;
}
public DispositionTransactionalStateMatcher withReleased() {
- super.withOutcome(Released.getInstance());
+ super.withOutcome(new ReleasedMatcher());
return this;
}
public DispositionTransactionalStateMatcher withRejected() {
- super.withOutcome(new Rejected());
+ super.withOutcome(new RejectedMatcher());
return this;
}
public DispositionTransactionalStateMatcher withRejected(String
condition, String description) {
- super.withOutcome(new Rejected().setError(new
ErrorCondition(Symbol.valueOf(condition), description)));
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withRejected(Symbol
condition, String description) {
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description)));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withRejected(String
condition, String description, Map<String, Object> info) {
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withRejected(Symbol
condition, String description, Map<String, Object> info) {
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withRejected(String
condition, String description, Matcher<?> info) {
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withRejected(Symbol
condition, String description, Matcher<?> info) {
+ super.withOutcome(new RejectedMatcher().withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info)));
return this;
}
public DispositionTransactionalStateMatcher withModified() {
- super.withOutcome(new Modified());
+ super.withOutcome(new ModifiedMatcher());
return this;
}
public DispositionTransactionalStateMatcher withModified(boolean
failed) {
- super.withOutcome(new Modified().setDeliveryFailed(failed));
+ super.withOutcome(new
ModifiedMatcher().withDeliveryFailed(failed));
return this;
}
public DispositionTransactionalStateMatcher withModified(boolean
failed, boolean undeliverableHere) {
- super.withOutcome(new
Modified().setDeliveryFailed(failed).setUndeliverableHere(undeliverableHere));
+ super.withOutcome(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withModified(boolean
failed, boolean undeliverableHere, Map<String, Object> annotations) {
+ super.withOutcome(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
+ return this;
+ }
+
+ public DispositionTransactionalStateMatcher withModified(boolean
failed, boolean undeliverableHere, Matcher<?> annotations) {
+ super.withOutcome(new
ModifiedMatcher().withDeliveryFailed(failed).withUndeliverableHere(undeliverableHere).withMessageAnnotations(annotations));
return this;
}
}
diff --git
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
index 7e32cbd4..3ada7cea 100644
---
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
+++
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/RejectedMatcher.java
@@ -56,6 +56,18 @@ public class RejectedMatcher extends
ListDescribedTypeMatcher {
return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description));
}
+ public RejectedMatcher withError(String condition, Matcher<?> description)
{
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+ }
+
+ public RejectedMatcher withError(Symbol condition, String description) {
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+ }
+
+ public RejectedMatcher withError(Symbol condition, Matcher<?> description)
{
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description));
+ }
+
public RejectedMatcher withError(String condition, String description,
Map<String, Object> info) {
return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
}
@@ -64,6 +76,22 @@ public class RejectedMatcher extends
ListDescribedTypeMatcher {
return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfoMap(info));
}
+ public RejectedMatcher withError(String condition, String description,
Matcher<?> info) {
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+ }
+
+ public RejectedMatcher withError(Symbol condition, String description,
Matcher<?> info) {
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+ }
+
+ public RejectedMatcher withError(String condition, Matcher<?> description,
Matcher<?> info) {
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+ }
+
+ public RejectedMatcher withError(Symbol condition, Matcher<?> description,
Matcher<?> info) {
+ return withError(new
ErrorConditionMatcher().withCondition(condition).withDescription(description).withInfo(info));
+ }
+
//----- Matcher based with methods for more complex validation
public RejectedMatcher withError(Matcher<?> m) {
diff --git
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
index 408c2cac..d468c8a4 100644
---
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
+++
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ReceiverHandlingTest.java
@@ -20,6 +20,7 @@ package org.apache.qpid.protonj2.test.driver;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.net.URI;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
@@ -476,4 +477,126 @@ class ReceiverHandlingTest extends TestPeerTestsBase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testReceiverSendsRejectedDisposition() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respondInKind();
+ peer.remoteFlow().withLinkCredit(1).queue();
+
peer.expectTransfer().respond().withSettled(true).withState().rejected("error",
"Error Code: 111222");
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver();
+ client.expectFlow().withLinkCredit(1);
+
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+
client.expectDisposition().withSettled(true).withState().rejected("error",
Matchers.containsString("111222"));
+ client.remoteEnd().queue();
+ client.expectEnd();
+
+ // Initiate the exchange
+ client.remoteAMQPHeader().now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testReceiverSendsModifiedDisposition() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respondInKind();
+ peer.remoteFlow().withLinkCredit(1).queue();
+
peer.expectTransfer().respond().withSettled(true).withState().modified(true,
true);
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver();
+ client.expectFlow().withLinkCredit(1);
+
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+
client.expectDisposition().withSettled(true).withState().modified(true, true);
+ client.remoteEnd().queue();
+ client.expectEnd();
+
+ // Initiate the exchange
+ client.remoteAMQPHeader().now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testReceiverSendsModifiedDispositionWithAnnotations() throws
Exception {
+ try (ProtonTestServer peer = new ProtonTestServer();
+ ProtonTestClient client = new ProtonTestClient()) {
+
+ final Map<String, Object> annotations = Map.of("test", "value");
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender().respondInKind();
+ peer.remoteFlow().withLinkCredit(1).queue();
+
peer.expectTransfer().respond().withSettled(true).withState().modified(true,
true, annotations);
+ peer.expectEnd().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.expectOpen();
+ client.expectBegin();
+ client.expectAttach().ofReceiver();
+ client.expectFlow().withLinkCredit(1);
+
client.remoteTransfer().withDeliveryId(0).withMessage().withBody().withValue("test").and().queue();
+
client.expectDisposition().withSettled(true).withState().modified(true, true,
annotations);
+ client.remoteEnd().queue();
+ client.expectEnd();
+
+ // Initiate the exchange
+ client.remoteAMQPHeader().now();
+ client.remoteOpen().now();
+ client.remoteBegin().now();
+ client.remoteAttach().ofSender().now();
+
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]