This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 26a62ac  PROTON-2524 Ensure that StreamReceiver refills the credit 
window
26a62ac is described below

commit 26a62ac0f9f0f21406cb1d52ba924e06a4127d6e
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Mar 25 18:51:13 2022 -0400

    PROTON-2524 Ensure that StreamReceiver refills the credit window
    
    Stream receiver need to refill the credit window as deliveries are
    returned from receive calls when they are already completed and when a
    streamed delivery receives the final transfer as well.  Also auto accept
    should be actively accepting deliveries that have completed.  The credit
    window replenishment for the standard receiver and the stream receiver
    impl should be mostly the same.
---
 .../protonj2/client/impl/ClientStreamDelivery.java |  54 +++---
 .../protonj2/client/impl/ClientStreamReceiver.java |  13 +-
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 126 +++++++++++++
 .../client/impl/ReconnectStreamReceiverTest.java   |   1 +
 .../protonj2/client/impl/StreamReceiverTest.java   | 197 ++++++++++++++++++---
 5 files changed, 341 insertions(+), 50 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
index df8a42e..f6e8158 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
@@ -61,6 +61,9 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
         this.receiver = receiver;
         this.protonDelivery = protonDelivery.setLinkedResource(this);
 
+        // Already fully received delivery could be settled now
+        autoAcceptDeliveryIfNecessary();
+
         // Capture inbound events and route to an active stream or message
         protonDelivery.deliveryReadHandler(this::handleDeliveryRead)
                       .deliveryAbortedHandler(this::handleDeliveryAborted);
@@ -184,14 +187,25 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
     //----- Event Handlers for Delivery updates
 
     void handleDeliveryRead(IncomingDelivery delivery) {
-        if (rawInputStream != null) {
-            rawInputStream.handleDeliveryRead(delivery);
+        try {
+            if (rawInputStream != null) {
+                rawInputStream.handleDeliveryRead(delivery);
+            }
+        } finally {
+            autoAcceptDeliveryIfNecessary();
         }
     }
 
     void handleDeliveryAborted(IncomingDelivery delivery) {
-        if (rawInputStream != null) {
-            rawInputStream.handleDeliveryAborted(delivery);
+        try {
+            if (rawInputStream != null) {
+                rawInputStream.handleDeliveryAborted(delivery);
+            }
+        } finally {
+            try {
+                receiver.disposition(delivery, null, true);
+            } catch (Exception error) {
+            }
         }
     }
 
@@ -201,6 +215,18 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
         }
     }
 
