This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0117c66f951 [fix][broker] ConcurrentLongHashMap throw
ArrayIndexOutOfBoundsException (#25644)
0117c66f951 is described below
commit 0117c66f9510f516358c40ca73febbfbd4d4d15f
Author: void-ptr974 <[email protected]>
AuthorDate: Sat May 2 22:38:33 2026 +0800
[fix][broker] ConcurrentLongHashMap throw ArrayIndexOutOfBoundsException
(#25644)
---
.../ConcurrentLongHashMapBenchmark.java | 411 +++++++++++++++++++++
.../common/util/collections/package-info.java | 20 +
.../util/collections/ConcurrentLongHashMap.java | 99 ++---
.../collections/ConcurrentLongHashMapTest.java | 389 +++++++++++++++++--
4 files changed, 842 insertions(+), 77 deletions(-)
diff --git
a/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
new file mode 100644
index 00000000000..e11f12f914c
--- /dev/null
+++
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapBenchmark.java
@@ -0,0 +1,411 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util.collections;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * Benchmarks for {@link ConcurrentLongHashMap}.
+ *
+ * <p>Compares two implementations:
+ * <ul>
+ * <li>{@code clhm} – {@link ConcurrentLongHashMap} (immutable Table
snapshot, primitive long
+ * keys, zero allocation on the key path),</li>
+ * <li>{@code chm} – {@link java.util.concurrent.ConcurrentHashMap} as the
JDK baseline.</li>
+ * </ul>
+ *
+ * <p>Workload mix:
+ * <ul>
+ * <li>{@link #getHit}/{@link #getMiss}/{@link #putRemove} – single-thread
basics.</li>
+ * <li>{@link #concurrentGetHit} – read-only, 16 threads.</li>
+ * <li>{@link #concurrentMixedReader}/{@link #concurrentMixedWriter} – an
asymmetric concurrent
+ * group: 12 readers + 4 writers operating on disjoint key partitions,
so any concurrency
+ * bug (torn rehash, lost-update, partial-publish) shows up either as a
JMH error or a
+ * reduced ops/sec number on the suspect implementation.</li>
+ * <li>{@link #concurrentExpandShrinkWriter}/{@link
#concurrentExpandShrinkReader} – starts the
+ * map at the smallest legal capacity and hammers a single section with
put/remove so that
+ * every writer constantly forces a rehash. Highest-pressure
rehash-vs-read benchmark; this
+ * is the workload that originally surfaced the OOB race in the
pre-Table design.</li>
+ * <li>{@link #boxingGetHit}/{@link #boxingPutGetRemove}/{@link
#boxingConcurrentGetHit}/
+ * {@link #boxingConcurrentPutRemove} – use keys above the {@code
Long.valueOf} cache range
+ * so every CHM operation has to allocate a fresh boxed Long. Run with
{@code -prof gc} to
+ * see the alloc-rate divergence vs the primitive-long map.</li>
+ * </ul>
+ *
+ * <p>Run from the repo root:
+ * <pre>{@code
+ * ./gradlew :microbench:shadowJar
+ * java -jar microbench/build/libs/microbench-*-benchmarks.jar \
+ * "ConcurrentLongHashMapBenchmark.concurrentExpandShrink" -prof gc
+ * }</pre>
+ */
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@BenchmarkMode(Mode.AverageTime)
+@Fork(1)
+@Warmup(iterations = 2, time = 5, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
+public class ConcurrentLongHashMapBenchmark {
+
+ /**
+ * Shared benchmark state for the steady-state benchmarks (the map is
fully populated up
+ * front and the workload only mutates keys outside the resident set).
+ */
+ @State(Scope.Benchmark)
+ public static class MapState {
+ @Param({"clhm", "chm"})
+ private String implementation;
+
+ @Param({"1024", "65536"})
+ private int entries;
+
+ private long[] presentKeys;
+ private long[] absentKeys;
+ private ConcurrentLongHashMap<String> clhm;
+ private ConcurrentHashMap<Long, String> chm;
+ private AtomicLong writeKey;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ presentKeys = new long[entries];
+ absentKeys = new long[entries];
+ clhm = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(entries)
+ .concurrencyLevel(16)
+ .build();
+ chm = new ConcurrentHashMap<>(entries, 0.66f, 16);
+
+ for (int i = 0; i < entries; i++) {
+ long key = i;
+ presentKeys[i] = key;
+ absentKeys[i] = key ^ Long.MIN_VALUE;
+ clhm.put(key, "value");
+ chm.put(key, "value");
+ }
+
+ writeKey = new AtomicLong(1L << 48);
+ }
+
+ String get(long key) {
+ return "clhm".equals(implementation) ? clhm.get(key) :
chm.get(key);
+ }
+
+ void put(long key, String value) {
+ if ("clhm".equals(implementation)) {
+ clhm.put(key, value);
+ } else {
+ chm.put(key, value);
+ }
+ }
+
+ void remove(long key) {
+ if ("clhm".equals(implementation)) {
+ clhm.remove(key);
+ } else {
+ chm.remove(key);
+ }
+ }
+
+ long nextWriteKey() {
+ return writeKey.getAndIncrement();
+ }
+ }
+
+ /**
+ * Per-thread cursor state.
+ */
+ @State(Scope.Thread)
+ public static class CursorState {
+ private int index;
+
+ int next(int length) {
+ int value = index;
+ index = value + 1;
+ return value & (length - 1);
+ }
+ }
+
+ /**
+ * Independent key-stream state for concurrent benchmarks. Each writer
thread owns a unique
+ * partition of the long key-space so the mutations don't trample each
other and the steady
+ * state remains bounded; readers also walk a private cursor so they don't
hot-spot a single
+ * bucket.
+ */
+ @State(Scope.Thread)
+ public static class WriterState {
+ private static final AtomicLong NEXT_PARTITION = new AtomicLong();
+ private long base;
+ private long offset;
+ // Keep a small per-writer working set so put + remove pair cleanly
without unbounded growth.
+ private static final int WORKING_SET = 1024;
+
+ @Setup(Level.Iteration)
+ public void setup() {
+ base = NEXT_PARTITION.getAndIncrement() << 40;
+ offset = 0;
+ }
+
+ long nextKey() {
+ long k = base + (offset & (WORKING_SET - 1));
+ offset++;
+ return k;
+ }
+ }
+
+ @Benchmark
+ public void getHit(MapState map, CursorState cursor, Blackhole blackhole) {
+
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+ }
+
+ @Benchmark
+ public void getMiss(MapState map, CursorState cursor, Blackhole blackhole)
{
+
blackhole.consume(map.get(map.absentKeys[cursor.next(map.absentKeys.length)]));
+ }
+
+ @Benchmark
+ public void putRemove(MapState map, Blackhole blackhole) {
+ long key = map.nextWriteKey();
+ map.put(key, "value");
+ blackhole.consume(map.get(key));
+ map.remove(key);
+ }
+
+ @Benchmark
+ @Threads(16)
+ public void concurrentGetHit(MapState map, CursorState cursor, Blackhole
blackhole) {
+
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+ }
+
+ /** Reader half of the asymmetric mixed-workload group: 12 reader threads.
*/
+ @Benchmark
+ @Group("concurrentMixed")
+ @GroupThreads(12)
+ public void concurrentMixedReader(MapState map, CursorState cursor,
Blackhole blackhole) {
+
blackhole.consume(map.get(map.presentKeys[cursor.next(map.presentKeys.length)]));
+ }
+
+ /** Writer half of the asymmetric mixed-workload group: 4 writer threads.
*/
+ @Benchmark
+ @Group("concurrentMixed")
+ @GroupThreads(4)
+ public void concurrentMixedWriter(MapState map, WriterState w, Blackhole
blackhole) {
+ long key = w.nextKey();
+ map.put(key, "value");
+ blackhole.consume(map.get(key));
+ map.remove(key);
+ }
+
+ /**
+ * Holds an aggressively-shrinking, single-section map. Each writer
thread's put/remove pair
+ * crosses the expand and shrink thresholds, so the rehash code path is
exercised on nearly
+ * every operation. Reader threads chase the writers to surface
read-vs-rehash races. This is
+ * the workload that originally surfaced the OOB race in the pre-Table
design.
+ */
+ @State(Scope.Benchmark)
+ public static class ChurningMapState {
+ @Param({"clhm", "chm"})
+ private String implementation;
+
+ private ConcurrentLongHashMap<String> clhm;
+ private ConcurrentHashMap<Long, String> chm;
+
+ @Setup(Level.Iteration)
+ public void setup() {
+ clhm = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ chm = new ConcurrentHashMap<>(4, 0.66f, 1);
+ }
+
+ String get(long key) {
+ return "clhm".equals(implementation) ? clhm.get(key) :
chm.get(key);
+ }
+
+ String put(long key, String value) {
+ return "clhm".equals(implementation) ? clhm.put(key, value) :
chm.put(key, value);
+ }
+
+ String remove(long key) {
+ return "clhm".equals(implementation) ? clhm.remove(key) :
chm.remove(key);
+ }
+ }
+
+ /** Writer driving constant expand+shrink on a single section. */
+ @Benchmark
+ @Group("concurrentExpandShrink")
+ @GroupThreads(4)
+ public void concurrentExpandShrinkWriter(ChurningMapState map, WriterState
w, Blackhole bh) {
+ long k1 = w.nextKey();
+ long k2 = w.nextKey();
+ bh.consume(map.put(k1, "v"));
+ bh.consume(map.put(k2, "v"));
+ bh.consume(map.remove(k1));
+ bh.consume(map.remove(k2));
+ }
+
+ /** Reader chasing the writers; reads must not throw or return torn
values. */
+ @Benchmark
+ @Group("concurrentExpandShrink")
+ @GroupThreads(4)
+ public void concurrentExpandShrinkReader(ChurningMapState map, WriterState
w, Blackhole bh) {
+ bh.consume(map.get(w.nextKey()));
+ }
+
+ //
------------------------------------------------------------------------------------------
+ // Boxing-impact workload
+ //
+ // Pulsar's actual usage of ConcurrentLongHashMap stores object values
(CompletableFuture,
+ // Producer, Consumer, ...) and the choice to keep a primitive-long map
instead of switching
+ // to ConcurrentHashMap<Long, V> hinges on whether the long->Long autobox
on every operation
+ // materially hurts throughput and GC pressure in practice.
+ //
+ // To make that visible to JMH we have to defeat the JDK's Long.valueOf
cache (which short-
+ // circuits values in [-128, 127] to a shared instance). Keys here are
seeded above the cache
+ // range and monotonically increase, so every CHM operation has to
allocate a fresh boxed
+ // Long; the primitive-long map sees zero allocation on the key path. Run
with `-prof gc` to
+ // see the alloc-rate divergence on top of the throughput numbers.
+ //
------------------------------------------------------------------------------------------
+
+ @State(Scope.Benchmark)
+ public static class BoxingMapState {
+ @Param({"clhm", "chm"})
+ private String implementation;
+
+ @Param({"1024", "65536"})
+ private int entries;
+
+ // Lock granularity. Override with `-p concurrency=...` to match CHM's
bucket-level
+ // striping (default JDK CHM uses one synchronized monitor per bucket,
so concurrency=1024
+ // on a 1024-bucket map effectively gives the primitive map a
comparable lock count).
+ @Param({"16"})
+ private int concurrency;
+
+ private long[] presentKeys;
+ private ConcurrentLongHashMap<String> clhm;
+ private ConcurrentHashMap<Long, String> chm;
+
+ @Setup(Level.Trial)
+ public void setup() {
+ presentKeys = new long[entries];
+ int cl = Math.min(concurrency, entries); // builder requires
expectedItems >= concurrencyLevel
+ clhm = ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(entries).concurrencyLevel(cl).build();
+ chm = new ConcurrentHashMap<>(entries, 0.66f, cl);
+
+ // Start the key space well above 127 so Long.valueOf cannot serve
from its cache.
+ // Use an odd stride so consecutive keys land in different
sections / cache lines.
+ final long base = 1L << 32;
+ for (int i = 0; i < entries; i++) {
+ long key = base + ((long) i) * 31L;
+ presentKeys[i] = key;
+ clhm.put(key, "value");
+ chm.put(key, "value");
+ }
+ }
+
+ String get(long key) {
+ return "clhm".equals(implementation) ? clhm.get(key) :
chm.get(key);
+ }
+
+ String put(long key, String value) {
+ return "clhm".equals(implementation) ? clhm.put(key, value) :
chm.put(key, value);
+ }
+
+ String remove(long key) {
+ return "clhm".equals(implementation) ? clhm.remove(key) :
chm.remove(key);
+ }
+ }
+
+ @State(Scope.Thread)
+ public static class BoxingCursor {
+ private int index;
+
+ int next(int length) {
+ int v = index;
+ index = v + 1;
+ return v & (length - 1);
+ }
+ }
+
+ @Benchmark
+ public void boxingGetHit(BoxingMapState map, BoxingCursor cur, Blackhole
bh) {
+ bh.consume(map.get(map.presentKeys[cur.next(map.presentKeys.length)]));
+ }
+
+ @Benchmark
+ @Threads(16)
+ public void boxingConcurrentGetHit(BoxingMapState map, BoxingCursor cur,
Blackhole bh) {
+ bh.consume(map.get(map.presentKeys[cur.next(map.presentKeys.length)]));
+ }
+
+ @Benchmark
+ public void boxingPutGetRemove(BoxingMapState map, BoxingCursor cur,
Blackhole bh) {
+ long key = map.presentKeys[cur.next(map.presentKeys.length)];
+ bh.consume(map.put(key, "value"));
+ bh.consume(map.get(key));
+ bh.consume(map.remove(key));
+ bh.consume(map.put(key, "value"));
+ }
+
+ @State(Scope.Thread)
+ public static class BoxingWriterState {
+ private static final AtomicLong NEXT_PARTITION = new AtomicLong();
+ private long base;
+ private long offset;
+
+ @Setup(Level.Iteration)
+ public void setup() {
+ base = (1L << 40) | (NEXT_PARTITION.getAndIncrement() << 32);
+ offset = 0;
+ }
+
+ long nextKey() {
+ return base + (offset++) * 31L;
+ }
+ }
+
+ @Benchmark
+ @Threads(16)
+ public void boxingConcurrentPutRemove(BoxingMapState map,
BoxingWriterState w, Blackhole bh) {
+ long key = w.nextKey();
+ bh.consume(map.put(key, "value"));
+ bh.consume(map.remove(key));
+ }
+}
diff --git
a/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
new file mode 100644
index 00000000000..9b2a21422d6
--- /dev/null
+++
b/microbench/src/main/java/org/apache/pulsar/common/util/collections/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util.collections;
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index cdeff550618..00a3565e5e3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.common.util.collections;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import com.google.common.collect.Lists;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -187,7 +188,7 @@ public class ConcurrentLongHashMap<V> {
public long capacity() {
long capacity = 0;
for (Section<V> s : sections) {
- capacity += s.capacity;
+ capacity += s.table.capacity();
}
return capacity;
}
@@ -288,13 +289,18 @@ public class ConcurrentLongHashMap<V> {
void accept(long key, V value);
}
- // A section is a portion of the hash map that is covered by a single
+ // A section is a portion of the hash map that is covered by a single
lock. The keys, values
+ // and capacity arrays are bundled into an immutable Table snapshot so
that readers always see
+ // a consistent (key, value, length) triple, eliminating the
partial-publish race that the
+ // previous design had to paper over with Math.min(keys.length,
values.length).
@SuppressWarnings("serial")
private static final class Section<V> extends StampedLock {
- private volatile long[] keys;
- private volatile V[] values;
+ private record Table<V>(long[] keys, V[] values, int capacity) { }
+
+ // Section is Serializable only by inheritance from StampedLock; never
actually serialized.
+ @SuppressFBWarnings("SE_BAD_FIELD")
+ private volatile Table<V> table;
- private volatile int capacity;
private final int initCapacity;
@SuppressWarnings("rawtypes") // AtomicIntegerFieldUpdater requires
raw type for class parameter
private static final AtomicIntegerFieldUpdater<Section> SIZE_UPDATER =
@@ -312,10 +318,9 @@ public class ConcurrentLongHashMap<V> {
Section(int capacity, float mapFillFactor, float mapIdleFactor,
boolean autoShrink,
float expandFactor, float shrinkFactor) {
- this.capacity = alignToPowerOfTwo(capacity);
- this.initCapacity = this.capacity;
- this.keys = new long[this.capacity];
- this.values = (V[]) new Object[this.capacity];
+ int initial = alignToPowerOfTwo(capacity);
+ this.initCapacity = initial;
+ this.table = new Table<>(new long[initial], (V[]) new
Object[initial], initial);
this.size = 0;
this.usedBuckets = 0;
this.autoShrink = autoShrink;
@@ -323,19 +328,18 @@ public class ConcurrentLongHashMap<V> {
this.mapIdleFactor = mapIdleFactor;
this.expandFactor = expandFactor;
this.shrinkFactor = shrinkFactor;
- this.resizeThresholdUp = (int) (this.capacity * mapFillFactor);
- this.resizeThresholdBelow = (int) (this.capacity * mapIdleFactor);
+ this.resizeThresholdUp = (int) (initial * mapFillFactor);
+ this.resizeThresholdBelow = (int) (initial * mapIdleFactor);
}
V get(long key, int keyHash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
- // add local variable here, so OutOfBound won't happen
- long[] keys = this.keys;
- V[] values = this.values;
- // calculate table.length as capacity to avoid rehash changing
capacity
- int bucket = signSafeMod(keyHash, values.length);
+ Table<V> table = this.table;
+ long[] keys = table.keys();
+ V[] values = table.values();
+ int bucket = signSafeMod(keyHash, table.capacity());
try {
while (true) {
@@ -357,10 +361,10 @@ public class ConcurrentLongHashMap<V> {
stamp = readLock();
acquiredLock = true;
- // update local variable
- keys = this.keys;
- values = this.values;
- bucket = signSafeMod(keyHash, values.length);
+ table = this.table;
+ keys = table.keys();
+ values = table.values();
+ bucket = signSafeMod(keyHash, table.capacity());
storedKey = keys[bucket];
storedValue = values[bucket];
}
@@ -372,7 +376,7 @@ public class ConcurrentLongHashMap<V> {
return null;
}
}
- bucket = (bucket + 1) & (values.length - 1);
+ bucket = (bucket + 1) & (table.capacity() - 1);
}
} finally {
if (acquiredLock) {
@@ -385,7 +389,10 @@ public class ConcurrentLongHashMap<V> {
int bucket = keyHash;
long stamp = writeLock();
- int capacity = this.capacity;
+ Table<V> table = this.table;
+ long[] keys = table.keys();
+ V[] values = table.values();
+ int capacity = table.capacity();
// Remember where we find the first available spot
int firstDeletedKey = -1;
@@ -453,10 +460,13 @@ public class ConcurrentLongHashMap<V> {
private V remove(long key, Object value, int keyHash) {
int bucket = keyHash;
long stamp = writeLock();
+ Table<V> table = this.table;
+ long[] keys = table.keys();
+ V[] values = table.values();
+ int capacity = table.capacity();
try {
while (true) {
- int capacity = this.capacity;
bucket = signSafeMod(bucket, capacity);
long storedKey = keys[bucket];
@@ -524,11 +534,12 @@ public class ConcurrentLongHashMap<V> {
long stamp = writeLock();
try {
- if (autoShrink && capacity > initCapacity) {
+ Table<V> table = this.table;
+ if (autoShrink && table.capacity() > initCapacity) {
shrinkToInitCapacity();
} else {
- Arrays.fill(keys, 0);
- Arrays.fill(values, EmptyValue);
+ Arrays.fill(table.keys(), 0);
+ Arrays.fill(table.values(), EmptyValue);
this.size = 0;
this.usedBuckets = 0;
}
@@ -540,19 +551,20 @@ public class ConcurrentLongHashMap<V> {
public void forEach(EntryProcessor<V> processor) {
long stamp = tryOptimisticRead();
- // We need to make sure that we read these 3 variables in a
consistent way
- int capacity = this.capacity;
- long[] keys = this.keys;
- V[] values = this.values;
+ Table<V> table = this.table;
+ int capacity = table.capacity();
+ long[] keys = table.keys();
+ V[] values = table.values();
// Validate no rehashing
if (!validate(stamp)) {
// Fallback to read lock
stamp = readLock();
- capacity = this.capacity;
- keys = this.keys;
- values = this.values;
+ table = this.table;
+ capacity = table.capacity();
+ keys = table.keys();
+ values = table.values();
unlockRead(stamp);
}
@@ -590,6 +602,9 @@ public class ConcurrentLongHashMap<V> {
// Expand the hashmap
long[] newKeys = new long[newCapacity];
V[] newValues = (V[]) new Object[newCapacity];
+ Table<V> table = this.table;
+ long[] keys = table.keys();
+ V[] values = table.values();
// Re-hash table
for (int i = 0; i < keys.length; i++) {
@@ -600,27 +615,21 @@ public class ConcurrentLongHashMap<V> {
}
}
- keys = newKeys;
- values = newValues;
- capacity = newCapacity;
+ this.table = new Table<>(newKeys, newValues, newCapacity);
usedBuckets = size;
- resizeThresholdUp = (int) (capacity * mapFillFactor);
- resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+ resizeThresholdUp = (int) (newCapacity * mapFillFactor);
+ resizeThresholdBelow = (int) (newCapacity * mapIdleFactor);
}
private void shrinkToInitCapacity() {
long[] newKeys = new long[initCapacity];
V[] newValues = (V[]) new Object[initCapacity];
- keys = newKeys;
- values = newValues;
+ table = new Table<>(newKeys, newValues, initCapacity);
size = 0;
usedBuckets = 0;
- // Capacity needs to be updated after the values, so that we won't
see
- // a capacity value bigger than the actual array size
- capacity = initCapacity;
- resizeThresholdUp = (int) (capacity * mapFillFactor);
- resizeThresholdBelow = (int) (capacity * mapIdleFactor);
+ resizeThresholdUp = (int) (initCapacity * mapFillFactor);
+ resizeThresholdBelow = (int) (initCapacity * mapIdleFactor);
}
private static <V> void insertKeyValueNoLock(long[] keys, V[] values,
long key, V value) {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index 8ba985ed329..bf9eede4837 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -28,6 +28,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
@@ -35,7 +36,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongFunction;
import lombok.Cleanup;
@@ -211,65 +214,387 @@ public class ConcurrentLongHashMapTest {
assertTrue(map.capacity() == initCapacity);
}
+ /**
+ * Spins many readers against a section that is constantly expanding and
shrinking. The
+ * stable key '1' is never removed, so every read must observe "v1";
volatile keys 2/3 may or
+ * may not be present at any instant. Any torn read or sentinel leak
surfaces as an
+ * AssertionError or runtime exception captured in {@code ex}.
+ */
@Test
- public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
.expectedItems(2)
.concurrencyLevel(1)
.autoShrink(true)
.mapIdleFactor(0.25f)
.build();
- assertEquals(map.capacity(), 4);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final int readThreads = 16;
final int writeThreads = 1;
final int n = 1_000;
- CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
- Future<?> future = null;
- AtomicReference<Exception> ex = new AtomicReference<>();
+
+ CyclicBarrier barrier = new CyclicBarrier(readThreads + writeThreads);
+ AtomicReference<Throwable> ex = new AtomicReference<>();
+ List<Future<?>> futures = new ArrayList<>();
+ AtomicBoolean writerDone = new AtomicBoolean(false);
+
+ assertNull(map.put(1, "v1"));
for (int i = 0; i < readThreads; i++) {
- executor.submit(() -> {
+ futures.add(executor.submit(() -> {
+ barrier.await();
try {
- barrier.await();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ while (!writerDone.get()) {
+ assertEquals(map.get(1), "v1");
+ map.get(2);
+ map.get(3);
+ }
+ } catch (Throwable t) {
+ ex.compareAndSet(null, t);
+ }
+ return null;
+ }));
+ }
+
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ try {
+ for (int i = 0; i < n; i++) {
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+ assertEquals(map.capacity(), 8);
+
+ assertTrue(map.remove(2, "v2"));
+ assertTrue(map.remove(3, "v3"));
+ assertEquals(map.capacity(), 4);
+ }
+ } finally {
+ writerDone.set(true);
+ }
+ return null;
+ }));
+
+ for (Future<?> future : futures) {
+ future.get(60, TimeUnit.SECONDS);
+ }
+
+ assertNull(ex.get());
+ }
+
+ /**
+ * Many concurrent writers all targeting the same section so {@code
put}/{@code remove} race
+ * against {@code rehash} (both expand and shrink). Each writer owns a
disjoint key range so
+ * the post-condition is deterministic. Readers concurrently look up every
key written.
+ */
+ @Test
+ public void testConcurrentMultiWriterExpandShrink() throws Throwable {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.20f)
+ .build();
+
+ final int writeThreads = 8;
+ final int readThreads = 8;
+ final int rounds = 200;
+ final int keysPerThread = 64;
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ AtomicReference<Throwable> ex = new AtomicReference<>();
+ AtomicBoolean writersDone = new AtomicBoolean(false);
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int t = 0; t < writeThreads; t++) {
+ final long base = (long) t * keysPerThread;
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ try {
+ for (int round = 0; round < rounds; round++) {
+ for (int k = 0; k < keysPerThread; k++) {
+ map.put(base + k, "v-" + (base + k));
+ }
+ for (int k = 0; k < keysPerThread; k++) {
+ assertEquals(map.get(base + k), "v-" + (base + k));
+ }
+ for (int k = 0; k < keysPerThread; k++) {
+ assertEquals(map.remove(base + k), "v-" + (base +
k));
+ }
+ for (int k = 0; k < keysPerThread; k++) {
+ assertNull(map.get(base + k));
+ }
+ }
+ } catch (Throwable th) {
+ ex.compareAndSet(null, th);
}
+ return null;
+ }));
+ }
+
+ for (int r = 0; r < readThreads; r++) {
+ futures.add(executor.submit(() -> {
+ barrier.await();
try {
- map.get(1);
- } catch (Exception e) {
- ex.set(e);
+ long total = (long) writeThreads * keysPerThread;
+ long key = 0;
+ while (!writersDone.get()) {
+ String v = map.get(key);
+ if (v != null && !v.equals("v-" + key)) {
+ throw new AssertionError("torn read for key " +
key + ": " + v);
+ }
+ key = (key + 1) % total;
+ }
+ } catch (Throwable th) {
+ ex.compareAndSet(null, th);
}
- });
+ return null;
+ }));
}
- assertNull(map.put(1, "v1"));
- future = executor.submit(() -> {
- try {
+ for (int i = 0; i < writeThreads; i++) {
+ futures.get(i).get(120, TimeUnit.SECONDS);
+ }
+ writersDone.set(true);
+ for (int i = writeThreads; i < futures.size(); i++) {
+ futures.get(i).get(60, TimeUnit.SECONDS);
+ }
+
+ assertNull(ex.get());
+ assertEquals(map.size(), 0);
+ }
+
+ /**
+ * Differential test against {@link
java.util.concurrent.ConcurrentHashMap}. Each thread owns
+ * a disjoint key partition (so any single-key sequence is linearizable),
but every operation
+ * is mirrored onto both maps. Per-call return values must agree, and
after the workload the
+ * two maps must contain exactly the same entries — including the reverse
direction.
+ */
+ @Test
+ public void testCorrectnessAgainstConcurrentHashMap() throws Throwable {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(8)
+ .concurrencyLevel(4)
+ .autoShrink(true)
+ .mapIdleFactor(0.20f)
+ .build();
+ ConcurrentHashMap<Long, String> reference = new ConcurrentHashMap<>();
+
+ final int nThreads = 8;
+ final int opsPerThread = 50_000;
+ final int keyRange = 2048;
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ CyclicBarrier barrier = new CyclicBarrier(nThreads);
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int t = 0; t < nThreads; t++) {
+ final int threadId = t;
+ final long base = (long) threadId << 40;
+ futures.add(executor.submit(() -> {
+ Random rnd = new Random(threadId);
barrier.await();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ for (int i = 0; i < opsPerThread; i++) {
+ long key = base + rnd.nextInt(keyRange);
+ int op = rnd.nextInt(5);
+ String value = "v-" + threadId + "-" + i;
+ switch (op) {
+ case 0:
+ assertEquals(map.put(key, value),
reference.put(key, value));
+ break;
+ case 1:
+ assertEquals(map.putIfAbsent(key, value),
reference.putIfAbsent(key, value));
+ break;
+ case 2:
+ assertEquals(map.remove(key),
reference.remove(key));
+ break;
+ case 3:
+ assertEquals(map.get(key), reference.get(key));
+ break;
+ default:
+ assertEquals(map.containsKey(key),
reference.containsKey(key));
+ break;
+ }
+ }
+ return null;
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get(120, TimeUnit.SECONDS);
+ }
+
+ assertEquals(map.size(), (long) reference.size());
+ for (Map.Entry<Long, String> e : reference.entrySet()) {
+ assertEquals(map.get(e.getKey()), e.getValue());
+ }
+ AtomicLong observed = new AtomicLong();
+ map.forEach((k, v) -> {
+ observed.incrementAndGet();
+ assertEquals(v, reference.get(k));
+ });
+ assertEquals(observed.get(), (long) reference.size());
+ }
+
+ /**
+ * Cross-thread put-publish-then-read invariant: once a {@code put(k, v)}
has returned and the
+ * writer has published k via a volatile counter, EVERY reader that
observes that counter must
+ * see a non-null value for k. A failure here would mean a successful put
was "lost" by the
+ * map's get path — the failure mode the Table-snapshot design exists to
prevent.
+ *
+ * <p>The map starts at the smallest legal capacity with autoShrink
enabled, so the rehash
+ * code path is exercised on virtually every put. This is the most
aggressive workload for
+ * the rehash-vs-get race that the previous separate-volatile-arrays
design couldn't survive.
+ */
+ @Test
+ public void testNoLostGetAfterPublish() throws Throwable {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+
+ final int totalKeys = 50_000;
+ final int readerThreads = 8;
+
+ AtomicLong highestPublished = new AtomicLong(-1);
+ AtomicReference<Throwable> ex = new AtomicReference<>();
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+ CyclicBarrier barrier = new CyclicBarrier(readerThreads + 1);
+ List<Future<?>> futures = new ArrayList<>();
+
+ // Writer: put then publish. The volatile-set on highestPublished
establishes
+ // happens-before with any reader that observes the published value.
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ for (int i = 0; i < totalKeys; i++) {
+ assertNull(map.put(i, "v" + i));
+ highestPublished.set(i);
}
+ return null;
+ }));
+
+ // Readers: observe the published counter, then verify every key in
[0, counter] is
+ // present with the expected value. The reader pulls the counter once
per cycle and
+ // catches up to it before pulling again.
+ for (int r = 0; r < readerThreads; r++) {
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ try {
+ long lastChecked = -1;
+ while (lastChecked < totalKeys - 1) {
+ long target = highestPublished.get();
+ while (lastChecked < target) {
+ lastChecked++;
+ String v = map.get(lastChecked);
+ if (v == null) {
+ throw new AssertionError(
+ "lost get for key " + lastChecked
+ + "; highestPublished=" +
target);
+ }
+ if (!v.equals("v" + lastChecked)) {
+ throw new AssertionError(
+ "wrong value for key " + lastChecked +
": " + v);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ ex.compareAndSet(null, t);
+ }
+ return null;
+ }));
+ }
+
+ for (Future<?> f : futures) {
+ f.get(120, TimeUnit.SECONDS);
+ }
+
+ assertNull(ex.get());
+ assertEquals(map.size(), (long) totalKeys);
+ }
- for (int i = 0; i < n; i++) {
- // expand hashmap
- assertNull(map.put(2, "v2"));
- assertNull(map.put(3, "v3"));
- assertEquals(map.capacity(), 8);
+ /**
+ * forEach during concurrent writes is documented as not strongly
thread-safe, but it must
+ * never throw, never expose {@code DeletedValue}/{@code EmptyValue}
sentinels, and every
+ * observed (key, value) pair must be a legitimate pair that was written
at some point.
+ */
+ @Test
+ public void testForEachDuringWrites() throws Throwable {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(8)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
- // shrink hashmap
- assertTrue(map.remove(2, "v2"));
- assertTrue(map.remove(3, "v3"));
- assertEquals(map.capacity(), 4);
+ final int writers = 4;
+ final int keysPerWriter = 256;
+ final int writeRounds = 200;
+ final int forEachRounds = 100;
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+ CyclicBarrier barrier = new CyclicBarrier(writers + 1);
+ AtomicReference<Throwable> ex = new AtomicReference<>();
+ AtomicBoolean writersDone = new AtomicBoolean(false);
+ List<Future<?>> futures = new ArrayList<>();
+
+ for (int t = 0; t < writers; t++) {
+ final long base = (long) t * keysPerWriter;
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ try {
+ for (int round = 0; round < writeRounds; round++) {
+ for (int k = 0; k < keysPerWriter; k++) {
+ map.put(base + k, "v-" + (base + k));
+ }
+ for (int k = 0; k < keysPerWriter; k++) {
+ map.remove(base + k);
+ }
+ }
+ } catch (Throwable th) {
+ ex.compareAndSet(null, th);
+ }
+ return null;
+ }));
+ }
+
+ futures.add(executor.submit(() -> {
+ barrier.await();
+ try {
+ for (int round = 0; round < forEachRounds &&
!writersDone.get(); round++) {
+ AtomicInteger seen = new AtomicInteger();
+ map.forEach((k, v) -> {
+ seen.incrementAndGet();
+ String expected = "v-" + k;
+ if (!expected.equals(v)) {
+ throw new AssertionError("Inconsistent (k,v): (" +
k + "," + v + ")");
+ }
+ });
+ long sz = map.size();
+ assertTrue(sz >= 0, "size went negative: " + sz);
+ assertTrue(sz <= (long) writers * keysPerWriter, "size >
universe: " + sz);
+ }
+ } catch (Throwable th) {
+ ex.compareAndSet(null, th);
}
- });
+ return null;
+ }));
+
+ for (int i = 0; i < writers; i++) {
+ futures.get(i).get(120, TimeUnit.SECONDS);
+ }
+ writersDone.set(true);
+ futures.get(writers).get(60, TimeUnit.SECONDS);
- future.get();
- assertTrue(ex.get() == null);
- // shut down pool
- executor.shutdown();
+ assertNull(ex.get());
}
@Test