This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new f2159293 PROTON-2618 Better handle remote forced close vs connection
drop
f2159293 is described below
commit f215929395e44a8a0679befe15f96fd95b634c16
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Oct 5 17:42:15 2022 -0400
PROTON-2618 Better handle remote forced close vs connection drop
Ensure that the receiver doesn't react to remote force close of the
connection if the connection will reconnect to avoid receive calls
unblocking with error when the underlying connection is reconnecting.
---
.../protonj2/client/impl/ClientConnection.java | 2 +
.../qpid/protonj2/client/impl/ClientLinkType.java | 7 +-
.../client/impl/ReconnectReceiverTest.java | 70 ++++++++++
.../protonj2/client/impl/ReconnectSenderTest.java | 150 ++++++++++++++++++++-
.../protonj2/client/impl/StreamReceiverTest.java | 68 ++++++++++
5 files changed, 293 insertions(+), 4 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1f2dcca..a1b0ce20 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -639,6 +639,8 @@ public final class ClientConnection implements Connection {
}
private void handleLocalClose(org.apache.qpid.protonj2.engine.Connection
connection) {
+ // Don't react if engine was shutdown and parent closed as a result
instead wait to get the
+ // shutdown notification and respond to that change.
if (connection.isRemotelyClosed()) {
final ClientException failureCause;
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index 8907a8f1..62552f18 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -369,9 +369,10 @@ public abstract class ClientLinkType<LinkType extends
Link<LinkType>,
}
protected final void handleParentEndpointClosed(ProtonType link) {
- // Don't react if engine was shutdown and parent closed as a result
instead wait to get the
- // shutdown notification and respond to that change.
- if (link.getEngine().isRunning()) {
+ // This handle is only for the case that the parent session was
remotely or locally
+ // closed. In all other cases we want to allow natural engine shutdown
handling to
+ // trigger shutdown as we can check there if the parent is
reconnecting or not.
+ if (link.getEngine().isRunning() &&
!link.getConnection().isLocallyClosed()) {
final ClientException failureCause;
if (link.getConnection().getRemoteCondition() != null) {
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
index e3f0db95..b10df8f9 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectReceiverTest.java
@@ -33,6 +33,7 @@ import
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -236,4 +237,73 @@ class ReconnectReceiverTest extends
ImperativeClientTestCase {
assertNotNull(delivery);
}
}
+
+ @Test
+ public void testReceiverWaitsWhenConnectionForcedDisconnect() throws
Exception {
+ final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ firstPeer.expectFlow().withLinkCredit(10);
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ finalPeer.expectFlow().withLinkCredit(10);
+ finalPeer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(false)
+ .withSettled(true)
+ .withMessageFormat(0)
+
.withPayload(payload).queue().afterDelay(5);
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession();
+ ReceiverOptions rcvOpts = new ReceiverOptions().autoAccept(false);
+ Receiver receiver = session.openReceiver("test-queue", rcvOpts);
+
+ Delivery delivery = null;
+ try {
+ delivery = receiver.receive(10, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ fail("Should not have failed on blocking receive call." +
ex.getMessage());
+ }
+
+ assertNotNull(delivery);
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectDetach().respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ delivery.accept();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ assertNotNull(delivery);
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
index 3aa88a8c..24711f16 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSenderTest.java
@@ -43,6 +43,7 @@ import
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosed
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -161,6 +162,153 @@ class ReconnectSenderTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResent()
throws Exception {
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withNonNullPayload();
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession();
+ Sender sender = session.openSender("test");
+
+ final AtomicReference<Tracker> tracker = new AtomicReference<>();
+ final AtomicReference<ClientException> error = new
AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ tracker.set(sender.send(Message.create("Hello")));
+ } catch (ClientException e) {
+ error.set(e);
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ firstPeer.waitForScriptToComplete();
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectDetach().withClosed(true).respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed
previously sent message");
+ assertNotNull(tracker.get());
+ assertNull(error.get());
+ assertThrows(ClientConnectionRemotelyClosedException.class, () ->
tracker.get().awaitSettlement());
+
+ sender.close();
+ session.close();
+ connection.close();
+
+ finalPeer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void
testSendBlockedOnCreditGetsSentAfterReconnectFromForcedCloseAndCreditGranted()
throws Exception {
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ Session session = connection.openSession();
+ Sender sender = session.openSender("test");
+
+ final AtomicReference<Tracker> tracker = new AtomicReference<>();
+ final AtomicReference<Exception> sendError = new
AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ tracker.set(sender.send(Message.create("Hello")));
+ } catch (ClientException e) {
+ sendError.set(e);
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ firstPeer.waitForScriptToComplete();
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectTransfer().withNonNullPayload()
+ .respond()
+ .withSettled(true).withState().accepted();
+ finalPeer.expectDetach().withClosed(true).respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ // Grant credit now and await expected message send.
+ finalPeer.remoteFlow().withDeliveryCount(0)
+ .withLinkCredit(10)
+ .withIncomingWindow(10)
+ .withOutgoingWindow(10)
+ .withNextIncomingId(0)
+ .withNextOutgoingId(1).now();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have sent
blocked message");
+ assertNull(sendError.get());
+ assertNotNull(tracker.get());
+
+ Tracker send = tracker.get();
+ assertSame(tracker.get(), send.awaitSettlement(10,
TimeUnit.SECONDS));
+ assertTrue(send.remoteSettled());
+ assertEquals(DeliveryState.accepted(), send.remoteState());
+
+ sender.close();
+ session.close();
+ connection.close();
+
+ finalPeer.waitForScriptToComplete();
+ }
+ }
+
@Test
public void
testSendBlockedOnCreditGetsSentAfterReconnectAndCreditGranted() throws
Exception {
try (ProtonTestServer firstPeer = new ProtonTestServer();
@@ -297,7 +445,7 @@ class ReconnectSenderTest extends ImperativeClientTestCase {
@Test
public void
testMultipleSenderCreationRecoversAfterDropWithNoAttachResponse() throws
Exception {
try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer intermediatePeer = new ProtonTestServer();
+ ProtonTestServer intermediatePeer = new ProtonTestServer();
ProtonTestServer finalPeer = new ProtonTestServer()) {
firstPeer.expectSASLAnonymousConnect();
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 2ca78f0a..00d1e60e 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -78,6 +78,7 @@ import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -3860,6 +3861,73 @@ class StreamReceiverTest extends
ImperativeClientTestCase {
}
}
+ @Test
+ public void testReceiverWaitsWhenConnectionForcedDisconnect() throws
Exception {
+ final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ firstPeer.expectFlow().withLinkCredit(10);
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ finalPeer.expectFlow().withLinkCredit(10);
+ finalPeer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(false)
+ .withSettled(true)
+ .withMessageFormat(0)
+
.withPayload(payload).queue().afterDelay(5);
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ StreamReceiverOptions rcvOpts = new
StreamReceiverOptions().autoAccept(false);
+ StreamReceiver receiver =
connection.openStreamReceiver("test-receiver", rcvOpts);
+
+ StreamDelivery delivery = null;
+ try {
+ delivery = receiver.receive(10, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ fail("Should not have failed on blocking receive call." +
ex.getMessage());
+ }
+
+ assertNotNull(delivery);
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectDetach().respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ delivery.accept();
+
+ receiver.close();
+ connection.close();
+
+ assertNotNull(delivery);
+ }
+ }
+
private byte[] createInvalidHeaderEncoding() {
final byte[] buffer = new byte[12];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]