This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 6146ca9  Fixed race condition during expansion of concurrent open hash 
maps (#2387)
6146ca9 is described below

commit 6146ca9cc4c307bf4b19915ca6f13280efd7d428
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Aug 17 08:47:30 2018 -0700

    Fixed race condition during expansion of concurrent open hash maps (#2387)
    
    ### Motivation
    
    Porting same fix as https://github.com/apache/bookkeeper/pull/1607 to 
correct issue reported on https://github.com/apache/bookkeeper/issues/1606.
    
    There is a race condition in the concurrent open hash maps implementation. 
The race happens when the maps gets re-hashed after the expansion and the new 
arrays are substituting the old ones.
    
    The race itself is that a thread doing a `get()` on the map is first 
checking the current capacity of the map, uses that to get the bucket and then 
tries to do optimistic read of the value in that bucket.
    
    This assumes `capacity` update is visible only after the `values` array is 
already swapped, but that is not always the case in current code.
    
    ### Changes
    
     * Use `volatile` qualifier for `capacity` and `values` arrays to ensure 
ordering of memory read is respected by compiler
     * In rehashing, update `capacity` after `values` where it was not already 
the case
---
 .../util/collections/ConcurrentLongHashMap.java    |  8 +--
 .../util/collections/ConcurrentLongPairSet.java    | 15 +++---
 .../util/collections/ConcurrentOpenHashMap.java    |  6 +--
 .../util/collections/ConcurrentOpenHashSet.java    |  6 +--
 .../collections/ConcurrentLongHashMapTest.java     | 57 ++++++++++++++++++++--
 5 files changed, 72 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
index fcb0c10..60c24c0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMap.java
@@ -30,7 +30,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Map from long to an Object.
- * 
+ *
  * Provides similar methods as a ConcurrentMap<long,Object> with 2 differences:
  * <ol>
  * <li>No boxing/unboxing from long -> Long
@@ -187,10 +187,10 @@ public class ConcurrentLongHashMap<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private long[] keys;
-        private V[] values;
+        private volatile long[] keys;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
index 74d4314..4634b40 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSet.java
@@ -26,7 +26,6 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.StampedLock;
-import java.util.function.Predicate;
 
 /**
  * Concurrent hash set where values are composed of pairs of longs.
@@ -163,11 +162,11 @@ public class ConcurrentLongPairSet {
 
     /**
      * Removes all of the elements of this collection that satisfy the given 
predicate.
-     * 
+     *
      * @param filter
      *            a predicate which returns {@code true} for elements to be 
removed
      * @return {@code true} if any elements were removed
-     * 
+     *
      * @return number of removed values
      */
     public int removeIf(LongPairPredicate filter) {
@@ -209,9 +208,9 @@ public class ConcurrentLongPairSet {
     @SuppressWarnings("serial")
     private static final class Section extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private long[] table;
+        private volatile long[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
@@ -449,9 +448,11 @@ public class ConcurrentLongPairSet {
                 }
             }
 
-            capacity = newCapacity;
             table = newTable;
             usedBuckets = size;
+            // Capacity needs to be updated after the values, so that we won't 
see
+            // a capacity value bigger than the actual array size
+            capacity = newCapacity;
             resizeThreshold = (int) (capacity * SetFillFactor);
         }
 
@@ -532,7 +533,7 @@ public class ConcurrentLongPairSet {
             }
         }
     }
-    
+
     @Override
     public String toString() {
         StringBuilder sb = new StringBuilder();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
index 585471c..94f64de 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
@@ -31,7 +31,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Concurrent hash map
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open 
hash map with linear probing, no node
  * allocations are required to store the values
  *
@@ -180,9 +180,9 @@ public class ConcurrentOpenHashMap<K, V> {
     @SuppressWarnings("serial")
     private static final class Section<K, V> extends StampedLock {
         // Keys and values are stored interleaved in the table array
-        private Object[] table;
+        private volatile Object[] table;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
index 93ca6e8..2f27913 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSet.java
@@ -32,7 +32,7 @@ import com.google.common.collect.Lists;
 
 /**
  * Concurrent hash set
- * 
+ *
  * Provides similar methods as a ConcurrentMap<K,V> but since it's an open 
hash map with linear probing, no node
  * allocations are required to store the values
  *
@@ -175,9 +175,9 @@ public class ConcurrentOpenHashSet<V> {
     // A section is a portion of the hash map that is covered by a single
     @SuppressWarnings("serial")
     private static final class Section<V> extends StampedLock {
-        private V[] values;
+        private volatile V[] values;
 
-        private int capacity;
+        private volatile int capacity;
         private volatile int size;
         private int usedBuckets;
         private int resizeThreshold;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
index c4215f3..e4cbd47 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java
@@ -24,12 +24,15 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,11 +40,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.LongFunction;
 
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-
 public class ConcurrentLongHashMapTest {
 
     @Test
@@ -235,6 +235,57 @@ public class ConcurrentLongHashMapTest {
     }
 
     @Test
+    public void stressConcurrentInsertionsAndReads() throws Throwable {
+        ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>(4, 1);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        final int writeThreads = 16;
+        final int readThreads = 16;
+        final int n = 1_000_000;
+        String value = "value";
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.put(key, value);
+                }
+            }));
+        }
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+                    map.get(key);
+                }
+            }));
+        }
+        for (Future<?> future : futures) {
+            future.get();
+        }
+        assertEquals(map.size(), n * writeThreads);
+        executor.shutdown();
+    }
+
+    @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map = new ConcurrentLongHashMap<>();
 

Reply via email to