This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 998bb51f5ea [fix][broker] fix delay queue sequence issue. (#24035)
998bb51f5ea is described below

commit 998bb51f5ea256c59110e5b8bef0f9daef1ac48d
Author: Wenzhi Feng <[email protected]>
AuthorDate: Mon Mar 3 10:50:29 2025 +0800

    [fix][broker] fix delay queue sequence issue. (#24035)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    |  8 +++----
 .../delayed/InMemoryDeliveryTrackerTest.java       | 26 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index 5796fcbd785..a48ed416138 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -22,7 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timer;
 import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
 import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
 import it.unimi.dsi.fastutil.longs.LongSet;
@@ -42,7 +42,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
 
     // timestamp -> ledgerId -> entryId
     // AVL tree -> OpenHashMap -> RoaringBitmap
-    protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
+    protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
             delayedMessageMap = new Long2ObjectAVLTreeMap<>();
 
     // If we detect that all messages have fixed delay time, such that the 
delivery is
@@ -122,7 +122,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         }
 
         long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
-        delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectOpenHashMap<>())
+        delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>())
                 .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
                 .add(entryId);
         updateTimer();
@@ -173,7 +173,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
             }
 
             LongSet ledgerIdToDelete = new LongOpenHashSet();
-            Long2ObjectMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(timestamp);
+            Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(timestamp);
             for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
                 long ledgerId = ledgerEntry.getLongKey();
                 Roaring64Bitmap entryIds = ledgerEntry.getValue();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index dc6f623c82b..92d4719a5a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -34,9 +34,12 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
 import java.time.Clock;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.bookkeeper.mledger.Position;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -249,4 +252,27 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
 
         timer.stop();
     }
+
+    @Test(dataProvider = "delayedTracker")
+    public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
+        assertFalse(tracker.hasMessageAvailable());
+
+        int messageCount = 5;
+        for(int i = 1; i <= messageCount; i++) {
+            assertTrue(tracker.addMessage(i, i, 1));
+        }
+        clockTime.set(10);
+        assertTrue(tracker.hasMessageAvailable());
+        assertEquals(tracker.getNumberOfDelayedMessages(), messageCount);
+
+        for (int i = 1; i <= messageCount; i++) {
+            Set<Position> scheduled = tracker.getScheduledMessages(1);
+            assertEquals(scheduled.size(), 1);
+            Position position = scheduled.iterator().next();
+            assertEquals(position.getLedgerId(), i);
+            assertEquals(position.getEntryId(), i);
+        }
+        tracker.close();
+    }
+
 }

Reply via email to