This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 6b59c35e5fa [improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908) 6b59c35e5fa is described below commit 6b59c35e5fac43fa782a082026a7c56bcc453c38 Author: 道君 <dao...@apache.org> AuthorDate: Thu Jun 20 15:18:03 2024 +0800 [improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908) (cherry picked from commit 5b1f653e65ccd967fd9642e6d6959de4b1b01a63) --- distribution/server/src/assemble/LICENSE.bin.txt | 3 +- distribution/shell/src/assemble/LICENSE.bin.txt | 2 + pom.xml | 2 +- pulsar-common/pom.xml | 5 + .../ConcurrentOpenLongPairRangeSet.java | 12 +- .../util/collections/ConcurrentRoaringBitSet.java | 439 +++++++++++++++++++++ 6 files changed, 453 insertions(+), 10 deletions(-) diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 1a66ab6d70a..3b30a40ff83 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -514,8 +514,7 @@ The Apache Software License, Version 2.0 * RxJava - io.reactivex.rxjava3-rxjava-3.0.1.jar * RoaringBitmap - - org.roaringbitmap-RoaringBitmap-0.9.44.jar - - org.roaringbitmap-shims-0.9.44.jar + - org.roaringbitmap-RoaringBitmap-1.1.0.jar * OpenTelemetry - io.opentelemetry-opentelemetry-api-1.38.0.jar - io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt index 758f2240df0..72a8e5863cf 100644 --- a/distribution/shell/src/assemble/LICENSE.bin.txt +++ b/distribution/shell/src/assemble/LICENSE.bin.txt @@ -383,6 +383,8 @@ The Apache Software License, Version 2.0 - simpleclient_tracer_common-0.16.0.jar - simpleclient_tracer_otel-0.16.0.jar - simpleclient_tracer_otel_agent-0.16.0.jar + * RoaringBitmap + - RoaringBitmap-1.1.0.jar * Log4J - log4j-api-2.23.1.jar - log4j-core-2.23.1.jar diff --git a/pom.xml b/pom.xml index fd5cd34bfd0..54ebfb087c9 100644 --- a/pom.xml +++ b/pom.xml @@ -313,7 +313,7 @@ flexible messaging model and an intuitive client API.</description> <j2objc-annotations.version>1.3</j2objc-annotations.version> <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version> <dependency-check-maven.version>9.1.0</dependency-check-maven.version> - <roaringbitmap.version>0.9.44</roaringbitmap.version> + <roaringbitmap.version>1.1.0</roaringbitmap.version> <extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version> <oshi.version>6.4.0</oshi.version> <checkerframework.version>3.33.0</checkerframework.version> diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml index df211d6ac87..95a61c1d187 100644 --- a/pulsar-common/pom.xml +++ b/pulsar-common/pom.xml @@ -244,6 +244,11 @@ <artifactId>awaitility</artifactId> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.roaringbitmap</groupId> + <artifactId>RoaringBitmap</artifactId> + </dependency> </dependencies> <build> diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java index 72215d7296c..b5ad89d1695 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java @@ -29,6 +29,7 @@ import java.util.NavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.mutable.MutableInt; +import org.roaringbitmap.RoaringBitSet; /** * A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of @@ -44,7 +45,7 @@ import org.apache.commons.lang.mutable.MutableInt; public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> { protected final NavigableMap<Long, BitSet> rangeBitSetMap = new ConcurrentSkipListMap<>(); - private boolean threadSafe = true; + private final boolean threadSafe; private final int bitSetSize; private final LongPairConsumer<T> consumer; @@ -95,9 +96,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements // (2) set 0th-index to upper-index in upperRange.getKey() if (isValid(upperKey, upperValue)) { BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet()); - if (rangeBitSet != null) { - rangeBitSet.set(0, (int) upperValue + 1); - } + rangeBitSet.set(0, (int) upperValue + 1); } // No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing // to set @@ -414,7 +413,6 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements } private BitSet createNewBitSet() { - return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize); + return this.threadSafe ? new ConcurrentRoaringBitSet() : new RoaringBitSet(); } - -} +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java new file mode 100644 index 00000000000..814e5840099 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.collections; + +import java.util.BitSet; +import java.util.concurrent.locks.StampedLock; +import java.util.stream.IntStream; +import org.roaringbitmap.RoaringBitSet; + +public class ConcurrentRoaringBitSet extends RoaringBitSet { + private final StampedLock rwLock = new StampedLock(); + + public ConcurrentRoaringBitSet() { + super(); + } + + @Override + public boolean get(int bitIndex) { + long stamp = rwLock.tryOptimisticRead(); + boolean isSet = super.get(bitIndex); + if (!rwLock.validate(stamp)) { + stamp = rwLock.readLock(); + try { + isSet = super.get(bitIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return isSet; + } + + @Override + public void set(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.set(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.clear(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.set(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.clear(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void clear() { + long stamp = rwLock.writeLock(); + try { + super.clear(); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public int nextSetBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int nextSetBit = super.nextSetBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + nextSetBit = super.nextSetBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return nextSetBit; + } + + @Override + public int nextClearBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int nextClearBit = super.nextClearBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + nextClearBit = super.nextClearBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return nextClearBit; + } + + @Override + public int previousSetBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int previousSetBit = super.previousSetBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + previousSetBit = super.previousSetBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return previousSetBit; + } + + @Override + public int previousClearBit(int fromIndex) { + long stamp = rwLock.tryOptimisticRead(); + int previousClearBit = super.previousClearBit(fromIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + previousClearBit = super.previousClearBit(fromIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return previousClearBit; + } + + @Override + public int length() { + long stamp = rwLock.tryOptimisticRead(); + int length = super.length(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + length = super.length(); + } finally { + rwLock.unlockRead(stamp); + } + } + return length; + } + + @Override + public boolean isEmpty() { + long stamp = rwLock.tryOptimisticRead(); + boolean isEmpty = super.isEmpty(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + isEmpty = super.isEmpty(); + } finally { + rwLock.unlockRead(stamp); + } + } + return isEmpty; + } + + @Override + public int cardinality() { + long stamp = rwLock.tryOptimisticRead(); + int cardinality = super.cardinality(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + cardinality = super.cardinality(); + } finally { + rwLock.unlockRead(stamp); + } + } + return cardinality; + } + + @Override + public int size() { + long stamp = rwLock.tryOptimisticRead(); + int size = super.size(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + size = super.size(); + } finally { + rwLock.unlockRead(stamp); + } + } + return size; + } + + @Override + public byte[] toByteArray() { + long stamp = rwLock.tryOptimisticRead(); + byte[] byteArray = super.toByteArray(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + byteArray = super.toByteArray(); + } finally { + rwLock.unlockRead(stamp); + } + } + return byteArray; + } + + @Override + public long[] toLongArray() { + long stamp = rwLock.tryOptimisticRead(); + long[] longArray = super.toLongArray(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + longArray = super.toLongArray(); + } finally { + rwLock.unlockRead(stamp); + } + } + return longArray; + } + + @Override + public void flip(int bitIndex) { + long stamp = rwLock.writeLock(); + try { + super.flip(bitIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void flip(int fromIndex, int toIndex) { + long stamp = rwLock.writeLock(); + try { + super.flip(fromIndex, toIndex); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int bitIndex, boolean value) { + long stamp = rwLock.writeLock(); + try { + super.set(bitIndex, value); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void set(int fromIndex, int toIndex, boolean value) { + long stamp = rwLock.writeLock(); + try { + super.set(fromIndex, toIndex, value); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public BitSet get(int fromIndex, int toIndex) { + long stamp = rwLock.tryOptimisticRead(); + BitSet bitSet = super.get(fromIndex, toIndex); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + bitSet = super.get(fromIndex, toIndex); + } finally { + rwLock.unlockRead(stamp); + } + } + return bitSet; + } + + @Override + public boolean intersects(BitSet set) { + long stamp = rwLock.writeLock(); + try { + return super.intersects(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void and(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.and(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void or(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.or(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void xor(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.xor(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + @Override + public void andNot(BitSet set) { + long stamp = rwLock.writeLock(); + try { + super.andNot(set); + } finally { + rwLock.unlockWrite(stamp); + } + } + + /** + * Returns the clone of the internal wrapped {@code BitSet}. + * This won't be a clone of the {@code ConcurrentBitSet} object. + * + * @return a clone of the internal wrapped {@code BitSet} + */ + @Override + public Object clone() { + long stamp = rwLock.tryOptimisticRead(); + RoaringBitSet clone = (RoaringBitSet) super.clone(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + clone = (RoaringBitSet) super.clone(); + } finally { + rwLock.unlockRead(stamp); + } + } + return clone; + } + + @Override + public String toString() { + long stamp = rwLock.tryOptimisticRead(); + String str = super.toString(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + str = super.toString(); + } finally { + rwLock.unlockRead(stamp); + } + } + return str; + } + + /** + * This operation is not supported on {@code ConcurrentBitSet}. + */ + @Override + public IntStream stream() { + throw new UnsupportedOperationException("stream is not supported"); + } + + public boolean equals(final Object o) { + long stamp = rwLock.tryOptimisticRead(); + boolean isEqual = super.equals(o); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + isEqual = super.equals(o); + } finally { + rwLock.unlockRead(stamp); + } + } + return isEqual; + } + + public int hashCode() { + long stamp = rwLock.tryOptimisticRead(); + int hashCode = super.hashCode(); + if (!rwLock.validate(stamp)) { + // Fallback to read lock + stamp = rwLock.readLock(); + try { + hashCode = super.hashCode(); + } finally { + rwLock.unlockRead(stamp); + } + } + return hashCode; + } +}