This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch branch-4.16
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/branch-4.16 by this push:
     new f0ec6b2208 [fix] Fix ArrayIndexOut0fBoundsException caused by 
optimistic lock (#4066)
f0ec6b2208 is described below

commit f0ec6b22081833c8ed8041b6c536686dd47824f0
Author: thetumbled <[email protected]>
AuthorDate: Mon Apr 29 06:02:48 2024 +0800

    [fix] Fix ArrayIndexOut0fBoundsException caused by optimistic lock (#4066)
---
 .../util/collections/ConcurrentLongHashMap.java    | 22 ++++----
 .../util/collections/ConcurrentLongHashSet.java    | 10 +++-
 .../collections/ConcurrentLongLongHashMap.java     | 46 ++++++++++------
 .../collections/ConcurrentLongLongPairHashMap.java | 41 ++++++++------
 .../util/collections/ConcurrentOpenHashMap.java    | 41 +++++++++-----
 .../util/collections/ConcurrentOpenHashSet.java    | 22 +++-----
 .../collections/ConcurrentLongHashMapTest.java     | 61 +++++++++++++++++++++
 .../collections/ConcurrentLongHashSetTest.java     | 64 ++++++++++++++++++++++
 .../collections/ConcurrentLongLongHashMapTest.java | 63 +++++++++++++++++++++
 .../ConcurrentLongLongPairHashMapTest.java         | 64 ++++++++++++++++++++++
 .../collections/ConcurrentOpenHashMapTest.java     | 64 ++++++++++++++++++++++
 .../collections/ConcurrentOpenHashSetTest.java     | 64 ++++++++++++++++++++++
 12 files changed, 488 insertions(+), 74 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
index 7ff465f079..ec8b700c99 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java
@@ -327,15 +327,18 @@ public class ConcurrentLongHashMap<V> {
         }
 
         V get(long key, int keyHash) {
-            int bucket = keyHash;
 
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
 
+            // add local variable here, so OutOfBound won't happen
+            long[] keys = this.keys;
+            V[] values = this.values;
+            // calculate table.length as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(keyHash, values.length);
+
             try {
                 while (true) {
-                    int capacity = this.capacity;
-                    bucket = signSafeMod(bucket, capacity);
 
                     // First try optimistic locking
                     long storedKey = keys[bucket];
@@ -354,16 +357,15 @@ public class ConcurrentLongHashMap<V> {
                         if (!acquiredLock) {
                             stamp = readLock();
                             acquiredLock = true;
+
+                            // update local variable
+                            keys = this.keys;
+                            values = this.values;
+                            bucket = signSafeMod(keyHash, values.length);
                             storedKey = keys[bucket];
                             storedValue = values[bucket];
                         }
 
-                        if (capacity != this.capacity) {
-                            // There has been a rehashing. We need to restart 
the search
-                            bucket = keyHash;
-                            continue;
-                        }
-
                         if (storedKey == key) {
                             return storedValue != DeletedValue ? storedValue : 
null;
                         } else if (storedValue == EmptyValue) {
@@ -372,7 +374,7 @@ public class ConcurrentLongHashMap<V> {
                         }
                     }
 
-                    ++bucket;
+                    bucket = (bucket + 1) & (values.length - 1);
                 }
             } finally {
                 if (acquiredLock) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
index a66de9ed8b..d98b3062a7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java
@@ -290,7 +290,11 @@ public class ConcurrentLongHashSet {
         boolean contains(long item, int hash) {
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
-            int bucket = signSafeMod(hash, capacity);
+
+            // add local variable here, so OutOfBound won't happen
+            long[] table = this.table;
+            // calculate table.length as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(hash, table.length);
 
             try {
                 while (true) {
@@ -311,7 +315,9 @@ public class ConcurrentLongHashSet {
                             stamp = readLock();
                             acquiredLock = true;
 
-                            bucket = signSafeMod(hash, capacity);
+                            // update local variable
+                            table = this.table;
+                            bucket = signSafeMod(hash, table.length);
                             storedItem = table[bucket];
                         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
index 25bcb4061a..3cf5be37d5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java
@@ -370,6 +370,9 @@ public class ConcurrentLongLongHashMap {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
+        // Each item take up 2 continuous array space.
+        private static final int ITEM_SIZE = 2;
+
         // Keys and values are stored interleaved in the table array
         private volatile long[] table;
 
@@ -389,7 +392,7 @@ public class ConcurrentLongLongHashMap {
                 float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
             this.initCapacity = this.capacity;
-            this.table = new long[2 * this.capacity];
+            this.table = new long[ITEM_SIZE * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
             this.autoShrink = autoShrink;
@@ -405,7 +408,10 @@ public class ConcurrentLongLongHashMap {
         long get(long key, int keyHash) {
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
-            int bucket = signSafeMod(keyHash, capacity);
+            // add local variable here, so OutOfBound won't happen
+            long[] table = this.table;
+            // calculate table.length/2 as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
 
             try {
                 while (true) {
@@ -427,7 +433,9 @@ public class ConcurrentLongLongHashMap {
                             stamp = readLock();
                             acquiredLock = true;
 
-                            bucket = signSafeMod(keyHash, capacity);
+                            // update local variable
+                            table = this.table;
+                            bucket = signSafeMod(keyHash, table.length / 
ITEM_SIZE);
                             storedKey = table[bucket];
                             storedValue = table[bucket + 1];
                         }
@@ -440,7 +448,7 @@ public class ConcurrentLongLongHashMap {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (acquiredLock) {
@@ -493,7 +501,7 @@ public class ConcurrentLongLongHashMap {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (usedBuckets > resizeThresholdUp) {
@@ -551,7 +559,7 @@ public class ConcurrentLongLongHashMap {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (usedBuckets > resizeThresholdUp) {
@@ -611,7 +619,7 @@ public class ConcurrentLongLongHashMap {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (usedBuckets > resizeThresholdUp) {
@@ -650,7 +658,7 @@ public class ConcurrentLongLongHashMap {
                         return ValueNotFound;
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
 
             } finally {
@@ -681,7 +689,7 @@ public class ConcurrentLongLongHashMap {
             int removedCount = 0;
             try {
                 // Go through all the buckets for this section
-                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= 2) {
+                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= ITEM_SIZE) {
                     long storedKey = table[bucket];
 
                     if (storedKey != DeletedKey && storedKey != EmptyKey) {
@@ -719,7 +727,7 @@ public class ConcurrentLongLongHashMap {
             int removedCount = 0;
             try {
                 // Go through all the buckets for this section
-                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= 2) {
+                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= ITEM_SIZE) {
                     long storedKey = table[bucket];
                     long storedValue = table[bucket + 1];
 
@@ -753,20 +761,20 @@ public class ConcurrentLongLongHashMap {
         }
 
         private void cleanBucket(int bucket) {
-            int nextInArray = (bucket + 2) & (table.length - 1);
+            int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
             if (table[nextInArray] == EmptyKey) {
                 table[bucket] = EmptyKey;
                 table[bucket + 1] = ValueNotFound;
                 --usedBuckets;
 
                 // Cleanup all the buckets that were in `DeletedKey` state, so 
that we can reduce unnecessary expansions
-                bucket = (bucket - 2) & (table.length - 1);
+                bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 while (table[bucket] == DeletedKey) {
                     table[bucket] = EmptyKey;
                     table[bucket + 1] = ValueNotFound;
                     --usedBuckets;
 
-                    bucket = (bucket - 2) & (table.length - 1);
+                    bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 }
             } else {
                 table[bucket] = DeletedKey;
@@ -807,7 +815,7 @@ public class ConcurrentLongLongHashMap {
                 }
 
                 // Go through all the buckets for this section
-                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                for (int bucket = 0; bucket < table.length; bucket += 
ITEM_SIZE) {
                     long storedKey = table[bucket];
                     long storedValue = table[bucket + 1];
 
@@ -833,11 +841,11 @@ public class ConcurrentLongLongHashMap {
 
         private void rehash(int newCapacity) {
             // Expand the hashmap
-            long[] newTable = new long[2 * newCapacity];
+            long[] newTable = new long[ITEM_SIZE * newCapacity];
             Arrays.fill(newTable, EmptyKey);
 
             // Re-hash table
-            for (int i = 0; i < table.length; i += 2) {
+            for (int i = 0; i < table.length; i += ITEM_SIZE) {
                 long storedKey = table[i];
                 long storedValue = table[i + 1];
                 if (storedKey != EmptyKey && storedKey != DeletedKey) {
@@ -855,7 +863,7 @@ public class ConcurrentLongLongHashMap {
         }
 
         private void shrinkToInitCapacity() {
-            long[] newTable = new long[2 * initCapacity];
+            long[] newTable = new long[ITEM_SIZE * initCapacity];
             Arrays.fill(newTable, EmptyKey);
 
             table = newTable;
@@ -881,7 +889,7 @@ public class ConcurrentLongLongHashMap {
                     return;
                 }
 
-                bucket = (bucket + 2) & (table.length - 1);
+                bucket = (bucket + ITEM_SIZE) & (table.length - 1);
             }
         }
     }
@@ -897,6 +905,8 @@ public class ConcurrentLongLongHashMap {
     }
 
     static final int signSafeMod(long n, int max) {
+        // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2
+        // that is to left shift 1 bit
         return (int) (n & (max - 1)) << 1;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
index 536eeb7448..5b23acbe01 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java
@@ -309,6 +309,9 @@ public class ConcurrentLongLongPairHashMap {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
+        // Each item take up 4 continuous array space.
+        private static final int ITEM_SIZE = 4;
+
         // Keys and values are stored interleaved in the table array
         private volatile long[] table;
 
@@ -328,7 +331,7 @@ public class ConcurrentLongLongPairHashMap {
                 float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
             this.initCapacity = this.capacity;
-            this.table = new long[4 * this.capacity];
+            this.table = new long[ITEM_SIZE * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
             this.autoShrink = autoShrink;
@@ -344,7 +347,10 @@ public class ConcurrentLongLongPairHashMap {
         LongPair get(long key1, long key2, int keyHash) {
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
-            int bucket = signSafeMod(keyHash, capacity);
+            // add local variable here, so OutOfBound won't happen
+            long[] table = this.table;
+            // calculate table.length / 4 as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
 
             try {
                 while (true) {
@@ -367,8 +373,9 @@ public class ConcurrentLongLongPairHashMap {
                         if (!acquiredLock) {
                             stamp = readLock();
                             acquiredLock = true;
-
-                            bucket = signSafeMod(keyHash, capacity);
+                            // update local variable
+                            table = this.table;
+                            bucket = signSafeMod(keyHash, table.length / 
ITEM_SIZE);
                             storedKey1 = table[bucket];
                             storedKey2 = table[bucket + 1];
                             storedValue1 = table[bucket + 2];
@@ -383,7 +390,7 @@ public class ConcurrentLongLongPairHashMap {
                         }
                     }
 
-                    bucket = (bucket + 4) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (acquiredLock) {
@@ -435,7 +442,7 @@ public class ConcurrentLongLongPairHashMap {
                         }
                     }
 
-                    bucket = (bucket + 4) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (usedBuckets > resizeThresholdUp) {
@@ -476,7 +483,7 @@ public class ConcurrentLongLongPairHashMap {
                         return false;
                     }
 
-                    bucket = (bucket + 4) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
 
             } finally {
@@ -502,7 +509,7 @@ public class ConcurrentLongLongPairHashMap {
         }
 
         private void cleanBucket(int bucket) {
-            int nextInArray = (bucket + 4) & (table.length - 1);
+            int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
             if (table[nextInArray] == EmptyKey) {
                 table[bucket] = EmptyKey;
                 table[bucket + 1] = EmptyKey;
@@ -512,7 +519,7 @@ public class ConcurrentLongLongPairHashMap {
 
                 // Cleanup all the buckets that were in `DeletedKey` state,
                 // so that we can reduce unnecessary expansions
-                bucket = (bucket - 4) & (table.length - 1);
+                bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 while (table[bucket] == DeletedKey) {
                     table[bucket] = EmptyKey;
                     table[bucket + 1] = EmptyKey;
@@ -520,7 +527,7 @@ public class ConcurrentLongLongPairHashMap {
                     table[bucket + 3] = ValueNotFound;
                     --usedBuckets;
 
-                    bucket = (bucket - 4) & (table.length - 1);
+                    bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 }
             } else {
                 table[bucket] = DeletedKey;
@@ -563,7 +570,7 @@ public class ConcurrentLongLongPairHashMap {
                 }
 
                 // Go through all the buckets for this section
-                for (int bucket = 0; bucket < table.length; bucket += 4) {
+                for (int bucket = 0; bucket < table.length; bucket += 
ITEM_SIZE) {
                     long storedKey1 = table[bucket];
                     long storedKey2 = table[bucket + 1];
                     long storedValue1 = table[bucket + 2];
@@ -592,11 +599,11 @@ public class ConcurrentLongLongPairHashMap {
         }
 
         private void rehash(int newCapacity) {
-            long[] newTable = new long[4 * newCapacity];
+            long[] newTable = new long[ITEM_SIZE * newCapacity];
             Arrays.fill(newTable, EmptyKey);
 
             // Re-hash table
-            for (int i = 0; i < table.length; i += 4) {
+            for (int i = 0; i < table.length; i += ITEM_SIZE) {
                 long storedKey1 = table[i];
                 long storedKey2 = table[i + 1];
                 long storedValue1 = table[i + 2];
@@ -616,7 +623,7 @@ public class ConcurrentLongLongPairHashMap {
         }
 
         private void shrinkToInitCapacity() {
-            long[] newTable = new long[4 * initCapacity];
+            long[] newTable = new long[ITEM_SIZE * initCapacity];
             Arrays.fill(newTable, EmptyKey);
 
             table = newTable;
@@ -630,7 +637,7 @@ public class ConcurrentLongLongPairHashMap {
         }
 
         private static void insertKeyValueNoLock(long[] table, int capacity, 
long key1, long key2, long value1,
-                long value2) {
+                                                 long value2) {
             int bucket = signSafeMod(hash(key1, key2), capacity);
 
             while (true) {
@@ -645,7 +652,7 @@ public class ConcurrentLongLongPairHashMap {
                     return;
                 }
 
-                bucket = (bucket + 4) & (table.length - 1);
+                bucket = (bucket + ITEM_SIZE) & (table.length - 1);
             }
         }
     }
@@ -664,6 +671,8 @@ public class ConcurrentLongLongPairHashMap {
     }
 
     static final int signSafeMod(long n, int max) {
+        // as the ITEM_SIZE of Section is 4, so the index is the multiple of 4
+        // that is to left shift 2 bits
         return (int) (n & (max - 1)) << 2;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
index cab3ce8ea3..163918adbc 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java
@@ -278,6 +278,9 @@ public class ConcurrentOpenHashMap<K, V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<K, V> extends StampedLock {
+        // Each item take up 2 continuous array space.
+        private static final int ITEM_SIZE = 2;
+
         // Keys and values are stored interleaved in the table array
         private volatile Object[] table;
 
@@ -297,7 +300,7 @@ public class ConcurrentOpenHashMap<K, V> {
                 float expandFactor, float shrinkFactor) {
             this.capacity = alignToPowerOfTwo(capacity);
             this.initCapacity = this.capacity;
-            this.table = new Object[2 * this.capacity];
+            this.table = new Object[ITEM_SIZE * this.capacity];
             this.size = 0;
             this.usedBuckets = 0;
             this.autoShrink = autoShrink;
@@ -312,7 +315,11 @@ public class ConcurrentOpenHashMap<K, V> {
         V get(K key, int keyHash) {
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
-            int bucket = signSafeMod(keyHash, capacity);
+
+            // add local variable here, so OutOfBound won't happen
+            Object[] table = this.table;
+            // calculate table.length / 2 as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
 
             try {
                 while (true) {
@@ -334,7 +341,9 @@ public class ConcurrentOpenHashMap<K, V> {
                             stamp = readLock();
                             acquiredLock = true;
 
-                            bucket = signSafeMod(keyHash, capacity);
+                            // update local variable
+                            table = this.table;
+                            bucket = signSafeMod(keyHash, table.length / 
ITEM_SIZE);
                             storedKey = (K) table[bucket];
                             storedValue = (V) table[bucket + 1];
                         }
@@ -347,7 +356,7 @@ public class ConcurrentOpenHashMap<K, V> {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (acquiredLock) {
@@ -400,7 +409,7 @@ public class ConcurrentOpenHashMap<K, V> {
                         }
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
             } finally {
                 if (usedBuckets > resizeThresholdUp) {
@@ -438,7 +447,7 @@ public class ConcurrentOpenHashMap<K, V> {
                         return null;
                     }
 
-                    bucket = (bucket + 2) & (table.length - 1);
+                    bucket = (bucket + ITEM_SIZE) & (table.length - 1);
                 }
 
             } finally {
@@ -496,7 +505,7 @@ public class ConcurrentOpenHashMap<K, V> {
                 }
 
                 // Go through all the buckets for this section
-                for (int bucket = 0; bucket < table.length; bucket += 2) {
+                for (int bucket = 0; bucket < table.length; bucket += 
ITEM_SIZE) {
                     K storedKey = (K) table[bucket];
                     V storedValue = (V) table[bucket + 1];
 
@@ -526,7 +535,7 @@ public class ConcurrentOpenHashMap<K, V> {
             int removedCount = 0;
             try {
                 // Go through all the buckets for this section
-                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= 2) {
+                for (int bucket = 0; size > 0 && bucket < table.length; bucket 
+= ITEM_SIZE) {
                     K storedKey = (K) table[bucket];
                     V storedValue = (V) table[bucket + 1];
 
@@ -564,7 +573,7 @@ public class ConcurrentOpenHashMap<K, V> {
         }
 
         private void cleanBucket(int bucket) {
-            int nextInArray = (bucket + 2) & (table.length - 1);
+            int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
             if (table[nextInArray] == EmptyKey) {
                 table[bucket] = EmptyKey;
                 table[bucket + 1] = null;
@@ -572,13 +581,13 @@ public class ConcurrentOpenHashMap<K, V> {
 
                 // Cleanup all the buckets that were in `DeletedKey` state,
                 // so that we can reduce unnecessary expansions
-                bucket = (bucket - 2) & (table.length - 1);
+                bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 while (table[bucket] == DeletedKey) {
                     table[bucket] = EmptyKey;
                     table[bucket + 1] = null;
                     --usedBuckets;
 
-                    bucket = (bucket - 2) & (table.length - 1);
+                    bucket = (bucket - ITEM_SIZE) & (table.length - 1);
                 }
             } else {
                 table[bucket] = DeletedKey;
@@ -588,10 +597,10 @@ public class ConcurrentOpenHashMap<K, V> {
 
         private void rehash(int newCapacity) {
             // Expand the hashmap
-            Object[] newTable = new Object[2 * newCapacity];
+            Object[] newTable = new Object[ITEM_SIZE * newCapacity];
 
             // Re-hash table
-            for (int i = 0; i < table.length; i += 2) {
+            for (int i = 0; i < table.length; i += ITEM_SIZE) {
                 K storedKey = (K) table[i];
                 V storedValue = (V) table[i + 1];
                 if (storedKey != EmptyKey && storedKey != DeletedKey) {
@@ -609,7 +618,7 @@ public class ConcurrentOpenHashMap<K, V> {
         }
 
         private void shrinkToInitCapacity() {
-            Object[] newTable = new Object[2 * initCapacity];
+            Object[] newTable = new Object[ITEM_SIZE * initCapacity];
 
             table = newTable;
             size = 0;
@@ -634,7 +643,7 @@ public class ConcurrentOpenHashMap<K, V> {
                     return;
                 }
 
-                bucket = (bucket + 2) & (table.length - 1);
+                bucket = (bucket + ITEM_SIZE) & (table.length - 1);
             }
         }
     }
@@ -650,6 +659,8 @@ public class ConcurrentOpenHashMap<K, V> {
     }
 
     static final int signSafeMod(long n, int max) {
+        // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2
+        // that is to left shift 1 bit
         return (int) (n & (max - 1)) << 1;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
index a7f39173d3..a5e12d10fa 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java
@@ -265,16 +265,16 @@ public class ConcurrentOpenHashSet<V> {
         }
 
         boolean contains(V value, int keyHash) {
-            int bucket = keyHash;
-
             long stamp = tryOptimisticRead();
             boolean acquiredLock = false;
 
+            // add local variable here, so OutOfBound won't happen
+            V[] values = this.values;
+            // calculate table.length as capacity to avoid rehash changing 
capacity
+            int bucket = signSafeMod(keyHash, values.length);
+
             try {
                 while (true) {
-                    int capacity = this.capacity;
-                    bucket = signSafeMod(bucket, capacity);
-
                     // First try optimistic locking
                     V storedValue = values[bucket];
 
@@ -292,15 +292,12 @@ public class ConcurrentOpenHashSet<V> {
                             stamp = readLock();
                             acquiredLock = true;
 
+                            // update local variable
+                            values = this.values;
+                            bucket = signSafeMod(keyHash, values.length);
                             storedValue = values[bucket];
                         }
 
-                        if (capacity != this.capacity) {
-                            // There has been a rehashing. We need to restart 
the search
-                            bucket = keyHash;
-                            continue;
-                        }
-
                         if (value.equals(storedValue)) {
                             return true;
                         } else if (storedValue == EmptyValue) {
@@ -308,8 +305,7 @@ public class ConcurrentOpenHashSet<V> {
                             return false;
                         }
                     }
-
-                    ++bucket;
+                    bucket = (bucket + 1) & (values.length - 1);
                 }
             } finally {
                 if (acquiredLock) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index f1372b2894..b1f1b5437d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -41,6 +41,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.LongFunction;
 import org.junit.Test;
 
@@ -187,6 +188,66 @@ public class ConcurrentLongHashMapTest {
         assertTrue(map.capacity() == 8);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(map.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                try {
+                    map.get(1);
+                } catch (Exception e) {
+                    ex.set(e);
+                }
+            });
+        }
+
+        assertNull(map.put(1, "v1"));
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                assertNull(map.put(2, "v2"));
+                assertNull(map.put(3, "v3"));
+                assertEquals(map.capacity(), 8);
+
+                // shrink hashmap
+                assertTrue(map.remove(2, "v2"));
+                assertTrue(map.remove(3, "v3"));
+                assertEquals(map.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testExpandShrinkAndClear() {
         ConcurrentLongHashMap<String> map = 
ConcurrentLongHashMap.<String>newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
index 2c28e88293..1c6bf12c69 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java
@@ -31,9 +31,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 
 /**
@@ -295,6 +297,68 @@ public class ConcurrentLongHashSetTest {
         assertTrue(map.capacity() == 8);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(set.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                while (true) {
+                    try {
+                        set.contains(1);
+                    } catch (Exception e) {
+                        ex.set(e);
+                    }
+                }
+            });
+        }
+
+        assertTrue(set.add(1));
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                assertTrue(set.add(2));
+                assertTrue(set.add(3));
+                assertEquals(set.capacity(), 8);
+
+                // shrink hashmap
+                assertTrue(set.remove(2));
+                assertTrue(set.remove(3));
+                assertEquals(set.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testExpandShrinkAndClear() {
         ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
index aca4694674..8121e3364c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java
@@ -32,10 +32,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction;
 import org.junit.Test;
 
@@ -159,6 +161,67 @@ public class ConcurrentLongLongHashMapTest {
         assertTrue(map.capacity() == 8);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(map.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                while (true) {
+                    try {
+                        map.get(1);
+                    } catch (Exception e) {
+                        ex.set(e);
+                    }
+                }
+            });
+        }
+        map.put(1, 11);
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                map.put(2, 22);
+                map.put(3, 33);
+                assertEquals(map.capacity(), 8);
+
+                // shrink hashmap
+                map.remove(2, 22);
+                map.remove(3, 33);
+                assertEquals(map.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testExpandShrinkAndClear() {
         ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
index b13625fcc9..36605c5b96 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java
@@ -32,9 +32,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import 
org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
 import org.junit.Test;
 
@@ -175,6 +177,68 @@ public class ConcurrentLongLongPairHashMapTest {
         assertTrue(map.capacity() == 8);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(map.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                while (true) {
+                    try {
+                        map.get(1, 1);
+                    } catch (Exception e) {
+                        ex.set(e);
+                    }
+                }
+            });
+        }
+
+        assertTrue(map.put(1, 1, 11, 11));
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                assertTrue(map.put(2, 2, 22, 22));
+                assertTrue(map.put(3, 3, 33, 33));
+                assertEquals(map.capacity(), 8);
+
+                // shrink hashmap
+                assertTrue(map.remove(2, 2, 22, 22));
+                assertTrue(map.remove(3, 3, 33, 33));
+                assertEquals(map.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testExpandShrinkAndClear() {
         ConcurrentLongLongPairHashMap map = 
ConcurrentLongLongPairHashMap.newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
index 3ed17edb57..a7835e6389 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java
@@ -33,11 +33,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiPredicate;
 import java.util.function.Function;
 import org.junit.Test;
@@ -180,6 +182,68 @@ public class ConcurrentOpenHashMapTest {
         assertTrue(map.capacity() == 8);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentOpenHashMap<String, String> map = 
ConcurrentOpenHashMap.<String, String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(map.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                while (true) {
+                    try {
+                        map.get("k3");
+                    } catch (Exception e) {
+                        ex.set(e);
+                    }
+                }
+            });
+        }
+
+        assertNull(map.put("k1", "v1"));
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                assertNull(map.put("k2", "v2"));
+                assertNull(map.put("k3", "v3"));
+                assertEquals(map.capacity(), 8);
+
+                // shrink hashmap
+                assertTrue(map.remove("k2", "v2"));
+                assertTrue(map.remove("k3", "v3"));
+                assertEquals(map.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testExpandShrinkAndClear() {
         ConcurrentOpenHashMap<String, String> map = 
ConcurrentOpenHashMap.<String, String>newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
index 8875be2f57..8840eacb09 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java
@@ -30,9 +30,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Test;
 
 /**
@@ -190,6 +192,68 @@ public class ConcurrentOpenHashSetTest {
         assertTrue(map.capacity() == initCapacity);
     }
 
+    @Test
+    public void testConcurrentExpandAndShrinkAndGet()  throws Throwable {
+        ConcurrentOpenHashSet<String> set = 
ConcurrentOpenHashSet.<String>newBuilder()
+                .expectedItems(2)
+                .concurrencyLevel(1)
+                .autoShrink(true)
+                .mapIdleFactor(0.25f)
+                .build();
+        assertEquals(set.capacity(), 4);
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int readThreads = 16;
+        final int writeThreads = 1;
+        final int n = 1_000;
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        Future<?> future = null;
+        AtomicReference<Exception> ex = new AtomicReference<>();
+
+        for (int i = 0; i < readThreads; i++) {
+            executor.submit(() -> {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                while (true) {
+                    try {
+                        set.contains("k2");
+                    } catch (Exception e) {
+                        ex.set(e);
+                    }
+                }
+            });
+        }
+
+        assertTrue(set.add("k1"));
+        future = executor.submit(() -> {
+            try {
+                barrier.await();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            for (int i = 0; i < n; i++) {
+                // expand hashmap
+                assertTrue(set.add("k2"));
+                assertTrue(set.add("k3"));
+                assertEquals(set.capacity(), 8);
+
+                // shrink hashmap
+                assertTrue(set.remove("k2"));
+                assertTrue(set.remove("k3"));
+                assertEquals(set.capacity(), 4);
+            }
+        });
+
+        future.get();
+        assertTrue(ex.get() == null);
+        // shut down pool
+        executor.shutdown();
+    }
+
     @Test
     public void testReduceUnnecessaryExpansions(){
         ConcurrentOpenHashSet<String> set =

Reply via email to