+    //----- Private stream delivery API
+
+    private void autoAcceptDeliveryIfNecessary() {
+        if (receiver.receiverOptions().autoAccept() && 
!protonDelivery.isSettled() && !protonDelivery.isPartial()) {
+            try {
+                receiver.disposition(protonDelivery, Accepted.getInstance(), 
receiver.receiverOptions().autoSettle());
+            } catch (Exception error) {
+                LOG.trace("Caught error while attempting to auto accept the 
fully read delivery.", error);
+            }
+        }
+    }
+
     //----- Raw InputStream Implementation
 
     private class RawDeliveryInputStream extends InputStream {
@@ -226,8 +252,6 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
 
                 try {
                     executor.execute(() -> {
-                        autoAcceptDeliveryIfNecessary();
-
                         // If the deliver wasn't fully read either because 
there are remaining
                         // bytes locally we need to discard those to aid in 
retention avoidance.
                         // and to potentially open the session window to allow 
for fully reading
@@ -417,7 +441,6 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
                         buffer.append(protonDelivery.readAll());
                         readRequest.complete(buffer.getReadableBytes());
                     } else if (!delivery.isPartial()) {
-                        autoAcceptDeliveryIfNecessary();
                         readRequest.complete(-1);
                     }
 
@@ -430,8 +453,6 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
             if (readRequest != null) {
                 readRequest.failed(new ClientDeliveryAbortedException("The 
remote sender has aborted this delivery"));
             }
-
-            delivery.settle();
         }
 
         private void handleReceiverClosed(ClientStreamReceiver receiver) {
@@ -453,7 +474,6 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
                     } else if (protonDelivery.isAborted()) {
                         request.failed(new ClientDeliveryAbortedException("The 
remote sender has aborted this delivery"));
                     } else if (!protonDelivery.isPartial()) {
-                        autoAcceptDeliveryIfNecessary();
                         request.complete(-1);
                     } else {
                         readRequest = request;
@@ -466,20 +486,6 @@ public final class ClientStreamDelivery implements 
StreamDelivery {
             }
         }
 
-        private void autoAcceptDeliveryIfNecessary() {
-            if (receiver.receiverOptions().autoAccept() && 
!protonDelivery.isSettled()) {
-                if (!buffer.isReadable() && protonDelivery.available() == 0 &&
-                    (protonDelivery.isAborted() || 
!protonDelivery.isPartial())) {
-
-                    try {
-                        receiver.disposition(protonDelivery, 
Accepted.getInstance(), receiver.receiverOptions().autoSettle());
-                    } catch (Exception error) {
-                        LOG.trace("Caught error while attempting to auto 
accept the fully read delivery.", error);
-                    }
-                }
-            }
-        }
-
         private void checkStreamStateIsValid() throws IOException {
             if (closed.get()) {
                 throw new IOException("The InputStream has been explicitly 
closed");
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index b1ae55c..c512a8c 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -240,6 +240,7 @@ public final class ClientStreamReceiver implements 
StreamReceiver {
                     }
                 } else {
                     receive.complete(new ClientStreamDelivery(this, delivery));
+                    asyncReplenishCreditIfNeeded();
                 }
             }
         });
@@ -555,6 +556,7 @@ public final class ClientStreamReceiver implements 
StreamReceiver {
                     entry.getKey().complete(new ClientStreamDelivery(this, 
delivery));
                 } finally {
                     entries.remove();
+                    asyncReplenishCreditIfNeeded();
                 }
             }
         }
@@ -598,12 +600,21 @@ public final class ClientStreamReceiver implements 
StreamReceiver {
         });
     }
 
+    private void asyncReplenishCreditIfNeeded() {
+        int creditWindow = options.creditWindow();
+        if (creditWindow > 0) {
+            executor.execute(() -> replenishCreditIfNeeded());
+        }
+    }
+
     private void replenishCreditIfNeeded() {
         int creditWindow = options.creditWindow();
         if (creditWindow > 0) {
             int currentCredit = protonReceiver.getCredit();
             if (currentCredit <= creditWindow * 0.5) {
-                int potentialPrefetch = currentCredit + 
protonReceiver.unsettled().size();
+                //int potentialPrefetch = currentCredit + 
protonReceiver.unsettled().size();
+                int potentialPrefetch = currentCredit +
+                    (int)protonReceiver.unsettled().stream().filter((delivery) 
-> delivery.getLinkedResource() == null).count();
 
                 if (potentialPrefetch <= creditWindow * 0.7) {
                     int additionalCredit = creditWindow - potentialPrefetch;
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index c13bdbc..0ef9d90 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -64,6 +64,7 @@ import 
org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import 
org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
 import 
org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.Released;
@@ -2633,4 +2634,129 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() 
throws Exception {
+       doTestReceiverCreditReplenishedAfterSyncReceive(true);
+    }
+
+    @Test
+    public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() 
throws Exception {
+       doTestReceiverCreditReplenishedAfterSyncReceive(false);
+    }
+
+    public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean 
autoAccept) throws Exception {
+       byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello 
World"));
+
+       try (ProtonTestServer peer = new ProtonTestServer()) {
+          peer.expectSASLAnonymousConnect();
+          peer.expectOpen().respond();
+          peer.expectBegin().respond();
+          peer.expectAttach().ofReceiver().respond();
+          peer.expectFlow().withLinkCredit(10);
+          for (int i = 0; i < 10; ++i) {
+             peer.remoteTransfer().withDeliveryId(i)
+                                  .withMore(false)
+                                  .withMessageFormat(0)
+                                  .withPayload(payload).queue();
+          }
+          peer.start();
+
+          URI remoteURI = peer.getServerURI();
+
+          LOG.info("Test started, peer listening on: {}", remoteURI);
+
+          Client container = Client.create();
+          Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+
+          ReceiverOptions options = new ReceiverOptions();
+          options.autoAccept(autoAccept);
+          options.creditWindow(10);
+
+          Receiver receiver = connection.openReceiver("test-receiver", 
options);
+
+          Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume messages 1 and 2 which should not provoke credit 
replenishment
+          // as there are still 8 outstanding which is above the 70% mark
+          assertNotNull(receiver.receive()); // #1
+          assertNotNull(receiver.receive()); // #2
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+          }
+          peer.expectFlow().withLinkCredit(3);
+
+          // Now consume message 3 which will trip the replenish barrier and 
the
+          // credit should be updated to reflect that we still have 7 queued
+          assertNotNull(receiver.receive());  // #3
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume messages 4 and 5 which should not provoke credit 
replenishment
+          // as there are still 5 outstanding plus the credit we sent last time
+          // which is above the 70% mark
+          assertNotNull(receiver.receive()); // #4
+          assertNotNull(receiver.receive()); // #5
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+          }
+          peer.expectFlow().withLinkCredit(6);
+
+          // Consume number 6 which means we only have 4 outstanding plus the 
three
+          // that we sent last time we flowed which is 70% of possible 
prefetch so
+          // we should flow to top off credit which would be 6 since we have 
four
+          // still pending
+          assertNotNull(receiver.receive()); // #6
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume deliveries 7 and 8 which should not flow as we should be
+          // above the threshold of 70% since we would now have 2 outstanding
+          // and 6 credits on the link
+          assertNotNull(receiver.receive()); // #7
+          assertNotNull(receiver.receive()); // #8
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Now consume 9 and 10 but we still shouldn't flow more credit 
because
+          // the link credit is above the 50% mark for overall credit 
windowing.
+          assertNotNull(receiver.receive()); // #9
+          assertNotNull(receiver.receive()); // #10
+
+          peer.waitForScriptToComplete();
+          peer.expectClose().respond();
+
+          connection.close();
+
+          peer.waitForScriptToComplete();
+       }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
index a41f984..944a97c 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
@@ -68,6 +68,7 @@ class ReconnectStreamReceiverTest extends 
ImperativeClientTestCase {
                                       .withMore(false)
                                       .withMessageFormat(0)
                                       .withPayload(payload).queue();
+            
finalPeer.expectDisposition().withSettled(true).withState().accepted();
             finalPeer.start();
 
             final URI primaryURI = firstPeer.getServerURI();
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 5fa3a71..196a624 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -516,7 +516,8 @@ class StreamReceiverTest extends ImperativeClientTestCase {
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
@@ -574,6 +575,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectBegin().respond();
             peer.expectAttach().ofSender().respond();
+            peer.expectDisposition().withState().accepted().withSettled(true);
 
             // Ensures that stream receiver has the delivery in its queue.
             connection.openSender("test-sender").openFuture().get();
@@ -632,6 +634,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
             assertFalse(delivery.completed());
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectDisposition().withSettled(true);
 
             peer.remoteTransfer().withHandle(0)
                                  .withDeliveryId(0)
@@ -728,6 +731,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload).queue();
+            peer.expectDisposition().withState().accepted().withSettled(true);
             peer.start();
 
             URI remoteURI = peer.getServerURI();
@@ -1207,9 +1211,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             // An initial frame has arrived but no reads have been performed 
and then if closed
             // the delivery will be consumed to allow the session window to be 
opened and prevent
-            // a stall due to an un-consumed delivery.  The stream delivery 
will not auto accept
-            // or auto settle the delivery as the user closed early which 
should indicate they
-            // are rejecting the message otherwise it is a programming error 
on their part.
+            // a stall due to an un-consumed delivery.
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10);
             peer.remoteTransfer().withHandle(0)
@@ -1218,6 +1220,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
                                  .withMessageFormat(0)
                                  .withPayload(payload2).queue();
             
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
+            peer.expectDisposition().withSettled(true).withState().accepted();
 
             rawStream.close();
 
@@ -1262,7 +1265,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
             Client container = Client.create();
             ConnectionOptions connectionOptions = new 
ConnectionOptions().maxFrameSize(1000);
             Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), connectionOptions);
-            StreamReceiverOptions streamOptions = new 
StreamReceiverOptions().readBufferSize(2000);
+            StreamReceiverOptions streamOptions = new 
StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
             StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", streamOptions);
             StreamDelivery delivery = receiver.receive();
             assertNotNull(delivery);
@@ -1508,6 +1511,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload).queue();
+            peer.expectDisposition().withSettled(true).withState().accepted();
             peer.start();
 
             URI remoteURI = peer.getServerURI();
@@ -2270,6 +2274,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload2).queue();
+            
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
 
             InputStream bodyStream = message.body();
             assertNotNull(bodyStream);
@@ -2279,7 +2284,6 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
             // mode and there is nothing more to read.
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9);
-            
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
 
             byte[] combinedPayloads = new byte[body1.length + body2.length];
             bodyStream.read(combinedPayloads);
