This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/maven.git
The following commit(s) were added to refs/heads/master by this push: new b6a84fcac6 [MNG-8014] Fix multithreaded builder (#1386) b6a84fcac6 is described below commit b6a84fcac6689e9b6b8052d0d204f1442bbbe88f Author: Guillaume Nodet <gno...@gmail.com> AuthorDate: Mon Jan 22 18:27:48 2024 +0100 [MNG-8014] Fix multithreaded builder (#1386) --- .../maven/project/DefaultProjectBuilder.java | 105 +++++++++++++++------ .../maven/model/building/DefaultModelBuilder.java | 41 ++++---- .../building/DefaultModelBuildingRequest.java | 4 +- .../building/DefaultTransformerContextBuilder.java | 6 +- .../org/apache/maven/model/building/Graph.java | 26 ++--- .../apache/maven/model/building/ModelCache.java | 4 +- .../repository/internal/DefaultModelCache.java | 63 ++++++++++--- .../maven/internal/xml}/ImmutableCollections.java | 30 +++--- .../org/apache/maven/internal/xml/XmlNodeImpl.java | 7 +- src/mdo/java/ImmutableCollections.java | 14 ++- 10 files changed, 183 insertions(+), 117 deletions(-) diff --git a/maven-core/src/main/java/org/apache/maven/project/DefaultProjectBuilder.java b/maven-core/src/main/java/org/apache/maven/project/DefaultProjectBuilder.java index 76791a82b8..7c99136b6a 100644 --- a/maven-core/src/main/java/org/apache/maven/project/DefaultProjectBuilder.java +++ b/maven-core/src/main/java/org/apache/maven/project/DefaultProjectBuilder.java @@ -25,9 +25,8 @@ import javax.inject.Singleton; import java.io.File; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -88,7 +87,7 @@ import org.slf4j.LoggerFactory; @Singleton public class DefaultProjectBuilder implements ProjectBuilder { public static final String BUILDER_PARALLELISM = "maven.projectBuilder.parallelism"; - public static final int DEFAULT_BUILDER_PARALLELISM = 1; + public static final int DEFAULT_BUILDER_PARALLELISM = Runtime.getRuntime().availableProcessors() / 2 + 1; private final Logger logger = LoggerFactory.getLogger(getClass()); private final ModelBuilder modelBuilder; @@ -208,14 +207,14 @@ public class DefaultProjectBuilder implements ProjectBuilder { private final List<RemoteRepository> repositories; private final ReactorModelPool modelPool; private final TransformerContextBuilder transformerContextBuilder; - private final ForkJoinPool forkJoinPool; + private final ExecutorService executor; BuildSession(ProjectBuildingRequest request, boolean localProjects) { this.request = request; this.session = RepositoryUtils.overlay(request.getLocalRepository(), request.getRepositorySession(), repoSystem); this.repositories = RepositoryUtils.toRepos(request.getRemoteRepositories()); - this.forkJoinPool = new ForkJoinPool(getParallelism(request)); + this.executor = createExecutor(getParallelism(request)); if (localProjects) { this.modelPool = new ReactorModelPool(); this.transformerContextBuilder = modelBuilder.newTransformerContextBuilder(); @@ -225,9 +224,42 @@ public class DefaultProjectBuilder implements ProjectBuilder { } } + ExecutorService createExecutor(int parallelism) { + // + // We need an executor that will not block. + // We can't use work stealing, as we are building a graph + // and this could lead to cycles where a thread waits for + // a task to finish, then execute another one which waits + // for the initial task... + // In order to work around that problem, we override the + // invokeAll method, so that whenever the method is called, + // the pool core size will be incremented before submitting + // all the tasks, then the thread will block waiting for all + // those subtasks to finish. + // This ensures the number of running workers is no more than + // the defined parallism, while making sure the pool will not + // be exhausted + // + return new ThreadPoolExecutor( + parallelism, Integer.MAX_VALUE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()) { + final AtomicInteger parked = new AtomicInteger(); + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + setCorePoolSize(parallelism + parked.incrementAndGet()); + try { + return super.invokeAll(tasks); + } finally { + setCorePoolSize(parallelism + parked.decrementAndGet()); + } + } + }; + } + @Override public void close() { - this.forkJoinPool.shutdownNow(); + this.executor.shutdownNow(); } private int getParallelism(ProjectBuildingRequest request) { @@ -355,18 +387,7 @@ public class DefaultProjectBuilder implements ProjectBuilder { } List<ProjectBuildingResult> build(List<File> pomFiles, boolean recursive) throws ProjectBuildingException { - ForkJoinTask<List<ProjectBuildingResult>> task = forkJoinPool.submit(() -> doBuild(pomFiles, recursive)); - - // ForkJoinTask.getException rewraps the exception in a weird way - // which cause an additional layer of exception, so try to unwrap it - task.quietlyJoin(); - if (task.isCompletedAbnormally()) { - Throwable e = task.getException(); - Throwable c = e.getCause(); - uncheckedThrow(c != null && c.getClass() == e.getClass() ? c : e); - } - - List<ProjectBuildingResult> results = task.getRawResult(); + List<ProjectBuildingResult> results = doBuild(pomFiles, recursive); if (results.stream() .flatMap(r -> r.getProblems().stream()) .anyMatch(p -> p.getSeverity() != ModelProblem.Severity.WARNING)) { @@ -417,14 +438,21 @@ public class DefaultProjectBuilder implements ProjectBuilder { Set<File> aggregatorFiles, boolean root, boolean recursive) { - List<ForkJoinTask<InterimResult>> tasks = pomFiles.stream() - .map(pomFile -> ForkJoinTask.adapt( + List<Callable<InterimResult>> tasks = pomFiles.stream() + .map(pomFile -> ((Callable<InterimResult>) () -> build(projectIndex, pomFile, concat(aggregatorFiles, pomFile), root, recursive))) .collect(Collectors.toList()); - - return ForkJoinTask.invokeAll(tasks).stream() - .map(ForkJoinTask::getRawResult) - .collect(Collectors.toList()); + try { + List<Future<InterimResult>> futures = executor.invokeAll(tasks); + List<InterimResult> list = new ArrayList<>(); + for (Future<InterimResult> future : futures) { + InterimResult interimResult = future.get(); + list.add(interimResult); + } + return list; + } catch (Exception e) { + throw new RuntimeException(e); + } } private <T> Set<T> concat(Set<T> set, T elem) { @@ -571,10 +599,31 @@ public class DefaultProjectBuilder implements ProjectBuilder { } } - return interimResults.parallelStream() - .map(interimResult -> doBuild(projectIndex, interimResult)) - .flatMap(List::stream) + List<Callable<List<ProjectBuildingResult>>> callables = interimResults.stream() + .map(interimResult -> + (Callable<List<ProjectBuildingResult>>) () -> doBuild(projectIndex, interimResult)) .collect(Collectors.toList()); + + try { + List<Future<List<ProjectBuildingResult>>> futures = executor.invokeAll(callables); + return futures.stream() + .map(listFuture -> { + try { + return listFuture.get(); + } catch (InterruptedException e) { + uncheckedThrow(e); + return null; + } catch (ExecutionException e) { + uncheckedThrow(e.getCause()); + return null; + } + }) + .flatMap(List::stream) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + uncheckedThrow(e); + return null; + } } private List<ProjectBuildingResult> doBuild(Map<File, MavenProject> projectIndex, InterimResult interimResult) { diff --git a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuilder.java b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuilder.java index c27242a145..7d832654a1 100644 --- a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuilder.java +++ b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuilder.java @@ -27,8 +27,6 @@ import java.io.IOException; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.Callable; -import java.util.concurrent.ForkJoinTask; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -1870,39 +1868,32 @@ public class DefaultModelBuilder implements ModelBuilder { String version, ModelCacheTag<T> tag, Callable<T> supplier) { - return doWithCache(cache, supplier, s -> cache.computeIfAbsent(groupId, artifactId, version, tag.getName(), s)); + Supplier<T> s = asSupplier(supplier); + if (cache == null) { + return s.get(); + } else { + return cache.computeIfAbsent(groupId, artifactId, version, tag.getName(), s); + } } private static <T> T cache(ModelCache cache, Source source, ModelCacheTag<T> tag, Callable<T> supplier) { - return doWithCache(cache, supplier, s -> cache.computeIfAbsent(source, tag.getName(), s)); - } - - private static <T> T doWithCache( - ModelCache cache, Callable<T> supplier, Function<Supplier<Supplier<T>>, T> asyncSupplierConsumer) { - if (cache != null) { - return asyncSupplierConsumer.apply(() -> { - ForkJoinTask<T> task = ForkJoinTask.adapt(supplier); - task.fork(); - return () -> { - task.quietlyJoin(); - if (task.isCompletedAbnormally()) { - Throwable e = task.getException(); - while (e instanceof RuntimeException && e.getCause() != null) { - e = e.getCause(); - } - uncheckedThrow(e); - } - return task.getRawResult(); - }; - }); + Supplier<T> s = asSupplier(supplier); + if (cache == null) { + return s.get(); } else { + return cache.computeIfAbsent(source, tag.getName(), s); + } + } + + private static <T> Supplier<T> asSupplier(Callable<T> supplier) { + return () -> { try { return supplier.call(); } catch (Exception e) { uncheckedThrow(e); return null; } - } + }; } static <T extends Throwable> void uncheckedThrow(Throwable t) throws T { diff --git a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuildingRequest.java b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuildingRequest.java index 9348098075..e19a72caef 100644 --- a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuildingRequest.java +++ b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultModelBuildingRequest.java @@ -252,8 +252,8 @@ public class DefaultModelBuildingRequest implements ModelBuildingRequest { public DefaultModelBuildingRequest setSystemProperties(Properties systemProperties) { if (systemProperties != null) { this.systemProperties = new Properties(); - synchronized (systemProperties) { // avoid concurrent modification if someone else sets/removes an unrelated - // system property + // avoid concurrent modification if someone else sets/removes an unrelated system property + synchronized (systemProperties) { this.systemProperties.putAll(systemProperties); } } else { diff --git a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultTransformerContextBuilder.java b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultTransformerContextBuilder.java index 98e8be6cfd..15799e1f73 100644 --- a/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultTransformerContextBuilder.java +++ b/maven-model-builder/src/main/java/org/apache/maven/model/building/DefaultTransformerContextBuilder.java @@ -46,6 +46,8 @@ class DefaultTransformerContextBuilder implements TransformerContextBuilder { private final Map<String, Set<FileModelSource>> mappedSources = new ConcurrentHashMap<>(64); + private volatile boolean fullReactorLoaded; + DefaultTransformerContextBuilder(DefaultModelBuilder defaultModelBuilder) { this.defaultModelBuilder = defaultModelBuilder; this.context = new DefaultTransformerContext(defaultModelBuilder.getModelProcessor()); @@ -60,8 +62,6 @@ class DefaultTransformerContextBuilder implements TransformerContextBuilder { DefaultModelProblemCollector problems = (DefaultModelProblemCollector) collector; return new TransformerContext() { - private volatile boolean fullReactorLoaded; - @Override public Path locate(Path path) { return context.locate(path); @@ -119,7 +119,7 @@ class DefaultTransformerContextBuilder implements TransformerContextBuilder { private void loadFullReactor() { if (!fullReactorLoaded) { - synchronized (this) { + synchronized (DefaultTransformerContextBuilder.this) { if (!fullReactorLoaded) { doLoadFullReactor(); fullReactorLoaded = true; diff --git a/maven-model-builder/src/main/java/org/apache/maven/model/building/Graph.java b/maven-model-builder/src/main/java/org/apache/maven/model/building/Graph.java index cded3a47d1..14656a7e07 100644 --- a/maven-model-builder/src/main/java/org/apache/maven/model/building/Graph.java +++ b/maven-model-builder/src/main/java/org/apache/maven/model/building/Graph.java @@ -18,26 +18,20 @@ */ package org.apache.maven.model.building; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; class Graph { - final Map<String, List<String>> graph = new LinkedHashMap<>(); + final Map<String, Set<String>> graph = new LinkedHashMap<>(); synchronized void addEdge(String from, String to) throws CycleDetectedException { - graph.computeIfAbsent(from, l -> new ArrayList<>()).add(to); - List<String> cycle = visitCycle(graph, Collections.singleton(to), new HashMap<>(), new LinkedList<>()); - if (cycle != null) { - // remove edge which introduced cycle - throw new CycleDetectedException( - "Edge between '" + from + "' and '" + to + "' introduces to cycle in the graph", cycle); + if (graph.computeIfAbsent(from, l -> new HashSet<>()).add(to)) { + List<String> cycle = visitCycle(graph, Collections.singleton(to), new HashMap<>(), new LinkedList<>()); + if (cycle != null) { + // remove edge which introduced cycle + throw new CycleDetectedException( + "Edge between '" + from + "' and '" + to + "' introduces to cycle in the graph", cycle); + } } } @@ -47,7 +41,7 @@ class Graph { } private static List<String> visitCycle( - Map<String, List<String>> graph, + Map<String, Set<String>> graph, Collection<String> children, Map<String, DfsState> stateMap, LinkedList<String> cycle) { diff --git a/maven-model-builder/src/main/java/org/apache/maven/model/building/ModelCache.java b/maven-model-builder/src/main/java/org/apache/maven/model/building/ModelCache.java index cc9d07e5fc..e5538f7d6d 100644 --- a/maven-model-builder/src/main/java/org/apache/maven/model/building/ModelCache.java +++ b/maven-model-builder/src/main/java/org/apache/maven/model/building/ModelCache.java @@ -32,7 +32,7 @@ import org.apache.maven.building.Source; */ public interface ModelCache { - <T> T computeIfAbsent(String groupId, String artifactId, String version, String tag, Supplier<Supplier<T>> data); + <T> T computeIfAbsent(String groupId, String artifactId, String version, String tag, Supplier<T> data); - <T> T computeIfAbsent(Source path, String tag, Supplier<Supplier<T>> data); + <T> T computeIfAbsent(Source path, String tag, Supplier<T> data); } diff --git a/maven-resolver-provider/src/main/java/org/apache/maven/repository/internal/DefaultModelCache.java b/maven-resolver-provider/src/main/java/org/apache/maven/repository/internal/DefaultModelCache.java index ec2f078a06..004fb6ef05 100644 --- a/maven-resolver-provider/src/main/java/org/apache/maven/repository/internal/DefaultModelCache.java +++ b/maven-resolver-provider/src/main/java/org/apache/maven/repository/internal/DefaultModelCache.java @@ -57,23 +57,19 @@ public class DefaultModelCache implements ModelCache { } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public <T> T computeIfAbsent( - String groupId, String artifactId, String version, String tag, Supplier<Supplier<T>> data) { - Object obj = computeIfAbsent(new GavCacheKey(groupId, artifactId, version, tag), (Supplier) data); - return (T) obj; + @SuppressWarnings({"unchecked"}) + public <T> T computeIfAbsent(String groupId, String artifactId, String version, String tag, Supplier<T> data) { + return (T) computeIfAbsent(new GavCacheKey(groupId, artifactId, version, tag), data); } @Override - @SuppressWarnings({"unchecked", "rawtypes"}) - public <T> T computeIfAbsent(Source path, String tag, Supplier<Supplier<T>> data) { - Object obj = computeIfAbsent(new SourceCacheKey(path, tag), (Supplier) data); - return (T) obj; + @SuppressWarnings({"unchecked"}) + public <T> T computeIfAbsent(Source path, String tag, Supplier<T> data) { + return (T) computeIfAbsent(new SourceCacheKey(path, tag), data); } - protected Object computeIfAbsent(Object key, Supplier<Supplier<?>> data) { - Supplier<?> s = cache.computeIfAbsent(key, k -> data.get()); - return s != null ? s.get() : null; + protected Object computeIfAbsent(Object key, Supplier<?> data) { + return cache.computeIfAbsent(key, k -> new CachingSupplier<>(data)).get(); } static class GavCacheKey { @@ -168,4 +164,47 @@ public class DefaultModelCache implements ModelCache { return hash; } } + + static class CachingSupplier<T> implements Supplier<T> { + final Supplier<T> supplier; + volatile Object value; + + CachingSupplier(Supplier<T> supplier) { + this.supplier = supplier; + } + + @Override + @SuppressWarnings({"unchecked", "checkstyle:InnerAssignment"}) + public T get() { + Object v; + if ((v = value) == null) { + synchronized (this) { + if ((v = value) == null) { + try { + v = value = supplier.get(); + } catch (Exception e) { + v = value = new AltRes(e); + } + } + } + } + if (v instanceof AltRes) { + uncheckedThrow(((AltRes) v).t); + } + return (T) v; + } + + static class AltRes { + final Throwable t; + + AltRes(Throwable t) { + this.t = t; + } + } + } + + @SuppressWarnings("unchecked") + static <T extends Throwable> void uncheckedThrow(Throwable t) throws T { + throw (T) t; // rely on vacuous cast + } } diff --git a/src/mdo/java/ImmutableCollections.java b/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/ImmutableCollections.java similarity index 96% copy from src/mdo/java/ImmutableCollections.java copy to maven-xml-impl/src/main/java/org/apache/maven/internal/xml/ImmutableCollections.java index fe4faef8d9..506424874b 100644 --- a/src/mdo/java/ImmutableCollections.java +++ b/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/ImmutableCollections.java @@ -16,22 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package ${package}; +package org.apache.maven.internal.xml; import java.io.Serializable; -import java.util.AbstractList; -import java.util.AbstractMap; -import java.util.AbstractSet; -import java.util.Collection; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Properties; -import java.util.RandomAccess; -import java.util.Set; +import java.util.*; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -485,11 +473,15 @@ class ImmutableCollections { private final Object[] entries; private MapN(Map<K, V> map) { - entries = map != null - ? map.entrySet().stream() - .map(e -> new SimpleImmutableEntry<>(e.getKey(), e.getValue())) - .toArray() - : new Object[0]; + if (map != null) { + entries = new Object[map.size()]; + int idx = 0; + for (Entry<K, V> e : map.entrySet()) { + entries[idx++] = new SimpleImmutableEntry<>(e.getKey(), e.getValue()); + } + } else { + entries = new Object[0]; + } } @Override diff --git a/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/XmlNodeImpl.java b/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/XmlNodeImpl.java index 1210b81320..0be92e9369 100644 --- a/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/XmlNodeImpl.java +++ b/maven-xml-impl/src/main/java/org/apache/maven/internal/xml/XmlNodeImpl.java @@ -23,7 +23,6 @@ import javax.xml.stream.XMLStreamException; import java.io.Serializable; import java.io.StringWriter; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -87,10 +86,8 @@ public class XmlNodeImpl implements Serializable, XmlNode { this.namespaceUri = namespaceUri == null ? "" : namespaceUri; this.name = Objects.requireNonNull(name); this.value = value; - this.attributes = - attributes != null ? Collections.unmodifiableMap(new HashMap<>(attributes)) : Collections.emptyMap(); - this.children = - children != null ? Collections.unmodifiableList(new ArrayList<>(children)) : Collections.emptyList(); + this.attributes = ImmutableCollections.copy(attributes); + this.children = ImmutableCollections.copy(children); this.location = location; } diff --git a/src/mdo/java/ImmutableCollections.java b/src/mdo/java/ImmutableCollections.java index fe4faef8d9..4b69b4d3f5 100644 --- a/src/mdo/java/ImmutableCollections.java +++ b/src/mdo/java/ImmutableCollections.java @@ -485,11 +485,15 @@ class ImmutableCollections { private final Object[] entries; private MapN(Map<K, V> map) { - entries = map != null - ? map.entrySet().stream() - .map(e -> new SimpleImmutableEntry<>(e.getKey(), e.getValue())) - .toArray() - : new Object[0]; + if (map != null) { + entries = new Object[map.size()]; + int idx = 0; + for (Map.Entry<K, V> e : map.entrySet()) { + entries[idx++] = new SimpleImmutableEntry<>(e.getKey(), e.getValue()); + } + } else { + entries = new Object[0]; + } } @Override