This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new fb8f7ce3 PROTON-2618 Add additional tests and fix minor stream sender 
issue
fb8f7ce3 is described below

commit fb8f7ce3d9c5d27815ba73e7f4aeb1418344415e
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Oct 7 14:42:34 2022 -0400

    PROTON-2618 Add additional tests and fix minor stream sender issue
    
    Ensure that stream sender can fail pending unsettled sends when the
    connection is dropped or closed.
---
 .../qpid/protonj2/client/impl/ClientSender.java    |  3 +-
 .../protonj2/client/impl/ClientStreamSender.java   |  3 +-
 .../client/impl/ReconnectStreamReceiverTest.java   | 71 +++++++++++++++++++++
 .../client/impl/ReconnectStreamSenderTest.java     | 74 ++++++++++++++++++++++
 .../protonj2/client/impl/StreamReceiverTest.java   | 68 --------------------
 5 files changed, 147 insertions(+), 72 deletions(-)

diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 3d73c99a..15a45e39 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -213,8 +213,7 @@ public final class ClientSender extends 
ClientSenderLinkType<Sender> implements
         // Cancel all settlement futures for in-flight sends passing an 
appropriate error to the future
         protonSender.unsettled().forEach((delivery) -> {
             try {
-                final ClientTracker tracker = delivery.getLinkedResource();
-                tracker.settlementFuture().failed(cause);
+                
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
             } catch (Exception e) {
             }
         });
diff --git 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 2e0ef82a..12700a1f 100644
--- 
a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ 
b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -358,8 +358,7 @@ public final class ClientStreamSender extends 
ClientSenderLinkType<StreamSender>
         // Cancel all settlement futures for in-flight sends passing an 
appropriate error to the future
         protonSender.unsettled().forEach((delivery) -> {
             try {
-                final ClientTracker tracker = delivery.getLinkedResource();
-                tracker.settlementFuture().failed(cause);
+                
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
             } catch (Exception e) {
             }
         });
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
index 944a97cf..bc5c3214 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamReceiverTest.java
@@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
 import java.net.URI;
@@ -30,9 +31,12 @@ import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
 import org.apache.qpid.protonj2.client.StreamDelivery;
 import org.apache.qpid.protonj2.client.StreamReceiver;
+import org.apache.qpid.protonj2.client.StreamReceiverOptions;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.types.messaging.AmqpValue;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
+import org.apache.qpid.protonj2.types.transport.Role;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -160,4 +164,71 @@ class ReconnectStreamReceiverTest extends 
ImperativeClientTestCase {
             finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testReceiverWaitsWhenConnectionForcedDisconnect() throws 
Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            firstPeer.expectFlow().withLinkCredit(10);
+            firstPeer.remoteClose()
+                     
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+            firstPeer.expectClose();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
+            finalPeer.expectFlow().withLinkCredit(10);
+            finalPeer.remoteTransfer().withHandle(0)
+                                      .withDeliveryId(0)
+                                      .withDeliveryTag(new byte[] { 1 })
+                                      .withMore(false)
+                                      .withSettled(true)
+                                      .withMessageFormat(0)
+                                      
.withPayload(payload).queue().afterDelay(5);
+            finalPeer.start();
+
+            final URI primaryURI = firstPeer.getServerURI();
+            final URI backupURI = finalPeer.getServerURI();
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.reconnectOptions().reconnectEnabled(true);
+            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+            StreamReceiverOptions rcvOpts = new 
StreamReceiverOptions().autoAccept(false);
+            StreamReceiver receiver = 
connection.openStreamReceiver("test-receiver", rcvOpts);
+
+            StreamDelivery delivery = null;
+            try {
+                delivery = receiver.receive(10, TimeUnit.SECONDS);
+            } catch (Exception ex) {
+                fail("Should not have failed on blocking receive call." + 
ex.getMessage());
+            }
+
+            assertNotNull(delivery);
+
+            firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectDetach().respond();
+            finalPeer.expectEnd().respond();
+            finalPeer.expectClose().respond();
+
+            delivery.accept();
+
+            receiver.close();
+            connection.close();
+
+            assertNotNull(delivery);
+        }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
index 0796d072..a872bd31 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectStreamSenderTest.java
@@ -17,6 +17,9 @@
 package org.apache.qpid.protonj2.client.impl;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -24,15 +27,20 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Message;
 import org.apache.qpid.protonj2.client.OutputStreamOptions;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderMessage;
 import org.apache.qpid.protonj2.client.StreamSenderOptions;
+import org.apache.qpid.protonj2.client.StreamTracker;
 import 