@@ -2340,7 +2344,9 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
             // Creating the input stream instance should read the first chunk 
of data from the incoming
             // delivery which should result in a new credit being available to 
expand the session window.
             // An additional transfer should be placed into the delivery 
buffer but not yet read since
-            // the user hasn't read anything.
+            // the user hasn't read anything. Since we are in auto settle the 
completed transfer should
+            // trigger settlement and also open the credit window but the 
session window should not be
+            // expanded since we haven't read the data yet.
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             
peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1);
             peer.remoteTransfer().withHandle(0)
@@ -2348,21 +2354,24 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload2).queue();
+            peer.expectDisposition().withSettled(true).withState().accepted();
+            
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(0).withLinkCredit(1);
 
             InputStream bodyStream = message.body();
             assertNotNull(bodyStream);
 
-            // Once the read of all data completes the session window should 
be opened and the
-            // stream should mark the delivery as accepted and settled since 
we are in auto settle
-            // mode and there is nothing more to read.
+            // Once the read of all data completes the session window should 
be opened
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(0);
-            
peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true);
             
peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(1);
 
             byte[] combinedPayloads = new byte[body1.length + body2.length];
             bodyStream.read(combinedPayloads);
 
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+            // No frames should be triggered by closing the stream since we 
already auto settled
+            // and updated the session window on the remote.
+
             assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 
