This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new dce813b8 PROTON-2618 Add additional sender test to validate recovery 
state
dce813b8 is described below

commit dce813b8838b17d0a4a0e47116d7a8f08f1fab34
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Tue Oct 11 18:08:59 2022 -0400

    PROTON-2618 Add additional sender test to validate recovery state
    
    Ensure sends still function as expected after link recovery and previous
    blocked send failed.
---
 .../protonj2/client/impl/ReconnectSenderTest.java  | 72 ++++++++++++++++++++++
 1 file changed, 72 insertions(+)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
index 24711f16..8ece08be 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
@@ -229,6 +229,78 @@ class ReconnectSenderTest extends ImperativeClientTestCase 
{
        }
     }
 
+    @Test
+    public void 
testInFlightSendFailedAfterConnectionForcedCloseAndNotResentNextSendSucceeds() 
throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+           firstPeer.expectSASLAnonymousConnect();
+           firstPeer.expectOpen().respond();
+           firstPeer.expectBegin().respond();
+           
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           firstPeer.remoteFlow().withLinkCredit(1).queue();
+           firstPeer.expectTransfer().withNonNullPayload();
+           firstPeer.remoteClose()
+                    
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+           firstPeer.expectClose();
+           firstPeer.start();
+
+           finalPeer.expectSASLAnonymousConnect();
+           finalPeer.expectOpen().respond();
+           finalPeer.expectBegin().respond();
+           
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           finalPeer.remoteFlow().withLinkCredit(1).queue();
+           finalPeer.start();
+
+           final URI primaryURI = firstPeer.getServerURI();
+           final URI backupURI = finalPeer.getServerURI();
+
+           ConnectionOptions options = new ConnectionOptions();
+           options.reconnectOptions().reconnectEnabled(true);
+           
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+           Client container = Client.create();
+           Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+           Session session = connection.openSession();
+           Sender sender = session.openSender("test");
+
+           final AtomicReference<Tracker> tracker = new AtomicReference<>();
+           final AtomicReference<ClientException> error = new 
AtomicReference<>();
+           final CountDownLatch latch = new CountDownLatch(1);
+
+           ForkJoinPool.commonPool().execute(() -> {
+               try {
+                   tracker.set(sender.send(Message.create("Hello")));
+               } catch (ClientException e) {
+                   error.set(e);
+               } finally {
+                   latch.countDown();
+               }
+           });
+
+           firstPeer.waitForScriptToComplete();
+           finalPeer.waitForScriptToComplete();
+           finalPeer.expectTransfer().withNonNullPayload().accept();
+           finalPeer.expectDetach().withClosed(true).respond();
+           finalPeer.expectEnd().respond();
+           finalPeer.expectClose().respond();
+
+           assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed 
previously sent message");
+           assertNotNull(tracker.get());
+           assertNull(error.get());
+           assertThrows(ClientConnectionRemotelyClosedException.class, () -> 
tracker.get().awaitSettlement());
+
+           Tracker tracked = sender.send(Message.create("Hello"));
+           assertTrue(tracked.awaitAccepted().remoteSettled());
+
+           sender.close();
+           session.close();
+           connection.close();
+
+           finalPeer.waitForScriptToComplete();
+       }
+    }
+
     @Test
     public void 
testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted() 
throws Exception {
         try (ProtonTestServer firstPeer = new ProtonTestServer();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to