This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new dce813b8 PROTON-2618 Add additional sender test to validate recovery state dce813b8 is described below commit dce813b8838b17d0a4a0e47116d7a8f08f1fab34 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Oct 11 18:08:59 2022 -0400 PROTON-2618 Add additional sender test to validate recovery state Ensure sends still function as expected after link recovery and previous blocked send failed. --- .../protonj2/client/impl/ReconnectSenderTest.java | 72 ++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java index 24711f16..8ece08be 100644 --- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java +++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java @@ -229,6 +229,78 @@ class ReconnectSenderTest extends ImperativeClientTestCase { } } + @Test + public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResentNextSendSucceeds() throws Exception { + try (ProtonTestServer firstPeer = new ProtonTestServer(); + ProtonTestServer finalPeer = new ProtonTestServer()) { + + firstPeer.expectSASLAnonymousConnect(); + firstPeer.expectOpen().respond(); + firstPeer.expectBegin().respond(); + firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + firstPeer.remoteFlow().withLinkCredit(1).queue(); + firstPeer.expectTransfer().withNonNullPayload(); + firstPeer.remoteClose() + .withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced disconnect").queue().afterDelay(20); + firstPeer.expectClose(); + firstPeer.start(); + + finalPeer.expectSASLAnonymousConnect(); + finalPeer.expectOpen().respond(); + finalPeer.expectBegin().respond(); + finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond(); + finalPeer.remoteFlow().withLinkCredit(1).queue(); + finalPeer.start(); + + final URI primaryURI = firstPeer.getServerURI(); + final URI backupURI = finalPeer.getServerURI(); + + ConnectionOptions options = new ConnectionOptions(); + options.reconnectOptions().reconnectEnabled(true); + options.reconnectOptions().addReconnectLocation(backupURI.getHost(), backupURI.getPort()); + + Client container = Client.create(); + Connection connection = container.connect(primaryURI.getHost(), primaryURI.getPort(), options); + Session session = connection.openSession(); + Sender sender = session.openSender("test"); + + final AtomicReference<Tracker> tracker = new AtomicReference<>(); + final AtomicReference<ClientException> error = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + ForkJoinPool.commonPool().execute(() -> { + try { + tracker.set(sender.send(Message.create("Hello"))); + } catch (ClientException e) { + error.set(e); + } finally { + latch.countDown(); + } + }); + + firstPeer.waitForScriptToComplete(); + finalPeer.waitForScriptToComplete(); + finalPeer.expectTransfer().withNonNullPayload().accept(); + finalPeer.expectDetach().withClosed(true).respond(); + finalPeer.expectEnd().respond(); + finalPeer.expectClose().respond(); + + assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed previously sent message"); + assertNotNull(tracker.get()); + assertNull(error.get()); + assertThrows(ClientConnectionRemotelyClosedException.class, () -> tracker.get().awaitSettlement()); + + Tracker tracked = sender.send(Message.create("Hello")); + assertTrue(tracked.awaitAccepted().remoteSettled()); + + sender.close(); + session.close(); + connection.close(); + + finalPeer.waitForScriptToComplete(); + } + } + @Test public void testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted() throws Exception { try (ProtonTestServer firstPeer = new ProtonTestServer(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org