0, body1.length));
             assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, 
body1.length, body1.length + body2.length));
 
@@ -2412,7 +2421,7 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
             Client container = Client.create();
             ConnectionOptions connectionOptions = new 
ConnectionOptions().maxFrameSize(1000);
             Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), connectionOptions);
-            StreamReceiverOptions streamOptions = new 
StreamReceiverOptions().readBufferSize(2000);
+            StreamReceiverOptions streamOptions = new 
StreamReceiverOptions().readBufferSize(2000).autoAccept(false);
             StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", streamOptions);
             StreamDelivery delivery = receiver.receive();
             assertNotNull(delivery);
@@ -2467,6 +2476,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
                 assertEquals(value, "test");
             });
 
+            delivery.accept();
+
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDetach().respond();
             peer.expectEnd().respond();
@@ -2498,11 +2509,6 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
                                  .withMore(false)
                                  .withMessageFormat(0)
                                  .withPayload(payload).queue();
-            peer.expectCoordinatorAttach().respond();
-            peer.remoteFlow().withLinkCredit(2).queue();
-            peer.expectDeclare().accept(txnId);
-            
peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted();
-            peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
             peer.start();
 
             URI remoteURI = peer.getServerURI();
@@ -2512,10 +2518,18 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
             final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
-            final StreamDelivery delivery = receiver.receive();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectCoordinatorAttach().respond();
+            peer.remoteFlow().withLinkCredit(2).queue();
+            peer.expectDeclare().accept(txnId);
+            
peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted();
+            peer.expectDischarge().withFail(false).withTxnId(txnId).accept();
 
             receiver.session().beginTransaction();
 
+            final StreamDelivery delivery = receiver.receive();
+
             assertNotNull(delivery);
             assertTrue(delivery.completed());
             assertFalse(delivery.aborted());
@@ -2540,6 +2554,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             receiver.session().commitTransaction();
 
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
             peer.expectDetach().respond();
             peer.expectEnd().respond();
             peer.expectClose().respond();