org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
@@ -40,6 +48,7 @@ import 
org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import 
org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -389,4 +398,69 @@ class ReconnectStreamSenderTest extends 
ImperativeClientTestCase {
             finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testInFlightSendFailedAfterConnectionForcedCloseAndNotResent() 
throws Exception {
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+           firstPeer.expectSASLAnonymousConnect();
+           firstPeer.expectOpen().respond();
+           firstPeer.expectBegin().respond();
+           
firstPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           firstPeer.remoteFlow().withLinkCredit(1).queue();
+           firstPeer.expectTransfer().withNonNullPayload();
+           firstPeer.remoteClose()
+                    
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
+           firstPeer.expectClose();
+           firstPeer.start();
+
+           finalPeer.expectSASLAnonymousConnect();
+           finalPeer.expectOpen().respond();
+           finalPeer.expectBegin().respond();
+           
finalPeer.expectAttach().ofSender().withTarget().withAddress("test").and().respond();
+           finalPeer.start();
+
+           final URI primaryURI = firstPeer.getServerURI();
+           final URI backupURI = finalPeer.getServerURI();
+
+           ConnectionOptions options = new ConnectionOptions();
+           options.reconnectOptions().reconnectEnabled(true);
+           
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
+
+           Client container = Client.create();
+           Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
+           StreamSender sender = connection.openStreamSender("test");
+
+           final AtomicReference<StreamTracker> tracker = new 
AtomicReference<>();
+           final AtomicReference<ClientException> error = new 
AtomicReference<>();
+           final CountDownLatch latch = new CountDownLatch(1);
+
+           ForkJoinPool.commonPool().execute(() -> {
+               try {
+                   tracker.set(sender.send(Message.create("Hello")));
+               } catch (ClientException e) {
+                   error.set(e);
+               } finally {
+                   latch.countDown();
+               }
+           });
+
+           firstPeer.waitForScriptToComplete();
+           finalPeer.waitForScriptToComplete();
+           finalPeer.expectDetach().withClosed(true).respond();
+           finalPeer.expectEnd().respond();
+           finalPeer.expectClose().respond();
+
+           assertTrue(latch.await(10, TimeUnit.SECONDS), "Should have failed 
previously sent message");
+           assertNotNull(tracker.get());
+           assertNull(error.get());
+           assertThrows(ClientConnectionRemotelyClosedException.class, () -> 
tracker.get().awaitSettlement());
+
+           sender.close();
+           connection.close();
+
+           finalPeer.waitForScriptToComplete();
+       }
+    }
 }
diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index 00d1e60e..2ca78f0a 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -78,7 +78,6 @@ import org.apache.qpid.protonj2.types.messaging.Footer;
 import org.apache.qpid.protonj2.types.messaging.Header;
 import org.apache.qpid.protonj2.types.messaging.MessageAnnotations;
 import org.apache.qpid.protonj2.types.messaging.Properties;
-import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.apache.qpid.protonj2.types.transport.Role;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -3861,73 +3860,6 @@ class StreamReceiverTest extends 
ImperativeClientTestCase {
        }
     }
 
-    @Test
-    public void testReceiverWaitsWhenConnectionForcedDisconnect() throws 
Exception {
-        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World"));
-
-        try (ProtonTestServer firstPeer = new ProtonTestServer();
-             ProtonTestServer finalPeer = new ProtonTestServer()) {
-
-            firstPeer.expectSASLAnonymousConnect();
-            firstPeer.expectOpen().respond();
-            firstPeer.expectBegin().respond();
-            
firstPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
-            firstPeer.expectFlow().withLinkCredit(10);
-            firstPeer.remoteClose()
-                     
.withErrorCondition(ConnectionError.CONNECTION_FORCED.toString(), "Forced 
disconnect").queue().afterDelay(20);
-            firstPeer.expectClose();
-            firstPeer.start();
-
-            finalPeer.expectSASLAnonymousConnect();
-            finalPeer.expectOpen().respond();
-            finalPeer.expectBegin().respond();
-            
finalPeer.expectAttach().withRole(Role.RECEIVER.getValue()).respond();
-            finalPeer.expectFlow().withLinkCredit(10);
-            finalPeer.remoteTransfer().withHandle(0)
-                                      .withDeliveryId(0)
-                                      .withDeliveryTag(new byte[] { 1 })
-                                      .withMore(false)
-                                      .withSettled(true)
-                                      .withMessageFormat(0)
-                                      
.withPayload(payload).queue().afterDelay(5);
-            finalPeer.start();
-
-            final URI primaryURI = firstPeer.getServerURI();
-            final URI backupURI = finalPeer.getServerURI();
-
-            ConnectionOptions options = new ConnectionOptions();
-            options.reconnectOptions().reconnectEnabled(true);
-            
options.reconnectOptions().addReconnectLocation(backupURI.getHost(), 
backupURI.getPort());
-
-            Client container = Client.create();
-            Connection connection = container.connect(primaryURI.getHost(), 
primaryURI.getPort(), options);
-            StreamReceiverOptions rcvOpts = new 
StreamReceiverOptions().autoAccept(false);
-            StreamReceiver receiver = 
connection.openStreamReceiver("test-receiver", rcvOpts);
-
-            StreamDelivery delivery = null;
-            try {
-                delivery = receiver.receive(10, TimeUnit.SECONDS);
-            } catch (Exception ex) {
-                fail("Should not have failed on blocking receive call." + 
ex.getMessage());
-            }
-
-            assertNotNull(delivery);
-
-            firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            finalPeer.waitForScriptToComplete();
-            finalPeer.expectDetach().respond();
-            finalPeer.expectEnd().respond();
-            finalPeer.expectClose().respond();
-
-            delivery.accept();
-
-            receiver.close();
-            connection.close();
-
-            assertNotNull(delivery);
-        }
-    }
-
     private byte[] createInvalidHeaderEncoding() {
         final byte[] buffer = new byte[12];
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to