This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dc0b0ea38a [improve][broker] Optimize delayed metadata index bitmap 
(#20136)
6dc0b0ea38a is described below

commit 6dc0b0ea38ab9dc410a24b19fd7567b9e013837d
Author: Qiang Zhao <mattisonc...@apache.org>
AuthorDate: Thu Apr 20 08:21:08 2023 +0800

    [improve][broker] Optimize delayed metadata index bitmap (#20136)
---
 .../bucket/BucketDelayedDeliveryTracker.java       |  3 ++
 .../broker/delayed/bucket/ImmutableBucket.java     | 53 +++++++++++++---------
 .../broker/delayed/bucket/MutableBucket.java       | 11 +++--
 3 files changed, 42 insertions(+), 25 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index ad2fc6fae4c..5d1dc2e2703 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -485,6 +485,9 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
                                     });
                                 });
                             }
+
+                            // optimize bm
+                            
delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
                             
immutableBucketDelayedIndexPair.getLeft().setDelayedIndexBitMap(delayedIndexBitMap);
 
                             
afterCreateImmutableBucket(immutableBucketDelayedIndexPair, createStartTime);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
index 82e98cefa5d..57de5c84fcd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java
@@ -20,11 +20,10 @@ package org.apache.pulsar.broker.delayed.bucket;
 
 import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
 import static 
org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.NULL_LONG_PROMISE;
-import com.google.protobuf.ByteString;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -37,8 +36,8 @@ import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotF
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex;
 import 
org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.roaringbitmap.InvalidRoaringFormat;
 import org.roaringbitmap.RoaringBitmap;
-import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
 
 @Slf4j
 class ImmutableBucket extends Bucket {
@@ -98,7 +97,7 @@ class ImmutableBucket extends Bucket {
                         this.setLastSegmentEntryId(metadataList.size());
                         
this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList);
                         List<Long> firstScheduleTimestamps = 
metadataList.stream().map(
-                                        
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
+                                
SnapshotSegmentMetadata::getMinScheduleTimestamp).toList();
                         
this.setFirstScheduleTimestamps(firstScheduleTimestamps);
 
                         return nextSnapshotEntryIndex + 1;
@@ -139,25 +138,37 @@ class ImmutableBucket extends Bucket {
         });
     }
 
+    /**
+     * Recover delayed index bit map and message numbers.
+     * @throws InvalidRoaringFormat invalid bitmap serialization format
+     */
     private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex,
-                                                    
List<SnapshotSegmentMetadata> segmentMetadata) {
-        this.delayedIndexBitMap.clear();
-        MutableLong numberMessages = new MutableLong(0);
-        for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) {
-            Map<Long, ByteString> bitByteStringMap = 
segmentMetadata.get(i).getDelayedIndexBitMapMap();
-            bitByteStringMap.forEach((leaderId, bitSetString) -> {
-                boolean exist = this.delayedIndexBitMap.containsKey(leaderId);
-                RoaringBitmap bitSet =
-                        new 
ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap();
-                numberMessages.add(bitSet.getCardinality());
-                if (!exist) {
-                    this.delayedIndexBitMap.put(leaderId, bitSet);
-                } else {
-                    this.delayedIndexBitMap.get(leaderId).or(bitSet);
+                                                    
List<SnapshotSegmentMetadata> segmentMetaList) {
+        delayedIndexBitMap.clear(); // cleanup dirty bm
+        final var numberMessages = new MutableLong(0);
+        for (int i = startSnapshotIndex; i < segmentMetaList.size(); i++) {
+            for (final var entry : 
segmentMetaList.get(i).getDelayedIndexBitMapMap().entrySet()) {
+                final var ledgerId = entry.getKey();
+                final var bs = entry.getValue();
+                final var sbm = new RoaringBitmap();
+                try {
+                    sbm.deserialize(bs.asReadOnlyByteBuffer());
+                } catch (IOException e) {
+                    throw new InvalidRoaringFormat(e.getMessage());
                 }
-            });
+                numberMessages.add(sbm.getCardinality());
+                delayedIndexBitMap.compute(ledgerId, (lId, bm) -> {
+                    if (bm == null) {
+                        return sbm;
+                    }
+                    bm.or(sbm);
+                    return bm;
+                });
+            }
         }
-        this.setNumberBucketDelayedMessages(numberMessages.getValue());
+        // optimize bm
+        delayedIndexBitMap.values().forEach(RoaringBitmap::runOptimize);
+        setNumberBucketDelayedMessages(numberMessages.getValue());
     }
 
     
CompletableFuture<List<DelayedMessageIndexBucketSnapshotFormat.SnapshotSegment>>
 getRemainSnapshotSegment() {
@@ -193,7 +204,7 @@ class ImmutableBucket extends Bucket {
                         
stats.recordFailEvent(BucketDelayedMessageIndexStats.Type.delete);
                     } else {
                         log.info("[{}] Delete bucket snapshot finish, 
bucketId: {}, bucketKey: {}",
-                                 dispatcherName, bucketId, bucketKey);
+                                dispatcherName, bucketId, bucketKey);
 
                         
stats.recordSuccessEvent(BucketDelayedMessageIndexStats.Type.delete,
                                 System.currentTimeMillis() - deleteStartTime);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
index e49ebe9606e..f404d5d02c1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/MutableBucket.java
@@ -116,10 +116,13 @@ class MutableBucket extends Bucket implements 
AutoCloseable {
 
                 Iterator<Map.Entry<Long, RoaringBitmap>> iterator = 
bitMap.entrySet().iterator();
                 while (iterator.hasNext()) {
-                    Map.Entry<Long, RoaringBitmap> entry = iterator.next();
-                    byte[] array = new 
byte[entry.getValue().serializedSizeInBytes()];
-                    entry.getValue().serialize(ByteBuffer.wrap(array));
-                    
segmentMetadataBuilder.putDelayedIndexBitMap(entry.getKey(), 
ByteString.copyFrom(array));
+                    final var entry = iterator.next();
+                    final var lId = entry.getKey();
+                    final var bm = entry.getValue();
+                    bm.runOptimize();
+                    final var array = new byte[bm.serializedSizeInBytes()];
+                    bm.serialize(ByteBuffer.wrap(array));
+                    segmentMetadataBuilder.putDelayedIndexBitMap(lId, 
ByteString.copyFrom(array));
                     iterator.remove();
                 }
 

Reply via email to