This is an automated email from the ASF dual-hosted git repository.

thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d33cc20761b [improve][broker] Reduce memory occupation of the delayed 
message queue (#23611)
d33cc20761b is described below

commit d33cc20761b97d103d52ce7e24638edcd43a2a1e
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Nov 22 14:10:05 2024 +0800

    [improve][broker] Reduce memory occupation of the delayed message queue 
(#23611)
---
 .../delayed/InMemoryDelayedDeliveryTracker.java    | 114 +++++++++++++++++----
 .../delayed/InMemoryDeliveryTrackerTest.java       |   8 +-
 2 files changed, 100 insertions(+), 22 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index bdc6e4c814e..5796fcbd785 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.broker.delayed;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Timer;
+import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
 import java.time.Clock;
 import java.util.NavigableSet;
 import java.util.TreeSet;
@@ -29,12 +35,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.PositionFactory;
 import 
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
 
 @Slf4j
 public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker {
 
-    protected final TripleLongPriorityQueue priorityQueue = new 
TripleLongPriorityQueue();
+    // timestamp -> ledgerId -> entryId
+    // AVL tree -> OpenHashMap -> RoaringBitmap
+    protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
+            delayedMessageMap = new Long2ObjectAVLTreeMap<>();
 
     // If we detect that all messages have fixed delay time, such that the 
delivery is
     // always going to be in FIFO order, then we can avoid pulling all the 
messages in
@@ -52,6 +61,9 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     // Track whether we have seen all messages with fixed delay so far.
     private boolean messagesHaveFixedDelay = true;
 
+    // The bit count to trim to reduce memory occupation.
+    private final int timestampPrecisionBitCnt;
+
     
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers 
dispatcher, Timer timer,
                                    long tickTimeMillis,
                                    boolean 
isDelayedDeliveryDeliverAtTimeStrict,
@@ -66,6 +78,35 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                                           long fixedDelayDetectionLookahead) {
         super(dispatcher, timer, tickTimeMillis, clock, 
isDelayedDeliveryDeliverAtTimeStrict);
         this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+        this.timestampPrecisionBitCnt = 
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+    }
+
+    /**
+     * The tick time is used to determine the precision of the delivery time. 
As the redelivery time
+     * is not accurate, we can bucket the delivery time and group multiple 
message ids into the same
+     * bucket to reduce the memory usage. THe default value is 1 second, which 
means we accept 1 second
+     * deviation for the delivery time, so that we can trim the lower 9 bits 
of the delivery time, because
+     * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
+     * @param tickTimeMillis
+     * @return
+     */
+    private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+        int bitCnt = 0;
+        while (tickTimeMillis > 0) {
+            tickTimeMillis >>= 1;
+            bitCnt++;
+        }
+        return bitCnt > 0 ? bitCnt - 1 : 0;
+    }
+
+    /**
+     * trim the lower bits of the timestamp to reduce the memory usage.
+     * @param timestamp
+     * @param bits
+     * @return
+     */
+    private static long trimLowerBit(long timestamp, int bits) {
+        return timestamp & (-1L << bits);
     }
 
     @Override
@@ -80,7 +121,10 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
                     deliverAt - clock.millis());
         }
 
-        priorityQueue.add(deliverAt, ledgerId, entryId);
+        long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+        delayedMessageMap.computeIfAbsent(timestamp, k -> new 
Long2ObjectOpenHashMap<>())
+                .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
+                .add(entryId);
         updateTimer();
 
         checkAndUpdateHighest(deliverAt);
