This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new b0abbf3c PROTON-2646 Update sendable state in sender on check
b0abbf3c is described below

commit b0abbf3cd8313aebf6acc7ad1fb03ced7ba38f98
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Mon Nov 7 15:28:16 2022 -0500

    PROTON-2646 Update sendable state in sender on check
    
    Ensure that the sendable state is updated in the case that the sender is
    queried and the caller then takes action to respond to the not sendable
    state.
---
 .../qpid/protonj2/engine/impl/ProtonSender.java    |   2 +-
 .../protonj2/engine/impl/ProtonSessionTest.java    | 155 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 1 deletion(-)

diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
index a5f0fa74..a0ba9f64 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSender.java
@@ -88,7 +88,7 @@ public class ProtonSender extends ProtonLink<Sender> 
implements Sender {
 
     @Override
     public boolean isSendable() {
-        return sendable && sessionWindow.isSendable();
+        return sendable = sendable && sessionWindow.isSendable();
     }
 
     @Override
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
index 9e5719c0..767d7774 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
@@ -2514,6 +2514,161 @@ public class ProtonSessionTest extends 
ProtonEngineTestSupport {
         assertNull(failure);
     }
 
+    @Test
+    public void 
testBothSendersNotifiedAfterSessionOutgoingWindowOpenedWhenBothRequestedSendableState()
 throws Exception {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>();
+        ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks);
+
+        final byte[] payload = new byte[] {0, 1, 2, 3, 4};
+        final DeliveryTagGenerator generator = 
ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator();
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().withMaxFrameSize(1024).respond();
+        
peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0);
+        peer.expectAttach().respond();
+        
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+        peer.expectAttach().respond();
+        
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+
+        Connection connection = engine.start().setMaxFrameSize(1024).open();
+        Session session = 
connection.session().setOutgoingCapacity(1024).open();
+        Sender sender1 = 
session.sender("test1").setDeliveryTagGenerator(generator).open();
+        Sender sender2 = 
session.sender("test2").setDeliveryTagGenerator(generator).open();
+
+        peer.waitForScriptToComplete();
+        peer.expectTransfer().withPayload(payload);
+
+        final AtomicInteger sender1CreditStateUpdated = new AtomicInteger();
+        sender1.creditStateUpdateHandler((self) -> {
+            sender1CreditStateUpdated.incrementAndGet();
+        });
+
+        final AtomicInteger sender2CreditStateUpdated = new AtomicInteger();
+        sender2.creditStateUpdateHandler((self) -> {
+            sender2CreditStateUpdated.incrementAndGet();
+        });
+
+        assertTrue(sender1.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+        assertTrue(sender2.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+
+        // Open, Begin, Attach, Attach
+        assertEquals(4, asyncIOCallbacks.size());
+        asyncIOCallbacks.forEach(runner -> runner.run());
+        asyncIOCallbacks.clear();
+
+        OutgoingDelivery delivery = sender1.next();
+        delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+
+        peer.waitForScriptToComplete();
+
+        assertEquals(1, asyncIOCallbacks.size());
+
+        assertFalse(sender1.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+        // Sender 2 shouldn't be able to send since sender 1 consumed the 
outgoing window
+        assertFalse(sender2.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+
+        // Free a frame's worth of window which should trigger both senders 
sendable update event
+        asyncIOCallbacks.poll().run();
+        assertEquals(0, asyncIOCallbacks.size());
+
+        assertTrue(sender1.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+        assertEquals(1, sender1CreditStateUpdated.get());
+        assertTrue(sender2.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+        assertEquals(1, sender2CreditStateUpdated.get());
+
+        peer.waitForScriptToComplete();
+        assertNull(failure);
+    }
+
+    @Test
+    public void 
testSingleSenderUpdatedWhenOutgoingWindowOpenedForTwoIfFirstConsumesSessionOutgoingWindow()
 throws Exception {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        Queue<Runnable> asyncIOCallbacks = new ArrayDeque<>();
+        ProtonTestConnector peer = createTestPeer(engine, asyncIOCallbacks);
+
+        final byte[] payload = new byte[] {0, 1, 2, 3, 4};
+        final DeliveryTagGenerator generator = 
ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator();
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().withMaxFrameSize(1024).respond();
+        
peer.expectBegin().withNextOutgoingId(0).respond().withNextOutgoingId(0);
+        peer.expectAttach().respond();
+        
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+        peer.expectAttach().respond();
+        
peer.remoteFlow().withLinkCredit(20).withNextIncomingId(0).withIncomingWindow(8192).queue();
+
+        Connection connection = engine.start().setMaxFrameSize(1024).open();
+        Session session = 
connection.session().setOutgoingCapacity(1024).open();
+        Sender sender1 = 
session.sender("test1").setDeliveryTagGenerator(generator).open();
+        Sender sender2 = 
session.sender("test2").setDeliveryTagGenerator(generator).open();
+
+        peer.waitForScriptToComplete();
+        peer.expectTransfer().withPayload(payload);
+
+        final AtomicInteger sender1CreditStateUpdated = new AtomicInteger();
+        sender1.creditStateUpdateHandler((self) -> {
+            sender1CreditStateUpdated.incrementAndGet();
+            if (self.isSendable()) {
+                OutgoingDelivery delivery = self.next();
+                
delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+            }
+        });
+
+        final AtomicInteger sender2CreditStateUpdated = new AtomicInteger();
+        sender2.creditStateUpdateHandler((self) -> {
+            sender2CreditStateUpdated.incrementAndGet();
+        });
+
+        assertTrue(sender1.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+        assertTrue(sender2.isSendable());
+        assertEquals(1024, session.getRemainingOutgoingCapacity());
+
+        // Open, Begin, Attach, Attach
+        assertEquals(4, asyncIOCallbacks.size());
+        asyncIOCallbacks.forEach(runner -> runner.run());
+        asyncIOCallbacks.clear();
+
+        OutgoingDelivery delivery = sender1.next();
+        delivery.writeBytes(ProtonByteBufferAllocator.DEFAULT.wrap(payload));
+
+        peer.waitForScriptToComplete();
+        peer.expectTransfer().withPayload(payload);
+
+        assertEquals(1, asyncIOCallbacks.size());
+
+        assertFalse(sender1.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+        // Sender 2 shouldn't be able to send since sender 1 consumed the 
outgoing window
+        assertFalse(sender2.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+
+        // Should trigger sender 1 to send which should exhaust the outgoing 
credit
+        asyncIOCallbacks.poll().run();
+        assertEquals(1, asyncIOCallbacks.size()); // Sender one should have 
sent
+
+        assertFalse(sender1.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+        assertEquals(1, sender1CreditStateUpdated.get());
+        assertFalse(sender2.isSendable());
+        assertEquals(0, session.getRemainingOutgoingCapacity());
+        // Should not have triggered an event for sender 2 being able to send 
since
+        // sender one consumed the outgoing window already.
+        assertEquals(0, sender2CreditStateUpdated.get());
+
+        peer.waitForScriptToComplete();
+        assertNull(failure);
+    }
+
     @Test
     public void 
testHandleInUseErrorReturnedIfAttachWithAlreadyBoundHandleArrives() throws 
Exception {
         Engine engine = EngineFactory.PROTON.createNonSaslEngine();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to