This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b429c790e14 HADOOP-19712. S3A: Deadlock in 
EvaluatingStatisticsMap.entryset() (#8006)
b429c790e14 is described below

commit b429c790e140c7d66d272b2c36fb1a7a46695f38
Author: Steve Loughran <[email protected]>
AuthorDate: Thu Oct 16 20:17:07 2025 +0100

    HADOOP-19712. S3A: Deadlock in EvaluatingStatisticsMap.entryset() (#8006)
    
    
    EvaluatingStatisticsMap used parallelStream() to process values; this uses 
a fixed thread pool of the JRE and can sporadically deadlock if there's not 
enough capacity and the direct/indirect map operations take place in a worker 
thread of its own.
    
    * Reworked how entrySet() and values() work, using .forEach()
    iterators after reviewing what ConcurrentHashMap does internally;
    it does a (safe) traverse.
    
    * Added EvaluatingStatisticsMap.forEach() implementation which maps the 
passed
    in BiConsumer down to the evaluators.forEach, evaluating each value as it 
goes.
    
    * Use that in IOStatisticsBinding.snapshot() code.
    
    Contributed by Steve Loughran
---
 .../statistics/impl/EvaluatingStatisticsMap.java   | 64 ++++++++++++++-----
 .../fs/statistics/impl/IOStatisticsBinding.java    |  5 +-
 .../fs/statistics/TestIOStatisticsStore.java       | 61 +++++++++++++++++++
 .../impl/TestEvaluatingStatisticsMap.java          | 71 ++++++++++++++++++++++
 4 files changed, 182 insertions(+), 19 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
index e4680f2d81f..fc1ab9e2bdd 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
@@ -19,12 +19,16 @@
 package org.apache.hadoop.fs.statistics.impl;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /**
  * A map of functions which can be invoked to dynamically
@@ -132,11 +136,10 @@ public Set<String> keySet() {
    */
   @Override
   public Collection<E> values() {
-    Set<Entry<String, Function<String, E>>> evalEntries =
-        evaluators.entrySet();
-    return evalEntries.parallelStream().map((e) ->
-        e.getValue().apply(e.getKey()))
-        .collect(Collectors.toList());
+    List<E> result = new ArrayList<>(size());
+    evaluators.forEach((k, f) ->
+        result.add(f.apply(k)));
+    return result;
   }
 
   /**
@@ -149,22 +152,37 @@ public Map<String, E> snapshot() {
 
   /**
    * Creating the entry set forces an evaluation of the functions.
-   *
+   * <p>
+   * Not synchronized, though thread safe.
+   * <p>
    * This is not a snapshot, so if the evaluators actually return
    * references to mutable objects (e.g. a MeanStatistic instance)
    * then that value may still change.
    *
-   * The evaluation may be parallelized.
    * @return an evaluated set of values
    */
   @Override
-  public synchronized Set<Entry<String, E>> entrySet() {
-    Set<Entry<String, Function<String, E>>> evalEntries =
-        evaluators.entrySet();
-    Set<Entry<String, E>> r = evalEntries.parallelStream().map((e) ->
-        new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey())))
-        .collect(Collectors.toSet());
-    return r;
+  public Set<Entry<String, E>> entrySet() {
+    Set<Entry<String, E>> result = new LinkedHashSet<>(size());
+    evaluators.forEach((key, evaluator) -> {
+      final E current = evaluator.apply(key);
+      result.add(new EntryImpl<>(key, current));
+    });
+    return result;
+  }
+
+
+  /**
+   * Hand down to the foreach iterator of the evaluators, by evaluating as each
+   * entry is processed and passing that in to the {@code action} consumer.
+   * @param action consumer of entries.
+   */
+  @Override
+  public void forEach(final BiConsumer<? super String, ? super E> action) {
+    BiConsumer<String, Function<String, E>> biConsumer = (key, value) -> {
+      action.accept(key, value.apply(key));
+    };
+    evaluators.forEach(biConsumer);
   }
 
   /**
@@ -173,7 +191,7 @@ public synchronized Set<Entry<String, E>> entrySet() {
    */
   private static final class EntryImpl<E> implements Entry<String, E> {
 
-    private String key;
+    private final String key;
 
     private E value;
 
@@ -197,6 +215,20 @@ public E setValue(final E val) {
       this.value = val;
       return val;
     }
+
+    @Override
+    public boolean equals(final Object o) {
+      if (!(o instanceof Entry)) {
+        return false;
+      }
+      Entry<String, ?> entry = (Entry<String, ?>) o;
+      return Objects.equals(key, entry.getKey()) && Objects.equals(value, 
entry.getValue());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key);
+    }
   }
 
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
index 6a5d01fb3b0..882648abd20 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
@@ -170,9 +170,8 @@ private static <E> Map<String, E> copyMap(
     // we have to clone the values so that they aren't
     // bound to the original values
     dest.clear();
-    source.entrySet()
-        .forEach(entry ->
-            dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
+    source.forEach((key, current) ->
+            dest.put(key, copyFn.apply(current)));
     return dest;
   }
 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
index 200c87e4332..9963ff9456c 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.fs.statistics;
 
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws 
Throwable {
         .isEqualTo(2);
   }
 
+  @Test
+  public void testForeach() throws Throwable {
+
+    final IOStatisticsStore store = iostatisticsStore()
+        .withCounters(COUNT, "c1", "c2")
+        .withGauges(GAUGE)
+        .withMinimums(MIN)
+        .withMaximums(MAX)
+        .withMeanStatistics(MEAN)
+        .build();
+    store.setCounter(COUNT, 10);
+    store.setCounter("c1", 1);
+    store.setCounter("c2", 2);
+
+    // get the counter map, which is evaluated on demand
+    final Map<String, Long> counters = store.counters();
+    LongAdder entryCount = new LongAdder();
+    LongAdder sum = new LongAdder();
+
+    // apply the foreach iteration
+    counters.forEach((k, v) -> {
+      entryCount.increment();
+      sum.add(v);
+    });
+    Assertions.assertThat(entryCount.longValue())
+        .describedAs("entry count")
+        .isEqualTo(3);
+    Assertions.assertThat(sum.longValue())
+        .describedAs("sum of values")
+        .isEqualTo(13);
+
+    // keyset is as expected
+    final Set<String> keys = counters.keySet();
+    Assertions.assertThat(keys)
+        .describedAs("keys")
+        .hasSize(3)
+        .contains("c1", "c2", COUNT);
+
+    // values are as expected
+    final Collection<Long> values = counters.values();
+    Assertions.assertThat(values)
+        .describedAs("values")
+        .hasSize(3)
+        .contains(10L, 1L, 2L);
+
+    // entries will all be evaluated
+    final Set<Map.Entry<String, Long>> entries = counters.entrySet();
+    entryCount.reset();
+    sum.reset();
+    entries.forEach(e -> {
+      entryCount.increment();
+      sum.add(e.getValue());
+    });
+  }
+
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
new file mode 100644
index 00000000000..d1cc0e362fb
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestEvaluatingStatisticsMap extends AbstractHadoopTestBase {
+
+
+  @Test
+  public void testEvaluatingStatisticsMap() {
+    EvaluatingStatisticsMap<String> map = new EvaluatingStatisticsMap<>();
+
+    Assertions.assertThat(map).isEmpty();
+    Assertions.assertThat(map.keySet()).isEmpty();
+    Assertions.assertThat(map.values()).isEmpty();
+    Assertions.assertThat(map.entrySet()).isEmpty();
+
+    // fill the map with the environment vars
+    final Map<String, String> env = System.getenv();
+    env.forEach((k, v) -> map.addFunction(k, any -> v));
+
+    // verify the keys match
+    assertThat(map.keySet())
+        .describedAs("keys")
+        .containsExactlyInAnyOrderElementsOf(env.keySet());
+
+    // and that the values do
+    assertThat(map.values())
+        .describedAs("Evaluated values")
+        .containsExactlyInAnyOrderElementsOf(env.values());
+
+    // now assert that this holds for the entryset.
+    env.forEach((k, v) ->
+        assertThat(map.get(k))
+            .describedAs("looked up key %s", k)
+            .isNotNull()
+            .isEqualTo(v));
+
+    map.forEach((k, v) ->
+        assertThat(env.get(k))
+            .describedAs("env var %s", k)
+            .isNotNull()
+            .isEqualTo(v));
+
+
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to