Cleaner layout for assembler
Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/4c03c05d Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/4c03c05d Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/4c03c05d Branch: refs/heads/ThreadPerGraphDataset Commit: 4c03c05decbd32e361a6e17a0434bd5caf059e8b Parents: cea2f65 Author: ajs6f <[email protected]> Authored: Sat Jan 7 21:14:30 2017 -0500 Committer: ajs6f <[email protected]> Committed: Tue Feb 14 09:39:03 2017 -0500 ---------------------------------------------------------------------- .../WriterPerGraphDatasetAssembler.java | 25 +++++++++++--------- 1 file changed, 14 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/4c03c05d/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java index 255e24c..cbb3582 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java @@ -26,7 +26,7 @@ import static org.apache.jena.sparql.util.graph.GraphUtils.multiValueResource; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.function.Consumer; import org.apache.jena.assembler.Assembler; import org.apache.jena.query.Dataset; @@ -60,16 +60,7 @@ public class WriterPerGraphDatasetAssembler extends TransactionalInMemDatasetAss // take advantage of writer-per-graph to load in parallel, one thread per named graph ExecutorService loaderThreadPool = newFixedThreadPool(namedGraphs.size()); try { - loaderThreadPool - .submit(() -> namedGraphs.stream().parallel().forEach(namedGraphResource -> { - dataset.begin(WRITE); - try { - loadNamedGraph(dataset, namedGraphResource); - dataset.commit(); - } finally { - dataset.end(); - } - })).get(); + loaderThreadPool.submit(() -> namedGraphs.parallelStream().forEach(loadIntoDataset(dataset))).get(); } catch (InterruptedException | ExecutionException e) { loaderThreadPool.shutdownNow(); throw new JenaException(e); @@ -79,6 +70,18 @@ public class WriterPerGraphDatasetAssembler extends TransactionalInMemDatasetAss // load using only this thread, default mode namedGraphs.forEach(namedGraphResource -> loadNamedGraph(dataset, namedGraphResource)); } + + private static Consumer<Resource> loadIntoDataset(Dataset ds) { + return namedGraphResource -> { + ds.begin(WRITE); + try { + loadNamedGraph(ds, namedGraphResource); + ds.commit(); + } finally { + ds.end(); + } + }; + } private static void loadNamedGraph(Dataset dataset, Resource namedGraphResource) { final String graphName = getAsStringValue(namedGraphResource, pGraphName);
