This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 06fdd8a9 PROTON-2600 Fix potential leak of tracked deliveries in the
session
06fdd8a9 is described below
commit 06fdd8a97b3e7cba875244d4c1dc9a90cbeed80d
Author: Timothy Bish <[email protected]>
AuthorDate: Thu Sep 1 15:41:52 2022 -0400
PROTON-2600 Fix potential leak of tracked deliveries in the session
Ensure that if the receiver sets the disposition immediately in the
handler we don't add the delivery to the tracking map.
---
.../qpid/protonj2/client/impl/ReceiverTest.java | 76 ++++++++++++++++++++++
.../engine/impl/ProtonSessionIncomingWindow.java | 2 +-
.../apache/qpid/protonj2/engine/util/SplayMap.java | 18 +++--
.../protonj2/engine/impl/ProtonReceiverTest.java | 62 ++++++++++++++++++
4 files changed, 151 insertions(+), 7 deletions(-)
diff --git
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 3ecfd5e7..d3fbae21 100644
---
a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++
b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2810,4 +2810,80 @@ public class ReceiverTest extends
ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testSyncReceiverAutoSettlingOnEachIncomingDelivery() throws
Exception {
+ final byte[] payload = createEncodedMessage(new AmqpValue<>("Hello
World!"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond().withNextOutgoingId(0);
+ peer.expectAttach().respond();
+ peer.expectFlow().withLinkCredit(3);
+ peer.remoteTransfer().withDeliveryId(0)
+ .withDeliveryTag(new byte[] {1})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(0)
+ .withSettled(true)
+ .withState().accepted();
+ peer.remoteTransfer().withDeliveryId(1)
+ .withDeliveryTag(new byte[] {2})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(1)
+ .withSettled(true)
+ .withState().accepted();
+ peer.remoteTransfer().withDeliveryId(2)
+ .withDeliveryTag(new byte[] {3})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(2)
+ .withSettled(true)
+ .withState().accepted();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ ReceiverOptions options = new ReceiverOptions();
+ options.creditWindow(0);
+ options.autoSettle(true);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(),
remoteURI.getPort());
+ Session session = connection.openSession();
+ Receiver receiver = session.openReceiver("test-queue", options);
+
+ receiver.addCredit(3);
+
+ Delivery delivery1 = receiver.receive();
+ Wait.assertTrue(() -> delivery1.settled());
+ Delivery delivery2 = receiver.receive();
+ Wait.assertTrue(() -> delivery2.settled());
+ Delivery delivery3 = receiver.receive();
+ Wait.assertTrue(() -> delivery3.settled());
+
+ assertNotNull(delivery1);
+ assertNotNull(delivery2);
+ assertNotNull(delivery3);
+
+ peer.waitForScriptToComplete();
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ // Check post conditions and done.
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
index 5e43a56c..e6ec2a59 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java
@@ -141,7 +141,7 @@ public class ProtonSessionIncomingWindow {
nextIncomingId++;
ProtonIncomingDelivery delivery = link.remoteTransfer(transfer,
payload);
- if (!delivery.isRemotelySettled() && delivery.isFirstTransfer()) {
+ if (!delivery.isSettled() && !delivery.isRemotelySettled() &&
delivery.isFirstTransfer()) {
unsettled.put((int) delivery.getDeliveryId(), delivery);
}
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
index 7cffd291..240ca81a 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/SplayMap.java
@@ -55,7 +55,11 @@ public class SplayMap<E> implements
NavigableMap<UnsignedInteger, E> {
protected static final Comparator<UnsignedInteger> COMPARATOR = new
UnsignedComparator();
protected static final Comparator<UnsignedInteger> REVERSE_COMPARATOR =
Collections.reverseOrder(COMPARATOR);
- protected final RingQueue<SplayedEntry<E>> entryPool = new RingQueue<>(64);
+ protected final int DEFAULT_ENTRY_POOL_SIZE = 64;
+
+ protected final RingQueue<SplayedEntry<E>> entryPool = new
RingQueue<>(DEFAULT_ENTRY_POOL_SIZE);
+
+ protected int entriesInExistence = 0;
/**
* Root node which can be null if the tree has no elements (size == 0)
@@ -154,14 +158,14 @@ public class SplayMap<E> implements
NavigableMap<UnsignedInteger, E> {
E oldValue = null;
if (root == null) {
- root = entryPool.poll(SplayMap::createEntry).initialize(key,
value);
+ root = entryPool.poll(() -> createEntry()).initialize(key, value);
} else {
root = splay(root, key);
if (root.key == key) {
oldValue = root.value;
root.value = value;
} else {
- final SplayedEntry<E> node =
entryPool.poll(SplayMap::createEntry).initialize(key, value);
+ final SplayedEntry<E> node = entryPool.poll(() ->
createEntry()).initialize(key, value);
if (compare(key, root.key) < 0) {
shiftRootRightOf(node);
@@ -198,13 +202,13 @@ public class SplayMap<E> implements
NavigableMap<UnsignedInteger, E> {
*/
public E putIfAbsent(int key, E value) {
if (root == null) {
- root = entryPool.poll(SplayMap::createEntry).initialize(key,
value);
+ root = entryPool.poll(() -> createEntry()).initialize(key, value);
} else {
root = splay(root, key);
if (root.key == key) {
return root.value;
} else {
- final SplayedEntry<E> node =
entryPool.poll(SplayMap::createEntry).initialize(key, value);
+ final SplayedEntry<E> node = entryPool.poll(() ->
createEntry()).initialize(key, value);
if (compare(key, root.key) < 0) {
shiftRootRightOf(node);
@@ -857,7 +861,7 @@ public class SplayMap<E> implements
NavigableMap<UnsignedInteger, E> {
return Integer.compareUnsigned(lhs, rhs);
}
- private static <E> SplayedEntry<E> createEntry() {
+ private SplayedEntry<E> createEntry() {
return new SplayedEntry<>();
}
@@ -1307,6 +1311,8 @@ public class SplayMap<E> implements
NavigableMap<UnsignedInteger, E> {
/**
* An immutable {@link Map} entry that can be used when exposing raw entry
mappings
* via the {@link Map} API.
+ *
+ * @param <E> Type of the value portion of this immutable entry.
*/
public static class ImmutableSplayMapEntry<E> implements
Map.Entry<UnsignedInteger, E> {
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
index bedfcf6f..b8be5c3d 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiverTest.java
@@ -4592,4 +4592,66 @@ public class ProtonReceiverTest extends
ProtonEngineTestSupport {
assertNull(failure);
}
+
+ @Test
+ public void testReceiveDeliveriesAndSendDispositionUponReceipt() {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ final byte[] payload = new byte[] { 1 };
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond().withContainerId("driver");
+ peer.expectBegin().respond().withNextOutgoingId(0);
+ peer.expectAttach().respond();
+ peer.expectFlow().withLinkCredit(3);
+ peer.remoteTransfer().withDeliveryId(0)
+ .withDeliveryTag(new byte[] {1})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(0)
+ .withSettled(true)
+ .withState().accepted();
+ peer.remoteTransfer().withDeliveryId(1)
+ .withDeliveryTag(new byte[] {2})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(1)
+ .withSettled(true)
+ .withState().accepted();
+ peer.remoteTransfer().withDeliveryId(2)
+ .withDeliveryTag(new byte[] {3})
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.expectDisposition().withFirst(2)
+ .withSettled(true)
+ .withState().accepted();
+
+ Connection connection = engine.start().open();
+ Session session = connection.session().open();
+ Receiver receiver = session.receiver("receiver");
+ receiver.deliveryReadHandler((delivery) -> {
+ delivery.disposition(Accepted.getInstance(), true);
+ });
+
+ receiver.addCredit(3);
+ receiver.open();
+
+ peer.waitForScriptToComplete();
+ peer.expectDetach().respond();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+
+ receiver.close();
+ session.close();
+ connection.close();
+
+ // Check post conditions and done.
+ peer.waitForScriptToComplete();
+ assertNull(failure);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]