This is an automated email from the ASF dual-hosted git repository.
thetumbled pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d33cc20761b [improve][broker] Reduce memory occupation of the delayed
message queue (#23611)
d33cc20761b is described below
commit d33cc20761b97d103d52ce7e24638edcd43a2a1e
Author: Wenzhi Feng <[email protected]>
AuthorDate: Fri Nov 22 14:10:05 2024 +0800
[improve][broker] Reduce memory occupation of the delayed message queue
(#23611)
---
.../delayed/InMemoryDelayedDeliveryTracker.java | 114 +++++++++++++++++----
.../delayed/InMemoryDeliveryTrackerTest.java | 8 +-
2 files changed, 100 insertions(+), 22 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
index bdc6e4c814e..5796fcbd785 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
@@ -20,6 +20,12 @@ package org.apache.pulsar.broker.delayed;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Timer;
+import it.unimi.dsi.fastutil.longs.Long2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
import java.time.Clock;
import java.util.NavigableSet;
import java.util.TreeSet;
@@ -29,12 +35,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import
org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
-import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
+import org.roaringbitmap.longlong.Roaring64Bitmap;
@Slf4j
public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker {
- protected final TripleLongPriorityQueue priorityQueue = new
TripleLongPriorityQueue();
+ // timestamp -> ledgerId -> entryId
+ // AVL tree -> OpenHashMap -> RoaringBitmap
+ protected final Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>>
+ delayedMessageMap = new Long2ObjectAVLTreeMap<>();
// If we detect that all messages have fixed delay time, such that the
delivery is
// always going to be in FIFO order, then we can avoid pulling all the
messages in
@@ -52,6 +61,9 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;
+ // The bit count to trim to reduce memory occupation.
+ private final int timestampPrecisionBitCnt;
+
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers
dispatcher, Timer timer,
long tickTimeMillis,
boolean
isDelayedDeliveryDeliverAtTimeStrict,
@@ -66,6 +78,35 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
+ this.timestampPrecisionBitCnt =
calculateTimestampPrecisionBitCnt(tickTimeMillis);
+ }
+
+ /**
+ * The tick time is used to determine the precision of the delivery time.
As the redelivery time
+ * is not accurate, we can bucket the delivery time and group multiple
message ids into the same
+ * bucket to reduce the memory usage. THe default value is 1 second, which
means we accept 1 second
+ * deviation for the delivery time, so that we can trim the lower 9 bits
of the delivery time, because
+ * 2**9ms = 512ms < 1s, 2**10ms = 1024ms > 1s.
+ * @param tickTimeMillis
+ * @return
+ */
+ private static int calculateTimestampPrecisionBitCnt(long tickTimeMillis) {
+ int bitCnt = 0;
+ while (tickTimeMillis > 0) {
+ tickTimeMillis >>= 1;
+ bitCnt++;
+ }
+ return bitCnt > 0 ? bitCnt - 1 : 0;
+ }
+
+ /**
+ * trim the lower bits of the timestamp to reduce the memory usage.
+ * @param timestamp
+ * @param bits
+ * @return
+ */
+ private static long trimLowerBit(long timestamp, int bits) {
+ return timestamp & (-1L << bits);
}
@Override
@@ -80,7 +121,10 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
deliverAt - clock.millis());
}
- priorityQueue.add(deliverAt, ledgerId, entryId);
+ long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
+ delayedMessageMap.computeIfAbsent(timestamp, k -> new
Long2ObjectOpenHashMap<>())
+ .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
+ .add(entryId);
updateTimer();
checkAndUpdateHighest(deliverAt);
@@ -105,7 +149,8 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
*/
@Override
public boolean hasMessageAvailable() {
- boolean hasMessageAvailable = !priorityQueue.isEmpty() &&
priorityQueue.peekN1() <= getCutoffTime();
+ boolean hasMessageAvailable = !delayedMessageMap.isEmpty()
+ && delayedMessageMap.firstLongKey() <= getCutoffTime();
if (!hasMessageAvailable) {
updateTimer();
}
@@ -121,25 +166,49 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
NavigableSet<Position> positions = new TreeSet<>();
long cutoffTime = getCutoffTime();
- while (n > 0 && !priorityQueue.isEmpty()) {
- long timestamp = priorityQueue.peekN1();
+ while (n > 0 && !delayedMessageMap.isEmpty()) {
+ long timestamp = delayedMessageMap.firstLongKey();
if (timestamp > cutoffTime) {
break;
}
- long ledgerId = priorityQueue.peekN2();
- long entryId = priorityQueue.peekN3();
- positions.add(PositionFactory.create(ledgerId, entryId));
-
- priorityQueue.pop();
- --n;
+ LongSet ledgerIdToDelete = new LongOpenHashSet();
+ Long2ObjectMap<Roaring64Bitmap> ledgerMap =
delayedMessageMap.get(timestamp);
+ for (Long2ObjectMap.Entry<Roaring64Bitmap> ledgerEntry :
ledgerMap.long2ObjectEntrySet()) {
+ long ledgerId = ledgerEntry.getLongKey();
+ Roaring64Bitmap entryIds = ledgerEntry.getValue();
+ int cardinality = (int) entryIds.getLongCardinality();
+ if (cardinality <= n) {
+ entryIds.forEach(entryId -> {
+ positions.add(PositionFactory.create(ledgerId,
entryId));
+ });
+ n -= cardinality;
+ ledgerIdToDelete.add(ledgerId);
+ } else {
+ long[] entryIdsArray = entryIds.toArray();
+ for (int i = 0; i < n; i++) {
+ positions.add(PositionFactory.create(ledgerId,
entryIdsArray[i]));
+ entryIds.removeLong(entryIdsArray[i]);
+ }
+ n = 0;
+ }
+ if (n <= 0) {
+ break;
+ }
+ }
+ for (long ledgerId : ledgerIdToDelete) {
+ ledgerMap.remove(ledgerId);
+ }
+ if (ledgerMap.isEmpty()) {
+ delayedMessageMap.remove(timestamp);
+ }
}
if (log.isDebugEnabled()) {
log.debug("[{}] Get scheduled messages - found {}",
dispatcher.getName(), positions.size());
}
- if (priorityQueue.isEmpty()) {
+ if (delayedMessageMap.isEmpty()) {
// Reset to initial state
highestDeliveryTimeTracked = 0;
messagesHaveFixedDelay = true;
@@ -151,24 +220,33 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
@Override
public CompletableFuture<Void> clear() {
- this.priorityQueue.clear();
+ this.delayedMessageMap.clear();
return CompletableFuture.completedFuture(null);
}
@Override
public long getNumberOfDelayedMessages() {
- return priorityQueue.size();
+ return delayedMessageMap.values().stream().mapToLong(
+ ledgerMap -> ledgerMap.values().stream().mapToLong(
+ Roaring64Bitmap::getLongCardinality).sum()).sum();
}
+ /**
+ * This method rely on Roaring64Bitmap::getLongSizeInBytes to calculate
the memory usage of the buffer.
+ * The memory usage of the buffer is not accurate, because
Roaring64Bitmap::getLongSizeInBytes will
+ * overestimate the memory usage of the buffer a lot.
+ * @return the memory usage of the buffer
+ */
@Override
public long getBufferMemoryUsage() {
- return priorityQueue.bytesCapacity();
+ return delayedMessageMap.values().stream().mapToLong(
+ ledgerMap -> ledgerMap.values().stream().mapToLong(
+ Roaring64Bitmap::getLongSizeInBytes).sum()).sum();
}
@Override
public void close() {
super.close();
- priorityQueue.close();
}
@Override
@@ -181,6 +259,6 @@ public class InMemoryDelayedDeliveryTracker extends
AbstractDelayedDeliveryTrack
}
protected long nextDeliveryTime() {
- return priorityQueue.peekN1();
+ return delayedMessageMap.firstLongKey();
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
index ff7763927d8..dc6f623c82b 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java
@@ -92,7 +92,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
false, 0)
}};
case "testAddMessageWithStrictDelay" -> new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer, 100,
clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
true, 0)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowBeforeTickTimeFrequencyWithStrict" ->
new Object[][]{{
@@ -100,7 +100,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
true, 0)
}};
case
"testAddMessageWithDeliverAtTimeAfterNowAfterTickTimeFrequencyWithStrict" ->
new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer,
100000, clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 1,
clock,
true, 0)
}};
case "testAddMessageWithDeliverAtTimeAfterFullTickTimeWithStrict"
-> new Object[][]{{
@@ -108,7 +108,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
true, 0)
}};
case "testWithFixedDelays",
"testWithMixedDelays","testWithNoDelays" -> new Object[][]{{
- new InMemoryDelayedDeliveryTracker(dispatcher, timer, 500,
clock,
+ new InMemoryDelayedDeliveryTracker(dispatcher, timer, 8,
clock,
true, 100)
}};
default -> new Object[][]{{
@@ -230,7 +230,7 @@ public class InMemoryDeliveryTrackerTest extends
AbstractDeliveryTrackerTest {
return;
}
try {
- this.priorityQueue.peekN1();
+ this.delayedMessageMap.firstLongKey();
} catch (Exception e) {
e.printStackTrace();
exceptions[0] = e;