More tests, renaming central type more accurately
Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/ade1e941 Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/ade1e941 Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/ade1e941 Branch: refs/heads/ThreadPerGraphDataset Commit: ade1e9418e6cec37ffaf710e4b50024449025864 Parents: 2727c6b Author: ajs6f <[email protected]> Authored: Sun Jan 8 11:56:09 2017 -0500 Committer: ajs6f <[email protected]> Committed: Tue Feb 14 09:39:04 2017 -0500 ---------------------------------------------------------------------- .../sparql/core/DatasetGraphGraphPerTxn.java | 355 +++++++++++++++++++ .../core/DatasetGraphPerGraphLocking.java | 318 ----------------- .../sparql/core/assembler/AssemblerUtils.java | 2 +- .../assembler/GraphPerTxnDatasetAssembler.java | 96 +++++ .../WriterPerGraphDatasetAssembler.java | 96 ----- .../java/org/apache/jena/sparql/TC_General.java | 4 +- .../jena/sparql/core/pergraph/BasicTest.java | 4 +- .../sparql/core/pergraph/FindPatternsTest.java | 4 +- .../jena/sparql/core/pergraph/FindTest.java | 4 +- .../jena/sparql/core/pergraph/LockTest.java | 4 +- .../core/pergraph/MultithreadingTest.java | 59 ++- .../pergraph/TS_DatasetGraphGraphPerTxn.java | 28 ++ .../TS_DatasetGraphPerGraphLocking.java | 28 -- .../core/pergraph/TransactionLifecycleTest.java | 4 +- .../jena/sparql/core/pergraph/ViewTest.java | 4 +- 15 files changed, 538 insertions(+), 472 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphGraphPerTxn.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphGraphPerTxn.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphGraphPerTxn.java new file mode 100644 index 0000000..3476f6f --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphGraphPerTxn.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.jena.sparql.core; + +import static java.util.stream.Collectors.toList; +import static org.apache.jena.atlas.iterator.Iter.filter; +import static org.apache.jena.graph.GraphUtil.addInto; +import static org.apache.jena.query.ReadWrite.READ; +import static org.apache.jena.query.ReadWrite.WRITE; +import static org.apache.jena.sparql.core.Quad.defaultGraphIRI; +import static org.apache.jena.sparql.core.Quad.isDefaultGraph; +import static org.apache.jena.util.iterator.WrappedIterator.createNoRemove; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Node; +import org.apache.jena.graph.Triple; +import org.apache.jena.graph.impl.GraphBase; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.sparql.JenaTransactionException; +import org.apache.jena.sparql.core.mem.TransactionalComponent; +import org.apache.jena.sparql.core.mem.TriTable; +import org.apache.jena.util.iterator.ExtendedIterator; + +public class DatasetGraphGraphPerTxn extends DatasetGraphCollection { + + private final Map<Node, LockableGraph> graphs = new ConcurrentHashMap<>(); + + /** + * The graph being used as part of a WRITE transaction. + */ + private final ThreadLocal<LockableGraph> graphInTransaction = new ThreadLocal<>(); + + private LockableGraph graphInTransaction() { + return graphInTransaction.get(); + } + + private final ThreadLocal<ReadWrite> transactionType = new ThreadLocal<>(); + + private ReadWrite transactionType() { + return transactionType.get(); + } + + @Override + public Iterator<Node> listGraphNodes() { + return filter(graphs.keySet().iterator(), gn -> !isDefaultGraph(gn)); + } + + @Override + public boolean supportsTransactions() { + return true; + } + + @Override + public boolean isInTransaction() { + return transactionType() != null; + } + + @Override + public void begin(ReadWrite readWrite) { + if (isInTransaction()) throw new JenaTransactionException("Cannot nest transactions!"); + transactionType.set(readWrite); + } + + private void actOnGraphInTransaction(Consumer<LockableGraph> action) { + if (graphInTransaction() != null) action.accept(graphInTransaction()); + } + + @Override + public void commit() { + if (!isInTransaction()) throw new JenaTransactionException("Cannot commit outside a transaction!"); + actOnGraphInTransaction(LockableGraph::commit); + cleanAfterTransaction(); + } + + @Override + public void abort() { + if (!isInTransaction()) throw new JenaTransactionException("Cannot abort outside a transaction!"); + actOnGraphInTransaction(LockableGraph::abort); + cleanAfterTransaction(); + } + + @Override + public void end() { + cleanAfterTransaction(); + } + + private void cleanAfterTransaction() { + graphInTransaction.remove(); + transactionType.remove(); + } + + @Override + public Graph getDefaultGraph() { + return getGraph(defaultGraphIRI); + } + + @Override + public void setDefaultGraph(final Graph g) { + addGraph(defaultGraphIRI, g); + } + + @Override + public void add(Quad quad) { + mutate(super::add, quad); + } + + @Override + public void delete(Quad quad) { + mutate(super::delete, quad); + } + + @Override + public void addGraph(Node graphName, Graph triples) { + wholeGraphAction(graphName, g -> addInto(getGraph(g), triples)); + } + + @Override + public void removeGraph(Node graphName) { + wholeGraphAction(graphName, graphs::remove); + } + + /** + * Perform an action on a graph after clearing it. + * + * @param graphName the graph to mutate + * @param action the action to take with that whole graph + */ + private void wholeGraphAction(Node graphName, Consumer<Node> action) { + mutate(gN -> { + getGraph(gN).clear(); + action.accept(gN); + }, graphName, graphName); + } + + private void mutate(Consumer<Quad> action, Quad quad) { + mutate(action, quad, quad.getGraph()); + } + + /** + * Take an action inside a transaction for a given graph. Assumes that {@code action} and {@code payload} are + * appropriately related to {@code graphName}. + * + * @param action action to take + * @param payload payload for action + * @param graphName graph with which to act + */ + private <T> void mutate(Consumer<T> action, T payload, Node graphName) { + if (isInTransaction()) { + switch (transactionType()) { + case READ: + throw new JenaTransactionException("Cannot write during a READ transaction!"); + case WRITE: + // are we trying to mutate the current graph-in-hand? + if (graphInTransaction() != null && graphName.equals(graphInTransaction().graphName())) + action.accept(payload); + // are we starting work with a newly-chosen graph? + else if (graphInTransaction() == null) { + LockableGraph graph = graphs.computeIfAbsent(graphName, LockableGraph::new); + graph.begin(transactionType()); + graphInTransaction.set(graph); + action.accept(payload); + } else // we already have a different graph in hand and cannot use another! + throw new JenaTransactionRegionException("Cannot work with more than one graph per transaction!"); + } + + } + // we must work without a transaction + action.accept(payload); + } + + @Override + public void clear() { + if (!isInTransaction()) _clear(graphs.values()); + else switch (transactionType()) { + case READ: + throw new JenaTransactionException("Cannot clear during a READ transaction!"); + case WRITE: + // avoid trying to nest a transaction on a graph we may have in hand + _clear(graphs.values().stream().filter(g -> !g.equals(graphInTransaction())).collect(toList())); + actOnGraphInTransaction(LockableGraph::clear); + } + + } + + private static void _clear(Collection<LockableGraph> graphs) { + try { + graphs.forEach(LockableGraph::beginWrite); + graphs.forEach(LockableGraph::clear); + graphs.forEach(LockableGraph::commit); + } finally { + graphs.forEach(LockableGraph::end); + } + } + + @Override + public Graph getGraph(final Node graphName) { + return graphs.computeIfAbsent(graphName, LockableGraph::new); + } + + /** + * A graph with a distinguished node, which we use as a name. + */ + private abstract static class PointedGraph extends GraphBase { + + private final Node distinguishedNode; + + public PointedGraph(Node graphName) { + this.distinguishedNode = graphName; + } + + public Node graphName() { + return distinguishedNode; + } + } + + /** + * A {@link PointedGraph} that features a write-lock and supports transactions. If a mutation is made outside a + * transaction, it is auto-wrapped in a transaction. + */ + private static class LockableGraph extends PointedGraph implements TransactionalComponent { + + public LockableGraph(Node graphName) { + super(graphName); + } + + /** + * We permit only one concurrent writer per named graph. + */ + private final Lock writeLock = new ReentrantLock(true); + + private final ThreadLocal<ReadWrite> currentTransactionType = new ThreadLocal<>(); + + private ReadWrite currentTransactionType() { + return currentTransactionType.get(); + } + + private final TriTable table = new TriTable(); + + @Override + protected ExtendedIterator<Triple> graphBaseFind(Triple t) { + if (currentTransactionType() == null) { + begin(READ); + try { + return _find(t); + } finally { + end(); + } + } + return _find(t); + } + + /** + * Must be called inside a transaction! + * + * @param t the triple-pattern to search + * @return matches from the in-transaction table + */ + private ExtendedIterator<Triple> _find(Triple t) { + return createNoRemove(table.find(t.getSubject(), t.getPredicate(), t.getObject()).iterator()); + } + + @Override + public void performAdd(Triple t) { + performMutation(t, table::add); + } + + @Override + public void performDelete(Triple t) { + performMutation(t, table::delete); + } + + private void performMutation(Triple t, Consumer<Triple> action) { + final ReadWrite readWrite = currentTransactionType(); + if (readWrite == null) { + begin(WRITE); + try { + action.accept(t); + commit(); + } finally { + end(); + } + } else switch (readWrite) { + case READ: + throw new JenaTransactionException("Cannot write during a READ transaction!"); + case WRITE: + action.accept(t); + } + } + + @Override + protected int graphBaseSize() { + // TODO make this efficient, somehow + return super.graphBaseSize(); + } + + @Override + public void begin(ReadWrite readWrite) { + if (currentTransactionType() != null) throw new JenaTransactionException("Cannot nest transactions!"); + switch (readWrite) { + case WRITE: + writeLock.lock(); + default: + currentTransactionType.set(readWrite); + table.begin(readWrite); + } + } + + public void beginWrite() { + begin(WRITE); + } + + @Override + public void commit() { + table.commit(); + finishTransaction(); + } + + @Override + public void abort() { + table.abort(); + finishTransaction(); + } + + @Override + public void end() { + table.end(); + finishTransaction(); + } + + private void finishTransaction() { + final ReadWrite readWrite = currentTransactionType(); + currentTransactionType.remove(); + if (WRITE.equals(readWrite)) writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java deleted file mode 100644 index 9c0b6cc..0000000 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.sparql.core; - -import static java.util.stream.Collectors.toList; -import static org.apache.jena.atlas.iterator.Iter.filter; -import static org.apache.jena.graph.GraphUtil.addInto; -import static org.apache.jena.query.ReadWrite.READ; -import static org.apache.jena.query.ReadWrite.WRITE; -import static org.apache.jena.sparql.core.Quad.defaultGraphIRI; -import static org.apache.jena.sparql.core.Quad.isDefaultGraph; -import static org.apache.jena.util.iterator.WrappedIterator.createNoRemove; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; - -import org.apache.jena.graph.Graph; -import org.apache.jena.graph.Node; -import org.apache.jena.graph.Triple; -import org.apache.jena.graph.impl.GraphBase; -import org.apache.jena.query.ReadWrite; -import org.apache.jena.sparql.JenaTransactionException; -import org.apache.jena.sparql.core.mem.TransactionalComponent; -import org.apache.jena.sparql.core.mem.TriTable; -import org.apache.jena.util.iterator.ExtendedIterator; - -public class DatasetGraphPerGraphLocking extends DatasetGraphCollection { - - private final Map<Node, LockableGraph> graphs = new ConcurrentHashMap<>(); - - private final ThreadLocal<LockableGraph> graphInTransaction = new ThreadLocal<>(); - - private LockableGraph graphInTransaction() { - return graphInTransaction.get(); - } - - private final ThreadLocal<ReadWrite> transactionType = new ThreadLocal<>(); - - private ReadWrite transactionType() { - return transactionType.get(); - } - - @Override - public Iterator<Node> listGraphNodes() { - return filter(graphs.keySet().iterator(), gn -> !isDefaultGraph(gn)); - } - - @Override - public boolean supportsTransactions() { - return true; - } - - @Override - public boolean isInTransaction() { - return transactionType() != null; - } - - @Override - public void begin(ReadWrite readWrite) { - if (isInTransaction()) throw new JenaTransactionException("Cannot nest transactions!"); - transactionType.set(readWrite); - } - - private void graphAction(Consumer<LockableGraph> action) { - if (graphInTransaction() != null) action.accept(graphInTransaction()); - } - - @Override - public void commit() { - if (!isInTransaction()) throw new JenaTransactionException("Cannot commit outside a transaction!"); - graphAction(LockableGraph::commit); - cleanAfterTransaction(); - } - - @Override - public void abort() { - if (!isInTransaction()) throw new JenaTransactionException("Cannot abort outside a transaction!"); - graphAction(LockableGraph::abort); - cleanAfterTransaction(); - } - - @Override - public void end() { - cleanAfterTransaction(); - } - - private void cleanAfterTransaction() { - graphInTransaction.remove(); - transactionType.remove(); - } - - @Override - public Graph getDefaultGraph() { - return getGraph(defaultGraphIRI); - } - - @Override - public void setDefaultGraph(final Graph g) { - addGraph(defaultGraphIRI, g); - } - - @Override - public Graph getGraph(final Node graphName) { - if (isInTransaction()) { - // are we using the current graph? - if (graphInTransaction() != null && graphName.equals(graphInTransaction().graphName())) - return graphInTransaction(); - // are we starting work with a graph? - if (graphInTransaction() == null) { - LockableGraph graph = graphs.computeIfAbsent(graphName, LockableGraph::new); - graph.begin(transactionType()); - graphInTransaction.set(graph); - return graph; - } - // we already have a graph in hand and cannot use another! - throw new JenaTransactionRegionException("Cannot work with more than one graph per transaction!"); - } - // we must work without a transaction - return graphs.computeIfAbsent(graphName, LockableGraph::new); - } - - @Override - public void clear() { - if (!isInTransaction()) _clear(graphs.values()); - else switch (transactionType()) { - case READ: - throw new JenaTransactionException("Cannot clear during a READ transaction!"); - case WRITE: - // avoid trying to nest a transaction on a graph we may have in hand - _clear(graphs.values().stream().filter(g -> !g.equals(graphInTransaction())).collect(toList())); - graphAction(LockableGraph::clear); - } - - } - - private static void _clear(Collection<LockableGraph> graphs) { - try { - graphs.forEach(LockableGraph::beginWrite); - graphs.forEach(LockableGraph::clear); - graphs.forEach(LockableGraph::commit); - } finally { - graphs.forEach(LockableGraph::end); - } - } - - @Override - public void addGraph(Node graphName, Graph triples) { - wholeGraphAction(graphName, g -> addInto(getGraph(g), triples)); - } - - @Override - public void removeGraph(Node graphName) { - wholeGraphAction(graphName, graphs::remove); - } - - private void wholeGraphAction(Node graphName, Consumer<Node> action) { - getGraph(graphName).clear(); - action.accept(graphName); - } - - /** - * A graph with a distinguished node, which we use as a name. - */ - private abstract static class PointedGraph extends GraphBase { - - private final Node distinguishedNode; - - public PointedGraph(Node graphName) { - this.distinguishedNode = graphName; - } - - public Node graphName() { - return distinguishedNode; - } - } - - /** - * A {@link PointedGraph} that features a write-lock and supports transactions. If a mutation is made outside a - * transaction, it is auto-wrapped in a transaction. - */ - private static class LockableGraph extends PointedGraph implements TransactionalComponent { - - public LockableGraph(Node graphName) { - super(graphName); - } - - /** - * We permit only one concurrent writer per named graph. - */ - private final Lock writeLock = new ReentrantLock(true); - - private final ThreadLocal<ReadWrite> currentTransactionType = new ThreadLocal<>(); - - private ReadWrite currentTransactionType() { - return currentTransactionType.get(); - } - - private final TriTable table = new TriTable(); - - @Override - protected ExtendedIterator<Triple> graphBaseFind(Triple t) { - if (currentTransactionType() == null) { - begin(READ); - try { - return _find(t); - } finally { - end(); - } - } - return _find(t); - } - - /** - * Must be called inside a transaction! - * - * @param t the triple-pattern to search - * @return matches from the in-transaction table - */ - private ExtendedIterator<Triple> _find(Triple t) { - return createNoRemove(table.find(t.getSubject(), t.getPredicate(), t.getObject()).iterator()); - } - - @Override - public void performAdd(Triple t) { - performMutation(t, table::add); - } - - @Override - public void performDelete(Triple t) { - performMutation(t, table::delete); - } - - private void performMutation(Triple t, Consumer<Triple> action) { - final ReadWrite readWrite = currentTransactionType(); - if (readWrite == null) { - begin(WRITE); - try { - action.accept(t); - commit(); - } finally { - end(); - } - } else switch (readWrite) { - case READ: - throw new JenaTransactionException("Cannot write during a READ transaction!"); - case WRITE: - action.accept(t); - } - } - - @Override - protected int graphBaseSize() { - // TODO make this efficient, somehow - return super.graphBaseSize(); - } - - @Override - public void begin(ReadWrite readWrite) { - if (currentTransactionType() != null) throw new JenaTransactionException("Cannot nest transactions!"); - switch (readWrite) { - case WRITE: - writeLock.lock(); - default: - currentTransactionType.set(readWrite); - table.begin(readWrite); - } - } - - public void beginWrite() { - begin(WRITE); - } - - @Override - public void commit() { - table.commit(); - finishTransaction(); - } - - @Override - public void abort() { - table.abort(); - finishTransaction(); - } - - @Override - public void end() { - table.end(); - finishTransaction(); - } - - private void finishTransaction() { - final ReadWrite readWrite = currentTransactionType(); - currentTransactionType.remove(); - if (WRITE.equals(readWrite)) writeLock.unlock(); - } - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/AssemblerUtils.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/AssemblerUtils.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/AssemblerUtils.java index d124393..e62f50e 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/AssemblerUtils.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/AssemblerUtils.java @@ -60,7 +60,7 @@ public class AssemblerUtils registerDataset(DatasetAssembler.getType(), new DatasetAssembler()) ; TxnInMemDatasetAssembler txnInMemDatasetAssembler = new TxnInMemDatasetAssembler(); registerDataset(txnInMemDatasetAssembler.getType(), txnInMemDatasetAssembler); - WriterPerGraphDatasetAssembler writerPerGraphDatasetAssembler = new WriterPerGraphDatasetAssembler(); + GraphPerTxnDatasetAssembler writerPerGraphDatasetAssembler = new GraphPerTxnDatasetAssembler(); registerDataset(writerPerGraphDatasetAssembler.getType(), writerPerGraphDatasetAssembler); } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/GraphPerTxnDatasetAssembler.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/GraphPerTxnDatasetAssembler.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/GraphPerTxnDatasetAssembler.java new file mode 100644 index 0000000..8218173 --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/GraphPerTxnDatasetAssembler.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.sparql.core.assembler; + +import static java.util.concurrent.Executors.newFixedThreadPool; +import static org.apache.jena.assembler.JA.data; +import static org.apache.jena.query.ReadWrite.WRITE; +import static org.apache.jena.riot.RDFDataMgr.read; +import static org.apache.jena.sparql.core.assembler.DatasetAssemblerVocab.pGraphName; +import static org.apache.jena.sparql.core.assembler.DatasetAssemblerVocab.pNamedGraph; +import static org.apache.jena.sparql.util.graph.GraphUtils.getAsStringValue; +import static org.apache.jena.sparql.util.graph.GraphUtils.getResourceValue; +import static org.apache.jena.sparql.util.graph.GraphUtils.multiValueResource; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.function.Consumer; + +import org.apache.jena.assembler.Assembler; +import org.apache.jena.query.Dataset; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.rdf.model.Resource; +import org.apache.jena.shared.JenaException; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; + +/** + * An {@link Assembler} that creates writer-per-graph in-memory {@link Dataset}s. + */ +public class GraphPerTxnDatasetAssembler extends TransactionalInMemDatasetAssembler { + + @Override + public Resource getType() { + return DatasetAssemblerVocab.tDatasetPGraphWriter; + } + + @Override + public Dataset createDataset() { + return DatasetFactory.wrap(new DatasetGraphGraphPerTxn()); + } + + @Override + protected void loadNamedGraphs(Resource root, Dataset dataset) { + List<Resource> namedGraphs = multiValueResource(root, pNamedGraph); + Resource parallelizeValue = getResourceValue(root, DatasetAssemblerVocab.pParallelize); + // defaults to false + boolean parallelize = parallelizeValue != null ? parallelizeValue.asLiteral().getBoolean() : false; + if (parallelize) { + // take advantage of writer-per-graph to load in parallel, one thread per named graph + ExecutorService loaderThreadPool = newFixedThreadPool(namedGraphs.size()); + try { + loaderThreadPool.submit(() -> namedGraphs.parallelStream().forEach(loadIntoDataset(dataset))).get(); + } catch (InterruptedException | ExecutionException e) { + loaderThreadPool.shutdownNow(); + throw new JenaException(e); + } + loaderThreadPool.shutdown(); + } else + // load using only this thread, default mode + namedGraphs.forEach(namedGraphResource -> loadNamedGraph(dataset, namedGraphResource)); + } + + private static Consumer<Resource> loadIntoDataset(Dataset ds) { + return namedGraphResource -> { + ds.begin(WRITE); + try { + loadNamedGraph(ds, namedGraphResource); + ds.commit(); + } finally { + ds.end(); + } + }; + } + + private static void loadNamedGraph(Dataset dataset, Resource namedGraphResource) { + final String graphName = getAsStringValue(namedGraphResource, pGraphName); + if (namedGraphResource.hasProperty(data)) multiValueResource(namedGraphResource, data) + .forEach(namedGraphData -> read(dataset.getNamedModel(graphName), namedGraphData.getURI())); + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java deleted file mode 100644 index 3310bff..0000000 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/assembler/WriterPerGraphDatasetAssembler.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.sparql.core.assembler; - -import static java.util.concurrent.Executors.newFixedThreadPool; -import static org.apache.jena.assembler.JA.data; -import static org.apache.jena.query.ReadWrite.WRITE; -import static org.apache.jena.riot.RDFDataMgr.read; -import static org.apache.jena.sparql.core.assembler.DatasetAssemblerVocab.pGraphName; -import static org.apache.jena.sparql.core.assembler.DatasetAssemblerVocab.pNamedGraph; -import static org.apache.jena.sparql.util.graph.GraphUtils.getAsStringValue; -import static org.apache.jena.sparql.util.graph.GraphUtils.getResourceValue; -import static org.apache.jena.sparql.util.graph.GraphUtils.multiValueResource; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.function.Consumer; - -import org.apache.jena.assembler.Assembler; -import org.apache.jena.query.Dataset; -import org.apache.jena.query.DatasetFactory; -import org.apache.jena.rdf.model.Resource; -import org.apache.jena.shared.JenaException; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; - -/** - * An {@link Assembler} that creates writer-per-graph in-memory {@link Dataset}s. - */ -public class WriterPerGraphDatasetAssembler extends TransactionalInMemDatasetAssembler { - - @Override - public Resource getType() { - return DatasetAssemblerVocab.tDatasetPGraphWriter; - } - - @Override - public Dataset createDataset() { - return DatasetFactory.wrap(new DatasetGraphPerGraphLocking()); - } - - @Override - protected void loadNamedGraphs(Resource root, Dataset dataset) { - List<Resource> namedGraphs = multiValueResource(root, pNamedGraph); - Resource parallelizeValue = getResourceValue(root, DatasetAssemblerVocab.pParallelize); - // defaults to false - boolean parallelize = parallelizeValue != null ? parallelizeValue.asLiteral().getBoolean() : false; - if (parallelize) { - // take advantage of writer-per-graph to load in parallel, one thread per named graph - ExecutorService loaderThreadPool = newFixedThreadPool(namedGraphs.size()); - try { - loaderThreadPool.submit(() -> namedGraphs.parallelStream().forEach(loadIntoDataset(dataset))).get(); - } catch (InterruptedException | ExecutionException e) { - loaderThreadPool.shutdownNow(); - throw new JenaException(e); - } - loaderThreadPool.shutdown(); - } else - // load using only this thread, default mode - namedGraphs.forEach(namedGraphResource -> loadNamedGraph(dataset, namedGraphResource)); - } - - private static Consumer<Resource> loadIntoDataset(Dataset ds) { - return namedGraphResource -> { - ds.begin(WRITE); - try { - loadNamedGraph(ds, namedGraphResource); - ds.commit(); - } finally { - ds.end(); - } - }; - } - - private static void loadNamedGraph(Dataset dataset, Resource namedGraphResource) { - final String graphName = getAsStringValue(namedGraphResource, pGraphName); - if (namedGraphResource.hasProperty(data)) multiValueResource(namedGraphResource, data) - .forEach(namedGraphData -> read(dataset.getNamedModel(graphName), namedGraphData.getURI())); - } -} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/TC_General.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/TC_General.java b/jena-arq/src/test/java/org/apache/jena/sparql/TC_General.java index c9ddab3..1d14f7a 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/TC_General.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/TC_General.java @@ -26,7 +26,7 @@ import org.apache.jena.sparql.api.TS_API ; import org.apache.jena.sparql.core.TS_Core ; import org.apache.jena.sparql.core.assembler.TS_Assembler ; import org.apache.jena.sparql.core.mem.TS_DatasetTxnMem ; -import org.apache.jena.sparql.core.pergraph.TS_DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.pergraph.TS_DatasetGraphGraphPerTxn; import org.apache.jena.sparql.engine.TS_Engine ; import org.apache.jena.sparql.engine.join.TS_Join ; import org.apache.jena.sparql.expr.E_Function ; @@ -72,7 +72,7 @@ import org.junit.runners.Suite ; , TS_Core.class , TS_Assembler.class , TS_DatasetTxnMem.class - , TS_DatasetGraphPerGraphLocking.class + , TS_DatasetGraphGraphPerTxn.class , TS_Path.class , TS_ParamString.class , TS_Update.class http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java index cda0471..a78d64d 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java @@ -20,13 +20,13 @@ package org.apache.jena.sparql.core.pergraph; import org.apache.jena.sparql.core.AbstractDatasetGraphTests; import org.apache.jena.sparql.core.DatasetGraph; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; public class BasicTest extends AbstractDatasetGraphTests { @Override protected DatasetGraph emptyDataset() { - return new DatasetGraphPerGraphLocking(); + return new DatasetGraphGraphPerTxn(); } } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java index 9012e16..815bc8e 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java @@ -20,13 +20,13 @@ package org.apache.jena.sparql.core.pergraph; import org.apache.jena.sparql.core.AbstractDatasetGraphFindPatterns; import org.apache.jena.sparql.core.DatasetGraph; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; public class FindPatternsTest extends AbstractDatasetGraphFindPatterns { @Override protected DatasetGraph create() { - return new DatasetGraphPerGraphLocking(); + return new DatasetGraphGraphPerTxn(); } } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java index dbe4243..d292a9f 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java @@ -20,13 +20,13 @@ package org.apache.jena.sparql.core.pergraph; import org.apache.jena.sparql.core.AbstractDatasetGraphFind; import org.apache.jena.sparql.core.DatasetGraph; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; public class FindTest extends AbstractDatasetGraphFind { @Override protected DatasetGraph create() { - return new DatasetGraphPerGraphLocking(); + return new DatasetGraphGraphPerTxn(); } } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java index 8d8ab10..d30b9ab 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java @@ -21,14 +21,14 @@ package org.apache.jena.sparql.core.pergraph; import static org.apache.jena.query.DatasetFactory.wrap; import org.apache.jena.query.Dataset; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; import org.apache.jena.sparql.core.mem.TestDatasetGraphInMemoryLock; public class LockTest extends TestDatasetGraphInMemoryLock { @Override protected Dataset createDataset() { - return wrap(new DatasetGraphPerGraphLocking()); + return wrap(new DatasetGraphGraphPerTxn()); } } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java index 3df0582..a5b7679 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java @@ -1,19 +1,14 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.jena.sparql.core.pergraph; @@ -23,12 +18,15 @@ import static org.apache.jena.graph.NodeFactory.createLiteral; import static org.apache.jena.graph.NodeFactory.createURI; import static org.apache.jena.query.ReadWrite.READ; import static org.apache.jena.query.ReadWrite.WRITE; +import static org.apache.jena.sparql.core.Quad.defaultGraphIRI; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.jena.atlas.junit.BaseTest; import org.apache.jena.graph.Node; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; +import org.apache.jena.sparql.core.Quad; import org.junit.Test; public class MultithreadingTest extends BaseTest { @@ -43,9 +41,15 @@ public class MultithreadingTest extends BaseTest { private static final Node graph2 = createURI("info:graph2"); + private static final Node graph3 = createURI("info:graph3"); + + private static final Quad[] QUADS = new Quad[] { Quad.create(defaultGraphIRI, dummy, dummy, dummy), + Quad.create(graph1, dummy, dummy, dummy), Quad.create(graph2, dummy, dummy, dummy), + Quad.create(graph3, dummy, dummy, dummy) }; + @Test public void loadTwoGraphsAtOnce() { - DatasetGraphPerGraphLocking dataset = new DatasetGraphPerGraphLocking(); + DatasetGraphGraphPerTxn dataset = new DatasetGraphGraphPerTxn(); // We start a thread loading a graph, then wait for the main thread to start loading a different graph. The // first thread must wait to see that the main thread has successfully started loading its graph to finish its @@ -88,4 +92,29 @@ public class MultithreadingTest extends BaseTest { dataset.end(); } } + + @Test + public void readSeveralGraphsInOneTxn() { + DatasetGraphGraphPerTxn dataset = new DatasetGraphGraphPerTxn(); + // write some quads into different graphs, using a txn-per-graph + for (Quad q : QUADS) { + dataset.begin(WRITE); + try { + dataset.add(q); + dataset.commit(); + } finally { + dataset.end(); + } + } + // check that they are all there in one big txn + dataset.begin(READ); + try { + for (Quad q : QUADS) { + assertTrue("Couldn't find quad in its graph!", dataset.contains(q)); + assertTrue("Couldn't find quad in its graph!", dataset.getGraph(q.getGraph()).contains(q.asTriple())); + } + } finally { + dataset.end(); + } + } } http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphGraphPerTxn.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphGraphPerTxn.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphGraphPerTxn.java new file mode 100644 index 0000000..b7f1b99 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphGraphPerTxn.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.sparql.core.pergraph; + +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + + +@RunWith(Suite.class) [email protected]({ BasicTest.class, FindPatternsTest.class, FindTest.class, LockTest.class, TransactionLifecycleTest.class, + ViewTest.class, MultithreadingTest.class }) +public class TS_DatasetGraphGraphPerTxn {} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphPerGraphLocking.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphPerGraphLocking.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphPerGraphLocking.java deleted file mode 100644 index 4f31459..0000000 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TS_DatasetGraphPerGraphLocking.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.jena.sparql.core.pergraph; - -import org.junit.runner.RunWith; -import org.junit.runners.Suite; - - -@RunWith(Suite.class) [email protected]({ BasicTest.class, FindPatternsTest.class, FindTest.class, LockTest.class, TransactionLifecycleTest.class, - ViewTest.class, MultithreadingTest.class }) -public class TS_DatasetGraphPerGraphLocking {} http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java index b3ec3d7..883412d 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java @@ -21,7 +21,7 @@ package org.apache.jena.sparql.core.pergraph; import static org.apache.jena.query.DatasetFactory.wrap; import org.apache.jena.query.Dataset; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; import org.apache.jena.sparql.transaction.AbstractTestTransactionLifecycle; import org.junit.Ignore; import org.junit.Test; @@ -30,7 +30,7 @@ public class TransactionLifecycleTest extends AbstractTestTransactionLifecycle { @Override protected Dataset create() { - return wrap(new DatasetGraphPerGraphLocking()); + return wrap(new DatasetGraphGraphPerTxn()); } @Test http://git-wip-us.apache.org/repos/asf/jena/blob/ade1e941/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java index cc1c029..1a8a272 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java @@ -19,13 +19,13 @@ package org.apache.jena.sparql.core.pergraph; import org.apache.jena.sparql.core.DatasetGraph; -import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.DatasetGraphGraphPerTxn; import org.apache.jena.sparql.core.TestDatasetGraphViewGraphs; public class ViewTest extends TestDatasetGraphViewGraphs { @Override protected DatasetGraph createBaseDSG() { - return new DatasetGraphPerGraphLocking(); + return new DatasetGraphGraphPerTxn(); } }
