This is an automated email from the ASF dual-hosted git repository. mattisonchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6dc0b0ea38a [improve][broker] Optimize delayed metadata index bitmap (#20136) 6dc0b0ea38a is described below commit 6dc0b0ea38ab9dc410a24b19fd7567b9e013837d Author: Qiang Zhao <mattisonc...@apache.org> AuthorDate: Thu Apr 20 08:21:08 2023 +0800 [improve][broker] Optimize delayed metadata index bitmap (#20136) --- .../bucket/BucketDelayedDeliveryTracker.java | 3 ++ .../broker/delayed/bucket/ImmutableBucket.java | 53 +++++++++++++--------- .../broker/delayed/bucket/MutableBucket.java | 11 +++-- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index ad2fc6fae4c..5d1dc2e2703 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -485,6 +485,9 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker }); }); } + + // optimize bm + delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize); immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap); afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 82e98cefa5d..57de5c84fcd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -20,11 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket; import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry; import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE; -import com.google.protobuf.ByteString; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -37,8 +36,8 @@ import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; import org.apache.pulsar.common.util.FutureUtil; +import org.roaringbitmap.InvalidRoaringFormat; import org.roaringbitmap.RoaringBitmap; -import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @Slf4j class ImmutableBucket extends Bucket { @@ -98,7 +97,7 @@ class ImmutableBucket extends Bucket { this.setLastSegmentEntryId(metadataList.size()); this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); List<Long> firstScheduleTimestamps = metadataList.stream().map( - SnapshotSegmentMetadata::getMinScheduleTimestamp).toList(); + SnapshotSegmentMetadata::getMinScheduleTimestamp).toList(); this.setFirstScheduleTimestamps(firstScheduleTimestamps); return nextSnapshotEntryIndex + 1; @@ -139,25 +138,37 @@ class ImmutableBucket extends Bucket { }); } + /** + * Recover delayed index bit map and message numbers. + * @throws InvalidRoaringFormat invalid bitmap serialization format + */ private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex, - List<SnapshotSegmentMetadata> segmentMetadata) { - this.delayedIndexBitMap.clear(); - MutableLong numberMessages = new MutableLong(0); - for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) { - Map<Long, ByteString> bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap(); - bitByteStringMap.forEach((leaderId, bitSetString) -> { - boolean exist = this.delayedIndexBitMap.containsKey(leaderId); - RoaringBitmap bitSet = - new ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap(); - numberMessages.add(bitSet.getCardinality()); - if (!exist) { - this.delayedIndexBitMap.put(leaderId, bitSet); - } else { - this.delayedIndexBitMap.get(leaderId).or(bitSet); + List<SnapshotSegmentMetadata> segmentMetaList) { + delayedIndexBitMap.clear(); // cleanup dirty bm + final var numberMessages = new MutableLong(0); + for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) { + for (final var entry : segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) { + final var ledgerId = entry.getKey(); + final var bs = entry.getValue(); + final var sbm = new RoaringBitmap(); + try { + sbm.deserialize(bs.asReadOnlyByteBuffer()); + } catch (IOException e) { + throw new InvalidRoaringFormat(e.getMessage()); } - }); + numberMessages.add(sbm.getCardinality()); + delayedIndexBitMap.compute(ledgerId, (lId, bm) -> { + if (bm == null) { + return sbm; + } + bm.or(sbm); + return bm; + }); + } } - this.setNumberBucketDelayedMessages(numberMessages.getValue()); + // optimize bm + delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize); + setNumberBucketDelayedMessages(numberMessages.getValue()); } CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>> getRemainSnapshotSegment() { @@ -193,7 +204,7 @@ class ImmutableBucket extends Bucket { stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete); } else { log.info("[{}] Delete bucket snapshot finish, bucketId: {}, bucketKey: {}", - dispatcherName, bucketId, bucketKey); + dispatcherName, bucketId, bucketKey); stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete, System.currentTimeMillis() - deleteStartTime); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java index e49ebe9606e..f404d5d02c1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java @@ -116,10 +116,13 @@ class MutableBucket extends Bucket implements AutoCloseable { Iterator<Map.Entry<Long, RoaringBitmap>> iterator = bitMap.entrySet().iterator(); while (iterator.hasNext()) { - Map.Entry<Long, RoaringBitmap> entry = iterator.next(); - byte[] array = new byte[entry.getValue().serializedSizeInBytes()]; - entry.getValue().serialize(ByteBuffer.wrap(array)); - segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), ByteString.copyFrom(array)); + final var entry = iterator.next(); + final var lId = entry.getKey(); + final var bm = entry.getValue(); + bm.runOptimize(); + final var array = new byte[bm.serializedSizeInBytes()]; + bm.serialize(ByteBuffer.wrap(array)); + segmentMetadataBuilder.putDelayedIndexBitMap(lId, ByteString.copyFrom(array)); iterator.remove(); }