@@ -105,7 +149,8 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
      */
     @Override
     public boolean hasMessageAvailable() {
-        boolean hasMessageAvailable = !priorityQueue.isEmpty() && 
priorityQueue.peekN1() <= getCutoffTime();
+        boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
+                && delayedMessageMap.firstLongKey() <= getCutoffTime();
         if (!hasMessageAvailable) {
             updateTimer();
         }
@@ -121,25 +166,49 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
         NavigableSet<Position> positions = new TreeSet<>();
         long cutoffTime = getCutoffTime();
 
-        while (n > 0 && !priorityQueue.isEmpty()) {
-            long timestamp = priorityQueue.peekN1();
+        while (n > 0 && !delayedMessageMap.isEmpty()) {
+            long timestamp = delayedMessageMap.firstLongKey();
             if (timestamp > cutoffTime) {
                 break;
             }
 
-            long ledgerId = priorityQueue.peekN2();
-            long entryId = priorityQueue.peekN3();
-            positions.add(PositionFactory.create(ledgerId, entryId));
-
-            priorityQueue.pop();
-            --n;
+            LongSet ledgerIdToDelete = new LongOpenHashSet();
+            Long2ObjectMap<Roaring64Bitmap> ledgerMap = 
delayedMessageMap.get(timestamp);
+            for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry : 
ledgerMap.long2ObjectEntrySet()) {
+                long ledgerId = ledgerEntry.getLongKey();
+                Roaring64Bitmap entryIds = ledgerEntry.getValue();
+                int cardinality = (int) entryIds.getLongCardinality();
+                if (cardinality <= n) {
+                    entryIds.forEach(entryId -> {
+                        positions.add(PositionFactory.create(ledgerId, 
entryId));
+                    });
+                    n -= cardinality;
+                    ledgerIdToDelete.add(ledgerId);
+                } else {
+                    long[] entryIdsArray = entryIds.toArray();
+                    for (int i = 0; i < n; i++) {
+                        positions.add(PositionFactory.create(ledgerId, 
entryIdsArray[i]));
+                        entryIds.removeLong(entryIdsArray[i]);
+                    }
+                    n = 0;
+                }
+                if (n <= 0) {
+                    break;
+                }
+            }
+            for (long ledgerId : ledgerIdToDelete) {
+                ledgerMap.remove(ledgerId);
+            }
+            if (ledgerMap.isEmpty()) {
+                delayedMessageMap.remove(timestamp);
+            }
         }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Get scheduled messages - found {}", 
dispatcher.getName(), positions.size());
         }
 
-        if (priorityQueue.isEmpty()) {
+        if (delayedMessageMap.isEmpty()) {
             // Reset to initial state
             highestDeliveryTimeTracked = 0;
             messagesHaveFixedDelay = true;
@@ -151,24 +220,33 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
 
     @Override
     public CompletableFuture<Void> clear() {
-        this.priorityQueue.clear();
+        this.delayedMessageMap.clear();
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long getNumberOfDelayedMessages() {
-        return priorityQueue.size();
+        return delayedMessageMap.values().stream().mapToLong(
+                ledgerMap -> ledgerMap.values().stream().mapToLong(
+                        Roaring64Bitmap::getLongCardinality).sum()).sum();
     }
 
+    /**
+     * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate 
the memory usage of the buffer.
+     * The memory usage of the buffer is not accurate, because 
Roaring64Bitmap::getLongSizeInBytes will
+     * overestimate the memory usage of the buffer a lot.
+     * @return the memory usage of the buffer
+     */
     @Override
     public long getBufferMemoryUsage() {
-        return priorityQueue.bytesCapacity();
+        return delayedMessageMap.values().stream().mapToLong(
+                ledgerMap -> ledgerMap.values().stream().mapToLong(
+                        Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
     }
 
     @Override
     public void close() {
         super.close();
-        priorityQueue.close();
     }
 
     @Override
@@ -181,6 +259,6 @@ public class InMemoryDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTrack
     }
 
     protected long nextDeliveryTime() {
-        return priorityQueue.peekN1();
+        return delayedMessageMap.firstLongKey();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index ff7763927d8..dc6f623c82b 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -92,7 +92,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             false, 0)
             }};
             case "testAddMessageWithStrictDelay" -> new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100, 
clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
                             true, 0)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" -> 
new Object[][]{{
@@ -100,7 +100,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             true, 0)
             }};
             case 
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" -> 
new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 
100000, clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1, 
clock,
                             true, 0)
             }};
             case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict" 
-> new Object[][]{{
@@ -108,7 +108,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                             true, 0)
             }};
             case "testWithFixedDelays", 
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
-                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500, 
clock,
+                    new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8, 
clock,
                             true, 100)
             }};
             default -> new Object[][]{{
@@ -230,7 +230,7 @@ public class InMemoryDeliveryTrackerTest extends 
AbstractDeliveryTrackerTest {
                     return;
                 }
                 try {
-                    this.priorityQueue.peekN1();
+                    this.delayedMessageMap.firstLongKey();
                 } catch (Exception e) {
                     e.printStackTrace();
                     exceptions[0] = e;

Reply via email to