This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f962d223870 [fix][broker][branch-4.0] Revert "[improve][broker] Reduce
memory occupation of the delayed message queue (#23611)" (#24429)
f962d223870 is described below
commit f962d22387081e00b38e6a85912ea4368db4f8d2
Author: Penghui Li <[email protected]>
AuthorDate: Thu Jun 19 11:25:28 2025 +0800
[fix][broker][branch-4.0] Revert "[improve][broker] Reduce memory
occupation of the delayed message queue (#23611)" (#24429)
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 114 ++++-----------------
.../delayed/InMemoryDeliveryTrackerTest.java | 33 +-----
2 files changed, 22 insertions(+), 125 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index a48ed416138..bdc6e4c814e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -20,12 +20,6 @@ package org.apache.pulsar.broker.delayed;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
-import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectRBTreeMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
-import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
-import it.unimi.dsi.fastutil.longs.LongSet;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
@@ -35,15 +29,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
+import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
@Slf4j
public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker {
- // timestamp -> ledgerId -> entryId
- // AVL tree -> OpenHashMap -> RoaringBitmap
- protected final Long2ObjectSortedMap<Long2ObjectSortedMap<Roaring64Bitmap>>
- delayedMessageMap = new Long2ObjectAVLTreeMap<>();
+ protected final TripleLongPriorityQueue priorityQueue = new
TripleLongPriorityQueue();
// If we detect that all messages have fixed delay time, such that the
delivery is
// always going to be in FIFO order, then we can avoid pulling all the
messages in
@@ -61,9 +52,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;
- // The bit count to trim to reduce memory occupation.
- private final int timestampPrecisionBitCnt;
-
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
long tickTimeMillis,
boolean
isDelayedDeliveryDeliverAtTimeStrict,
@@ -78,35 +66,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
- this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
- }
-
- /**
- * The tick time is used to determine the precision of the delivery time.
As the redelivery time
- * is not accurate, we can bucket the delivery time and group multiple
message ids into the same
- * bucket to reduce the memory usage. THe default value is 1 second, which
means we accept 1 second
- * deviation for the delivery time, so that we can trim the lower 9 bits
of the delivery time, because
- * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
- * @param tickTimeMillis
- * @return
- */
- private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
- int bitCnt = 0;
- while (tickTimeMillis > 0) {
- tickTimeMillis >>= 1;
- bitCnt++;
- }
- return bitCnt > 0 ? bitCnt - 1 : 0;
- }
-
- /**
- * trim the lower bits of the timestamp to reduce the memory usage.
- * @param timestamp
- * @param bits
- * @return
- */
- private static long trimLowerBit(long timestamp, int bits) {
- return timestamp & (-1L << bits);
}
@Override
@@ -121,10 +80,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
deliverAt - clock.millis());
}
- long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
- delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectRBTreeMap<>())
- .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
- .add(entryId);
+ priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();
checkAndUpdateHighest(deliverAt);
@@ -149,8 +105,7 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
*/
@Override
public boolean hasMessageAvailable() {
- boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
- && delayedMessageMap.firstLongKey() <= getCutoffTime();
+ boolean hasMessageAvailable = !priorityQueue.isEmpty() &&
priorityQueue.peekN1() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
@@ -166,49 +121,25 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
NavigableSet<Position> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();
- while (n > 0 && !delayedMessageMap.isEmpty()) {
- long timestamp = delayedMessageMap.firstLongKey();
+ while (n > 0 && !priorityQueue.isEmpty()) {
+ long timestamp = priorityQueue.peekN1();
if (timestamp > cutoffTime) {
break;
}
- LongSet ledgerIdToDelete = new LongOpenHashSet();
- Long2ObjectSortedMap<Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
- for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
- long ledgerId = ledgerEntry.getLongKey();
- Roaring64Bitmap entryIds = ledgerEntry.getValue();
- int cardinality = (int) entryIds.getLongCardinality();
- if (cardinality <= n) {
- entryIds.forEach(entryId -> {
- positions.add(PositionFactory.create(ledgerId,
entryId));
- });
- n -= cardinality;
- ledgerIdToDelete.add(ledgerId);
- } else {
- long[] entryIdsArray = entryIds.toArray();
- for (int i = 0; i < n; i++) {
- positions.add(PositionFactory.create(ledgerId,
entryIdsArray[i]));
- entryIds.removeLong(entryIdsArray[i]);
- }
- n = 0;
- }
- if (n <= 0) {
- break;
- }
- }
- for (long ledgerId : ledgerIdToDelete) {
- ledgerMap.remove(ledgerId);
- }
- if (ledgerMap.isEmpty()) {
- delayedMessageMap.remove(timestamp);
- }
+ long ledgerId = priorityQueue.peekN2();
+ long entryId = priorityQueue.peekN3();
+ positions.add(PositionFactory.create(ledgerId, entryId));
+
+ priorityQueue.pop();
+ --n;
}
if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messages - found {}",
dispatcher.getName(), positions.size());
}
- if (delayedMessageMap.isEmpty()) {
+ if (priorityQueue.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
@@ -220,33 +151,24 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
@Override
public CompletableFuture<Void> clear() {
- this.delayedMessageMap.clear();
+ this.priorityQueue.clear();
return CompletableFuture.completedFuture(null);
}
@Override
public long getNumberOfDelayedMessages() {
- return delayedMessageMap.values().stream().mapToLong(
- ledgerMap -> ledgerMap.values().stream().mapToLong(
- Roaring64Bitmap::getLongCardinality).sum()).sum();
+ return priorityQueue.size();
}
- /**
- * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate
the memory usage of the buffer.
- * The memory usage of the buffer is not accurate, because
Roaring64Bitmap::getLongSizeInBytes will
- * overestimate the memory usage of the buffer a lot.
- * @return the memory usage of the buffer
- */
@Override
public long getBufferMemoryUsage() {
- return delayedMessageMap.values().stream().mapToLong(
- ledgerMap -> ledgerMap.values().stream().mapToLong(
- Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
+ return priorityQueue.bytesCapacity();
}
@Override
public void close() {
super.close();
+ priorityQueue.close();
}
@Override
@@ -259,6 +181,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
protected long nextDeliveryTime() {
- return delayedMessageMap.firstLongKey();
+ return priorityQueue.peekN1();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index c03dc5b17bb..96bee4282e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -34,12 +34,10 @@ import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Method;
import java.time.Clock;
import java.util.NavigableMap;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
-import org.apache.bookkeeper.mledger.Position;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -95,7 +93,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
false, 0)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
true, 0)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" ->
new Object[][]{{
@@ -103,7 +101,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
true, 0)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" ->
new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict"
-> new Object[][]{{
@@ -111,7 +109,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
true, 0)
}};
case "testWithFixedDelays",
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8,
clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500,
clock,
true, 100)
}};
default -> new Object[][]{{
@@ -234,7 +232,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
return;
}
try {
- this.delayedMessageMap.firstLongKey();
+ this.priorityQueue.peekN1();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;
@@ -251,27 +249,4 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
assertNull(exceptions[0]);
}
-
- @Test(dataProvider = "delayedTracker")
- public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker)
throws Exception {
- assertFalse(tracker.hasMessageAvailable());
-
- int messageCount = 5;
- for(int i = 1; i <= messageCount; i++) {
- assertTrue(tracker.addMessage(i, i, 1));
- }
- clockTime.set(10);
- assertTrue(tracker.hasMessageAvailable());
- assertEquals(tracker.getNumberOfDelayedMessages(), messageCount);
-
- for (int i = 1; i <= messageCount; i++) {
- Set<Position> scheduled = tracker.getScheduledMessages(1);
- assertEquals(scheduled.size(), 1);
- Position position = scheduled.iterator().next();
- assertEquals(position.getLedgerId(), i);
- assertEquals(position.getEntryId(), i);
- }
- tracker.close();
- }
-
}