This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new cf366744 PROTON-2802 Make scripting message payload of a transfer 
simpler
cf366744 is described below

commit cf3667444bce4ff9da8c9097d7c7a2fa2f767386
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Wed Mar 6 17:59:05 2024 -0500

    PROTON-2802 Make scripting message payload of a transfer simpler
    
    Provide a simpler API for scripting the expected message payload that
    will accompany an incoming transfer and provide some updates to match
    that API on the remote inject of transfer with message payloads.
---
 .../qpid/protonj2/client/impl/MessageSendTest.java |  21 +-
 .../test/driver/actions/TransferInjectAction.java  |  52 +-
 .../driver/expectations/TransferExpectation.java   |   5 +
 .../messaging/AbstractMessageSectionMatcher.java   |  10 +-
 .../matchers/transport/TransferMessageMatcher.java | 811 +++++++++++++++++++++
 .../transport/TransferPayloadCompositeMatcher.java |   2 +-
 .../matchers/types/EncodedAmqpTypeMatcher.java     |  16 +-
 .../protonj2/test/driver/SenderHandlingTest.java   | 366 ++++++++++
 8 files changed, 1264 insertions(+), 19 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
index 319597c3..3746ea08 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java
@@ -95,19 +95,16 @@ class MessageSendTest extends ImperativeClientTestCase {
             // Gates send on remote flow having been sent and received
             session.openReceiver("dummy").openFuture().get();
 
-            HeaderMatcher headerMatcher = new HeaderMatcher(true);
-            headerMatcher.withDurable(true);
-            headerMatcher.withPriority((byte) 1);
-            headerMatcher.withTtl(65535);
-            headerMatcher.withFirstAcquirer(true);
-            headerMatcher.withDeliveryCount(2);
-            EncodedAmqpValueMatcher bodyMatcher = new 
EncodedAmqpValueMatcher("Hello World");
-            TransferPayloadCompositeMatcher payloadMatcher = new 
TransferPayloadCompositeMatcher();
-            payloadMatcher.setHeadersMatcher(headerMatcher);
-            payloadMatcher.setMessageContentMatcher(bodyMatcher);
-
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            
peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept();
+            peer.expectTransfer().withMessage().withMessageFormat(0)
+                                               .withHeader()
+                                               .withDurability(true)
+                                               .withPriority((byte) 1)
+                                               .withTimeToLive(65535)
+                                               .withFirstAcquirer(true)
+                                               .withDeliveryCount(2)
+                                               .also()
+                                               .withValue("Hello World");
             peer.expectDetach().respond();
             peer.expectClose().respond();
 
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
index dd2c6d01..e434a486 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/TransferInjectAction.java
@@ -257,6 +257,10 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
         return new FooterBuilder();
     }
 
