This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 89102d62434 HADOOP-19712. S3A: Deadlock in
EvaluatingStatisticsMap.entryset() (#8006)
89102d62434 is described below
commit 89102d624342e1860992b9927c15fd411cc067fc
Author: Steve Loughran <[email protected]>
AuthorDate: Thu Oct 16 20:17:07 2025 +0100
HADOOP-19712. S3A: Deadlock in EvaluatingStatisticsMap.entryset() (#8006)
EvaluatingStatisticsMap used parallelStream() to process values; this uses
a fixed thread pool of the JRE and can sporadically deadlock if there's not
enough capacity and the direct/indirect map operations take place in a worker
thread of its own.
* Reworked how entrySet() and values() work, using .forEach()
iterators after reviewing what ConcurrentHashMap does internally;
it does a (safe) traverse.
* Added EvaluatingStatisticsMap.forEach() implementation which maps the
passed
in BiConsumer down to the evaluators.forEach, evaluating each value as it
goes.
* Use that in IOStatisticsBinding.snapshot() code.
Contributed by Steve Loughran
---
.../statistics/impl/EvaluatingStatisticsMap.java | 64 ++++++++++++++-----
.../fs/statistics/impl/IOStatisticsBinding.java | 5 +-
.../fs/statistics/TestIOStatisticsStore.java | 61 +++++++++++++++++++
.../impl/TestEvaluatingStatisticsMap.java | 71 ++++++++++++++++++++++
4 files changed, 182 insertions(+), 19 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
index e4680f2d81f..fc1ab9e2bdd 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EvaluatingStatisticsMap.java
@@ -19,12 +19,16 @@
package org.apache.hadoop.fs.statistics.impl;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
import java.util.function.Function;
-import java.util.stream.Collectors;
/**
* A map of functions which can be invoked to dynamically
@@ -132,11 +136,10 @@ public Set<String> keySet() {
*/
@Override
public Collection<E> values() {
- Set<Entry<String, Function<String, E>>> evalEntries =
- evaluators.entrySet();
- return evalEntries.parallelStream().map((e) ->
- e.getValue().apply(e.getKey()))
- .collect(Collectors.toList());
+ List<E> result = new ArrayList<>(size());
+ evaluators.forEach((k, f) ->
+ result.add(f.apply(k)));
+ return result;
}
/**
@@ -149,22 +152,37 @@ public Map<String, E> snapshot() {
/**
* Creating the entry set forces an evaluation of the functions.
- *
+ * <p>
+ * Not synchronized, though thread safe.
+ * <p>
* This is not a snapshot, so if the evaluators actually return
* references to mutable objects (e.g. a MeanStatistic instance)
* then that value may still change.
*
- * The evaluation may be parallelized.
* @return an evaluated set of values
*/
@Override
- public synchronized Set<Entry<String, E>> entrySet() {
- Set<Entry<String, Function<String, E>>> evalEntries =
- evaluators.entrySet();
- Set<Entry<String, E>> r = evalEntries.parallelStream().map((e) ->
- new EntryImpl<>(e.getKey(), e.getValue().apply(e.getKey())))
- .collect(Collectors.toSet());
- return r;
+ public Set<Entry<String, E>> entrySet() {
+ Set<Entry<String, E>> result = new LinkedHashSet<>(size());
+ evaluators.forEach((key, evaluator) -> {
+ final E current = evaluator.apply(key);
+ result.add(new EntryImpl<>(key, current));
+ });
+ return result;
+ }
+
+
+ /**
+ * Hand down to the foreach iterator of the evaluators, by evaluating as each
+ * entry is processed and passing that in to the {@code action} consumer.
+ * @param action consumer of entries.
+ */
+ @Override
+ public void forEach(final BiConsumer<? super String, ? super E> action) {
+ BiConsumer<String, Function<String, E>> biConsumer = (key, value) -> {
+ action.accept(key, value.apply(key));
+ };
+ evaluators.forEach(biConsumer);
}
/**
@@ -173,7 +191,7 @@ public synchronized Set<Entry<String, E>> entrySet() {
*/
private static final class EntryImpl<E> implements Entry<String, E> {
- private String key;
+ private final String key;
private E value;
@@ -197,6 +215,20 @@ public E setValue(final E val) {
this.value = val;
return val;
}
+
+ @Override
+ public boolean equals(final Object o) {
+ if (!(o instanceof Entry)) {
+ return false;
+ }
+ Entry<String, ?> entry = (Entry<String, ?>) o;
+ return Objects.equals(key, entry.getKey()) && Objects.equals(value,
entry.getValue());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(key);
+ }
}
}
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
index 6a5d01fb3b0..882648abd20 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsBinding.java
@@ -170,9 +170,8 @@ private static <E> Map<String, E> copyMap(
// we have to clone the values so that they aren't
// bound to the original values
dest.clear();
- source.entrySet()
- .forEach(entry ->
- dest.put(entry.getKey(), copyFn.apply(entry.getValue())));
+ source.forEach((key, current) ->
+ dest.put(key, copyFn.apply(current)));
return dest;
}
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
index 778eab8315a..1ef9bb12e81 100644
---
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/TestIOStatisticsStore.java
@@ -18,10 +18,16 @@
package org.apache.hadoop.fs.statistics;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
+
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -174,4 +180,59 @@ public void testNegativeCounterIncrementIgnored() throws
Throwable {
.isEqualTo(2);
}
+ @Test
+ public void testForeach() throws Throwable {
+
+ final IOStatisticsStore store = iostatisticsStore()
+ .withCounters(COUNT, "c1", "c2")
+ .withGauges(GAUGE)
+ .withMinimums(MIN)
+ .withMaximums(MAX)
+ .withMeanStatistics(MEAN)
+ .build();
+ store.setCounter(COUNT, 10);
+ store.setCounter("c1", 1);
+ store.setCounter("c2", 2);
+
+ // get the counter map, which is evaluated on demand
+ final Map<String, Long> counters = store.counters();
+ LongAdder entryCount = new LongAdder();
+ LongAdder sum = new LongAdder();
+
+ // apply the foreach iteration
+ counters.forEach((k, v) -> {
+ entryCount.increment();
+ sum.add(v);
+ });
+ Assertions.assertThat(entryCount.longValue())
+ .describedAs("entry count")
+ .isEqualTo(3);
+ Assertions.assertThat(sum.longValue())
+ .describedAs("sum of values")
+ .isEqualTo(13);
+
+ // keyset is as expected
+ final Set<String> keys = counters.keySet();
+ Assertions.assertThat(keys)
+ .describedAs("keys")
+ .hasSize(3)
+ .contains("c1", "c2", COUNT);
+
+ // values are as expected
+ final Collection<Long> values = counters.values();
+ Assertions.assertThat(values)
+ .describedAs("values")
+ .hasSize(3)
+ .contains(10L, 1L, 2L);
+
+ // entries will all be evaluated
+ final Set<Map.Entry<String, Long>> entries = counters.entrySet();
+ entryCount.reset();
+ sum.reset();
+ entries.forEach(e -> {
+ entryCount.increment();
+ sum.add(e.getValue());
+ });
+ }
+
}
diff --git
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
new file mode 100644
index 00000000000..d1cc0e362fb
--- /dev/null
+++
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/impl/TestEvaluatingStatisticsMap.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.util.Map;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestEvaluatingStatisticsMap extends AbstractHadoopTestBase {
+
+
+ @Test
+ public void testEvaluatingStatisticsMap() {
+ EvaluatingStatisticsMap<String> map = new EvaluatingStatisticsMap<>();
+
+ Assertions.assertThat(map).isEmpty();
+ Assertions.assertThat(map.keySet()).isEmpty();
+ Assertions.assertThat(map.values()).isEmpty();
+ Assertions.assertThat(map.entrySet()).isEmpty();
+
+ // fill the map with the environment vars
+ final Map<String, String> env = System.getenv();
+ env.forEach((k, v) -> map.addFunction(k, any -> v));
+
+ // verify the keys match
+ assertThat(map.keySet())
+ .describedAs("keys")
+ .containsExactlyInAnyOrderElementsOf(env.keySet());
+
+ // and that the values do
+ assertThat(map.values())
+ .describedAs("Evaluated values")
+ .containsExactlyInAnyOrderElementsOf(env.values());
+
+ // now assert that this holds for the entryset.
+ env.forEach((k, v) ->
+ assertThat(map.get(k))
+ .describedAs("looked up key %s", k)
+ .isNotNull()
+ .isEqualTo(v));
+
+ map.forEach((k, v) ->
+ assertThat(env.get(k))
+ .describedAs("env var %s", k)
+ .isNotNull()
+ .isEqualTo(v));
+
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]