This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new f962d223870 [fix][broker][branch-4.0] Revert "[improve][broker] Reduce 
memory occupation of the delayed message queue (#23611)" (#24429)
f962d223870 is described below

commit f962d22387081e00b38e6a85912ea4368db4f8d2
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jun 19 11:25:28 2025 +0800

    [fix][broker][branch-4.0] Revert "[improve][broker] Reduce memory 
occupation of the delayed message queue (#23611)" (#24429)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 114 ++++-----------------
 .../delayed/InMemoryDeliveryTrackerTest.java       |  33 +-----
 2 files changed, 22 insertions(+), 125 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a48ed416138..bdc6e4c814e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -20,12 +20,6 @@ package org.apache.pulsar.broker.delayed;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timer;
-import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.longs.LongSet;
 import java.time.Clock;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -35,15 +29,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
 
 @Slf4j
 public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker {
 
-    // timestamp -> ledgerId -> entryId
-    // AVL tree -> OpenHashMap -> RoaringBitmap
-    protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
-            delayedMessageMap = new Long2ObjectAVLTreeMap<>();
+    protected final TripleLongPriorityQueue priorityQueue = new 
TripleLongPriorityQueue();
 
     // If we detect that all messages have fixed delay time, such that the 
delivery is
     // always going to be in FIFO order, then we can avoid pulling all the 
messages in
@@ -61,9 +52,6 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     // Track whether we have seen all messages with fixed delay so far.
     private boolean messagesHaveFixedDelay = true;
 
-    // The bit count to trim to reduce memory occupation.
-    private final int timestampPrecisionBitCnt;
-
     
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                    long tickTimeMillis,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict,
@@ -78,35 +66,6 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                                           long fixedDelayDetectionLookahead) {
         super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
-        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
-    }
-
-    /**
-     * The tick time is used to determine the precision of the delivery time. 
As the redelivery time
-     * is not accurate, we can bucket the delivery time and group multiple 
message ids into the same
-     * bucket to reduce the memory usage. THe default value is 1 second, which 
means we accept 1 second
-     * deviation for the delivery time, so that we can trim the lower 9 bits 
of the delivery time, because
-     * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
-     * @param tickTimeMillis
-     * @return
-     */
-    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
-        int bitCnt = 0;
-        while (tickTimeMillis > 0) {
-            tickTimeMillis >>= 1;
-            bitCnt++;
-        }
-        return bitCnt > 0 ? bitCnt - 1 : 0;
-    }
-
-    /**
-     * trim the lower bits of the timestamp to reduce the memory usage.
-     * @param timestamp
-     * @param bits
-     * @return
-     */
-    private static long trimLowerBit(long timestamp, int bits) {
-        return timestamp & (-1L << bits);
     }
 
     @Override
@@ -121,10 +80,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                     deliverAt - clock.millis());
         }
 
-        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
-        delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectRBTreeMap<>())
-                .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
-                .add(entryId);
+        priorityQueue.add(deliverAt, ledgerId, entryId);
         updateTimer();
 
         checkAndUpdateHighest(deliverAt);
@@ -149,8 +105,7 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
      */
     @Override
     public boolean hasMessageAvailable() {
-        boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
-                && delayedMessageMap.firstLongKey() <= getCutoffTime();
+        boolean hasMessageAvailable = !priorityQueue.isEmpty() && 
priorityQueue.peekN1() <= getCutoffTime();
         if (!hasMessageAvailable) {
             updateTimer();
         }
@@ -166,49 +121,25 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         NavigableSet<Position> positions = new TreeSet<>();
         long cutoffTime = getCutoffTime();
 
-        while (n > 0 && !delayedMessageMap.isEmpty()) {
-            long timestamp = delayedMessageMap.firstLongKey();
+        while (n > 0 && !priorityQueue.isEmpty()) {
+            long timestamp = priorityQueue.peekN1();
             if (timestamp > cutoffTime) {
                 break;
             }
 
-            LongSet ledgerIdToDelete = new LongOpenHashSet();
-            Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(timestamp);
-            for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
-                long ledgerId = ledgerEntry.getLongKey();
-                Roaring64Bitmap entryIds = ledgerEntry.getValue();
-                int cardinality = (int) entryIds.getLongCardinality();
-                if (cardinality <= n) {
-                    entryIds.forEach(entryId -> {
-                        positions.add(PositionFactory.create(ledgerId, 
entryId));
-                    });
-                    n -= cardinality;
-                    ledgerIdToDelete.add(ledgerId);
-                } else {
-                    long[] entryIdsArray = entryIds.toArray();
-                    for (int i = 0; i < n; i++) {
-                        positions.add(PositionFactory.create(ledgerId, 
entryIdsArray[i]));
-                        entryIds.removeLong(entryIdsArray[i]);
-                    }
-                    n = 0;
-                }
-                if (n <= 0) {
-                    break;
-                }
-            }
-            for (long ledgerId : ledgerIdToDelete) {
-                ledgerMap.remove(ledgerId);
-            }
-            if (ledgerMap.isEmpty()) {
-                delayedMessageMap.remove(timestamp);
-            }
+            long ledgerId = priorityQueue.peekN2();
+            long entryId = priorityQueue.peekN3();
+            positions.add(PositionFactory.create(ledgerId, entryId));
+
+            priorityQueue.pop();
+            --n;
         }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get scheduled messages - found {}", 
dispatcher.getName(), positions.size());
         }
 
