This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 06fdd8a9 PROTON-2600 Fix potential leak of tracked deliveries in the 
session
06fdd8a9 is described below

commit 06fdd8a97b3e7cba875244d4c1dc9a90cbeed80d
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Sep 1 15:41:52 2022 -0400

    PROTON-2600 Fix potential leak of tracked deliveries in the session
    
    Ensure that if the receiver sets the disposition immediately in the
    handler we don't add the delivery to the tracking map.
---
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 76 ++++++++++++++++++++++
 .../engine/impl/ProtonSessionIncomingWindow.java   |  2 +-
 .../apache/qpid/protonj2/engine/util/SplayMap.java | 18 +++--
 .../protonj2/engine/impl/ProtonReceiverTest.java   | 62 ++++++++++++++++++
 4 files changed, 151 insertions(+), 7 deletions(-)

diff --git 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 3ecfd5e7..d3fbae21 100644
--- 
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ 
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2810,4 +2810,80 @@ public class ReceiverTest extends 
ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testSyncReceiverAutoSettlingOnEachIncomingDelivery() throws 
Exception {
+        final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello 
World!"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond().withNextOutgoingId(0);
+            peer.expectAttach().respond();
+            peer.expectFlow().withLinkCredit(3);
+            peer.remoteTransfer().withDeliveryId(0)
+                                 .withDeliveryTag(new byte[] {1})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(0)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.remoteTransfer().withDeliveryId(1)
+                                 .withDeliveryTag(new byte[] {2})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(1)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.remoteTransfer().withDeliveryId(2)
+                                 .withDeliveryTag(new byte[] {3})
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.expectDisposition().withFirst(2)
+                                    .withSettled(true)
+                                    .withState().accepted();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            ReceiverOptions options = new ReceiverOptions();
+            options.creditWindow(0);
+            options.autoSettle(true);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), 
remoteURI.getPort());
+            Session session = connection.openSession();
+            Receiver receiver = session.openReceiver("test-queue", options);
+
+            receiver.addCredit(3);
+
+            Delivery delivery1 = receiver.receive();
+            Wait.assertTrue(() -> delivery1.settled());
+            Delivery delivery2 = receiver.receive();
+            Wait.assertTrue(() -> delivery2.settled());
+            Delivery delivery3 = receiver.receive();
+            Wait.assertTrue(() -> delivery3.settled());
+
+            assertNotNull(delivery1);
+            assertNotNull(delivery2);
+            assertNotNull(delivery3);
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+            peer.expectEnd().respond();
+            peer.expectClose().respond();
+
+            receiver.close();
+            session.close();
+            connection.close();
+
+            // Check post conditions and done.
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
 }
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 5e43a56c..e6ec2a59 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -141,7 +141,7 @@ public class ProtonSessionIncomingWindow {
         nextIncomingId++;
 
         ProtonIncomingDelivery delivery = link.remoteTransfer(transfer, 
payload);
-        if (!delivery.isRemotelySettled() && delivery.isFirstTransfer()) {
+        if (!delivery.isSettled() && !delivery.isRemotelySettled() && 
delivery.isFirstTransfer()) {
             unsettled.put((int) delivery.getDeliveryId(), delivery);
         }
 
diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
index 7cffd291..240ca81a 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
@@ -55,7 +55,11 @@ public class SplayMap<E> implements 
NavigableMap<UnsignedInteger, E> {
     protected static final Comparator<UnsignedInteger> COMPARATOR = new 
UnsignedComparator();
     protected static final Comparator<UnsignedInteger> REVERSE_COMPARATOR = 
Collections.reverseOrder(COMPARATOR);
 
-    protected final RingQueue<SplayedEntry<E>> entryPool = new RingQueue<>(64);
+    protected final int DEFAULT_ENTRY_POOL_SIZE = 64;
+
+    protected final RingQueue<SplayedEntry<E>> entryPool = new 
RingQueue<>(DEFAULT_ENTRY_POOL_SIZE);
+
+    protected int entriesInExistence = 0;
 
     /**
      * Root node which can be null if the tree has no elements (size == 0)
@@ -154,14 +158,14 @@ public class SplayMap<E> implements 
NavigableMap<UnsignedInteger, E> {
         E oldValue = null;
 
         if (root == null) {
-            root = entryPool.poll(SplayMap::createEntry).initialize(key, 
value);
+            root = entryPool.poll(() -> createEntry()).initialize(key, value);
         } else {
             root = splay(root, key);
             if (root.key == key) {
                 oldValue = root.value;
                 root.value = value;
             } else {
-                final SplayedEntry<E> node = 
entryPool.poll(SplayMap::createEntry).initialize(key, value);
+                final SplayedEntry<E> node = entryPool.poll(() -> 
createEntry()).initialize(key, value);
 
                 if (compare(key, root.key) < 0) {
                     shiftRootRightOf(node);
@@ -198,13 +202,13 @@ public class SplayMap<E> implements 
NavigableMap<UnsignedInteger, E> {
      */
     public E putIfAbsent(int key, E value) {
         if (root == null) {
-            root = entryPool.poll(SplayMap::createEntry).initialize(key, 
value);
+            root = entryPool.poll(() -> createEntry()).initialize(key, value);
         } else {
             root = splay(root, key);
             if (root.key == key) {
                 return root.value;
             } else {
-                final SplayedEntry<E> node = 
entryPool.poll(SplayMap::createEntry).initialize(key, value);
+                final SplayedEntry<E> node = entryPool.poll(() -> 
createEntry()).initialize(key, value);
 
                 if (compare(key, root.key) < 0) {
                     shiftRootRightOf(node);
@@ -857,7 +861,7 @@ public class SplayMap<E> implements 
NavigableMap<UnsignedInteger, E> {
         return Integer.compareUnsigned(lhs, rhs);
     }
 
-    private static <E> SplayedEntry<E> createEntry() {
+    private SplayedEntry<E> createEntry() {
         return new SplayedEntry<>();
     }
 
@@ -1307,6 +1311,8 @@ public class SplayMap<E> implements 
NavigableMap<UnsignedInteger, E> {
     /**
      * An immutable {@link Map} entry that can be used when exposing raw entry 
mappings
      * via the {@link Map} API.
+     *
+     * @param <E> Type of the value portion of this immutable entry.
      */
     public static class ImmutableSplayMapEntry<E> implements 
Map.Entry<UnsignedInteger, E> {
 
diff --git 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
index bedfcf6f..b8be5c3d 100644
--- 
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
+++ 
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
@@ -4592,4 +4592,66 @@ public class ProtonReceiverTest extends 
ProtonEngineTestSupport {
 
         assertNull(failure);
     }
+
+    @Test
+    public void testReceiveDeliveriesAndSendDispositionUponReceipt() {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        final byte[] payload = new byte[] { 1 };
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().respond().withContainerId("driver");
+        peer.expectBegin().respond().withNextOutgoingId(0);
+        peer.expectAttach().respond();
+        peer.expectFlow().withLinkCredit(3);
+        peer.remoteTransfer().withDeliveryId(0)
+                             .withDeliveryTag(new byte[] {1})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(0)
+                                .withSettled(true)
+                                .withState().accepted();
+        peer.remoteTransfer().withDeliveryId(1)
+                             .withDeliveryTag(new byte[] {2})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(1)
+                                .withSettled(true)
+                                .withState().accepted();
+        peer.remoteTransfer().withDeliveryId(2)
+                             .withDeliveryTag(new byte[] {3})
+                             .withMore(false)
+                             .withMessageFormat(0)
+                             .withPayload(payload).queue();
+        peer.expectDisposition().withFirst(2)
+                                .withSettled(true)
+                                .withState().accepted();
+
+        Connection connection = engine.start().open();
+        Session session = connection.session().open();
+        Receiver receiver = session.receiver("receiver");
+        receiver.deliveryReadHandler((delivery) -> {
+            delivery.disposition(Accepted.getInstance(), true);
+        });
+
+        receiver.addCredit(3);
+        receiver.open();
+
+        peer.waitForScriptToComplete();
+        peer.expectDetach().respond();
+        peer.expectEnd().respond();
+        peer.expectClose().respond();
+
+        receiver.close();
+        session.close();
+        connection.close();
+
+        // Check post conditions and done.
+        peer.waitForScriptToComplete();
+        assertNull(failure);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to