This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch branch-4.17
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.17 by this push:
new a2dc07c194 [fix] Fix ArrayIndexOut0fBoundsException caused by
optimistic lock (#4066)
a2dc07c194 is described below
commit a2dc07c194697929e391bc3121852ed23127c2bd
Author: thetumbled <[email protected]>
AuthorDate: Mon Apr 29 06:02:48 2024 +0800
[fix] Fix ArrayIndexOut0fBoundsException caused by optimistic lock (#4066)
---
.../util/collections/ConcurrentLongHashMap.java | 22 ++++----
.../util/collections/ConcurrentLongHashSet.java | 10 +++-
.../collections/ConcurrentLongLongHashMap.java | 46 ++++++++++------
.../collections/ConcurrentLongLongPairHashMap.java | 41 ++++++++------
.../util/collections/ConcurrentOpenHashMap.java | 41 +++++++++-----
.../util/collections/ConcurrentOpenHashSet.java | 22 +++-----
.../collections/ConcurrentLongHashMapTest.java | 61 +++++++++++++++++++++
.../collections/ConcurrentLongHashSetTest.java | 64 ++++++++++++++++++++++
.../collections/ConcurrentLongLongHashMapTest.java | 63 +++++++++++++++++++++
.../ConcurrentLongLongPairHashMapTest.java | 64 ++++++++++++++++++++++
.../collections/ConcurrentOpenHashMapTest.java | 64 ++++++++++++++++++++++
.../collections/ConcurrentOpenHashSetTest.java | 64 ++++++++++++++++++++++
12 files changed, 488 insertions(+), 74 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index c95497a861..20094a106f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -346,15 +346,18 @@ public class ConcurrentLongHashMap<V> {
}
V get(long key, int keyHash) {
- int bucket = keyHash;
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
+ // add local variable here, so OutOfBound won't happen
+ long[] keys = this.keys;
+ V[] values = this.values;
+ // calculate table.length as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(keyHash, values.length);
+
try {
while (true) {
- int capacity = this.capacity;
- bucket = signSafeMod(bucket, capacity);
// First try optimistic locking
long storedKey = keys[bucket];
@@ -373,16 +376,15 @@ public class ConcurrentLongHashMap<V> {
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
+
+ // update local variable
+ keys = this.keys;
+ values = this.values;
+ bucket = signSafeMod(keyHash, values.length);
storedKey = keys[bucket];
storedValue = values[bucket];
}
- if (capacity != this.capacity) {
- // There has been a rehashing. We need to restart
the search
- bucket = keyHash;
- continue;
- }
-
if (storedKey == key) {
return storedValue != DeletedValue ? storedValue :
null;
} else if (storedValue == EmptyValue) {
@@ -391,7 +393,7 @@ public class ConcurrentLongHashMap<V> {
}
}
- ++bucket;
+ bucket = (bucket + 1) & (values.length - 1);
}
} finally {
if (acquiredLock) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index 78f99cef73..8383f5fac2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -303,7 +303,11 @@ public class ConcurrentLongHashSet {
boolean contains(long item, int hash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
- int bucket = signSafeMod(hash, capacity);
+
+ // add local variable here, so OutOfBound won't happen
+ long[] table = this.table;
+ // calculate table.length as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(hash, table.length);
try {
while (true) {
@@ -324,7 +328,9 @@ public class ConcurrentLongHashSet {
stamp = readLock();
acquiredLock = true;
- bucket = signSafeMod(hash, capacity);
+ // update local variable
+ table = this.table;
+ bucket = signSafeMod(hash, table.length);
storedItem = table[bucket];
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index 1ad3b781f1..2c8d7a07f5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -385,6 +385,9 @@ public class ConcurrentLongLongHashMap {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
+ // Each item take up 2 continuous array space.
+ private static final int ITEM_SIZE = 2;
+
// Keys and values are stored interleaved in the table array
private volatile long[] table;
@@ -404,7 +407,7 @@ public class ConcurrentLongLongHashMap {
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
- this.table = new long[2 * this.capacity];
+ this.table = new long[ITEM_SIZE * this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.autoShrink = autoShrink;
@@ -420,7 +423,10 @@ public class ConcurrentLongLongHashMap {
long get(long key, int keyHash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
- int bucket = signSafeMod(keyHash, capacity);
+ // add local variable here, so OutOfBound won't happen
+ long[] table = this.table;
+ // calculate table.length/2 as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
try {
while (true) {
@@ -442,7 +448,9 @@ public class ConcurrentLongLongHashMap {
stamp = readLock();
acquiredLock = true;
- bucket = signSafeMod(keyHash, capacity);
+ // update local variable
+ table = this.table;
+ bucket = signSafeMod(keyHash, table.length /
ITEM_SIZE);
storedKey = table[bucket];
storedValue = table[bucket + 1];
}
@@ -455,7 +463,7 @@ public class ConcurrentLongLongHashMap {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (acquiredLock) {
@@ -508,7 +516,7 @@ public class ConcurrentLongLongHashMap {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThresholdUp) {
@@ -566,7 +574,7 @@ public class ConcurrentLongLongHashMap {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThresholdUp) {
@@ -626,7 +634,7 @@ public class ConcurrentLongLongHashMap {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThresholdUp) {
@@ -665,7 +673,7 @@ public class ConcurrentLongLongHashMap {
return ValueNotFound;
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
@@ -696,7 +704,7 @@ public class ConcurrentLongLongHashMap {
int removedCount = 0;
try {
// Go through all the buckets for this section
- for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= 2) {
+ for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= ITEM_SIZE) {
long storedKey = table[bucket];
if (storedKey != DeletedKey && storedKey != EmptyKey) {
@@ -734,7 +742,7 @@ public class ConcurrentLongLongHashMap {
int removedCount = 0;
try {
// Go through all the buckets for this section
- for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= 2) {
+ for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= ITEM_SIZE) {
long storedKey = table[bucket];
long storedValue = table[bucket + 1];
@@ -768,20 +776,20 @@ public class ConcurrentLongLongHashMap {
}
private void cleanBucket(int bucket) {
- int nextInArray = (bucket + 2) & (table.length - 1);
+ int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
if (table[nextInArray] == EmptyKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = ValueNotFound;
--usedBuckets;
// Cleanup all the buckets that were in `DeletedKey` state, so
that we can reduce unnecessary expansions
- bucket = (bucket - 2) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
while (table[bucket] == DeletedKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = ValueNotFound;
--usedBuckets;
- bucket = (bucket - 2) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
}
} else {
table[bucket] = DeletedKey;
@@ -822,7 +830,7 @@ public class ConcurrentLongLongHashMap {
}
// Go through all the buckets for this section
- for (int bucket = 0; bucket < table.length; bucket += 2) {
+ for (int bucket = 0; bucket < table.length; bucket +=
ITEM_SIZE) {
long storedKey = table[bucket];
long storedValue = table[bucket + 1];
@@ -848,11 +856,11 @@ public class ConcurrentLongLongHashMap {
private void rehash(int newCapacity) {
// Expand the hashmap
- long[] newTable = new long[2 * newCapacity];
+ long[] newTable = new long[ITEM_SIZE * newCapacity];
Arrays.fill(newTable, EmptyKey);
// Re-hash table
- for (int i = 0; i < table.length; i += 2) {
+ for (int i = 0; i < table.length; i += ITEM_SIZE) {
long storedKey = table[i];
long storedValue = table[i + 1];
if (storedKey != EmptyKey && storedKey != DeletedKey) {
@@ -870,7 +878,7 @@ public class ConcurrentLongLongHashMap {
}
private void shrinkToInitCapacity() {
- long[] newTable = new long[2 * initCapacity];
+ long[] newTable = new long[ITEM_SIZE * initCapacity];
Arrays.fill(newTable, EmptyKey);
table = newTable;
@@ -896,7 +904,7 @@ public class ConcurrentLongLongHashMap {
return;
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
}
}
@@ -912,6 +920,8 @@ public class ConcurrentLongLongHashMap {
}
static final int signSafeMod(long n, int max) {
+ // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2
+ // that is to left shift 1 bit
return (int) (n & (max - 1)) << 1;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index c254393cdf..24259609a5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -329,6 +329,9 @@ public class ConcurrentLongLongPairHashMap {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section extends StampedLock {
+ // Each item take up 4 continuous array space.
+ private static final int ITEM_SIZE = 4;
+
// Keys and values are stored interleaved in the table array
private volatile long[] table;
@@ -348,7 +351,7 @@ public class ConcurrentLongLongPairHashMap {
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
- this.table = new long[4 * this.capacity];
+ this.table = new long[ITEM_SIZE * this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.autoShrink = autoShrink;
@@ -364,7 +367,10 @@ public class ConcurrentLongLongPairHashMap {
LongPair get(long key1, long key2, int keyHash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
- int bucket = signSafeMod(keyHash, capacity);
+ // add local variable here, so OutOfBound won't happen
+ long[] table = this.table;
+ // calculate table.length / 4 as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
try {
while (true) {
@@ -387,8 +393,9 @@ public class ConcurrentLongLongPairHashMap {
if (!acquiredLock) {
stamp = readLock();
acquiredLock = true;
-
- bucket = signSafeMod(keyHash, capacity);
+ // update local variable
+ table = this.table;
+ bucket = signSafeMod(keyHash, table.length /
ITEM_SIZE);
storedKey1 = table[bucket];
storedKey2 = table[bucket + 1];
storedValue1 = table[bucket + 2];
@@ -403,7 +410,7 @@ public class ConcurrentLongLongPairHashMap {
}
}
- bucket = (bucket + 4) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (acquiredLock) {
@@ -455,7 +462,7 @@ public class ConcurrentLongLongPairHashMap {
}
}
- bucket = (bucket + 4) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThresholdUp) {
@@ -496,7 +503,7 @@ public class ConcurrentLongLongPairHashMap {
return false;
}
- bucket = (bucket + 4) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
@@ -522,7 +529,7 @@ public class ConcurrentLongLongPairHashMap {
}
private void cleanBucket(int bucket) {
- int nextInArray = (bucket + 4) & (table.length - 1);
+ int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
if (table[nextInArray] == EmptyKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = EmptyKey;
@@ -532,7 +539,7 @@ public class ConcurrentLongLongPairHashMap {
// Cleanup all the buckets that were in `DeletedKey` state,
// so that we can reduce unnecessary expansions
- bucket = (bucket - 4) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
while (table[bucket] == DeletedKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = EmptyKey;
@@ -540,7 +547,7 @@ public class ConcurrentLongLongPairHashMap {
table[bucket + 3] = ValueNotFound;
--usedBuckets;
- bucket = (bucket - 4) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
}
} else {
table[bucket] = DeletedKey;
@@ -583,7 +590,7 @@ public class ConcurrentLongLongPairHashMap {
}
// Go through all the buckets for this section
- for (int bucket = 0; bucket < table.length; bucket += 4) {
+ for (int bucket = 0; bucket < table.length; bucket +=
ITEM_SIZE) {
long storedKey1 = table[bucket];
long storedKey2 = table[bucket + 1];
long storedValue1 = table[bucket + 2];
@@ -612,11 +619,11 @@ public class ConcurrentLongLongPairHashMap {
}
private void rehash(int newCapacity) {
- long[] newTable = new long[4 * newCapacity];
+ long[] newTable = new long[ITEM_SIZE * newCapacity];
Arrays.fill(newTable, EmptyKey);
// Re-hash table
- for (int i = 0; i < table.length; i += 4) {
+ for (int i = 0; i < table.length; i += ITEM_SIZE) {
long storedKey1 = table[i];
long storedKey2 = table[i + 1];
long storedValue1 = table[i + 2];
@@ -636,7 +643,7 @@ public class ConcurrentLongLongPairHashMap {
}
private void shrinkToInitCapacity() {
- long[] newTable = new long[4 * initCapacity];
+ long[] newTable = new long[ITEM_SIZE * initCapacity];
Arrays.fill(newTable, EmptyKey);
table = newTable;
@@ -650,7 +657,7 @@ public class ConcurrentLongLongPairHashMap {
}
private static void insertKeyValueNoLock(long[] table, int capacity,
long key1, long key2, long value1,
- long value2) {
+ long value2) {
int bucket = signSafeMod(hash(key1, key2), capacity);
while (true) {
@@ -665,7 +672,7 @@ public class ConcurrentLongLongPairHashMap {
return;
}
- bucket = (bucket + 4) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
}
}
@@ -684,6 +691,8 @@ public class ConcurrentLongLongPairHashMap {
}
static final int signSafeMod(long n, int max) {
+ // as the ITEM_SIZE of Section is 4, so the index is the multiple of 4
+ // that is to left shift 2 bits
return (int) (n & (max - 1)) << 2;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index cf258e0c14..44215c63d7 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -298,6 +298,9 @@ public class ConcurrentOpenHashMap<K, V> {
// A section is a portion of the hash map that is covered by a single
@SuppressWarnings("serial")
private static final class Section<K, V> extends StampedLock {
+ // Each item take up 2 continuous array space.
+ private static final int ITEM_SIZE = 2;
+
// Keys and values are stored interleaved in the table array
private volatile Object[] table;
@@ -317,7 +320,7 @@ public class ConcurrentOpenHashMap<K, V> {
float expandFactor, float shrinkFactor) {
this.capacity = alignToPowerOfTwo(capacity);
this.initCapacity = this.capacity;
- this.table = new Object[2 * this.capacity];
+ this.table = new Object[ITEM_SIZE * this.capacity];
this.size = 0;
this.usedBuckets = 0;
this.autoShrink = autoShrink;
@@ -332,7 +335,11 @@ public class ConcurrentOpenHashMap<K, V> {
V get(K key, int keyHash) {
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
- int bucket = signSafeMod(keyHash, capacity);
+
+ // add local variable here, so OutOfBound won't happen
+ Object[] table = this.table;
+ // calculate table.length / 2 as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
try {
while (true) {
@@ -354,7 +361,9 @@ public class ConcurrentOpenHashMap<K, V> {
stamp = readLock();
acquiredLock = true;
- bucket = signSafeMod(keyHash, capacity);
+ // update local variable
+ table = this.table;
+ bucket = signSafeMod(keyHash, table.length /
ITEM_SIZE);
storedKey = (K) table[bucket];
storedValue = (V) table[bucket + 1];
}
@@ -367,7 +376,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (acquiredLock) {
@@ -420,7 +429,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
if (usedBuckets > resizeThresholdUp) {
@@ -458,7 +467,7 @@ public class ConcurrentOpenHashMap<K, V> {
return null;
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
} finally {
@@ -516,7 +525,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
// Go through all the buckets for this section
- for (int bucket = 0; bucket < table.length; bucket += 2) {
+ for (int bucket = 0; bucket < table.length; bucket +=
ITEM_SIZE) {
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];
@@ -546,7 +555,7 @@ public class ConcurrentOpenHashMap<K, V> {
int removedCount = 0;
try {
// Go through all the buckets for this section
- for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= 2) {
+ for (int bucket = 0; size > 0 && bucket < table.length; bucket
+= ITEM_SIZE) {
K storedKey = (K) table[bucket];
V storedValue = (V) table[bucket + 1];
@@ -584,7 +593,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
private void cleanBucket(int bucket) {
- int nextInArray = (bucket + 2) & (table.length - 1);
+ int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
if (table[nextInArray] == EmptyKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = null;
@@ -592,13 +601,13 @@ public class ConcurrentOpenHashMap<K, V> {
// Cleanup all the buckets that were in `DeletedKey` state,
// so that we can reduce unnecessary expansions
- bucket = (bucket - 2) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
while (table[bucket] == DeletedKey) {
table[bucket] = EmptyKey;
table[bucket + 1] = null;
--usedBuckets;
- bucket = (bucket - 2) & (table.length - 1);
+ bucket = (bucket - ITEM_SIZE) & (table.length - 1);
}
} else {
table[bucket] = DeletedKey;
@@ -608,10 +617,10 @@ public class ConcurrentOpenHashMap<K, V> {
private void rehash(int newCapacity) {
// Expand the hashmap
- Object[] newTable = new Object[2 * newCapacity];
+ Object[] newTable = new Object[ITEM_SIZE * newCapacity];
// Re-hash table
- for (int i = 0; i < table.length; i += 2) {
+ for (int i = 0; i < table.length; i += ITEM_SIZE) {
K storedKey = (K) table[i];
V storedValue = (V) table[i + 1];
if (storedKey != EmptyKey && storedKey != DeletedKey) {
@@ -629,7 +638,7 @@ public class ConcurrentOpenHashMap<K, V> {
}
private void shrinkToInitCapacity() {
- Object[] newTable = new Object[2 * initCapacity];
+ Object[] newTable = new Object[ITEM_SIZE * initCapacity];
table = newTable;
size = 0;
@@ -654,7 +663,7 @@ public class ConcurrentOpenHashMap<K, V> {
return;
}
- bucket = (bucket + 2) & (table.length - 1);
+ bucket = (bucket + ITEM_SIZE) & (table.length - 1);
}
}
}
@@ -670,6 +679,8 @@ public class ConcurrentOpenHashMap<K, V> {
}
static final int signSafeMod(long n, int max) {
+ // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2
+ // that is to left shift 1 bit
return (int) (n & (max - 1)) << 1;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index ae50ab67b9..6aa5a389ce 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -283,16 +283,16 @@ public class ConcurrentOpenHashSet<V> {
}
boolean contains(V value, int keyHash) {
- int bucket = keyHash;
-
long stamp = tryOptimisticRead();
boolean acquiredLock = false;
+ // add local variable here, so OutOfBound won't happen
+ V[] values = this.values;
+ // calculate table.length as capacity to avoid rehash changing
capacity
+ int bucket = signSafeMod(keyHash, values.length);
+
try {
while (true) {
- int capacity = this.capacity;
- bucket = signSafeMod(bucket, capacity);
-
// First try optimistic locking
V storedValue = values[bucket];
@@ -310,15 +310,12 @@ public class ConcurrentOpenHashSet<V> {
stamp = readLock();
acquiredLock = true;
+ // update local variable
+ values = this.values;
+ bucket = signSafeMod(keyHash, values.length);
storedValue = values[bucket];
}
- if (capacity != this.capacity) {
- // There has been a rehashing. We need to restart
the search
- bucket = keyHash;
- continue;
- }
-
if (value.equals(storedValue)) {
return true;
} else if (storedValue == EmptyValue) {
@@ -326,8 +323,7 @@ public class ConcurrentOpenHashSet<V> {
return false;
}
}
-
- ++bucket;
+ bucket = (bucket + 1) & (values.length - 1);
}
} finally {
if (acquiredLock) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index f1372b2894..b1f1b5437d 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongFunction;
import org.junit.Test;
@@ -187,6 +188,66 @@ public class ConcurrentLongHashMapTest {
assertTrue(map.capacity() == 8);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(map.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ map.get(1);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ });
+ }
+
+ assertNull(map.put(1, "v1"));
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ assertNull(map.put(2, "v2"));
+ assertNull(map.put(3, "v3"));
+ assertEquals(map.capacity(), 8);
+
+ // shrink hashmap
+ assertTrue(map.remove(2, "v2"));
+ assertTrue(map.remove(3, "v3"));
+ assertEquals(map.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testExpandShrinkAndClear() {
ConcurrentLongHashMap<String> map =
ConcurrentLongHashMap.<String>newBuilder()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
index 2c28e88293..1c6bf12c69 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -31,9 +31,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
/**
@@ -295,6 +297,68 @@ public class ConcurrentLongHashSetTest {
assertTrue(map.capacity() == 8);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(set.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ while (true) {
+ try {
+ set.contains(1);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ });
+ }
+
+ assertTrue(set.add(1));
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ assertTrue(set.add(2));
+ assertTrue(set.add(3));
+ assertEquals(set.capacity(), 8);
+
+ // shrink hashmap
+ assertTrue(set.remove(2));
+ assertTrue(set.remove(3));
+ assertEquals(set.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testExpandShrinkAndClear() {
ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
index aca4694674..8121e3364c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -32,10 +32,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import
org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction;
import org.junit.Test;
@@ -159,6 +161,67 @@ public class ConcurrentLongLongHashMapTest {
assertTrue(map.capacity() == 8);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(map.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ while (true) {
+ try {
+ map.get(1);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ });
+ }
+ map.put(1, 11);
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ map.put(2, 22);
+ map.put(3, 33);
+ assertEquals(map.capacity(), 8);
+
+ // shrink hashmap
+ map.remove(2, 22);
+ map.remove(3, 33);
+ assertEquals(map.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testExpandShrinkAndClear() {
ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
index b13625fcc9..36605c5b96 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -32,9 +32,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.junit.Test;
@@ -175,6 +177,68 @@ public class ConcurrentLongLongPairHashMapTest {
assertTrue(map.capacity() == 8);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(map.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ while (true) {
+ try {
+ map.get(1, 1);
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ });
+ }
+
+ assertTrue(map.put(1, 1, 11, 11));
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ assertTrue(map.put(2, 2, 22, 22));
+ assertTrue(map.put(3, 3, 33, 33));
+ assertEquals(map.capacity(), 8);
+
+ // shrink hashmap
+ assertTrue(map.remove(2, 2, 22, 22));
+ assertTrue(map.remove(3, 3, 33, 33));
+ assertEquals(map.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testExpandShrinkAndClear() {
ConcurrentLongLongPairHashMap map =
ConcurrentLongLongPairHashMap.newBuilder()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
index 3ed17edb57..a7835e6389 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
@@ -33,11 +33,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;
import java.util.function.Function;
import org.junit.Test;
@@ -180,6 +182,68 @@ public class ConcurrentOpenHashMapTest {
assertTrue(map.capacity() == 8);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentOpenHashMap<String, String> map =
ConcurrentOpenHashMap.<String, String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(map.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ while (true) {
+ try {
+ map.get("k3");
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ });
+ }
+
+ assertNull(map.put("k1", "v1"));
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ assertNull(map.put("k2", "v2"));
+ assertNull(map.put("k3", "v3"));
+ assertEquals(map.capacity(), 8);
+
+ // shrink hashmap
+ assertTrue(map.remove("k2", "v2"));
+ assertTrue(map.remove("k3", "v3"));
+ assertEquals(map.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testExpandShrinkAndClear() {
ConcurrentOpenHashMap<String, String> map =
ConcurrentOpenHashMap.<String, String>newBuilder()
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
index 8875be2f57..8840eacb09 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
@@ -30,9 +30,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
/**
@@ -190,6 +192,68 @@ public class ConcurrentOpenHashSetTest {
assertTrue(map.capacity() == initCapacity);
}
+ @Test
+ public void testConcurrentExpandAndShrinkAndGet() throws Throwable {
+ ConcurrentOpenHashSet<String> set =
ConcurrentOpenHashSet.<String>newBuilder()
+ .expectedItems(2)
+ .concurrencyLevel(1)
+ .autoShrink(true)
+ .mapIdleFactor(0.25f)
+ .build();
+ assertEquals(set.capacity(), 4);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ final int readThreads = 16;
+ final int writeThreads = 1;
+ final int n = 1_000;
+ CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+ Future<?> future = null;
+ AtomicReference<Exception> ex = new AtomicReference<>();
+
+ for (int i = 0; i < readThreads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ while (true) {
+ try {
+ set.contains("k2");
+ } catch (Exception e) {
+ ex.set(e);
+ }
+ }
+ });
+ }
+
+ assertTrue(set.add("k1"));
+ future = executor.submit(() -> {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ for (int i = 0; i < n; i++) {
+ // expand hashmap
+ assertTrue(set.add("k2"));
+ assertTrue(set.add("k3"));
+ assertEquals(set.capacity(), 8);
+
+ // shrink hashmap
+ assertTrue(set.remove("k2"));
+ assertTrue(set.remove("k3"));
+ assertEquals(set.capacity(), 4);
+ }
+ });
+
+ future.get();
+ assertTrue(ex.get() == null);
+ // shut down pool
+ executor.shutdown();
+ }
+
@Test
public void testReduceUnnecessaryExpansions(){
ConcurrentOpenHashSet<String> set =