This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 1ead4dea PROTON-2826 Remove timed out sends from blocked queue 1ead4dea is described below commit 1ead4dea40a3af59af2fbfc4a536c08304951f43 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Fri Jun 7 16:50:48 2024 -0400 PROTON-2826 Remove timed out sends from blocked queue Ensure that sends that time out waiting for credit are removed from the blocked queue and also abort any partial send that was blocked waiting for credit to ensure no leak of deliveries from the sender link. --- .../client/impl/ClientNextReceiverSelector.java | 3 + .../qpid/protonj2/client/impl/ClientSender.java | 16 ++- .../protonj2/client/impl/ClientStreamSender.java | 16 ++- .../transport/netty5/WebSocketTransport.java | 1 + .../qpid/protonj2/client/impl/SenderTest.java | 110 +++++++++++++++++++++ .../protonj2/client/impl/StreamSenderTest.java | 56 +++++++++++ 6 files changed, 194 insertions(+), 8 deletions(-) diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java index a6f04b5c..8d8e8619 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java @@ -162,6 +162,7 @@ final class ClientNextReceiverSelector { return result != null ? result : selectFirstAvailable(); } + @SuppressWarnings("resource") private ClientReceiver selectFirstAvailable() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && @@ -171,6 +172,7 @@ final class ClientNextReceiverSelector { .orElse(null); } + @SuppressWarnings("resource") private ClientReceiver selectLargestBacklog() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && @@ -180,6 +182,7 @@ final class ClientNextReceiverSelector { .orElse(null); } + @SuppressWarnings("resource") private ClientReceiver selectSmallestBacklog() { return session.getProtonSession().receivers().stream() .filter((r) -> r.getLinkedResource() instanceof ClientReceiver && diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java index a2df99d6..ae78c60e 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java @@ -131,23 +131,23 @@ public final class ClientSender extends ClientSenderLinkType<Sender> implements } private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addLast(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addLast(send); } private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addFirst(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addFirst(send); } private Tracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException { @@ -311,6 +311,14 @@ public final class ClientSender extends ClientSenderLinkType<Sender> implements sendTimeout.cancel(true); } + if (delivery != null) { + try { + delivery.abort(); + } catch (Exception ex) { + // Attempted abort could fail if offline so we ignore it. + } + } + payload.close(); request.failed(exception); diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java index b158e072..58d21891 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java @@ -131,23 +131,23 @@ public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> } private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addLast(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addLast(send); } private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) { + blocked.addFirst(send); if (options.sendTimeout() > 0 && send.sendTimeout() == null) { send.sendTimeout(executor.schedule(() -> { + blocked.remove(send); send.failed(send.createSendTimedOutException()); }, options.sendTimeout(), TimeUnit.MILLISECONDS)); } - - blocked.addFirst(send); } private StreamTracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException { @@ -499,6 +499,14 @@ public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> sendTimeout.cancel(true); } + if (delivery != null) { + try { + delivery.abort(); + } catch (Exception ex) { + // Attempted abort could fail if offline so we ignore it. + } + } + if (payload != null) { payload.close(); } diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java index 59337461..0c7fb599 100644 --- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java +++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/netty5/WebSocketTransport.java @@ -169,6 +169,7 @@ public class WebSocketTransport extends TcpTransport { super.channelActive(context); } + @SuppressWarnings("resource") @Override protected void messageReceived(ChannelHandlerContext ctx, Object message) throws Exception { LOG.trace("New data read: incoming: {}", message); diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java index 7a576ef7..a16b360c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java @@ -561,6 +561,60 @@ public class SenderTest extends ImperativeClientTestCase { } } + @Test + public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions(); + options.sendTimeout(10); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + Session session = connection.openSession(); + Sender sender = session.openSender("test-queue"); + sender.openFuture().get(10, TimeUnit.SECONDS); + + Message<String> message = Message.create("Hello World"); + try { + sender.send(message); + fail("Should throw a send timed out exception"); + } catch (ClientSendTimedOutException ex) { + // Expected error, ignore + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withLinkCredit(1).now(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + session.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + @Test public void testSendCompletesWhenCreditEventuallyOffered() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { @@ -1999,6 +2053,62 @@ public class SenderTest extends ImperativeClientTestCase { } } + @Test + public void testSendTimesOutIfNotAllMessageFramesCanBeSent() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().withNextOutgoingId(0).respond(); + peer.expectAttach().ofSender().respond(); + peer.remoteFlow().withIncomingWindow(2).withNextIncomingId(0).withLinkCredit(1).queue(); + peer.expectTransfer().withDeliveryId(0).withNonNullPayload().withMore(true); + peer.expectTransfer().withNonNullPayload().withMore(true); + peer.expectTransfer().withNullPayload().withAborted(true); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions().maxFrameSize(1024); + options.sendTimeout(25); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + Sender sender = connection.openSender("test-queue").openFuture().get(); + + final byte[] payload = new byte[4800]; + Arrays.fill(payload, (byte) 1); + + try { + sender.send(Message.create(payload)); + } catch (ClientSendTimedOutException e) { + LOG.trace("send failed with expected error: ", e); + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withIncomingWindow(1).withNextIncomingId(4).withLinkCredit(1).now(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withDeliveryId(1).withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + connection.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(); + connection.closeAsync().get(); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } + @Test void testConcurrentSendOnlyBlocksForInitialSendInProgress() throws Exception { try (ProtonTestServer peer = new ProtonTestServer()) { diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java index 07d6d91f..ab69063c 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java @@ -59,6 +59,7 @@ import org.apache.qpid.protonj2.client.StreamSenderOptions; import org.apache.qpid.protonj2.client.StreamTracker; import org.apache.qpid.protonj2.client.exceptions.ClientException; import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; +import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException; import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException; import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase; import org.apache.qpid.protonj2.client.test.Wait; @@ -2918,4 +2919,59 @@ public class StreamSenderTest extends ImperativeClientTestCase { peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } } + + @Test + public void testSendTimesOutWhenNoCreditIssuedAndThenIssueCredit() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.start(); + + URI remoteURI = peer.getServerURI(); + + LOG.info("Sender test started, peer listening on: {}", remoteURI); + + Client container = Client.create(); + ConnectionOptions options = new ConnectionOptions(); + options.sendTimeout(10); + Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options); + StreamSender sender = connection.openStreamSender("test-queue"); + sender.openFuture().get(10, TimeUnit.SECONDS); + + Message<String> message = Message.create("Hello World"); + try { + sender.send(message); + fail("Should throw a send timed out exception"); + } catch (ClientSendTimedOutException ex) { + // Expected error, ignore + } + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.remoteFlow().withLinkCredit(1).now(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); + peer.expectTransfer().withMessage().withValue("Hello World 2"); + peer.expectDetach().respond(); + peer.expectEnd().respond(); + peer.expectClose().respond(); + + // Ensure the send happens after the remote has sent a flow with credit + connection.openSender("test-queue-2").openFuture().get(); + + try { + sender.send(Message.create("Hello World 2")); + } catch (ClientException ex) { + LOG.trace("Error on second send", ex); + fail("Should not throw an exception"); + } + + sender.closeAsync().get(10, TimeUnit.SECONDS); + + connection.closeAsync().get(10, TimeUnit.SECONDS); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org