lhotari commented on a change in pull request #9787:
URL: https://github.com/apache/pulsar/pull/9787#discussion_r586749878



##########
File path: 
pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMap.java
##########
@@ -354,42 +354,33 @@ void clear() {
         }
 
         public void forEach(BiConsumer<? super K, ? super V> processor) {
-            long stamp = tryOptimisticRead();
-
+            // Take a reference to the data table, if there is a rehashing 
event, we'll be
+            // simply iterating over a snapshot of the data.
             Object[] table = this.table;
-            boolean acquiredReadLock = false;
-
-            try {
 
-                // Validate no rehashing
-                if (!validate(stamp)) {
-                    // Fallback to read lock
-                    stamp = readLock();
-                    acquiredReadLock = true;
-                    table = this.table;
+            // Go through all the buckets for this section. We try to renew 
the stamp only after a validation
+            // error, otherwise we keep going with the same.
+            long stamp = 0;
+            for (int bucket = 0; bucket < table.length; bucket += 2) {
+                if (stamp == 0) {
+                    stamp = tryOptimisticRead();
                 }
 
-                // Go through all the buckets for this section
-                for (int bucket = 0; bucket < table.length; bucket += 2) {
-                    K storedKey = (K) table[bucket];
-                    V storedValue = (V) table[bucket + 1];
-
-                    if (!acquiredReadLock && !validate(stamp)) {
-                        // Fallback to acquiring read lock
-                        stamp = readLock();
-                        acquiredReadLock = true;
+                K storedKey = (K) table[bucket];
+                V storedValue = (V) table[bucket + 1];
 
-                        storedKey = (K) table[bucket];
-                        storedValue = (V) table[bucket + 1];
-                    }
+                if (!validate(stamp)) {
+                    // Fallback to acquiring read lock
+                    stamp = readLock();
 
-                    if (storedKey != DeletedKey && storedKey != EmptyKey) {
-                        processor.accept(storedKey, storedValue);
-                    }
-                }
-            } finally {
-                if (acquiredReadLock) {
+                    storedKey = (K) table[bucket];
+                    storedValue = (V) table[bucket + 1];
                     unlockRead(stamp);

Review comment:
       try-finally for unlock? (Same comment to all unlock locations)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to