This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new b0abbf3c PROTON-2646 Update sendable state in sender on check
b0abbf3c is described below
commit b0abbf3cd8313aebf6acc7ad1fb03ced7ba38f98
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Nov 7 15:28:16 2022 -0500
PROTON-2646 Update sendable state in sender on check
Ensure that the sendable state is updated in the case that the sender is
queried and the caller then takes action to respond to the not sendable
state.
---
.../qpid/protonj2/engine/impl/ProtonSender.java | 2 +-
.../protonj2/engine/impl/ProtonSessionTest.java | 155 +++++++++++++++++++++
2 files changed, 156 insertions(+), 1 deletion(-)
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
index a5f0fa74..a0ba9f64 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
@@ -88,7 +88,7 @@ public class ProtonSender extends ProtonLink<Sender>
implements Sender {
@Override
public boolean isSendable() {
- return sendable && sessionWindow.isSendable();
+ return sendable = sendable && sessionWindow.isSendable();
}
@Override
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
index 9e5719c0..767d7774 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
@@ -2514,6 +2514,161 @@ public class ProtonSessionTest extends
ProtonEngineTestSupport {
assertNull(failure);
}
+ @Test
+ public void
testBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState()
throws Exception {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>();
+ ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks);
+
+ final byte[] payload = new byte[] {0, 1, 2, 3, 4};
+ final DeliveryTagGenerator generator =
ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator();
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().withMaxFrameSize(1024).respond();
+
peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0);
+ peer.expectAttach().respond();
+
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+ peer.expectAttach().respond();
+
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+
+ Connection connection = engine.start().setMaxFrameSize(1024).open();
+ Session session =
connection.session().setOutgoingCapacity(1024).open();
+ Sender sender1 =
session.sender("test1").setDeliveryTagGenerator(generator).open();
+ Sender sender2 =
session.sender("test2").setDeliveryTagGenerator(generator).open();
+
+ peer.waitForScriptToComplete();
+ peer.expectTransfer().withPayload(payload);
+
+ final AtomicInteger sender1CreditStateUpdated = new AtomicInteger();
+ sender1.creditStateUpdateHandler((self) -> {
+ sender1CreditStateUpdated.incrementAndGet();
+ });
+
+ final AtomicInteger sender2CreditStateUpdated = new AtomicInteger();
+ sender2.creditStateUpdateHandler((self) -> {
+ sender2CreditStateUpdated.incrementAndGet();
+ });
+
+ assertTrue(sender1.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+ assertTrue(sender2.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+
+ // Open, Begin, Attach, Attach
+ assertEquals(4, asyncIOCallbacks.size());
+ asyncIOCallbacks.forEach(runner -> runner.run());
+ asyncIOCallbacks.clear();
+
+ OutgoingDelivery delivery = sender1.next();
+ delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+
+ peer.waitForScriptToComplete();
+
+ assertEquals(1, asyncIOCallbacks.size());
+
+ assertFalse(sender1.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+ // Sender 2 shouldn't be able to send since sender 1 consumed the
outgoing window
+ assertFalse(sender2.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+
+ // Free a frame's worth of window which should trigger both senders
sendable update event
+ asyncIOCallbacks.poll().run();
+ assertEquals(0, asyncIOCallbacks.size());
+
+ assertTrue(sender1.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+ assertEquals(1, sender1CreditStateUpdated.get());
+ assertTrue(sender2.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+ assertEquals(1, sender2CreditStateUpdated.get());
+
+ peer.waitForScriptToComplete();
+ assertNull(failure);
+ }
+
+ @Test
+ public void
testSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow()
throws Exception {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>();
+ ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks);
+
+ final byte[] payload = new byte[] {0, 1, 2, 3, 4};
+ final DeliveryTagGenerator generator =
ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator();
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().withMaxFrameSize(1024).respond();
+
peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0);
+ peer.expectAttach().respond();
+
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+ peer.expectAttach().respond();
+
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+
+ Connection connection = engine.start().setMaxFrameSize(1024).open();
+ Session session =
connection.session().setOutgoingCapacity(1024).open();
+ Sender sender1 =
session.sender("test1").setDeliveryTagGenerator(generator).open();
+ Sender sender2 =
session.sender("test2").setDeliveryTagGenerator(generator).open();
+
+ peer.waitForScriptToComplete();
+ peer.expectTransfer().withPayload(payload);
+
+ final AtomicInteger sender1CreditStateUpdated = new AtomicInteger();
+ sender1.creditStateUpdateHandler((self) -> {
+ sender1CreditStateUpdated.incrementAndGet();
+ if (self.isSendable()) {
+ OutgoingDelivery delivery = self.next();
+
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+ }
+ });
+
+ final AtomicInteger sender2CreditStateUpdated = new AtomicInteger();
+ sender2.creditStateUpdateHandler((self) -> {
+ sender2CreditStateUpdated.incrementAndGet();
+ });
+
+ assertTrue(sender1.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+ assertTrue(sender2.isSendable());
+ assertEquals(1024, session.getRemainingOutgoingCapacity());
+
+ // Open, Begin, Attach, Attach
+ assertEquals(4, asyncIOCallbacks.size());
+ asyncIOCallbacks.forEach(runner -> runner.run());
+ asyncIOCallbacks.clear();
+
+ OutgoingDelivery delivery = sender1.next();
+ delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+
+ peer.waitForScriptToComplete();
+ peer.expectTransfer().withPayload(payload);
+
+ assertEquals(1, asyncIOCallbacks.size());
+
+ assertFalse(sender1.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+ // Sender 2 shouldn't be able to send since sender 1 consumed the
outgoing window
+ assertFalse(sender2.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+
+ // Should trigger sender 1 to send which should exhaust the outgoing
credit
+ asyncIOCallbacks.poll().run();
+ assertEquals(1, asyncIOCallbacks.size()); // Sender one should have
sent
+
+ assertFalse(sender1.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+ assertEquals(1, sender1CreditStateUpdated.get());
+ assertFalse(sender2.isSendable());
+ assertEquals(0, session.getRemainingOutgoingCapacity());
+ // Should not have triggered an event for sender 2 being able to send
since
+ // sender one consumed the outgoing window already.
+ assertEquals(0, sender2CreditStateUpdated.get());
+
+ peer.waitForScriptToComplete();
+ assertNull(failure);
+ }
+
@Test
public void
testHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives() throws
Exception {
Engine engine = EngineFactory.PROTON.createNonSaslEngine();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]