This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new b5ac05d7 PROTON-2795 Ensure message format is set on first transfer frame b5ac05d7 is described below commit b5ac05d77ce697d0290643709bb2f5d718a1a673 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Feb 23 17:54:55 2024 -0500 PROTON-2795 Ensure message format is set on first transfer frame Fix code to enforce send of message format on first transfer and omit from subsequent frames as it is not needed after the first Transfer frame. --- .../qpid/protonj2/client/impl/MessageSendTest.java | 2 +- .../engine/impl/ProtonSessionOutgoingWindow.java | 2 +- .../protonj2/engine/impl/ProtonSenderTest.java | 52 +++++++++++++++++++++- 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java index c893e3fb..73aa64d4 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/MessageSendTest.java @@ -107,7 +107,7 @@ class MessageSendTest extends ImperativeClientTestCase { payloadMatcher.setMessageContentMatcher(bodyMatcher); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.expectTransfer().withPayload(payloadMatcher).accept(); + peer.expectTransfer().withMessageFormat(0).withPayload(payloadMatcher).accept(); peer.expectDetach().respond(); peer.expectClose().respond(); diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java index c25e5b44..35729bd8 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionOutgoingWindow.java @@ -267,7 +267,7 @@ public class ProtonSessionOutgoingWindow { try { cachedTransfer.setDeliveryId(delivery.getDeliveryId()); - if (delivery.getMessageFormat() != 0) { + if (delivery.getTransferCount() == 0) { cachedTransfer.setMessageFormat(delivery.getMessageFormat()); } else { cachedTransfer.clearMessageFormat(); diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java index 064370e2..22843ed2 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSenderTest.java @@ -57,6 +57,7 @@ import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException; import org.apache.qpid.protonj2.logging.ProtonLogger; import org.apache.qpid.protonj2.logging.ProtonLoggerFactory; import org.apache.qpid.protonj2.test.driver.ProtonTestConnector; +import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedInteger; import org.apache.qpid.protonj2.test.driver.matchers.messaging.AcceptedMatcher; import org.apache.qpid.protonj2.test.driver.matchers.messaging.ModifiedMatcher; import org.apache.qpid.protonj2.test.driver.matchers.messaging.RejectedMatcher; @@ -1267,6 +1268,53 @@ public class ProtonSenderTest extends ProtonEngineTestSupport { assertNull(failure); } + @Test + public void testSendTransferWithDefaultMessageFormat() throws Exception { + Engine engine = EngineFactory.PROTON.createNonSaslEngine(); + engine.errorHandler(result -> failure = result.failureCause()); + ProtonTestConnector peer = createTestPeer(engine); + + final byte [] payloadBuffer = new byte[] {0, 1, 2, 3, 4}; + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond().withContainerId("driver"); + peer.expectBegin().respond(); + peer.expectAttach().withRole(Role.SENDER.getValue()).respond(); + peer.remoteFlow().withDeliveryCount(0) + .withLinkCredit(10) + .withIncomingWindow(1024) + .withOutgoingWindow(10) + .withNextIncomingId(0) + .withNextOutgoingId(1).queue(); + peer.expectTransfer().withMessageFormat(0).withPayload(payloadBuffer); + peer.expectDetach().withHandle(0).respond(); + + Connection connection = engine.start(); + + connection.open(); + Session session = connection.session(); + session.open(); + + ProtonBuffer payload = ProtonBufferAllocator.defaultAllocator().copy(payloadBuffer); + + Sender sender = session.sender("sender-1"); + + assertFalse(sender.isSendable()); + + sender.creditStateUpdateHandler(handler -> { + if (handler.isSendable()) { + handler.next().setTag(new byte[] {0}).writeBytes(payload); + } + }); + + sender.open(); + sender.close(); + + peer.waitForScriptToComplete(); + + assertNull(failure); + } + @Test public void testSenderSignalsDeliveryUpdatedOnSettledThenSettleFromLinkAPI() throws Exception { doTestSenderSignalsDeliveryUpdatedOnSettled(true); @@ -3342,7 +3390,7 @@ public class ProtonSenderTest extends ProtonEngineTestSupport { peer.expectTransfer().withHandle(0) .withState(nullValue()) .withDeliveryId(0) - .withMessageFormat(42) + .withMessageFormat((UnsignedInteger) null) .withAborted(anyOf(nullValue(), is(false))) .withSettled(false) .withMore(anyOf(nullValue(), is(false))) @@ -3424,7 +3472,7 @@ public class ProtonSenderTest extends ProtonEngineTestSupport { peer.expectTransfer().withHandle(0) .withState().accepted() .withDeliveryId(0) - .withMessageFormat(42) + .withMessageFormat((UnsignedInteger) null) .withAborted(anyOf(nullValue(), is(false))) .withSettled(settle) .withMore(anyOf(nullValue(), is(false))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org