This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ead4dea PROTON-2826 Remove timed out sends from blocked queue
1ead4dea is described below

commit 1ead4dea40a3af59af2fbfc4a536c08304951f43
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Jun 7 16:50:48 2024 -0400

    PROTON-2826 Remove timed out sends from blocked queue
    
    Ensure that sends that time out waiting for credit are removed from the
    blocked queue and also abort any partial send that was blocked waiting
    for credit to ensure no leak of deliveries from the sender link.
---
 .../client/impl/ClientNextReceiverSelector.java    |   3 +
 .../qpid/protonj2/client/impl/ClientSender.java    |  16 ++-
 .../protonj2/client/impl/ClientStreamSender.java   |  16 ++-
 .../transport/netty5/WebSocketTransport.java       |   1 +
 .../qpid/protonj2/client/impl/SenderTest.java      | 110 +++++++++++++++++++++
 .../protonj2/client/impl/StreamSenderTest.java     |  56 +++++++++++
 6 files changed, 194 insertions(+), 8 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
index a6f04b5c..8d8e8619 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
@@ -162,6 +162,7 @@ final class ClientNextReceiverSelector {
         return result != null ? result : selectFirstAvailable();
     }
 
+    @SuppressWarnings("resource")
     private ClientReceiver selectFirstAvailable() {
         return session.getProtonSession().receivers().stream()
                 .filter((r) -> r.getLinkedResource() instanceof ClientReceiver 
&&
@@ -171,6 +172,7 @@ final class ClientNextReceiverSelector {
                 .orElse(null);
     }
 
+    @SuppressWarnings("resource")
     private ClientReceiver selectLargestBacklog() {
         return session.getProtonSession().receivers().stream()
                 .filter((r) -> r.getLinkedResource() instanceof ClientReceiver 
&&
@@ -180,6 +182,7 @@ final class ClientNextReceiverSelector {
                 .orElse(null);
     }
 
+    @SuppressWarnings("resource")
     private ClientReceiver selectSmallestBacklog() {
         return session.getProtonSession().receivers().stream()
                 .filter((r) -> r.getLinkedResource() instanceof ClientReceiver 
&&
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index a2df99d6..ae78c60e 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -131,23 +131,23 @@ public final class ClientSender extends 
ClientSenderLinkType<Sender> implements
     }
 
     private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
+        blocked.addLast(send);
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
+                blocked.remove(send);
                 send.failed(send.createSendTimedOutException());
             }, options.sendTimeout(), TimeUnit.MILLISECONDS));
         }
-
-        blocked.addLast(send);
     }
 
     private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
+        blocked.addFirst(send);
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
+                blocked.remove(send);
                 send.failed(send.createSendTimedOutException());
             }, options.sendTimeout(), TimeUnit.MILLISECONDS));
         }
-
-        blocked.addFirst(send);
     }
 
     private Tracker sendMessage(AdvancedMessage<?> message, Map<String, 
Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
@@ -311,6 +311,14 @@ public final class ClientSender extends 
ClientSenderLinkType<Sender> implements
                 sendTimeout.cancel(true);
             }
 
+            if (delivery != null) {
+                try {
+                    delivery.abort();
+                } catch (Exception ex) {
+                    // Attempted abort could fail if offline so we ignore it.
+                }
+            }
+
             payload.close();
 
             request.failed(exception);
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index b158e072..58d21891 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -131,23 +131,23 @@ public final class ClientStreamSender extends 
ClientSenderLinkType<StreamSender>
     }
 
     private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
+        blocked.addLast(send);
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
+                blocked.remove(send);
                 send.failed(send.createSendTimedOutException());
             }, options.sendTimeout(), TimeUnit.MILLISECONDS));
         }
-
-        blocked.addLast(send);
     }
 
     private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
+        blocked.addFirst(send);
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
+                blocked.remove(send);
                 send.failed(send.createSendTimedOutException());
             }, options.sendTimeout(), TimeUnit.MILLISECONDS));
         }
-
-        blocked.addFirst(send);
     }
 
     private StreamTracker sendMessage(AdvancedMessage<?> message, Map<String, 
Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
@@ -499,6 +499,14 @@ public final class ClientStreamSender extends 
ClientSenderLinkType<StreamSender>
                 sendTimeout.cancel(true);
             }
 
+            if (delivery != null) {
+                try {
+                    delivery.abort();
+                } catch (Exception ex) {
+                    // Attempted abort could fail if offline so we ignore it.
+                }
+            }
+
             if (payload != null) {
                 payload.close();
             }
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
index 59337461..0c7fb599 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java
@@ -169,6 +169,7 @@ public class WebSocketTransport extends TcpTransport {
             super.channelActive(context);
         }
 