+    public MessageBuilder withMessage() {
+        return new MessageBuilder();
+    }
+
     private Header getOrCreateHeader() {
         if (header == null) {
             header = new Header();
@@ -479,6 +483,11 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
 
     public final class ApplicationPropertiesBuilder extends SectionBuilder {
 
+        public ApplicationPropertiesBuilder withProperty(String key, Object 
value) {
+            getOrCreateApplicationProperties().setApplicationProperty(key, 
value);
+            return this;
+        }
+
         public ApplicationPropertiesBuilder withApplicationProperty(String 
key, Object value) {
             getOrCreateApplicationProperties().setApplicationProperty(key, 
value);
             return this;
@@ -540,7 +549,12 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
 
     public final class FooterBuilder extends SectionBuilder {
 
-        public FooterBuilder withFooter(Object key, Object value) {
+        public FooterBuilder withFooter(String key, Object value) {
+            getOrCreateFooter().setFooterProperty(Symbol.valueOf(key), value);
+            return this;
+        }
+
+        public FooterBuilder withFooter(Symbol key, Object value) {
             getOrCreateFooter().setFooterProperty(key, value);
             return this;
         }
@@ -666,6 +680,42 @@ public class TransferInjectAction extends 
AbstractPerformativeInjectAction<Trans
         }
     }
 
+    public final class MessageBuilder extends SectionBuilder {
+
+        public MessageBuilder withMessageFormat(int format) {
+            TransferInjectAction.this.withMessageFormat(format);
+            return this;
+        }
+
+        public HeaderBuilder withHeader() {
+            return TransferInjectAction.this.withHeader();
+        }
+
+        public DeliveryAnnotationsBuilder withDeliveryAnnotations() {
+            return TransferInjectAction.this.withDeliveryAnnotations();
+        }
+
+        public MessageAnnotationsBuilder withMessageAnnotations() {
+            return TransferInjectAction.this.withMessageAnnotations();
+        }
+
+        public PropertiesBuilder withProperties() {
+            return TransferInjectAction.this.withProperties();
+        }
+
+        public ApplicationPropertiesBuilder withApplicationProperties() {
+            return TransferInjectAction.this.withApplicationProperties();
+        }
+
+        public BodySectionBuilder withBody() {
+            return TransferInjectAction.this.withBody();
+        }
+
+        public FooterBuilder withFooter() {
+            return TransferInjectAction.this.withFooter();
+        }
+    }
+
     private static byte[] generateUniqueDeliveryTag() {
         final byte[] tag = new byte[Long.BYTES + Long.BYTES];
         final UUID uuid = UUID.randomUUID();
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
index 6476a2ac..0a0398f0 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/TransferExpectation.java
@@ -44,6 +44,7 @@ import 
org.apache.qpid.protonj2.test.driver.codec.transport.ReceiverSettleMode;
 import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transactions.TransactionalStateMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMatcher;
+import 
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferMessageMatcher;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 
@@ -285,6 +286,10 @@ public class TransferExpectation extends 
AbstractExpectation<Transfer> {
         return this;
     }
 
+    public TransferMessageMatcher withMessage() {
+        return (TransferMessageMatcher) (this.payloadMatcher = new 
TransferMessageMatcher(this));
+    }
+
     //----- Matcher based with methods for more complex validation
 
     public TransferExpectation withHandle(Matcher<?> m) {
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
index ac174685..9fb76013 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/messaging/AbstractMessageSectionMatcher.java
@@ -41,7 +41,7 @@ public abstract class AbstractMessageSectionMatcher<T extends 
AbstractMessageSec
     private final Map<Object, Matcher<?>> fieldMatchers;
     private Map<Object, Object> receivedFields;
 
-    private final boolean allowTrailingBytes;
+    private boolean allowTrailingBytes;
 
     protected AbstractMessageSectionMatcher(UnsignedLong numericDescriptor, 
Symbol symbolicDescriptor, Map<Object, Matcher<?>> fieldMatchers, boolean 
expectTrailingBytes) {
         this.numericDescriptor = numericDescriptor;
@@ -50,6 +50,14 @@ public abstract class AbstractMessageSectionMatcher<T 
extends AbstractMessageSec
         this.allowTrailingBytes = expectTrailingBytes;
     }
 
+    public void setAllowTrailingBytes(boolean allowTrailingBytes) {
+        this.allowTrailingBytes = allowTrailingBytes;
+    }
+
+    public boolean isAllowTrailngBytes() {
+        return allowTrailingBytes;
+    }
+
     protected Map<Object, Matcher<?>> getMatchers() {
         return fieldMatchers;
     }
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
new file mode 100644
index 00000000..25cb5a80
--- /dev/null
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferMessageMatcher.java
@@ -0,0 +1,811 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.test.driver.matchers.transport;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.Symbol;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger;
+import org.apache.qpid.protonj2.test.driver.codec.transport.Transfer;
+import org.apache.qpid.protonj2.test.driver.expectations.TransferExpectation;
+import 
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpSequenceMatcher;
+import 
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpTypeMatcher;
+import 
org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matcher used by a {@link TransferExpectation} to build a matcher for the 
message
+ * payload that accompanies the {@link Transfer}. The matcher generally 
adheres to
+ * the standard AMQP message format zero layout.
+ */
+public class TransferMessageMatcher extends TypeSafeMatcher<ByteBuffer> {
+
+    private final TransferExpectation expectation;
+
+    private HeaderMatcher headersMatcher;
+    private DeliveryAnnotationsMatcher deliveryAnnotationsMatcher;
+    private MessageAnnotationsMatcher messageAnnotationsMatcher;
+    private PropertiesMatcher propertiesMatcher;
+    private ApplicationPropertiesMatcher applicationPropertiesMatcher;
+    private List<EncodedAmqpTypeMatcher> bodySectionMatchers = new 
ArrayList<>();
+    private FooterMatcher footersMatcher;
+
+    // String buckets for mismatch error descriptions.
+    private String headerMatcherFailureDescription;
+    private String deliveryAnnotationsMatcherFailureDescription;
+    private String messageAnnotationsMatcherFailureDescription;
+    private String propertiesMatcherFailureDescription;
+    private String applicationPropertiesMatcherFailureDescription;
+    private String msgContentMatcherFailureDescription;
+    private String footerMatcherFailureDescription;
+
+    public TransferMessageMatcher(TransferExpectation expectation) {
+        this.expectation = expectation;
+    }
+
+    public TransferExpectation also() {
+        return expectation;
+    }
+
+    public TransferExpectation and() {
+        return expectation;
+    }
+
+    @Override
+    protected boolean matchesSafely(ByteBuffer receivedBinary) {
+        final ByteBuffer receivedSlice = 
receivedBinary.slice().asReadOnlyBuffer();
+
+        int bytesConsumed = 0;
+
+        // MessageHeader Section
+        if (headersMatcher != null) {
+            try {
+                bytesConsumed += 
headersMatcher.getInnerMatcher().verify(receivedSlice.slice());
+                receivedSlice.position(bytesConsumed);
+            } catch (Throwable t) {
+                headerMatcherFailureDescription = "\nActual encoded form of 
remaining bytes passed to MessageHeaderMatcher: " + receivedSlice;
+                headerMatcherFailureDescription += "\nMessageHeaderMatcher 
generated throwable: " + t;
+
+                return false;
+            }
+        }
+
+        // DeliveryAnnotations Section
+        if (deliveryAnnotationsMatcher != null) {
+            try {
+                bytesConsumed += 
deliveryAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice());
+                receivedSlice.position(bytesConsumed);
+            } catch (Throwable t) {
+                deliveryAnnotationsMatcherFailureDescription = "\nActual 
encoded form of remaining bytes passed " +
+                                                               "to 
DeliveryAnnotationsMatcher: " + receivedSlice;
+                deliveryAnnotationsMatcherFailureDescription += 
"\nDeliveryAnnotationsMatcher generated throwable: " + t;
+
+                return false;
+            }
+        }
+
+        // MessageAnnotations Section
+        if (messageAnnotationsMatcher != null) {
+            try {
+                bytesConsumed += 
messageAnnotationsMatcher.getInnerMatcher().verify(receivedSlice.slice());
+                receivedSlice.position(bytesConsumed);
+            } catch (Throwable t) {
+                messageAnnotationsMatcherFailureDescription = "\nActual 
encoded form of remaining bytes passed to " +
+                                                              
"MessageAnnotationsMatcher: " + receivedSlice;
+                messageAnnotationsMatcherFailureDescription += 
"\nMessageAnnotationsMatcher generated throwable: " + t;
+
+                return false;
+            }
+        }
+
+        // Properties Section
+        if (propertiesMatcher != null) {
+            try {
+                bytesConsumed += 
propertiesMatcher.getInnerMatcher().verify(receivedSlice.slice());
+                receivedSlice.position(bytesConsumed);
+            } catch (Throwable t) {
+                propertiesMatcherFailureDescription = "\nActual encoded form 
of remaining bytes passed to " +
+                                                      "PropertiesMatcher: " + 
receivedSlice;
+                propertiesMatcherFailureDescription += "\nPropertiesMatcher 
generated throwable: " + t;
+
+                return false;
+            }
+        }
+
+        // Application Properties Section
+        if (applicationPropertiesMatcher != null) {
+            try {
+                bytesConsumed += 
applicationPropertiesMatcher.getInnerMatcher().verify(receivedSlice.slice());
+                receivedSlice.position(bytesConsumed);
+            } catch (Throwable t) {
+                applicationPropertiesMatcherFailureDescription = "\nActual 
encoded form of remaining bytes passed to " +
+                                                                 
"ApplicationPropertiesMatcher: " + receivedSlice;
+                applicationPropertiesMatcherFailureDescription += 
"\nApplicationPropertiesMatcher generated throwable: " + t;
+
+                return false;
+            }
+        }
+
+        // Message Content Body Section, already a Matcher<Binary>
+        if (!bodySectionMatchers.isEmpty()) {
+            final ByteBuffer slicedMsgContext = receivedSlice.slice();
+
+            for (Matcher<ByteBuffer> msgContentMatcher : bodySectionMatchers) {
+                final int originalReadableBytes = slicedMsgContext.remaining();
+                final boolean contentMatches = 
msgContentMatcher.matches(slicedMsgContext);
+                if (!contentMatches) {
+                    Description desc = new StringDescription();
+                    msgContentMatcher.describeTo(desc);
+                    msgContentMatcher.describeMismatch(receivedSlice, desc);
+
+                    msgContentMatcherFailureDescription = 
"\nMessageContentMatcher mismatch Description:";
+                    msgContentMatcherFailureDescription += desc.toString();
+
+                    return false;
+                }
+
+                bytesConsumed += originalReadableBytes - 
slicedMsgContext.remaining();
+                receivedSlice.position(bytesConsumed);
+            }
+        }
+
+        // Footers Section
+        if (footersMatcher != null) {
+            try {
+                bytesConsumed += 
footersMatcher.getInnerMatcher().verify(receivedSlice.slice());
+            } catch (Throwable t) {
+                footerMatcherFailureDescription = "\nActual encoded form of 
remaining bytes passed to " +
+                                                  "FooterMatcher: " + 
receivedSlice;
+                footerMatcherFailureDescription += "\nFooterMatcher generated 
throwable: " + t;
+
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    public TransferMessageMatcher withMessageFormat(int format) {
+        this.expectation.withMessageFormat(format);
+        return this;
+    }
+
+    public HeaderMatcher withHeader() {
+        if (headersMatcher == null) {
+            headersMatcher = new HeaderMatcher(this);
+        }
+
+        if (deliveryAnnotationsMatcher != null || messageAnnotationsMatcher != 
null ||
+            propertiesMatcher != null || applicationPropertiesMatcher != null 
||
+            !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        } else {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+        }
+
+        return headersMatcher;
+    }
+
+    public DeliveryAnnotationsMatcher withDeliveryAnnotations() {
+        if (deliveryAnnotationsMatcher == null) {
+            deliveryAnnotationsMatcher = new DeliveryAnnotationsMatcher(this);
+        }
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (messageAnnotationsMatcher != null || propertiesMatcher != null || 
applicationPropertiesMatcher != null ||
+            !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        } else {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+        }
+
+        return deliveryAnnotationsMatcher;
+    }
+
+    public MessageAnnotationsMatcher withMessageAnnotations() {
+        if (messageAnnotationsMatcher == null) {
+            messageAnnotationsMatcher = new MessageAnnotationsMatcher(this);
+        }
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (propertiesMatcher != null || applicationPropertiesMatcher != null 
||
+            !bodySectionMatchers.isEmpty() || footersMatcher != null) {
+
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        } else {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+        }
+
+        return messageAnnotationsMatcher;
+    }
+
+    public PropertiesMatcher withProperties() {
+        if (propertiesMatcher == null) {
+            propertiesMatcher = new PropertiesMatcher(this);
+        }
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (messageAnnotationsMatcher != null) {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (applicationPropertiesMatcher != null || 
!bodySectionMatchers.isEmpty() || footersMatcher != null) {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        } else {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+        }
+
+        return propertiesMatcher;
+    }
+
+    public ApplicationPropertiesMatcher withApplicationProperties() {
+        if (applicationPropertiesMatcher == null) {
+            applicationPropertiesMatcher = new 
ApplicationPropertiesMatcher(this);
+        }
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (messageAnnotationsMatcher != null) {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (propertiesMatcher != null) {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (!bodySectionMatchers.isEmpty() || footersMatcher != null) {
+            
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        } else {
+            
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(false);
+        }
+
+        return applicationPropertiesMatcher;
+    }
+
+    public TransferMessageMatcher withSequence(List<?> sequence) {
+        final EncodedAmqpSequenceMatcher matcher = new 
EncodedAmqpSequenceMatcher(sequence, footersMatcher != null);
+
+        if (!bodySectionMatchers.isEmpty()) {
+            bodySectionMatchers.get(bodySectionMatchers.size() - 
1).setAllowTrailingBytes(true);
+        }
+
+        bodySectionMatchers.add(matcher);
+
+        return this;
+    }
+
+    public TransferMessageMatcher withData(byte[] payload) {
+        final EncodedDataMatcher matcher = new EncodedDataMatcher(payload, 
footersMatcher != null);
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (messageAnnotationsMatcher != null) {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (propertiesMatcher != null) {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (applicationPropertiesMatcher != null) {
+            
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (!bodySectionMatchers.isEmpty()) {
+            bodySectionMatchers.get(bodySectionMatchers.size() - 
1).setAllowTrailingBytes(true);
+        }
+
+        bodySectionMatchers.add(matcher);
+
+        return this;
+    }
+
+    public TransferMessageMatcher withValue(Object value) {
+        final EncodedAmqpValueMatcher matcher = new 
EncodedAmqpValueMatcher(value, footersMatcher != null);
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (messageAnnotationsMatcher != null) {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (propertiesMatcher != null) {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (applicationPropertiesMatcher != null) {
+            
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (!bodySectionMatchers.isEmpty()) {
+            bodySectionMatchers.get(bodySectionMatchers.size() - 
1).setAllowTrailingBytes(true);
+        }
+
+        bodySectionMatchers.add(matcher);
+
+        return this;
+    }
+
+    public FooterMatcher withFooters() {
+        if (footersMatcher == null) {
+            footersMatcher = new FooterMatcher(this);
+        }
+
+        if (headersMatcher != null) {
+            headersMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (deliveryAnnotationsMatcher != null) {
+            
deliveryAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (messageAnnotationsMatcher != null) {
+            
messageAnnotationsMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (propertiesMatcher != null) {
+            propertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+        if (applicationPropertiesMatcher != null) {
+            
applicationPropertiesMatcher.getInnerMatcher().setAllowTrailingBytes(true);
+        }
+
+        if (!bodySectionMatchers.isEmpty()) {
+            bodySectionMatchers.get(bodySectionMatchers.size() - 
1).setAllowTrailingBytes(true);
+        }
+
+        return footersMatcher;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+        description.appendText("a Binary encoding of a Transfer frames 
payload, containing an AMQP message");
+    }
+
+    @Override
+    protected void describeMismatchSafely(ByteBuffer item, Description 
mismatchDescription) {
+        mismatchDescription.appendText("\nActual encoded form of the full 
Transfer frame payload: ").appendValue(item);
+
+        // MessageHeaders Section
+        if (headerMatcherFailureDescription != null) {
+            mismatchDescription.appendText("\nMessageHeadersMatcherFailed!");
+            mismatchDescription.appendText(headerMatcherFailureDescription);
+            return;
+        }
+
+        // MessageHeaders Section
+        if (deliveryAnnotationsMatcherFailureDescription != null) {
+            
mismatchDescription.appendText("\nDeliveryAnnotationsMatcherFailed!");
+            
mismatchDescription.appendText(deliveryAnnotationsMatcherFailureDescription);
+            return;
+        }
+
+        // MessageAnnotations Section
+        if (messageAnnotationsMatcherFailureDescription != null) {
+            
mismatchDescription.appendText("\nMessageAnnotationsMatcherFailed!");
+            
mismatchDescription.appendText(messageAnnotationsMatcherFailureDescription);
+            return;
+        }
+
+        // Properties Section
+        if (propertiesMatcherFailureDescription != null) {
+            mismatchDescription.appendText("\nPropertiesMatcherFailed!");
+            
mismatchDescription.appendText(propertiesMatcherFailureDescription);
+            return;
+        }
+
+        // Application Properties Section
+        if (applicationPropertiesMatcherFailureDescription != null) {
+            
mismatchDescription.appendText("\nApplicationPropertiesMatcherFailed!");
+            
mismatchDescription.appendText(applicationPropertiesMatcherFailureDescription);
+            return;
+        }
+
+        // Message Content Body Section
+        if (msgContentMatcherFailureDescription != null) {
+            mismatchDescription.appendText("\nContentMatcherFailed!");
+            
mismatchDescription.appendText(msgContentMatcherFailureDescription);
+            return;
+        }
+
+        // Footer Section
+        if (footerMatcherFailureDescription != null) {
+            mismatchDescription.appendText("\nContentMatcherFailed!");
+            mismatchDescription.appendText(footerMatcherFailureDescription);
+        }
+    }
+
+    public static final class HeaderMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public HeaderMatcher(TransferMessageMatcher transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public HeaderMatcher withDurability(boolean durable) {
+            matcher.withDurable(equalTo(durable));
+            return this;
+        }
+
+        public HeaderMatcher withDurability(Boolean durable) {
+            matcher.withDurable(equalTo(durable));
+            return this;
+        }
+
+        public HeaderMatcher withPriority(byte priority) {
+            matcher.withPriority(equalTo(UnsignedByte.valueOf(priority)));
+            return this;
+        }
+
+        public HeaderMatcher withPriority(UnsignedByte priority) {
+            matcher.withPriority(equalTo(priority));
+            return this;
+        }
+
+        public HeaderMatcher withTimeToLive(int timeToLive) {
+            matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive)));
+            return this;
+        }
+
+        public HeaderMatcher withTimeToLive(long timeToLive) {
+            matcher.withTtl(equalTo(UnsignedInteger.valueOf(timeToLive)));
+            return this;
+        }
+
+        public HeaderMatcher withTimeToLive(UnsignedInteger timeToLive) {
+            matcher.withTtl(equalTo(timeToLive));
+            return this;
+        }
+
+        public HeaderMatcher withFirstAcquirer(boolean durable) {
+            matcher.withFirstAcquirer(equalTo(durable));
+            return this;
+        }
+
+        public HeaderMatcher withFirstAcquirer(Boolean durable) {
+            matcher.withFirstAcquirer(equalTo(durable));
+            return this;
+        }
+
+        public HeaderMatcher withDeliveryCount(int deliveryCount) {
+            
matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount)));
+            return this;
+        }
+
+        public HeaderMatcher withDeliveryCount(long deliveryCount) {
+            
matcher.withDeliveryCount(equalTo(UnsignedInteger.valueOf(deliveryCount)));
+            return this;
+        }
+
+        public HeaderMatcher withDeliveryCount(UnsignedInteger deliveryCount) {
+            matcher.withDeliveryCount(equalTo(deliveryCount));
+            return this;
+        }
+
+        org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher 
getInnerMatcher() {
+            return matcher;
+        }
+    }
+
+    public static final class DeliveryAnnotationsMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher
 matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public DeliveryAnnotationsMatcher(TransferMessageMatcher 
transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public DeliveryAnnotationsMatcher withAnnotation(String key, Object 
value) {
+            matcher.withEntry(Symbol.valueOf(key), value);
+            return this;
+        }
+
+        public DeliveryAnnotationsMatcher withAnnotation(Symbol key, Object 
value) {
+            matcher.withEntry(key, value);
+            return this;
+        }
+
+        
org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher
 getInnerMatcher() {
+            return matcher;
+        }
+    }
+
+    public static final class MessageAnnotationsMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher
 matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public MessageAnnotationsMatcher(TransferMessageMatcher 
transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public MessageAnnotationsMatcher withAnnotation(String key, Object 
value) {
+            matcher.withEntry(Symbol.valueOf(key), value);
+            return this;
+        }
+
+        public MessageAnnotationsMatcher withAnnotation(Symbol key, Object 
value) {
+            matcher.withEntry(key, value);
+            return this;
+        }
+
+        
org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher
 getInnerMatcher() {
+            return matcher;
+        }
+    }
+
+    public static final class PropertiesMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher 
matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public PropertiesMatcher(TransferMessageMatcher transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public PropertiesMatcher withMessageId(Object messageId) {
+            matcher.withMessageId(messageId);
+            return this;
+        }
+
+        public PropertiesMatcher withUserId(byte[] userId) {
+            matcher.withUserId(userId);
+            return this;
+        }
+
+        public PropertiesMatcher withUserId(Binary userId) {
+            matcher.withUserId(userId);
+            return this;
+        }
+
+        public PropertiesMatcher withTo(String to) {
+            matcher.withTo(to);
+            return this;
+        }
+
+        public PropertiesMatcher withSubject(String subject) {
+            matcher.withSubject(subject);
+            return this;
+        }
+
+        public PropertiesMatcher withReplyTo(String replyTo) {
+            matcher.withReplyTo(replyTo);
+            return this;
+        }
+
+        public PropertiesMatcher withCorrelationId(Object correlationId) {
+            matcher.withCorrelationId(correlationId);
+            return this;
+        }
+
+        public PropertiesMatcher withContentType(String contentType) {
+            matcher.withContentType(contentType);
+            return this;
+        }
+
+        public PropertiesMatcher withContentType(Symbol contentType) {
+            matcher.withContentType(contentType);
+            return this;
+        }
+
+        public PropertiesMatcher withContentEncoding(String contentEncoding) {
+            matcher.withContentEncoding(contentEncoding);
+            return this;
+        }
+
+        public PropertiesMatcher withContentEncoding(Symbol contentEncoding) {
+            matcher.withContentEncoding(contentEncoding);
+            return this;
+        }
+
+        public PropertiesMatcher withAbsoluteExpiryTime(int 
absoluteExpiryTime) {
+            matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+            return this;
+        }
+
+        public PropertiesMatcher withAbsoluteExpiryTime(long 
absoluteExpiryTime) {
+            matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+            return this;
+        }
+
+        public PropertiesMatcher withAbsoluteExpiryTime(Long 
absoluteExpiryTime) {
+            matcher.withAbsoluteExpiryTime(absoluteExpiryTime);
+            return this;
+        }
+
+        public PropertiesMatcher withCreationTime(int creationTime) {
+            matcher.withCreationTime(creationTime);
+            return this;
+        }
+
+        public PropertiesMatcher withCreationTime(long creationTime) {
+            matcher.withCreationTime(creationTime);
+            return this;
+        }
+
+        public PropertiesMatcher withCreationTime(Long creationTime) {
+            matcher.withCreationTime(creationTime);
+            return this;
+        }
+
+        public PropertiesMatcher withGroupId(String groupId) {
+            matcher.withGroupId(groupId);
+            return this;
+        }
+
+        public PropertiesMatcher withGroupSequence(int groupSequence) {
+            matcher.withGroupSequence(groupSequence);
+            return this;
+        }
+
+        public PropertiesMatcher withGroupSequence(long groupSequence) {
+            matcher.withGroupSequence(groupSequence);
+            return this;
+        }
+
+        public PropertiesMatcher withGroupSequence(Long groupSequence) {
+            matcher.withGroupSequence(groupSequence);
+            return this;
+        }
+
+        public PropertiesMatcher withReplyToGroupId(String replyToGroupId) {
+            matcher.withReplyToGroupId(replyToGroupId);
+            return this;
+        }
+
+        
org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher 
getInnerMatcher() {
+            return matcher;
+        }
+    }
+
+    public static final class ApplicationPropertiesMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher
 matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public ApplicationPropertiesMatcher(TransferMessageMatcher 
transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public ApplicationPropertiesMatcher withProperty(String key, Object 
value) {
+            matcher.withEntry(key, value);
+            return this;
+        }
+
+        
org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher
 getInnerMatcher() {
+            return matcher;
+        }
+    }
+
+    public static final class FooterMatcher {
+
+        private final 
org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher matcher =
+            new 
org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher(false);
+
+        private final TransferMessageMatcher transferMatcher;
+
+        public FooterMatcher(TransferMessageMatcher transferMatcher) {
+            this.transferMatcher = transferMatcher;
+        }
+
+        public TransferMessageMatcher also() {
+            return transferMatcher;
+        }
+
+        public TransferMessageMatcher and() {
+            return transferMatcher;
+        }
+
+        public FooterMatcher withFooter(String key, Object value) {
+            matcher.withEntry(Symbol.valueOf(key), value);
+            return this;
+        }
+
+        public FooterMatcher withFooter(Symbol key, Object value) {
+            matcher.withEntry(key, value);
+            return this;
+        }
+
+        org.apache.qpid.protonj2.test.driver.matchers.messaging.FooterMatcher 
getInnerMatcher() {
+            return matcher;
+        }
+    }
+}
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
index 95dcd64d..9e78b181 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/transport/TransferPayloadCompositeMatcher.java
@@ -173,7 +173,7 @@ public class TransferPayloadCompositeMatcher extends 
TypeSafeMatcher<ByteBuffer>
             }
         }
 
-        // MessageAnnotations Section
+        // Footers Section
         if (footersMatcher != null) {
             try {
                 bytesConsumed += footersMatcher.verify(receivedSlice.slice());
diff --git 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
index 2c6539a3..0eaf4de0 100644
--- 
a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
+++ 
b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/matchers/types/EncodedAmqpTypeMatcher.java
@@ -40,7 +40,7 @@ public abstract class EncodedAmqpTypeMatcher extends 
TypeSafeMatcher<ByteBuffer>
     private final Symbol descriptorSymbol;
     private final UnsignedLong descriptorCode;
     private final Object expectedValue;
-    private boolean permitTrailingBytes;
+    private boolean allowTrailingBytes;
     private DescribedType decodedDescribedType;
     private boolean unexpectedTrailingBytes;
 
@@ -48,11 +48,19 @@ public abstract class EncodedAmqpTypeMatcher extends 
TypeSafeMatcher<ByteBuffer>
         this(symbol, code, expectedValue, false);
     }
 
-    public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object 
expectedValue, boolean permitTrailingBytes) {
+    public EncodedAmqpTypeMatcher(Symbol symbol, UnsignedLong code, Object 
expectedValue, boolean allowTrailingBytes) {
         this.descriptorSymbol = symbol;
         this.descriptorCode = code;
         this.expectedValue = expectedValue;
-        this.permitTrailingBytes = permitTrailingBytes;
+        this.allowTrailingBytes = allowTrailingBytes;
+    }
+
+    public void setAllowTrailingBytes(boolean allowTrailingBytes) {
+        this.allowTrailingBytes = allowTrailingBytes;
+    }
+
+    public boolean isAllowTrailngBytes() {
+        return allowTrailingBytes;
     }
 
     protected Object getExpectedValue() {
@@ -110,7 +118,7 @@ public abstract class EncodedAmqpTypeMatcher extends 
TypeSafeMatcher<ByteBuffer>
             }
         }
 
-        if (decoded < length && !permitTrailingBytes) {
+        if (decoded < length && !allowTrailingBytes) {
             unexpectedTrailingBytes = true;
             return false;
         }
diff --git 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
index e5720836..5982968b 100644
--- 
a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
+++ 
b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/SenderHandlingTest.java
@@ -580,4 +580,370 @@ class SenderHandlingTest extends TestPeerTestsBase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testTransferInjectAndExpectAPIs() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(true).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                   .withFooter().withFooter("footer", 
"value").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            client.expectDetach().withHandle(42);
+            client.expectEnd();
+
+            client.remoteDetach().now();
+            client.remoteEnd().now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @Test
+    public void testTransferInjectAndExpectAPIsFailOnNoMatchInHeader() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(false).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                   .withFooter().withFooter("footer", 
"value").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void 
testTransferInjectAndExpectAPIsFailOnNoMatchDeliveryAnnotations() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(true).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "1").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                   .withFooter().withFooter("footer", 
"value").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void 
testTransferInjectAndExpectAPIsFailOnNoMatchMessageAnnotations() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(true).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "1").also()
+                                   .withFooter().withFooter("footer", 
"value").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testTransferInjectAndExpectAPIsFailOnNoMatchFooter() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(true).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                   .withFooter().withFooter("footer", 
"1").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testTransferInjectAndExpectAPIsFailOnMissingSection() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage()
+                                 
.withProperties().withCorrelationId("test").and()
+                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                 
.withApplicationProperties().withProperty("ap", "pa").and()
+                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                 .withData(new byte[] {0, 1, 2})
+                                 .withHeader().withDurability(true).and()
+                                 .withFooters().withFooter("footer", "value");
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withHeader().withDurability(true).also()
+                                   
.withApplicationProperties().withProperty("ap", "pa").also()
+                                   
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                   
.withProperties().withCorrelationId("test").also()
+                                   
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                   .withBody().withData(new byte[] {0, 1, 
2}).also()
+                                   .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            assertThrows(AssertionError.class, () -> 
peer.waitForScriptToComplete(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void testTransferInjectAndExpectAPIsMapTypePresenceOnly() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer();
+             ProtonTestClient client = new ProtonTestClient()) {
+
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond().withHandle(42);
+            peer.remoteFlow().withLinkCredit(1).queue();
+            // Script a full message using the inject API
+            peer.expectTransfer().withMessage().withMessageFormat(1)
+                                                
.withProperties().withCorrelationId("test").and()
+                                                
.withDeliveryAnnotations().also()
+                                                
.withApplicationProperties().and()
+                                                
.withMessageAnnotations().also()
+                                                .withData(new byte[] {0, 1, 2})
+                                                
.withHeader().withDurability(true).and()
+                                                .withFooters();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            client.connect(remoteURI.getHost(), remoteURI.getPort());
+            client.expectAMQPHeader();
+            client.expectOpen();
+            client.expectBegin();
+            client.expectAttach().ofReceiver().withHandle(42);
+            client.expectFlow().withLinkCredit(1).withHandle(42);
+            client.remoteTransfer().withMessage().withMessageFormat(1)
+                                                 
.withHeader().withDurability(true).also()
+                                                 
.withApplicationProperties().withProperty("ap", "pa").also()
+                                                 
.withDeliveryAnnotations().withAnnotation("da", "ad").also()
+                                                 
.withProperties().withCorrelationId("test").also()
+                                                 
.withMessageAnnotations().withAnnotation("ma", "am").also()
+                                                 
.withFooter().withFooter("footer", "1").also()
+                                                 .withBody().withData(new 
byte[] {0, 1, 2}).also()
+                                                 .queue();
+
+            // Now start and then await the remote grant of credit and out 
send of a transfer
+            client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+            client.remoteOpen().now();
+            client.remoteBegin().now();
+            client.remoteAttach().ofSender().withHandle(2).now();
+
+            client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to