This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 6b59c35e5fa [improve][broker] Optimize 
`ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908)
6b59c35e5fa is described below

commit 6b59c35e5fac43fa782a082026a7c56bcc453c38
Author: 道君 <dao...@apache.org>
AuthorDate: Thu Jun 20 15:18:03 2024 +0800

    [improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by 
RoaringBitmap (#22908)
    
    (cherry picked from commit 5b1f653e65ccd967fd9642e6d6959de4b1b01a63)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   3 +-
 distribution/shell/src/assemble/LICENSE.bin.txt    |   2 +
 pom.xml                                            |   2 +-
 pulsar-common/pom.xml                              |   5 +
 .../ConcurrentOpenLongPairRangeSet.java            |  12 +-
 .../util/collections/ConcurrentRoaringBitSet.java  | 439 +++++++++++++++++++++
 6 files changed, 453 insertions(+), 10 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 1a66ab6d70a..3b30a40ff83 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -514,8 +514,7 @@ The Apache Software License, Version 2.0
   * RxJava
     - io.reactivex.rxjava3-rxjava-3.0.1.jar
   * RoaringBitmap
-    - org.roaringbitmap-RoaringBitmap-0.9.44.jar
-    - org.roaringbitmap-shims-0.9.44.jar
+    - org.roaringbitmap-RoaringBitmap-1.1.0.jar
   * OpenTelemetry
     - io.opentelemetry-opentelemetry-api-1.38.0.jar
     - io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 758f2240df0..72a8e5863cf 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -383,6 +383,8 @@ The Apache Software License, Version 2.0
     - simpleclient_tracer_common-0.16.0.jar
     - simpleclient_tracer_otel-0.16.0.jar
     - simpleclient_tracer_otel_agent-0.16.0.jar
+ * RoaringBitmap
+    - RoaringBitmap-1.1.0.jar
  * Log4J
     - log4j-api-2.23.1.jar
     - log4j-core-2.23.1.jar
diff --git a/pom.xml b/pom.xml
index fd5cd34bfd0..54ebfb087c9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -313,7 +313,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <j2objc-annotations.version>1.3</j2objc-annotations.version>
     <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
     <dependency-check-maven.version>9.1.0</dependency-check-maven.version>
-    <roaringbitmap.version>0.9.44</roaringbitmap.version>
+    <roaringbitmap.version>1.1.0</roaringbitmap.version>
     <extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
     <oshi.version>6.4.0</oshi.version>
     <checkerframework.version>3.33.0</checkerframework.version>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index df211d6ac87..95a61c1d187 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -244,6 +244,11 @@
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 72215d7296c..b5ad89d1695 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,6 +29,7 @@ import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
+import org.roaringbitmap.RoaringBitSet;
 
 /**
  * A Concurrent set comprising zero or more ranges of type {@link LongPair}. 
This can be alternative of
@@ -44,7 +45,7 @@ import org.apache.commons.lang.mutable.MutableInt;
 public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> 
implements LongPairRangeSet<T> {
 
     protected final NavigableMap<Long, BitSet> rangeBitSetMap = new 
ConcurrentSkipListMap<>();
-    private boolean threadSafe = true;
+    private final boolean threadSafe;
     private final int bitSetSize;
     private final LongPairConsumer<T> consumer;
 
@@ -95,9 +96,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends 
Comparable<T>> implements
             // (2) set 0th-index to upper-index in upperRange.getKey()
             if (isValid(upperKey, upperValue)) {
                 BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, 
(key) -> createNewBitSet());
-                if (rangeBitSet != null) {
-                    rangeBitSet.set(0, (int) upperValue + 1);
-                }
+                rangeBitSet.set(0, (int) upperValue + 1);
             }
             // No-op if values are not valid eg: if lower == LongPair.earliest 
or upper == LongPair.latest then nothing
             // to set
@@ -414,7 +413,6 @@ public class ConcurrentOpenLongPairRangeSet<T extends 
Comparable<T>> implements
     }
 
     private BitSet createNewBitSet() {
-        return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new 
BitSet(bitSetSize);
+        return this.threadSafe ? new ConcurrentRoaringBitSet() : new 
RoaringBitSet();
     }
-
-}
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
new file mode 100644
index 00000000000..814e5840099
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.BitSet;
+import java.util.concurrent.locks.StampedLock;
+import java.util.stream.IntStream;
+import org.roaringbitmap.RoaringBitSet;
+
+public class ConcurrentRoaringBitSet extends RoaringBitSet {
+    private final StampedLock rwLock = new StampedLock();
+
+    public ConcurrentRoaringBitSet() {
+        super();
+    }
+
+    @Override
+    public boolean get(int bitIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isSet = super.get(bitIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                isSet = super.get(bitIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isSet;
+    }
+
+    @Override
+    public void set(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void clear() {
+        long stamp = rwLock.writeLock();
+        try {
+            super.clear();
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public int nextSetBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int nextSetBit = super.nextSetBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                nextSetBit = super.nextSetBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return nextSetBit;
+    }
+
+    @Override
+    public int nextClearBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int nextClearBit = super.nextClearBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                nextClearBit = super.nextClearBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return nextClearBit;
+    }
+
+    @Override
+    public int previousSetBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int previousSetBit = super.previousSetBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                previousSetBit = super.previousSetBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return previousSetBit;
+    }
+
+    @Override
+    public int previousClearBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int previousClearBit = super.previousClearBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                previousClearBit = super.previousClearBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return previousClearBit;
+    }
+
+    @Override
+    public int length() {
+        long stamp = rwLock.tryOptimisticRead();
+        int length = super.length();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                length = super.length();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return length;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isEmpty = super.isEmpty();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                isEmpty = super.isEmpty();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isEmpty;
+    }
+
+    @Override
+    public int cardinality() {
+        long stamp = rwLock.tryOptimisticRead();
+        int cardinality = super.cardinality();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                cardinality = super.cardinality();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return cardinality;
+    }
+
+    @Override
+    public int size() {
+        long stamp = rwLock.tryOptimisticRead();
+        int size = super.size();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                size = super.size();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return size;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        byte[] byteArray = super.toByteArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                byteArray = super.toByteArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return byteArray;
+    }
+
+    @Override
+    public long[] toLongArray() {
+        long stamp = rwLock.tryOptimisticRead();
+        long[] longArray = super.toLongArray();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                longArray = super.toLongArray();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return longArray;
+    }
+
+    @Override
+    public void flip(int bitIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(bitIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void flip(int fromIndex, int toIndex) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.flip(fromIndex, toIndex);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int bitIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(bitIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex, boolean value) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.set(fromIndex, toIndex, value);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public BitSet get(int fromIndex, int toIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        BitSet bitSet = super.get(fromIndex, toIndex);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                bitSet = super.get(fromIndex, toIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bitSet;
+    }
+
+    @Override
+    public boolean intersects(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            return super.intersects(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void and(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.and(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void or(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.or(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void xor(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.xor(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    @Override
+    public void andNot(BitSet set) {
+        long stamp = rwLock.writeLock();
+        try {
+            super.andNot(set);
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
+    }
+
+    /**
+     * Returns the clone of the internal wrapped {@code BitSet}.
+     * This won't be a clone of the {@code ConcurrentBitSet} object.
+     *
+     * @return a clone of the internal wrapped {@code BitSet}
+     */
+    @Override
+    public Object clone() {
+        long stamp = rwLock.tryOptimisticRead();
+        RoaringBitSet clone = (RoaringBitSet) super.clone();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                clone = (RoaringBitSet) super.clone();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return clone;
+    }
+
+    @Override
+    public String toString() {
+        long stamp = rwLock.tryOptimisticRead();
+        String str = super.toString();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                str = super.toString();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return str;
+    }
+
+    /**
+     * This operation is not supported on {@code ConcurrentBitSet}.
+     */
+    @Override
+    public IntStream stream() {
+        throw new UnsupportedOperationException("stream is not supported");
+    }
+
+    public boolean equals(final Object o) {
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isEqual = super.equals(o);
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                isEqual = super.equals(o);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isEqual;
+    }
+
+    public int hashCode() {
+        long stamp = rwLock.tryOptimisticRead();
+        int hashCode = super.hashCode();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                hashCode = super.hashCode();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return hashCode;
+    }
+}

Reply via email to