This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new e007fc98 PROTON-2780 Fix race in test that leads to unexpected 
performative
e007fc98 is described below

commit e007fc98ee4d6240f24c1a23bc642ff921b8ece5
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Dec 8 11:47:15 2023 -0500

    PROTON-2780 Fix race in test that leads to unexpected performative
    
    Fix a test and add one new one to cover a similar scenario, ensure the
    test stages frames to avoid race on flow vs dispoistion.
---
 .../protonj2/client/impl/StreamReceiverTest.java   | 78 +++++++++++++++++++++-
 1 file changed, 76 insertions(+), 2 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 7824b220..d7843ee4 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -1079,17 +1079,18 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
             // from the incoming delivery and the session window opens which 
allows the second chunk to
             // arrive and again the session window will be opened as that 
chunk is moved to the reader's
             // buffer for return from the read request.
+            final byte[] combinedPayloads = new byte[payload1.length + 
payload2.length];
+
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
             peer.remoteTransfer().withHandle(0)
                                  .withDeliveryId(0)
                                  .withMore(false)
                                  .withMessageFormat(0)
-                                 .withPayload(payload2).queue();
+                                 
.withPayload(payload2).queue().afterDelay(100);
             
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
             
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
 
-            byte[] combinedPayloads = new byte[payload1.length + 
payload2.length];
             rawStream.read(combinedPayloads);
 
             assertTrue(Arrays.equals(payload1, 0, payload1.length, 
combinedPayloads, 0, payload1.length));
@@ -1110,6 +1111,79 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void 
testStreamDeliveryRawInputStreamReadInChunksOpensSessionWindowForAdditionalInput()
 throws Exception {
+        final byte[] body1 = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
+        final byte[] body2 = new byte[] { 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 };
+        final byte[] payload1 = createEncodedMessage(new Data(body1));
+        final byte[] payload2 = createEncodedMessage(new Data(body2));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().withMaxFrameSize(1000).respond();
+            peer.expectBegin().withIncomingWindow(1).respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withIncomingWindow(1).withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] { 1 })
+                                 .withMore(true)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload1).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions connectionOptions = new 
ConnectionOptions().maxFrameSize(1000);
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), connectionOptions);
+            StreamReceiverOptions streamOptions = new 
StreamReceiverOptions().readBufferSize(2000);
+            StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", streamOptions);
+            StreamDelivery delivery = receiver.receive();
+            assertNotNull(delivery);
+            InputStream rawStream = delivery.rawInputStream();
+            assertNotNull(rawStream);
+
+            // An initial frame has arrived but more than that is requested so 
the first chuck is pulled
+            // from the incoming delivery and the session window opens which 
allows the second chunk to
+            // arrive and again the session window will be opened as that 
chunk is moved to the reader's
+            // buffer for return from the read request.
+            final byte[] chunk1 = new byte[payload1.length];
+            final byte[] chunk2 = new byte[payload2.length];
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 
.withPayload(payload2).queue().afterDelay(100);
+            
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
+            
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
+
+            rawStream.read(chunk1);
+            rawStream.read(chunk2);
+
+            assertTrue(Arrays.equals(payload1, 0, payload1.length, chunk1, 0, 
payload1.length));
+            assertTrue(Arrays.equals(payload2, 0, payload2.length, chunk2, 0, 
payload2.length));
+
+            rawStream.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            receiver.openFuture().get();
+            receiver.closeAsync().get();
+            connection.closeAsync().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void testStreamDeliveryRawInputStreamBlockedReadBytesAborted() 
throws Exception {
         final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to