This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new e007fc98 PROTON-2780 Fix race in test that leads to unexpected
performative
e007fc98 is described below
commit e007fc98ee4d6240f24c1a23bc642ff921b8ece5
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Dec 8 11:47:15 2023 -0500
PROTON-2780 Fix race in test that leads to unexpected performative
Fix a test and add one new one to cover a similar scenario, ensure the
test stages frames to avoid race on flow vs dispoistion.
---
.../protonj2/client/impl/StreamReceiverTest.java | 78 +++++++++++++++++++++-
1 file changed, 76 insertions(+), 2 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 7824b220..d7843ee4 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -1079,17 +1079,18 @@ class StreamReceiverTest extends
ImperativeClientTestCase {
// from the incoming delivery and the session window opens which
allows the second chunk to
// arrive and again the session window will be opened as that
chunk is moved to the reader's
// buffer for return from the read request.
+ final byte[] combinedPayloads = new byte[payload1.length +
payload2.length];
+
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
peer.remoteTransfer().withHandle(0)
.withDeliveryId(0)
.withMore(false)
.withMessageFormat(0)
- .withPayload(payload2).queue();
+
.withPayload(payload2).queue().afterDelay(100);
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
- byte[] combinedPayloads = new byte[payload1.length +
payload2.length];
rawStream.read(combinedPayloads);
assertTrue(Arrays.equals(payload1, 0, payload1.length,
combinedPayloads, 0, payload1.length));
@@ -1110,6 +1111,79 @@ class StreamReceiverTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void
testStreamDeliveryRawInputStreamReadInChunksOpensSessionWindowForAdditionalInput()
throws Exception {
+ final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+ final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
+ final byte[] payload1 = createEncodedMessage(new Data(body1));
+ final byte[] payload2 = createEncodedMessage(new Data(body2));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().withMaxFrameSize(1000).respond();
+ peer.expectBegin().withIncomingWindow(1).respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(true)
+ .withMessageFormat(0)
+ .withPayload(payload1).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions connectionOptions = new
ConnectionOptions().maxFrameSize(1000);
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort(), connectionOptions);
+ StreamReceiverOptions streamOptions = new
StreamReceiverOptions().readBufferSize(2000);
+ StreamReceiver receiver =
connection.openStreamReceiver("test-queue", streamOptions);
+ StreamDelivery delivery = receiver.receive();
+ assertNotNull(delivery);
+ InputStream rawStream = delivery.rawInputStream();
+ assertNotNull(rawStream);
+
+ // An initial frame has arrived but more than that is requested so
the first chuck is pulled
+ // from the incoming delivery and the session window opens which
allows the second chunk to
+ // arrive and again the session window will be opened as that
chunk is moved to the reader's
+ // buffer for return from the read request.
+ final byte[] chunk1 = new byte[payload1.length];
+ final byte[] chunk2 = new byte[payload2.length];
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+
.withPayload(payload2).queue().afterDelay(100);
+
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
+
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
+
+ rawStream.read(chunk1);
+ rawStream.read(chunk2);
+
+ assertTrue(Arrays.equals(payload1, 0, payload1.length, chunk1, 0,
payload1.length));
+ assertTrue(Arrays.equals(payload2, 0, payload2.length, chunk2, 0,
payload2.length));
+
+ rawStream.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+
+ receiver.openFuture().get();
+ receiver.closeAsync().get();
+ connection.closeAsync().get();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
public void testStreamDeliveryRawInputStreamBlockedReadBytesAborted()
throws Exception {
final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]