This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 998bb51f5ea [fix][broker] fix delay queue sequence issue. (#24035)
998bb51f5ea is described below
commit 998bb51f5ea256c59110e5b8bef0f9daef1ac48d
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Mar 3 10:50:29 2025 +0800
[fix][broker] fix delay queue sequence issue. (#24035)
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 8 +++----
.../delayed/InMemoryDeliveryTrackerTest.java | 26 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 5796fcbd785..a48ed416138 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import it.unimi.dsi.fastutil.longs.LongSet;
@@ -42,7 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
// timestamp -> ledgerId -> entryId
// AVL tree -> OpenHashMap -> RoaringBitmap
- protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
+ protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
delayedMessageMap = new Long2ObjectAVLTreeMap<>();
// If we detect that all messages have fixed delay time, such that the
delivery is
@@ -122,7 +122,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
- delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectOpenHashMap<>())
+ delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
updateTimer();
@@ -173,7 +173,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
LongSet ledgerIdToDelete = new LongOpenHashSet();
- Long2ObjectMap<Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
+ Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
long ledgerId = ledgerEntry.getLongKey();
Roaring64Bitmap entryIds = ledgerEntry.getValue();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index dc6f623c82b..92d4719a5a2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -34,9 +34,12 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.NavigableMap;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.Position;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -249,4 +252,27 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
timer.stop();
}
+
+ @Test(dataProvider = "delayedTracker")
+ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
+ assertFalse(tracker.hasMessageAvailable());
+
+ int messageCount = 5;
+ for(int i = 1; i <= messageCount; i++) {
+ assertTrue(tracker.addMessage(i, i, 1));
+ }
+ clockTime.set(10);
+ assertTrue(tracker.hasMessageAvailable());
+ assertEquals(tracker.getNumberOfDelayedMessages(), messageCount);
+
+ for (int i = 1; i <= messageCount; i++) {
+ Set<Position> scheduled = tracker.getScheduledMessages(1);
+ assertEquals(scheduled.size(), 1);
+ Position position = scheduled.iterator().next();
+ assertEquals(position.getLedgerId(), i);
+ assertEquals(position.getEntryId(), i);
+ }
+ tracker.close();
+ }
+
}