This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 26a62ac PROTON-2524 Ensure that StreamReceiver refills the credit window 26a62ac is described below commit 26a62ac0f9f0f21406cb1d52ba924e06a4127d6e Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Mar 25 18:51:13 2022 -0400 PROTON-2524 Ensure that StreamReceiver refills the credit window Stream receiver need to refill the credit window as deliveries are returned from receive calls when they are already completed and when a streamed delivery receives the final transfer as well. Also auto accept should be actively accepting deliveries that have completed. The credit window replenishment for the standard receiver and the stream receiver impl should be mostly the same. --- .../protonj2/client/impl/ClientStreamDelivery.java | 54 +++--- .../protonj2/client/impl/ClientStreamReceiver.java | 13 +- .../qpid/protonj2/client/impl/ReceiverTest.java | 126 +++++++++++++ .../client/impl/ReconnectStreamReceiverTest.java | 1 + .../protonj2/client/impl/StreamReceiverTest.java | 197 ++++++++++++++++++--- 5 files changed, 341 insertions(+), 50 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java index df8a42e..f6e8158 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java @@ -61,6 +61,9 @@ public final class ClientStreamDelivery implements StreamDelivery { this.receiver = receiver; this.protonDelivery = protonDelivery.setLinkedResource(this); + // Already fully received delivery could be settled now + autoAcceptDeliveryIfNecessary(); + // Capture inbound events and route to an active stream or message protonDelivery.deliveryReadHandler(this::handleDeliveryRead) .deliveryAbortedHandler(this::handleDeliveryAborted); @@ -184,14 +187,25 @@ public final class ClientStreamDelivery implements StreamDelivery { //----- Event Handlers for Delivery updates void handleDeliveryRead(IncomingDelivery delivery) { - if (rawInputStream != null) { - rawInputStream.handleDeliveryRead(delivery); + try { + if (rawInputStream != null) { + rawInputStream.handleDeliveryRead(delivery); + } + } finally { + autoAcceptDeliveryIfNecessary(); } } void handleDeliveryAborted(IncomingDelivery delivery) { - if (rawInputStream != null) { - rawInputStream.handleDeliveryAborted(delivery); + try { + if (rawInputStream != null) { + rawInputStream.handleDeliveryAborted(delivery); + } + } finally { + try { + receiver.disposition(delivery, null, true); + } catch (Exception error) { + } } } @@ -201,6 +215,18 @@ public final class ClientStreamDelivery implements StreamDelivery { } } + //----- Private stream delivery API + + private void autoAcceptDeliveryIfNecessary() { + if (receiver.receiverOptions().autoAccept() && !protonDelivery.isSettled() && !protonDelivery.isPartial()) { + try { + receiver.disposition(protonDelivery, Accepted.getInstance(), receiver.receiverOptions().autoSettle()); + } catch (Exception error) { + LOG.trace("Caught error while attempting to auto accept the fully read delivery.", error); + } + } + } + //----- Raw InputStream Implementation private class RawDeliveryInputStream extends InputStream { @@ -226,8 +252,6 @@ public final class ClientStreamDelivery implements StreamDelivery { try { executor.execute(() -> { - autoAcceptDeliveryIfNecessary(); - // If the deliver wasn't fully read either because there are remaining // bytes locally we need to discard those to aid in retention avoidance. // and to potentially open the session window to allow for fully reading @@ -417,7 +441,6 @@ public final class ClientStreamDelivery implements StreamDelivery { buffer.append(protonDelivery.readAll()); readRequest.complete(buffer.getReadableBytes()); } else if (!delivery.isPartial()) { - autoAcceptDeliveryIfNecessary(); readRequest.complete(-1); } @@ -430,8 +453,6 @@ public final class ClientStreamDelivery implements StreamDelivery { if (readRequest != null) { readRequest.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery")); } - - delivery.settle(); } private void handleReceiverClosed(ClientStreamReceiver receiver) { @@ -453,7 +474,6 @@ public final class ClientStreamDelivery implements StreamDelivery { } else if (protonDelivery.isAborted()) { request.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery")); } else if (!protonDelivery.isPartial()) { - autoAcceptDeliveryIfNecessary(); request.complete(-1); } else { readRequest = request; @@ -466,20 +486,6 @@ public final class ClientStreamDelivery implements StreamDelivery { } } - private void autoAcceptDeliveryIfNecessary() { - if (receiver.receiverOptions().autoAccept() && !protonDelivery.isSettled()) { - if (!buffer.isReadable() && protonDelivery.available() == 0 && - (protonDelivery.isAborted() || !protonDelivery.isPartial())) { - - try { - receiver.disposition(protonDelivery, Accepted.getInstance(), receiver.receiverOptions().autoSettle()); - } catch (Exception error) { - LOG.trace("Caught error while attempting to auto accept the fully read delivery.", error); - } - } - } - } - private void checkStreamStateIsValid() throws IOException { if (closed.get()) { throw new IOException("The InputStream has been explicitly closed"); diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java index b1ae55c..c512a8c 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java @@ -240,6 +240,7 @@ public final class ClientStreamReceiver implements StreamReceiver { } } else { receive.complete(new ClientStreamDelivery(this, delivery)); + asyncReplenishCreditIfNeeded(); } } }); @@ -555,6 +556,7 @@ public final class ClientStreamReceiver implements StreamReceiver { entry.getKey().complete(new ClientStreamDelivery(this, delivery)); } finally { entries.remove(); + asyncReplenishCreditIfNeeded(); } } } @@ -598,12 +600,21 @@ public final class ClientStreamReceiver implements StreamReceiver { }); } + private void asyncReplenishCreditIfNeeded() { + int creditWindow = options.creditWindow(); + if (creditWindow > 0) { + executor.execute(() -> replenishCreditIfNeeded()); + } + } + private void replenishCreditIfNeeded() { int creditWindow = options.creditWindow(); if (creditWindow > 0) { int currentCredit = protonReceiver.getCredit(); if (currentCredit <= creditWindow * 0.5) { - int potentialPrefetch = currentCredit + protonReceiver.unsettled().size(); + //int potentialPrefetch = currentCredit + protonReceiver.unsettled().size(); + int potentialPrefetch = currentCredit + + (int)protonReceiver.unsettled().stream().filter((delivery) -> delivery.getLinkedResource() == null).count(); if (potentialPrefetch <= creditWindow * 0.7) { int additionalCredit = creditWindow - potentialPrefetch; diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java index c13bdbc..0ef9d90 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java @@ -64,6 +64,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException; import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException; import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase; +import org.apache.qpid.protonj2.client.test.Wait; import org.apache.qpid.protonj2.test.driver.ProtonTestServer; import org.apache.qpid.protonj2.test.driver.codec.messaging.Modified; import org.apache.qpid.protonj2.test.driver.codec.messaging.Released; @@ -2633,4 +2634,129 @@ public class ReceiverTest extends ImperativeClientTestCase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() throws Exception { + doTestReceiverCreditReplenishedAfterSyncReceive(true); + } + + @Test + public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() throws Exception { + doTestReceiverCreditReplenishedAfterSyncReceive(false); + } + + public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean autoAccept) throws Exception { + byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World")); + + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond(); + peer.expectFlow().withLinkCredit(10); + for (int i = 0; i < 10; ++i) { + peer.remoteTransfer().withDeliveryId(i) + .withMore(false) + .withMessageFormat(0) + .withPayload(payload).queue(); + } + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); + + ReceiverOptions options = new ReceiverOptions(); + options.autoAccept(autoAccept); + options.creditWindow(10); + + Receiver receiver = connection.openReceiver("test-receiver", options); + + Wait.waitFor(() -> receiver.queuedDeliveries() == 10); + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume messages 1 and 2 which should not provoke credit replenishment + // as there are still 8 outstanding which is above the 70% mark + assertNotNull(receiver.receive()); // #1 + assertNotNull(receiver.receive()); // #2 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + } + peer.expectFlow().withLinkCredit(3); + + // Now consume message 3 which will trip the replenish barrier and the + // credit should be updated to reflect that we still have 7 queued + assertNotNull(receiver.receive()); // #3 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume messages 4 and 5 which should not provoke credit replenishment + // as there are still 5 outstanding plus the credit we sent last time + // which is above the 70% mark + assertNotNull(receiver.receive()); // #4 + assertNotNull(receiver.receive()); // #5 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + } + peer.expectFlow().withLinkCredit(6); + + // Consume number 6 which means we only have 4 outstanding plus the three + // that we sent last time we flowed which is 70% of possible prefetch so + // we should flow to top off credit which would be 6 since we have four + // still pending + assertNotNull(receiver.receive()); // #6 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume deliveries 7 and 8 which should not flow as we should be + // above the threshold of 70% since we would now have 2 outstanding + // and 6 credits on the link + assertNotNull(receiver.receive()); // #7 + assertNotNull(receiver.receive()); // #8 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Now consume 9 and 10 but we still shouldn't flow more credit because + // the link credit is above the 50% mark for overall credit windowing. + assertNotNull(receiver.receive()); // #9 + assertNotNull(receiver.receive()); // #10 + + peer.waitForScriptToComplete(); + peer.expectClose().respond(); + + connection.close(); + + peer.waitForScriptToComplete(); + } + } } diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java index a41f984..944a97c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java @@ -68,6 +68,7 @@ class ReconnectStreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload).queue(); + finalPeer.expectDisposition().withSettled(true).withState().accepted(); finalPeer.start(); final URI primaryURI = firstPeer.getServerURI(); diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java index 5fa3a71..196a624 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java @@ -516,7 +516,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); @@ -574,6 +575,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectBegin().respond(); peer.expectAttach().ofSender().respond(); + peer.expectDisposition().withState().accepted().withSettled(true); // Ensures that stream receiver has the delivery in its queue. connection.openSender("test-sender").openFuture().get(); @@ -632,6 +634,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { assertFalse(delivery.completed()); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDisposition().withSettled(true); peer.remoteTransfer().withHandle(0) .withDeliveryId(0) @@ -728,6 +731,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload).queue(); + peer.expectDisposition().withState().accepted().withSettled(true); peer.start(); URI remoteURI = peer.getServerURI(); @@ -1207,9 +1211,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { // An initial frame has arrived but no reads have been performed and then if closed // the delivery will be consumed to allow the session window to be opened and prevent - // a stall due to an un-consumed delivery. The stream delivery will not auto accept - // or auto settle the delivery as the user closed early which should indicate they - // are rejecting the message otherwise it is a programming error on their part. + // a stall due to an un-consumed delivery. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(10); peer.remoteTransfer().withHandle(0) @@ -1218,6 +1220,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMessageFormat(0) .withPayload(payload2).queue(); peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9); + peer.expectDisposition().withSettled(true).withState().accepted(); rawStream.close(); @@ -1262,7 +1265,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { Client container = Client.create(); ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000); Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions); - StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000); + StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false); StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions); StreamDelivery delivery = receiver.receive(); assertNotNull(delivery); @@ -1508,6 +1511,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload).queue(); + peer.expectDisposition().withSettled(true).withState().accepted(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -2270,6 +2274,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload2).queue(); + peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true); InputStream bodyStream = message.body(); assertNotNull(bodyStream); @@ -2279,7 +2284,6 @@ class StreamReceiverTest extends ImperativeClientTestCase { // mode and there is nothing more to read. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(9); - peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true); byte[] combinedPayloads = new byte[body1.length + body2.length]; bodyStream.read(combinedPayloads); @@ -2340,7 +2344,9 @@ class StreamReceiverTest extends ImperativeClientTestCase { // Creating the input stream instance should read the first chunk of data from the incoming // delivery which should result in a new credit being available to expand the session window. // An additional transfer should be placed into the delivery buffer but not yet read since - // the user hasn't read anything. + // the user hasn't read anything. Since we are in auto settle the completed transfer should + // trigger settlement and also open the credit window but the session window should not be + // expanded since we haven't read the data yet. peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectFlow().withDeliveryCount(0).withIncomingWindow(1).withLinkCredit(1); peer.remoteTransfer().withHandle(0) @@ -2348,21 +2354,24 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload2).queue(); + peer.expectDisposition().withSettled(true).withState().accepted(); + peer.expectFlow().withDeliveryCount(1).withIncomingWindow(0).withLinkCredit(1); InputStream bodyStream = message.body(); assertNotNull(bodyStream); - // Once the read of all data completes the session window should be opened and the - // stream should mark the delivery as accepted and settled since we are in auto settle - // mode and there is nothing more to read. + // Once the read of all data completes the session window should be opened peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(0); - peer.expectDisposition().withFirst(0).withState().accepted().withSettled(true); peer.expectFlow().withDeliveryCount(1).withIncomingWindow(1).withLinkCredit(1); byte[] combinedPayloads = new byte[body1.length + body2.length]; bodyStream.read(combinedPayloads); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + // No frames should be triggered by closing the stream since we already auto settled + // and updated the session window on the remote. + assertTrue(Arrays.equals(body1, 0, body1.length, combinedPayloads, 0, body1.length)); assertTrue(Arrays.equals(body2, 0, body2.length, combinedPayloads, body1.length, body1.length + body2.length)); @@ -2412,7 +2421,7 @@ class StreamReceiverTest extends ImperativeClientTestCase { Client container = Client.create(); ConnectionOptions connectionOptions = new ConnectionOptions().maxFrameSize(1000); Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions); - StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000); + StreamReceiverOptions streamOptions = new StreamReceiverOptions().readBufferSize(2000).autoAccept(false); StreamReceiver receiver = connection.openStreamReceiver("test-queue", streamOptions); StreamDelivery delivery = receiver.receive(); assertNotNull(delivery); @@ -2467,6 +2476,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { assertEquals(value, "test"); }); + delivery.accept(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDetach().respond(); peer.expectEnd().respond(); @@ -2498,11 +2509,6 @@ class StreamReceiverTest extends ImperativeClientTestCase { .withMore(false) .withMessageFormat(0) .withPayload(payload).queue(); - peer.expectCoordinatorAttach().respond(); - peer.remoteFlow().withLinkCredit(2).queue(); - peer.expectDeclare().accept(txnId); - peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted(); - peer.expectDischarge().withFail(false).withTxnId(txnId).accept(); peer.start(); URI remoteURI = peer.getServerURI(); @@ -2512,10 +2518,18 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); - final StreamDelivery delivery = receiver.receive(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectCoordinatorAttach().respond(); + peer.remoteFlow().withLinkCredit(2).queue(); + peer.expectDeclare().accept(txnId); + peer.expectDisposition().withSettled(true).withState().transactional().withTxnId(txnId).withAccepted(); + peer.expectDischarge().withFail(false).withTxnId(txnId).accept(); receiver.session().beginTransaction(); + final StreamDelivery delivery = receiver.receive(); + assertNotNull(delivery); assertTrue(delivery.completed()); assertFalse(delivery.aborted()); @@ -2540,6 +2554,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { receiver.session().commitTransaction(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectDetach().respond(); peer.expectEnd().respond(); peer.expectClose().respond(); @@ -2575,7 +2591,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -2623,7 +2640,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -2671,7 +2689,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -2719,7 +2738,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -2767,7 +2787,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -2815,7 +2836,8 @@ class StreamReceiverTest extends ImperativeClientTestCase { final Client container = Client.create(); final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); - final StreamReceiver receiver = connection.openStreamReceiver("test-queue"); + final StreamReceiverOptions options = new StreamReceiverOptions().autoAccept(false); + final StreamReceiver receiver = connection.openStreamReceiver("test-queue", options); peer.waitForScriptToComplete(5, TimeUnit.SECONDS); peer.expectDisposition().withState().rejected("decode-error", "failed reading message header"); @@ -3707,6 +3729,131 @@ class StreamReceiverTest extends ImperativeClientTestCase { } } + @Test + public void testReceiverCreditReplenishedAfterSyncReceiveAutoAccept() throws Exception { + doTestReceiverCreditReplenishedAfterSyncReceive(true); + } + + @Test + public void testReceiverCreditReplenishedAfterSyncReceiveManualAccept() throws Exception { + doTestReceiverCreditReplenishedAfterSyncReceive(false); + } + + public void doTestReceiverCreditReplenishedAfterSyncReceive(boolean autoAccept) throws Exception { + byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World")); + + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofReceiver().respond(); + peer.expectFlow().withLinkCredit(10); + for (int i = 0; i < 10; ++i) { + peer.remoteTransfer().withDeliveryId(i) + .withMore(false) + .withMessageFormat(0) + .withPayload(payload).queue(); + } + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()); + + StreamReceiverOptions options = new StreamReceiverOptions(); + options.autoAccept(autoAccept); + options.creditWindow(10); + + StreamReceiver receiver = connection.openStreamReceiver("test-receiver", options); + + Wait.waitFor(() -> receiver.queuedDeliveries() == 10); + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume messages 1 and 2 which should not provoke credit replenishment + // as there are still 8 outstanding which is above the 70% mark + assertNotNull(receiver.receive()); // #1 + assertNotNull(receiver.receive()); // #2 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + } + peer.expectFlow().withLinkCredit(3); + + // Now consume message 3 which will trip the replenish barrier and the + // credit should be updated to reflect that we still have 7 queued + assertNotNull(receiver.receive()); // #3 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume messages 4 and 5 which should not provoke credit replenishment + // as there are still 5 outstanding plus the credit we sent last time + // which is above the 70% mark + assertNotNull(receiver.receive()); // #4 + assertNotNull(receiver.receive()); // #5 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + } + peer.expectFlow().withLinkCredit(6); + + // Consume number 6 which means we only have 4 outstanding plus the three + // that we sent last time we flowed which is 70% of possible prefetch so + // we should flow to top off credit which would be 6 since we have four + // still pending + assertNotNull(receiver.receive()); // #6 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Consume deliveries 7 and 8 which should not flow as we should be + // above the threshold of 70% since we would now have 2 outstanding + // and 6 credits on the link + assertNotNull(receiver.receive()); // #7 + assertNotNull(receiver.receive()); // #8 + + peer.waitForScriptToComplete(); + if (autoAccept) + { + peer.expectDisposition(); + peer.expectDisposition(); + } + + // Now consume 9 and 10 but we still shouldn't flow more credit because + // the link credit is above the 50% mark for overall credit windowing. + assertNotNull(receiver.receive()); // #9 + assertNotNull(receiver.receive()); // #10 + + peer.waitForScriptToComplete(); + + peer.expectClose().respond(); + connection.close(); + + peer.waitForScriptToComplete(); + } + } + private byte[] createInvalidHeaderEncoding() { final byte[] buffer = new byte[12]; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org