This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new dce813b8 PROTON-2618 Add additional sender test to validate recovery
state
dce813b8 is described below
commit dce813b8838b17d0a4a0e47116d7a8f08f1fab34
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Oct 11 18:08:59 2022 -0400
PROTON-2618 Add additional sender test to validate recovery state
Ensure sends still function as expected after link recovery and previous
blocked send failed.
---
.../protonj2/client/impl/ReconnectSenderTest.java | 72 ++++++++++++++++++++++
1 file changed, 72 insertions(+)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
index 24711f16..8ece08be 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
@@ -229,6 +229,78 @@ class ReconnectSenderTest extends ImperativeClientTestCase
{
}
}
+ @Test
+ public void
testInFlightSendFailedAfterConnectionForcedCloseAndNotResentNextSendSucceeds()
throws Exception {
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withNonNullPayload();
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.remoteFlow().withLinkCredit(1).queue();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession();
+ Sender sender = session.openSender("test");
+
+ final AtomicReference<Tracker> tracker = new AtomicReference<>();
+ final AtomicReference<ClientException> error = new
AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ tracker.set(sender.send(Message.create("Hello")));
+ } catch (ClientException e) {
+ error.set(e);
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ firstPeer.waitForScriptToComplete();
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectTransfer().withNonNullPayload().accept();
+ finalPeer.expectDetach().withClosed(true).respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed
previously sent message");
+ assertNotNull(tracker.get());
+ assertNull(error.get());
+ assertThrows(ClientConnectionRemotelyClosedException.class, () ->
tracker.get().awaitSettlement());
+
+ Tracker tracked = sender.send(Message.create("Hello"));
+ assertTrue(tracked.awaitAccepted().remoteSettled());
+
+ sender.close();
+ session.close();
+ connection.close();
+
+ finalPeer.waitForScriptToComplete();
+ }
+ }
+
@Test
public void
testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted()
throws Exception {
try (ProtonTestServer firstPeer = new ProtonTestServer();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]