This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new b0abbf3c PROTON-2646 Update sendable state in sender on check b0abbf3c is described below commit b0abbf3cd8313aebf6acc7ad1fb03ced7ba38f98 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Mon Nov 7 15:28:16 2022 -0500 PROTON-2646 Update sendable state in sender on check Ensure that the sendable state is updated in the case that the sender is queried and the caller then takes action to respond to the not sendable state. --- .../qpid/protonj2/engine/impl/ProtonSender.java | 2 +- .../protonj2/engine/impl/ProtonSessionTest.java | 155 +++++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java index a5f0fa74..a0ba9f64 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java @@ -88,7 +88,7 @@ public class ProtonSender extends ProtonLink<Sender> implements Sender { @Override public boolean isSendable() { - return sendable && sessionWindow.isSendable(); + return sendable = sendable && sessionWindow.isSendable(); } @Override diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java index 9e5719c0..767d7774 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java @@ -2514,6 +2514,161 @@ public class ProtonSessionTest extends ProtonEngineTestSupport { assertNull(failure); } + @Test + public void testBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState() throws Exception { + Engine engine = EngineFactory.PROTON.createNonSaslEngine(); + engine.errorHandler(result -> failure = result.failureCause()); + Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>(); + ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks); + + final byte[] payload = new byte[] {0, 1, 2, 3, 4}; + final DeliveryTagGenerator generator = ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator(); + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().withMaxFrameSize(1024).respond(); + peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0); + peer.expectAttach().respond(); + peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue(); + peer.expectAttach().respond(); + peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue(); + + Connection connection = engine.start().setMaxFrameSize(1024).open(); + Session session = connection.session().setOutgoingCapacity(1024).open(); + Sender sender1 = session.sender("test1").setDeliveryTagGenerator(generator).open(); + Sender sender2 = session.sender("test2").setDeliveryTagGenerator(generator).open(); + + peer.waitForScriptToComplete(); + peer.expectTransfer().withPayload(payload); + + final AtomicInteger sender1CreditStateUpdated = new AtomicInteger(); + sender1.creditStateUpdateHandler((self) -> { + sender1CreditStateUpdated.incrementAndGet(); + }); + + final AtomicInteger sender2CreditStateUpdated = new AtomicInteger(); + sender2.creditStateUpdateHandler((self) -> { + sender2CreditStateUpdated.incrementAndGet(); + }); + + assertTrue(sender1.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + assertTrue(sender2.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + + // Open, Begin, Attach, Attach + assertEquals(4, asyncIOCallbacks.size()); + asyncIOCallbacks.forEach(runner -> runner.run()); + asyncIOCallbacks.clear(); + + OutgoingDelivery delivery = sender1.next(); + delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload)); + + peer.waitForScriptToComplete(); + + assertEquals(1, asyncIOCallbacks.size()); + + assertFalse(sender1.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window + assertFalse(sender2.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + + // Free a frame's worth of window which should trigger both senders sendable update event + asyncIOCallbacks.poll().run(); + assertEquals(0, asyncIOCallbacks.size()); + + assertTrue(sender1.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + assertEquals(1, sender1CreditStateUpdated.get()); + assertTrue(sender2.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + assertEquals(1, sender2CreditStateUpdated.get()); + + peer.waitForScriptToComplete(); + assertNull(failure); + } + + @Test + public void testSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow() throws Exception { + Engine engine = EngineFactory.PROTON.createNonSaslEngine(); + engine.errorHandler(result -> failure = result.failureCause()); + Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>(); + ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks); + + final byte[] payload = new byte[] {0, 1, 2, 3, 4}; + final DeliveryTagGenerator generator = ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator(); + + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().withMaxFrameSize(1024).respond(); + peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0); + peer.expectAttach().respond(); + peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue(); + peer.expectAttach().respond(); + peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue(); + + Connection connection = engine.start().setMaxFrameSize(1024).open(); + Session session = connection.session().setOutgoingCapacity(1024).open(); + Sender sender1 = session.sender("test1").setDeliveryTagGenerator(generator).open(); + Sender sender2 = session.sender("test2").setDeliveryTagGenerator(generator).open(); + + peer.waitForScriptToComplete(); + peer.expectTransfer().withPayload(payload); + + final AtomicInteger sender1CreditStateUpdated = new AtomicInteger(); + sender1.creditStateUpdateHandler((self) -> { + sender1CreditStateUpdated.incrementAndGet(); + if (self.isSendable()) { + OutgoingDelivery delivery = self.next(); + delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload)); + } + }); + + final AtomicInteger sender2CreditStateUpdated = new AtomicInteger(); + sender2.creditStateUpdateHandler((self) -> { + sender2CreditStateUpdated.incrementAndGet(); + }); + + assertTrue(sender1.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + assertTrue(sender2.isSendable()); + assertEquals(1024, session.getRemainingOutgoingCapacity()); + + // Open, Begin, Attach, Attach + assertEquals(4, asyncIOCallbacks.size()); + asyncIOCallbacks.forEach(runner -> runner.run()); + asyncIOCallbacks.clear(); + + OutgoingDelivery delivery = sender1.next(); + delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload)); + + peer.waitForScriptToComplete(); + peer.expectTransfer().withPayload(payload); + + assertEquals(1, asyncIOCallbacks.size()); + + assertFalse(sender1.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + // Sender 2 shouldn't be able to send since sender 1 consumed the outgoing window + assertFalse(sender2.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + + // Should trigger sender 1 to send which should exhaust the outgoing credit + asyncIOCallbacks.poll().run(); + assertEquals(1, asyncIOCallbacks.size()); // Sender one should have sent + + assertFalse(sender1.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + assertEquals(1, sender1CreditStateUpdated.get()); + assertFalse(sender2.isSendable()); + assertEquals(0, session.getRemainingOutgoingCapacity()); + // Should not have triggered an event for sender 2 being able to send since + // sender one consumed the outgoing window already. + assertEquals(0, sender2CreditStateUpdated.get()); + + peer.waitForScriptToComplete(); + assertNull(failure); + } + @Test public void testHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives() throws Exception { Engine engine = EngineFactory.PROTON.createNonSaslEngine(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org