-        if (delayedMessageMap.isEmpty()) {
+        if (priorityQueue.isEmpty()) {
             // Reset to initial state
             highestDeliveryTimeTracked = 0;
             messagesHaveFixedDelay = true;
@@ -220,33 +151,24 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
 
     @Override
     public CompletableFuture<Void> clear() {
-        this.delayedMessageMap.clear();
+        this.priorityQueue.clear();
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long getNumberOfDelayedMessages() {
-        return delayedMessageMap.values().stream().mapToLong(
-                ledgerMap -> ledgerMap.values().stream().mapToLong(
-                        Roaring64Bitmap::getLongCardinality).sum()).sum();
+        return priorityQueue.size();
     }
 
-    /**
-     * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate 
the memory usage of the buffer.
-     * The memory usage of the buffer is not accurate, because 
Roaring64Bitmap::getLongSizeInBytes will
-     * overestimate the memory usage of the buffer a lot.
-     * @return the memory usage of the buffer
-     */
     @Override
     public long getBufferMemoryUsage() {
-        return delayedMessageMap.values().stream().mapToLong(
-                ledgerMap -> ledgerMap.values().stream().mapToLong(
-                        Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
+        return priorityQueue.bytesCapacity();
     }
 
     @Override
     public void close() {
         super.close();
+        priorityQueue.close();
     }
 
     @Override
@@ -259,6 +181,6 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     }
 
     protected long nextDeliveryTime() {
-        return delayedMessageMap.firstLongKey();
+        return priorityQueue.peekN1();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index c03dc5b17bb..96bee4282e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -34,12 +34,10 @@ import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Method;
 import java.time.Clock;
 import java.util.NavigableMap;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.Position;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -95,7 +93,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             false, 0)
             }};
             case "testAddMessageWithStrictDelay" -> new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
                             true, 0)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> 
new Object[][]{{
@@ -103,7 +101,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             true, 0)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> 
new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
                             true, 0)
             }};
             case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" 
-> new Object[][]{{
@@ -111,7 +109,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             true, 0)
             }};
             case "testWithFixedDelays", 
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, 
clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, 
clock,
                             true, 100)
             }};
             default -> new Object[][]{{
@@ -234,7 +232,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                     return;
                 }
                 try {
-                    this.delayedMessageMap.firstLongKey();
+                    this.priorityQueue.peekN1();
                 } catch (Exception e) {
                     e.printStackTrace();
                     exceptions[0] = e;
@@ -251,27 +249,4 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
 
         assertNull(exceptions[0]);
     }
-
-    @Test(dataProvider = "delayedTracker")
-    public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) 
throws Exception {
-        assertFalse(tracker.hasMessageAvailable());
-
-        int messageCount = 5;
-        for(int i = 1; i <= messageCount; i++) {
-            assertTrue(tracker.addMessage(i, i, 1));
-        }
-        clockTime.set(10);
-        assertTrue(tracker.hasMessageAvailable());
-        assertEquals(tracker.getNumberOfDelayedMessages(), messageCount);
-
-        for (int i = 1; i <= messageCount; i++) {
-            Set<Position> scheduled = tracker.getScheduledMessages(1);
-            assertEquals(scheduled.size(), 1);
-            Position position = scheduled.iterator().next();
-            assertEquals(position.getLedgerId(), i);
-            assertEquals(position.getEntryId(), i);
-        }
-        tracker.close();
-    }
-
 }

Reply via email to