This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new fb8f7ce3 PROTON-2618 Add additional tests and fix minor stream sender
issue
fb8f7ce3 is described below
commit fb8f7ce3d9c5d27815ba73e7f4aeb1418344415e
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Oct 7 14:42:34 2022 -0400
PROTON-2618 Add additional tests and fix minor stream sender issue
Ensure that stream sender can fail pending unsettled sends when the
connection is dropped or closed.
---
.../qpid/protonj2/client/impl/ClientSender.java | 3 +-
.../protonj2/client/impl/ClientStreamSender.java | 3 +-
.../client/impl/ReconnectStreamReceiverTest.java | 71 +++++++++++++++++++++
.../client/impl/ReconnectStreamSenderTest.java | 74 ++++++++++++++++++++++
.../protonj2/client/impl/StreamReceiverTest.java | 68 --------------------
5 files changed, 147 insertions(+), 72 deletions(-)
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 3d73c99a..15a45e39 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -213,8 +213,7 @@ public final class ClientSender extends
ClientSenderLinkType<Sender> implements
// Cancel all settlement futures for in-flight sends passing an
appropriate error to the future
protonSender.unsettled().forEach((delivery) -> {
try {
- final ClientTracker tracker = delivery.getLinkedResource();
- tracker.settlementFuture().failed(cause);
+
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
} catch (Exception e) {
}
});
diff --git
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 2e0ef82a..12700a1f 100644
---
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -358,8 +358,7 @@ public final class ClientStreamSender extends
ClientSenderLinkType<StreamSender>
// Cancel all settlement futures for in-flight sends passing an
appropriate error to the future
protonSender.unsettled().forEach((delivery) -> {
try {
- final ClientTracker tracker = delivery.getLinkedResource();
- tracker.settlementFuture().failed(cause);
+
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
} catch (Exception e) {
}
});
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
index 944a97cf..bc5c3214 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
import java.net.URI;
@@ -30,9 +31,12 @@ import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.StreamReceiver;
+import org.apache.qpid.protonj2.client.StreamReceiverOptions;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.types.messaging.AmqpValue;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
+import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -160,4 +164,71 @@ class ReconnectStreamReceiverTest extends
ImperativeClientTestCase {
finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testReceiverWaitsWhenConnectionForcedDisconnect() throws
Exception {
+ final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ firstPeer.expectFlow().withLinkCredit(10);
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+ finalPeer.expectFlow().withLinkCredit(10);
+ finalPeer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] { 1 })
+ .withMore(false)
+ .withSettled(true)
+ .withMessageFormat(0)
+
.withPayload(payload).queue().afterDelay(5);
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ StreamReceiverOptions rcvOpts = new
StreamReceiverOptions().autoAccept(false);
+ StreamReceiver receiver =
connection.openStreamReceiver("test-receiver", rcvOpts);
+
+ StreamDelivery delivery = null;
+ try {
+ delivery = receiver.receive(10, TimeUnit.SECONDS);
+ } catch (Exception ex) {
+ fail("Should not have failed on blocking receive call." +
ex.getMessage());
+ }
+
+ assertNotNull(delivery);
+
+ firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectDetach().respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ delivery.accept();
+
+ receiver.close();
+ connection.close();
+
+ assertNotNull(delivery);
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
index 0796d072..a872bd31 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
@@ -17,6 +17,9 @@
package org.apache.qpid.protonj2.client.impl;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -24,15 +27,20 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.OutputStreamOptions;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamSenderMessage;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
+import org.apache.qpid.protonj2.client.StreamTracker;
import
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
@@ -40,6 +48,7 @@ import
org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -389,4 +398,69 @@ class ReconnectStreamSenderTest extends
ImperativeClientTestCase {
finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResent()
throws Exception {
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ firstPeer.remoteFlow().withLinkCredit(1).queue();
+ firstPeer.expectTransfer().withNonNullPayload();
+ firstPeer.remoteClose()
+
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
+ firstPeer.expectClose();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+ finalPeer.start();
+
+ final URI primaryURI = firstPeer.getServerURI();
+ final URI backupURI = finalPeer.getServerURI();
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.reconnectOptions().reconnectEnabled(true);
+
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
+ StreamSender sender = connection.openStreamSender("test");
+
+ final AtomicReference<StreamTracker> tracker = new
AtomicReference<>();
+ final AtomicReference<ClientException> error = new
AtomicReference<>();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ tracker.set(sender.send(Message.create("Hello")));
+ } catch (ClientException e) {
+ error.set(e);
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ firstPeer.waitForScriptToComplete();
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectDetach().withClosed(true).respond();
+ finalPeer.expectEnd().respond();
+ finalPeer.expectClose().respond();
+
+ assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed
previously sent message");
+ assertNotNull(tracker.get());
+ assertNull(error.get());
+ assertThrows(ClientConnectionRemotelyClosedException.class, () ->
tracker.get().awaitSettlement());
+
+ sender.close();
+ connection.close();
+
+ finalPeer.waitForScriptToComplete();
+ }
+ }
}
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 00d1e60e..2ca78f0a 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -78,7 +78,6 @@ import org.apache.qpid.protonj2.types.messaging.Footer;
import org.apache.qpid.protonj2.types.messaging.Header;
import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
import org.apache.qpid.protonj2.types.messaging.Properties;
-import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.apache.qpid.protonj2.types.transport.Role;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -3861,73 +3860,6 @@ class StreamReceiverTest extends
ImperativeClientTestCase {
}
}
- @Test
- public void testReceiverWaitsWhenConnectionForcedDisconnect() throws
Exception {
- final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World"));
-
- try (ProtonTestServer firstPeer = new ProtonTestServer();
- ProtonTestServer finalPeer = new ProtonTestServer()) {
-
- firstPeer.expectSASLAnonymousConnect();
- firstPeer.expectOpen().respond();
- firstPeer.expectBegin().respond();
-
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
- firstPeer.expectFlow().withLinkCredit(10);
- firstPeer.remoteClose()
-
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced
disconnect").queue().afterDelay(20);
- firstPeer.expectClose();
- firstPeer.start();
-
- finalPeer.expectSASLAnonymousConnect();
- finalPeer.expectOpen().respond();
- finalPeer.expectBegin().respond();
-
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
- finalPeer.expectFlow().withLinkCredit(10);
- finalPeer.remoteTransfer().withHandle(0)
- .withDeliveryId(0)
- .withDeliveryTag(new byte[] { 1 })
- .withMore(false)
- .withSettled(true)
- .withMessageFormat(0)
-
.withPayload(payload).queue().afterDelay(5);
- finalPeer.start();
-
- final URI primaryURI = firstPeer.getServerURI();
- final URI backupURI = finalPeer.getServerURI();
-
- ConnectionOptions options = new ConnectionOptions();
- options.reconnectOptions().reconnectEnabled(true);
-
options.reconnectOptions().addReconnectLocation(backupURI.getHost(),
backupURI.getPort());
-
- Client container = Client.create();
- Connection connection = container.connect(primaryURI.getHost(),
primaryURI.getPort(), options);
- StreamReceiverOptions rcvOpts = new
StreamReceiverOptions().autoAccept(false);
- StreamReceiver receiver =
connection.openStreamReceiver("test-receiver", rcvOpts);
-
- StreamDelivery delivery = null;
- try {
- delivery = receiver.receive(10, TimeUnit.SECONDS);
- } catch (Exception ex) {
- fail("Should not have failed on blocking receive call." +
ex.getMessage());
- }
-
- assertNotNull(delivery);
-
- firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
- finalPeer.waitForScriptToComplete();
- finalPeer.expectDetach().respond();
- finalPeer.expectEnd().respond();
- finalPeer.expectClose().respond();
-
- delivery.accept();
-
- receiver.close();
- connection.close();
-
- assertNotNull(delivery);
- }
- }
-
private byte[] createInvalidHeaderEncoding() {
final byte[] buffer = new byte[12];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]