This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2da4ee8b54a [revert] "[improve][broker] Optimize 
`ConcurrentOpenLongPairRangeSet` by RoaringBitmap (#22908)" (#22968)
2da4ee8b54a is described below

commit 2da4ee8b54aa1e15d501b57cd4c476186aff92eb
Author: Lari Hotari <lhot...@users.noreply.github.com>
AuthorDate: Tue Jun 25 18:04:13 2024 +0300

    [revert] "[improve][broker] Optimize `ConcurrentOpenLongPairRangeSet` by 
RoaringBitmap (#22908)" (#22968)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +-
 distribution/shell/src/assemble/LICENSE.bin.txt    |   2 -
 pom.xml                                            |   2 +-
 pulsar-common/pom.xml                              |   5 -
 .../ConcurrentOpenLongPairRangeSet.java            |  12 +-
 .../util/collections/ConcurrentRoaringBitSet.java  | 439 ---------------------
 6 files changed, 9 insertions(+), 453 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 24c601b184a..cfbe991a8ed 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -513,7 +513,7 @@ The Apache Software License, Version 2.0
   * RxJava
     - io.reactivex.rxjava3-rxjava-3.0.1.jar
   * RoaringBitmap
-    - org.roaringbitmap-RoaringBitmap-1.1.0.jar
+    - org.roaringbitmap-RoaringBitmap-1.0.6.jar
   * OpenTelemetry
     - io.opentelemetry-opentelemetry-api-1.38.0.jar
     - io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 2971147c2c8..0da56c6afa8 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -382,8 +382,6 @@ The Apache Software License, Version 2.0
     - simpleclient_tracer_common-0.16.0.jar
     - simpleclient_tracer_otel-0.16.0.jar
     - simpleclient_tracer_otel_agent-0.16.0.jar
- * RoaringBitmap
-    - RoaringBitmap-1.1.0.jar
  * Log4J
     - log4j-api-2.23.1.jar
     - log4j-core-2.23.1.jar
diff --git a/pom.xml b/pom.xml
index 1e200d04d68..7c556fa1277 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,7 +317,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <j2objc-annotations.version>1.3</j2objc-annotations.version>
     <lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
     <dependency-check-maven.version>9.1.0</dependency-check-maven.version>
-    <roaringbitmap.version>1.1.0</roaringbitmap.version>
+    <roaringbitmap.version>1.0.6</roaringbitmap.version>
     <extra-enforcer-rules.version>1.6.1</extra-enforcer-rules.version>
     <oshi.version>6.4.0</oshi.version>
     <checkerframework.version>3.33.0</checkerframework.version>
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 3f73a43698e..aa7e4998e5c 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -252,11 +252,6 @@
       <artifactId>awaitility</artifactId>
       <scope>test</scope>
     </dependency>
-
-    <dependency>
-      <groupId>org.roaringbitmap</groupId>
-      <artifactId>RoaringBitmap</artifactId>
-    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index b5ad89d1695..72215d7296c 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,7 +29,6 @@ import java.util.NavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
-import org.roaringbitmap.RoaringBitSet;
 
 /**
  * A Concurrent set comprising zero or more ranges of type {@link LongPair}. 
This can be alternative of
@@ -45,7 +44,7 @@ import org.roaringbitmap.RoaringBitSet;
 public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> 
implements LongPairRangeSet<T> {
 
     protected final NavigableMap<Long, BitSet> rangeBitSetMap = new 
ConcurrentSkipListMap<>();
-    private final boolean threadSafe;
+    private boolean threadSafe = true;
     private final int bitSetSize;
     private final LongPairConsumer<T> consumer;
 
@@ -96,7 +95,9 @@ public class ConcurrentOpenLongPairRangeSet<T extends 
Comparable<T>> implements
             // (2) set 0th-index to upper-index in upperRange.getKey()
             if (isValid(upperKey, upperValue)) {
                 BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, 
(key) -> createNewBitSet());
-                rangeBitSet.set(0, (int) upperValue + 1);
+                if (rangeBitSet != null) {
+                    rangeBitSet.set(0, (int) upperValue + 1);
+                }
             }
             // No-op if values are not valid eg: if lower == LongPair.earliest 
or upper == LongPair.latest then nothing
             // to set
@@ -413,6 +414,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends 
Comparable<T>> implements
     }
 
     private BitSet createNewBitSet() {
-        return this.threadSafe ? new ConcurrentRoaringBitSet() : new 
RoaringBitSet();
+        return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new 
BitSet(bitSetSize);
     }
-}
\ No newline at end of file
+
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
deleted file mode 100644
index 814e5840099..00000000000
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import java.util.BitSet;
-import java.util.concurrent.locks.StampedLock;
-import java.util.stream.IntStream;
-import org.roaringbitmap.RoaringBitSet;
-
-public class ConcurrentRoaringBitSet extends RoaringBitSet {
-    private final StampedLock rwLock = new StampedLock();
-
-    public ConcurrentRoaringBitSet() {
-        super();
-    }
-
-    @Override
-    public boolean get(int bitIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        boolean isSet = super.get(bitIndex);
-        if (!rwLock.validate(stamp)) {
-            stamp = rwLock.readLock();
-            try {
-                isSet = super.get(bitIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return isSet;
-    }
-
-    @Override
-    public void set(int bitIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.set(bitIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void clear(int bitIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.clear(bitIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void set(int fromIndex, int toIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.set(fromIndex, toIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void clear(int fromIndex, int toIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.clear(fromIndex, toIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void clear() {
-        long stamp = rwLock.writeLock();
-        try {
-            super.clear();
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public int nextSetBit(int fromIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        int nextSetBit = super.nextSetBit(fromIndex);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                nextSetBit = super.nextSetBit(fromIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return nextSetBit;
-    }
-
-    @Override
-    public int nextClearBit(int fromIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        int nextClearBit = super.nextClearBit(fromIndex);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                nextClearBit = super.nextClearBit(fromIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return nextClearBit;
-    }
-
-    @Override
-    public int previousSetBit(int fromIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        int previousSetBit = super.previousSetBit(fromIndex);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                previousSetBit = super.previousSetBit(fromIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return previousSetBit;
-    }
-
-    @Override
-    public int previousClearBit(int fromIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        int previousClearBit = super.previousClearBit(fromIndex);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                previousClearBit = super.previousClearBit(fromIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return previousClearBit;
-    }
-
-    @Override
-    public int length() {
-        long stamp = rwLock.tryOptimisticRead();
-        int length = super.length();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                length = super.length();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return length;
-    }
-
-    @Override
-    public boolean isEmpty() {
-        long stamp = rwLock.tryOptimisticRead();
-        boolean isEmpty = super.isEmpty();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                isEmpty = super.isEmpty();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return isEmpty;
-    }
-
-    @Override
-    public int cardinality() {
-        long stamp = rwLock.tryOptimisticRead();
-        int cardinality = super.cardinality();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                cardinality = super.cardinality();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return cardinality;
-    }
-
-    @Override
-    public int size() {
-        long stamp = rwLock.tryOptimisticRead();
-        int size = super.size();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                size = super.size();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return size;
-    }
-
-    @Override
-    public byte[] toByteArray() {
-        long stamp = rwLock.tryOptimisticRead();
-        byte[] byteArray = super.toByteArray();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                byteArray = super.toByteArray();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return byteArray;
-    }
-
-    @Override
-    public long[] toLongArray() {
-        long stamp = rwLock.tryOptimisticRead();
-        long[] longArray = super.toLongArray();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                longArray = super.toLongArray();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return longArray;
-    }
-
-    @Override
-    public void flip(int bitIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.flip(bitIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void flip(int fromIndex, int toIndex) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.flip(fromIndex, toIndex);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void set(int bitIndex, boolean value) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.set(bitIndex, value);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void set(int fromIndex, int toIndex, boolean value) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.set(fromIndex, toIndex, value);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public BitSet get(int fromIndex, int toIndex) {
-        long stamp = rwLock.tryOptimisticRead();
-        BitSet bitSet = super.get(fromIndex, toIndex);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                bitSet = super.get(fromIndex, toIndex);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return bitSet;
-    }
-
-    @Override
-    public boolean intersects(BitSet set) {
-        long stamp = rwLock.writeLock();
-        try {
-            return super.intersects(set);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void and(BitSet set) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.and(set);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void or(BitSet set) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.or(set);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void xor(BitSet set) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.xor(set);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    @Override
-    public void andNot(BitSet set) {
-        long stamp = rwLock.writeLock();
-        try {
-            super.andNot(set);
-        } finally {
-            rwLock.unlockWrite(stamp);
-        }
-    }
-
-    /**
-     * Returns the clone of the internal wrapped {@code BitSet}.
-     * This won't be a clone of the {@code ConcurrentBitSet} object.
-     *
-     * @return a clone of the internal wrapped {@code BitSet}
-     */
-    @Override
-    public Object clone() {
-        long stamp = rwLock.tryOptimisticRead();
-        RoaringBitSet clone = (RoaringBitSet) super.clone();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                clone = (RoaringBitSet) super.clone();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return clone;
-    }
-
-    @Override
-    public String toString() {
-        long stamp = rwLock.tryOptimisticRead();
-        String str = super.toString();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                str = super.toString();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return str;
-    }
-
-    /**
-     * This operation is not supported on {@code ConcurrentBitSet}.
-     */
-    @Override
-    public IntStream stream() {
-        throw new UnsupportedOperationException("stream is not supported");
-    }
-
-    public boolean equals(final Object o) {
-        long stamp = rwLock.tryOptimisticRead();
-        boolean isEqual = super.equals(o);
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                isEqual = super.equals(o);
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return isEqual;
-    }
-
-    public int hashCode() {
-        long stamp = rwLock.tryOptimisticRead();
-        int hashCode = super.hashCode();
-        if (!rwLock.validate(stamp)) {
-            // Fallback to read lock
-            stamp = rwLock.readLock();
-            try {
-                hashCode = super.hashCode();
-            } finally {
-                rwLock.unlockRead(stamp);
-            }
-        }
-        return hashCode;
-    }
-}

Reply via email to