@@ -2575,7 +2591,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -2623,7 +2640,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -2671,7 +2689,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -2719,7 +2738,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -2767,7 +2787,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -2815,7 +2836,8 @@ class StreamReceiverTest extends ImperativeClientTestCase 
{
 
             final Client container = Client.create();
             final Connection connection = 
container.connect(remoteURI.getHost(), remoteURI.getPort());
-            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue");
+            final StreamReceiverOptions options = new 
StreamReceiverOptions().autoAccept(false);
+            final StreamReceiver receiver = 
connection.openStreamReceiver("test-queue", options);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
             peer.expectDisposition().withState().rejected("decode-error", 
"failed reading message header");
@@ -3707,6 +3729,131 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() 
throws Exception {
+       doTestReceiverCreditReplenishedAfterSyncReceive(true);
+    }
+
+    @Test
+    public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() 
throws Exception {
+       doTestReceiverCreditReplenishedAfterSyncReceive(false);
+    }
+
+    public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean 
autoAccept) throws Exception {
+       byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello 
World"));
+
+       try (ProtonTestServer peer = new ProtonTestServer()) {
+          peer.expectSASLAnonymousConnect();
+          peer.expectOpen().respond();
+          peer.expectBegin().respond();
+          peer.expectAttach().ofReceiver().respond();
+          peer.expectFlow().withLinkCredit(10);
+          for (int i = 0; i < 10; ++i) {
+             peer.remoteTransfer().withDeliveryId(i)
+                                  .withMore(false)
+                                  .withMessageFormat(0)
+                                  .withPayload(payload).queue();
+          }
+          peer.start();
+
+          URI remoteURI = peer.getServerURI();
+
+          LOG.info("Test started, peer listening on: {}", remoteURI);
+
+          Client container = Client.create();
+          Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+
+          StreamReceiverOptions options = new StreamReceiverOptions();
+          options.autoAccept(autoAccept);
+          options.creditWindow(10);
+
+          StreamReceiver receiver = 
connection.openStreamReceiver("test-receiver", options);
+
+          Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume messages 1 and 2 which should not provoke credit 
replenishment
+          // as there are still 8 outstanding which is above the 70% mark
+          assertNotNull(receiver.receive()); // #1
+          assertNotNull(receiver.receive()); // #2
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+          }
+          peer.expectFlow().withLinkCredit(3);
+
+          // Now consume message 3 which will trip the replenish barrier and 
the
+          // credit should be updated to reflect that we still have 7 queued
+          assertNotNull(receiver.receive());  // #3
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume messages 4 and 5 which should not provoke credit 
replenishment
+          // as there are still 5 outstanding plus the credit we sent last time
+          // which is above the 70% mark
+          assertNotNull(receiver.receive()); // #4
+          assertNotNull(receiver.receive()); // #5
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+          }
+          peer.expectFlow().withLinkCredit(6);
+
+          // Consume number 6 which means we only have 4 outstanding plus the 
three
+          // that we sent last time we flowed which is 70% of possible 
prefetch so
+          // we should flow to top off credit which would be 6 since we have 
four
+          // still pending
+          assertNotNull(receiver.receive()); // #6
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Consume deliveries 7 and 8 which should not flow as we should be
+          // above the threshold of 70% since we would now have 2 outstanding
+          // and 6 credits on the link
+          assertNotNull(receiver.receive()); // #7
+          assertNotNull(receiver.receive()); // #8
+
+          peer.waitForScriptToComplete();
+          if (autoAccept)
+          {
+              peer.expectDisposition();
+              peer.expectDisposition();
+          }
+
+          // Now consume 9 and 10 but we still shouldn't flow more credit 
because
+          // the link credit is above the 50% mark for overall credit 
windowing.
+          assertNotNull(receiver.receive()); // #9
+          assertNotNull(receiver.receive()); // #10
+
+          peer.waitForScriptToComplete();
+
+          peer.expectClose().respond();
+          connection.close();
+
+          peer.waitForScriptToComplete();
+       }
+    }
+
     private byte[] createInvalidHeaderEncoding() {
         final byte[] buffer = new byte[12];
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to