This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new a758479d PROTON-2880 Fix UnsettledMap iterator compaction logic
a758479d is described below
commit a758479dc4c212e237183ed7a6ff3a1703c38ffc
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Mar 18 12:57:49 2025 -0400
PROTON-2880 Fix UnsettledMap iterator compaction logic
Ensure that on compaction during a remove the location value of the
next element is properly computed.
---
.../qpid/protonj2/engine/util/UnsettledMap.java | 57 +++++++++++-----------
.../protonj2/engine/util/UnsettledMapTest.java | 32 ++++++++++++
2 files changed, 61 insertions(+), 28 deletions(-)
diff --git
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
index cf362dfe..b4f49e0d 100644
---
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
+++
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/util/UnsettledMap.java
@@ -166,18 +166,16 @@ public class UnsettledMap<Delivery> implements
Map<UnsignedInteger, Delivery> {
}
public Delivery get(int deliveryId) {
- if (totalEntries == 0) {
- return null;
- }
-
- // Search every bucket because delivery IDs can wrap around, but we can
- // stop at the first empty bucket as all buckets following it must also
- // be empty buckets.
- for (int i = 0; i <= current; ++i) {
- if (buckets[i].isInRange(deliveryId)) {
- final Delivery result = buckets[i].get(deliveryId);
- if (result != null) {
- return result;
+ if (totalEntries > 0) {
+ // Search every bucket because delivery IDs can wrap around, but
we can
+ // stop at the first empty bucket as all buckets following it must
also
+ // be empty buckets.
+ for (int i = 0; i <= current; ++i) {
+ if (buckets[i].isInRange(deliveryId)) {
+ final Delivery result = buckets[i].get(deliveryId);
+ if (result != null) {
+ return result;
+ }
}
}
}
@@ -228,16 +226,21 @@ public class UnsettledMap<Delivery> implements
Map<UnsignedInteger, Delivery> {
return false;
}
+ /**
+ * Visits each entry within the {@link UnsettledMap} and invokes the
provided action
+ * on each delivery in the tracker.
+ *
+ * @param action
+ * The action to invoke on each visited entry.
+ */
public void forEach(Consumer<Delivery> action) {
Objects.requireNonNull(action);
- if (totalEntries == 0) {
- return;
- }
-
- for (int i = 0; i <= current; ++i) {
- for (int j = buckets[i].readOffset; j < buckets[i].writeOffset;
++j) {
- action.accept(buckets[i].entryAt(j));
+ if (totalEntries > 0) {
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j <
buckets[i].writeOffset; ++j) {
+ action.accept(buckets[i].entryAt(j));
+ }
}
}
}
@@ -368,14 +371,12 @@ public class UnsettledMap<Delivery> implements
Map<UnsignedInteger, Delivery> {
public void forEach(BiConsumer<? super UnsignedInteger, ? super Delivery>
action) {
Objects.requireNonNull(action);
- if (totalEntries == 0) {
- return;
- }
-
- for (int i = 0; i <= current; ++i) {
- for (int j = buckets[i].readOffset; j < buckets[i].writeOffset;
++j) {
- final Delivery delivery = buckets[i].entryAt(j);
-
action.accept(UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(delivery)),
delivery);
+ if (totalEntries > 0) {
+ for (int i = 0; i <= current; ++i) {
+ for (int j = buckets[i].readOffset; j <
buckets[i].writeOffset; ++j) {
+ final Delivery delivery = buckets[i].entryAt(j);
+
action.accept(UnsignedInteger.valueOf(deliveryIdSupplier.getDeliveryId(delivery)),
delivery);
+ }
}
}
}
@@ -520,7 +521,7 @@ public class UnsettledMap<Delivery> implements
Map<UnsignedInteger, Delivery> {
// Moved into the previous bucket so the index being
negative
// give us the located when added to the previous write
offset
result = (long) (prevBucketIndex) << 32;
- result = prevBucket.writeOffset + nextEntryOffset;
+ result |= prevBucket.writeOffset + nextEntryOffset;
} else if (nextBucket.entries + (bucket.entries -
toCopyBackward) > 0) {
// Moved into the next bucket gives of the raw index so
long
// as we compact entries to zero (otherwise it is the read
offset
diff --git
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
index e6889b0e..5f7d4b3c 100644
---
a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
+++
b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/util/UnsettledMapTest.java
@@ -1892,6 +1892,38 @@ public class UnsettledMapTest {
assertEquals(uintArray.length, removed.get());
}
+ @Test
+ public void testRemoveFromIteratorFromMiddleBucket() {
+ final int numBuckets = 5;
+ final int bucketSize = 10;
+ final int numEntries = numBuckets * bucketSize;
+
+ final UnsettledMap<DeliveryType> map = createMap(numBuckets,
bucketSize);
+
+ for (int i = 0; i < numEntries; ++i) {
+ map.put(i, new DeliveryType(i));
+ }
+
+ assertEquals(numEntries, map.size());
+
+ Iterator<UnsignedInteger> entries = map.keySet().iterator();
+
+ // Move to center of bucket two
+ for (int i = 0; i < bucketSize + (bucketSize / 2); ++i) {
+ entries.next();
+ }
+
+ UnsignedInteger lastValue = null;
+
+ // Remove from center of bucket two into bucket three until a
compaction event should occur.
+ for (int i = 0; i < bucketSize; ++i) {
+ lastValue = entries.next();
+ entries.remove();
+ }
+
+ assertEquals(lastValue.intValue() + 1, entries.next().intValue());
+ }
+
protected void dumpRandomDataSet(int iterations, boolean bounded) {
final int[] dataSet = new int[iterations];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]