+        @SuppressWarnings("resource")
         @Override
         protected void messageReceived(ChannelHandlerContext ctx, Object 
message) throws Exception {
             LOG.trace("New data read: incoming: {}", message);
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index 7a576ef7..a16b360c 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -561,6 +561,60 @@ public class SenderTest extends ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions();
+            options.sendTimeout(10);
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), options);
+            Session session = connection.openSession();
+            Sender sender = session.openSender("test-queue");
+            sender.openFuture().get(10, TimeUnit.SECONDS);
+
+            Message<String> message = Message.create("Hello World");
+            try {
+                sender.send(message);
+                fail("Should throw a send timed out exception");
+            } catch (ClientSendTimedOutException ex) {
+                // Expected error, ignore
+            }
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.remoteFlow().withLinkCredit(1).now();
+            peer.expectAttach().ofSender().respond();
+            peer.expectTransfer().withMessage().withValue("Hello World 2");
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            session.openSender("test-queue-2").openFuture().get();
+
+            try {
+                sender.send(Message.create("Hello World 2"));
+            } catch (ClientException ex) {
+                LOG.trace("Error on second send", ex);
+                fail("Should not throw an exception");
+            }
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void testSendCompletesWhenCreditEventuallyOffered() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
@@ -1999,6 +2053,62 @@ public class SenderTest extends ImperativeClientTestCase 
{
         }
     }
 
+    @Test
+    public void testSendTimesOutIfNotAllMessageFramesCanBeSent() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().withNextOutgoingId(0).respond();
+            peer.expectAttach().ofSender().respond();
+            
peer.remoteFlow().withIncomingWindow(2).withNextIncomingId(0).withLinkCredit(1).queue();
+            
peer.expectTransfer().withDeliveryId(0).withNonNullPayload().withMore(true);
+            peer.expectTransfer().withNonNullPayload().withMore(true);
+            peer.expectTransfer().withNullPayload().withAborted(true);
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new 
ConnectionOptions().maxFrameSize(1024);
+            options.sendTimeout(25);
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), options);
+            Sender sender = 
connection.openSender("test-queue").openFuture().get();
+
+            final byte[] payload = new byte[4800];
+            Arrays.fill(payload, (byte) 1);
+
+            try {
+                sender.send(Message.create(payload));
+            } catch (ClientSendTimedOutException e) {
+                LOG.trace("send failed with expected error: ", e);
+            }
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            
peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(1).now();
+            peer.expectAttach().ofSender().respond();
+            
peer.expectTransfer().withDeliveryId(1).withMessage().withValue("Hello World 
2");
+            peer.expectDetach().respond();
+            peer.expectClose().respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            connection.openSender("test-queue-2").openFuture().get();
+
+            try {
+                sender.send(Message.create("Hello World 2"));
+            } catch (ClientException ex) {
+                LOG.trace("Error on second send", ex);
+                fail("Should not throw an exception");
+            }
+
+            sender.closeAsync().get();
+            connection.closeAsync().get();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     void testConcurrentSendOnlyBlocksForInitialSendInProgress() throws 
Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 07d6d91f..ab69063c 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -59,6 +59,7 @@ import org.apache.qpid.protonj2.client.StreamSenderOptions;
 import org.apache.qpid.protonj2.client.StreamTracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import 
org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.client.test.Wait;
@@ -2918,4 +2919,59 @@ public class StreamSenderTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws 
Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Sender test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions();
+            options.sendTimeout(10);
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort(), options);
+            StreamSender sender = connection.openStreamSender("test-queue");
+            sender.openFuture().get(10, TimeUnit.SECONDS);
+
+            Message<String> message = Message.create("Hello World");
+            try {
+                sender.send(message);
+                fail("Should throw a send timed out exception");
+            } catch (ClientSendTimedOutException ex) {
+                // Expected error, ignore
+            }
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.remoteFlow().withLinkCredit(1).now();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofSender().respond();
+            peer.expectTransfer().withMessage().withValue("Hello World 2");
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            // Ensure the send happens after the remote has sent a flow with 
credit
+            connection.openSender("test-queue-2").openFuture().get();
+
+            try {
+                sender.send(Message.create("Hello World 2"));
+            } catch (ClientException ex) {
+                LOG.trace("Error on second send", ex);
+                fail("Should not throw an exception");
+            }
+
+            sender.closeAsync().get(10, TimeUnit.SECONDS);
+
+            connection.closeAsync().get(10, TimeUnit.SECONDS);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to