Repository: jena Updated Branches: refs/heads/ThreadPerGraphDataset [created] 8601cd302
Trialing a per-graph writer dataset Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/8601cd30 Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/8601cd30 Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/8601cd30 Branch: refs/heads/ThreadPerGraphDataset Commit: 8601cd3028c0e57dbc805efc5d54d7b9ee272c3f Parents: c987c1b Author: ajs6f <[email protected]> Authored: Sat Jan 7 15:33:01 2017 -0500 Committer: ajs6f <[email protected]> Committed: Sat Jan 7 15:33:01 2017 -0500 ---------------------------------------------------------------------- .../core/DatasetGraphPerGraphLocking.java | 295 +++++++++++++++++++ .../core/JenaTransactionRegionException.java | 14 + .../core/TS_DatasetGraphPerGraphLocking.java | 11 + .../jena/sparql/core/pergraph/BasicTest.java | 14 + .../sparql/core/pergraph/FindPatternsTest.java | 14 + .../jena/sparql/core/pergraph/FindTest.java | 14 + .../jena/sparql/core/pergraph/LockTest.java | 16 + .../core/pergraph/MultithreadingTest.java | 75 +++++ .../core/pergraph/TransactionLifecycleTest.java | 22 ++ .../jena/sparql/core/pergraph/ViewTest.java | 13 + 10 files changed, 488 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java new file mode 100644 index 0000000..30da6ed --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphPerGraphLocking.java @@ -0,0 +1,295 @@ +package org.apache.jena.sparql.core; + +import static java.util.stream.Collectors.toList; +import static org.apache.jena.atlas.iterator.Iter.filter; +import static org.apache.jena.graph.GraphUtil.addInto; +import static org.apache.jena.query.ReadWrite.READ; +import static org.apache.jena.query.ReadWrite.WRITE; +import static org.apache.jena.sparql.core.Quad.defaultGraphIRI; +import static org.apache.jena.sparql.core.Quad.isDefaultGraph; +import static org.apache.jena.util.iterator.WrappedIterator.createNoRemove; + +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Node; +import org.apache.jena.graph.Triple; +import org.apache.jena.graph.impl.GraphBase; +import org.apache.jena.query.ReadWrite; +import org.apache.jena.sparql.JenaTransactionException; +import org.apache.jena.sparql.core.mem.TransactionalComponent; +import org.apache.jena.sparql.core.mem.TriTable; +import org.apache.jena.util.iterator.ExtendedIterator; + +public class DatasetGraphPerGraphLocking extends DatasetGraphCollection { + + private final Map<Node, LockableGraph> graphs = new ConcurrentHashMap<>(); + + private final ThreadLocal<LockableGraph> graphInTransaction = new ThreadLocal<>(); + + private LockableGraph graphInTransaction() { + return graphInTransaction.get(); + } + + private final ThreadLocal<ReadWrite> transactionType = new ThreadLocal<>(); + + private ReadWrite transactionType() { + return transactionType.get(); + } + + @Override + public Iterator<Node> listGraphNodes() { + return filter(graphs.keySet().iterator(), gn -> !isDefaultGraph(gn)); + } + + @Override + public boolean supportsTransactions() { + return true; + } + + @Override + public boolean isInTransaction() { + return transactionType() != null; + } + + @Override + public void begin(ReadWrite readWrite) { + if (isInTransaction()) throw new JenaTransactionException("Cannot nest transactions!"); + transactionType.set(readWrite); + } + + private void graphAction(Consumer<LockableGraph> action) { + if (graphInTransaction() != null) action.accept(graphInTransaction()); + } + + @Override + public void commit() { + if (!isInTransaction()) throw new JenaTransactionException("Cannot commit outside a transaction!"); + graphAction(LockableGraph::commit); + cleanAfterTransaction(); + } + + @Override + public void abort() { + if (!isInTransaction()) throw new JenaTransactionException("Cannot abort outside a transaction!"); + graphAction(LockableGraph::abort); + cleanAfterTransaction(); + } + + @Override + public void end() { + cleanAfterTransaction(); + } + + private void cleanAfterTransaction() { + graphInTransaction.remove(); + transactionType.remove(); + } + + @Override + public Graph getDefaultGraph() { + return getGraph(defaultGraphIRI); + } + + @Override + public void setDefaultGraph(final Graph g) { + addGraph(defaultGraphIRI, g); + } + + @Override + public Graph getGraph(final Node graphName) { + if (isInTransaction()) { + // are we using the current graph? + if (graphInTransaction()!= null && graphName.equals(graphInTransaction().graphName())) return graphInTransaction(); + // are we starting work with a graph? + if (graphInTransaction() == null) { + LockableGraph graph = graphs.computeIfAbsent(graphName, LockableGraph::new); + graph.begin(transactionType()); + graphInTransaction.set(graph); + return graph; + } + // we already have a graph in hand and cannot use another! + throw new JenaTransactionRegionException("Cannot work with more than one graph per transaction!"); + } + // we must work without a transaction + return graphs.computeIfAbsent(graphName, LockableGraph::new); + } + + @Override + public void clear() { + if (!isInTransaction()) _clear(graphs.values()); + else switch (transactionType()) { + case READ: + throw new JenaTransactionException("Cannot clear during a READ transaction!"); + case WRITE: + // avoid trying to nest a transaction on a graph we may have in hand + _clear(graphs.values().stream().filter(g -> !g.equals(graphInTransaction())).collect(toList())); + graphAction(LockableGraph::clear); + } + + } + + private static void _clear(Collection<LockableGraph> graphs) { + try { + graphs.forEach(LockableGraph::beginWrite); + graphs.forEach(LockableGraph::clear); + graphs.forEach(LockableGraph::commit); + } finally { + graphs.forEach(LockableGraph::end); + } + } + + @Override + public void addGraph(Node graphName, Graph triples) { + wholeGraphAction(graphName, g -> addInto(getGraph(g), triples)); + } + + @Override + public void removeGraph(Node graphName) { + wholeGraphAction(graphName, graphs::remove); + } + + private void wholeGraphAction(Node graphName, Consumer<Node> action) { + getGraph(graphName).clear(); + action.accept(graphName); + } + + /** + * A graph with a distinguished node, which we use as a name. + */ + private abstract static class PointedGraph extends GraphBase { + + private final Node distinguishedNode; + + public PointedGraph(Node graphName) { + this.distinguishedNode = graphName; + } + + public Node graphName() { + return distinguishedNode; + } + } + + /** + * A {@link PointedGraph} that features a write-lock and supports transactions. If a mutation is made outside a + * transaction, it is auto-wrapped in a transaction. + */ + private static class LockableGraph extends PointedGraph implements TransactionalComponent { + + public LockableGraph(Node graphName) { + super(graphName); + } + + /** + * We permit only one concurrent writer per named graph. + */ + private final Lock writeLock = new ReentrantLock(true); + + private final ThreadLocal<ReadWrite> currentTransactionType = new ThreadLocal<>(); + + private ReadWrite currentTransactionType() { + return currentTransactionType.get(); + } + + private final TriTable table = new TriTable(); + + @Override + protected ExtendedIterator<Triple> graphBaseFind(Triple t) { + if (currentTransactionType() == null) { + begin(READ); + try { + return _find(t); + } finally { + end(); + } + } + return _find(t); + } + + /** + * Must be called inside a transaction! + * + * @param t the triple-pattern to search + * @return matches from the in-transaction table + */ + private ExtendedIterator<Triple> _find(Triple t) { + return createNoRemove(table.find(t.getSubject(), t.getPredicate(), t.getObject()).iterator()); + } + + @Override + public void performAdd(Triple t) { + performMutation(t, table::add); + } + + @Override + public void performDelete(Triple t) { + performMutation(t, table::delete); + } + + private void performMutation(Triple t, Consumer<Triple> action) { + final ReadWrite readWrite = currentTransactionType(); + if (readWrite == null) { + begin(WRITE); + try { + action.accept(t); + commit(); + } finally { + end(); + } + } else switch (readWrite) { + case READ: + throw new JenaTransactionException("Cannot write during a READ transaction!"); + case WRITE: + action.accept(t); + } + } + + @Override + protected int graphBaseSize() { + // TODO make this efficient, somehow + return super.graphBaseSize(); + } + + @Override + public void begin(ReadWrite readWrite) { + if (currentTransactionType() != null) throw new JenaTransactionException("Cannot nest transactions!"); + if (WRITE.equals(readWrite)) writeLock.lock(); + currentTransactionType.set(readWrite); + table.begin(readWrite); + } + + public void beginWrite() { + begin(WRITE); + } + + @Override + public void commit() { + table.commit(); + finishTransaction(); + } + + @Override + public void abort() { + table.abort(); + finishTransaction(); + } + + @Override + public void end() { + table.end(); + finishTransaction(); + } + + private void finishTransaction() { + final ReadWrite readWrite = currentTransactionType(); + currentTransactionType.remove(); + if (WRITE.equals(readWrite)) writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java new file mode 100644 index 0000000..aaaef6c --- /dev/null +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/JenaTransactionRegionException.java @@ -0,0 +1,14 @@ +package org.apache.jena.sparql.core; + +import org.apache.jena.sparql.JenaTransactionException; + +/** + * Thrown when a transaction attempts to work outside its controlled region of data. + */ +public class JenaTransactionRegionException extends JenaTransactionException { + + public JenaTransactionRegionException(String message) { + super(message); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java new file mode 100644 index 0000000..60e363e --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/TS_DatasetGraphPerGraphLocking.java @@ -0,0 +1,11 @@ +package org.apache.jena.sparql.core; + +import org.apache.jena.sparql.core.pergraph.*; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + + +@RunWith(Suite.class) [email protected]({ BasicTest.class, FindPatternsTest.class, FindTest.class, LockTest.class, TransactionLifecycleTest.class, + ViewTest.class, MultithreadingTest.class }) +public class TS_DatasetGraphPerGraphLocking {} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java new file mode 100644 index 0000000..770973b --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/BasicTest.java @@ -0,0 +1,14 @@ +package org.apache.jena.sparql.core.pergraph; + +import org.apache.jena.sparql.core.AbstractDatasetGraphTests; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; + +public class BasicTest extends AbstractDatasetGraphTests { + + @Override + protected DatasetGraph emptyDataset() { + return new DatasetGraphPerGraphLocking(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java new file mode 100644 index 0000000..5934a9f --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindPatternsTest.java @@ -0,0 +1,14 @@ +package org.apache.jena.sparql.core.pergraph; + +import org.apache.jena.sparql.core.AbstractDatasetGraphFindPatterns; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; + +public class FindPatternsTest extends AbstractDatasetGraphFindPatterns { + + @Override + protected DatasetGraph create() { + return new DatasetGraphPerGraphLocking(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java new file mode 100644 index 0000000..447dd98 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/FindTest.java @@ -0,0 +1,14 @@ +package org.apache.jena.sparql.core.pergraph; + +import org.apache.jena.sparql.core.AbstractDatasetGraphFind; +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; + +public class FindTest extends AbstractDatasetGraphFind { + + @Override + protected DatasetGraph create() { + return new DatasetGraphPerGraphLocking(); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java new file mode 100644 index 0000000..782fde5 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/LockTest.java @@ -0,0 +1,16 @@ +package org.apache.jena.sparql.core.pergraph; + +import static org.apache.jena.query.DatasetFactory.wrap; + +import org.apache.jena.query.Dataset; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.mem.TestDatasetGraphInMemoryLock; + +public class LockTest extends TestDatasetGraphInMemoryLock { + + @Override + protected Dataset createDataset() { + return wrap(new DatasetGraphPerGraphLocking()); + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java new file mode 100644 index 0000000..e99c372 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/MultithreadingTest.java @@ -0,0 +1,75 @@ +package org.apache.jena.sparql.core.pergraph; + +import static com.jayway.awaitility.Awaitility.waitAtMost; +import static com.jayway.awaitility.Duration.TEN_SECONDS; +import static org.apache.jena.graph.NodeFactory.createLiteral; +import static org.apache.jena.graph.NodeFactory.createURI; +import static org.apache.jena.query.ReadWrite.READ; +import static org.apache.jena.query.ReadWrite.WRITE; +import static org.slf4j.LoggerFactory.getLogger; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.jena.atlas.junit.BaseTest; +import org.apache.jena.graph.Node; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.junit.Test; +import org.slf4j.Logger; + +public class MultithreadingTest extends BaseTest { + + private static final Logger log = getLogger(MultithreadingTest.class); + + private static final Node dummy = createURI("info:test"); + + private static final Node graph1 = createURI("info:graph1"); + + private static final Node graph2 = createURI("info:graph2"); + + @Test + public void loadTwoGraphsAtOnce() { + DatasetGraphPerGraphLocking dataset = new DatasetGraphPerGraphLocking(); + + // We start a thread loading a graph, then wait for the main thread to start loading a different graph. The + // first thread must wait to see that the main thread has successfully started loading its graph to finish its + // load. So when the first thread does finish, this proves that two graphs were being loaded simultaneously. + + AtomicBoolean startMain = new AtomicBoolean(), baton = new AtomicBoolean(), finishLine = new AtomicBoolean(); + + new Thread(() -> { + dataset.begin(WRITE); + try { + dataset.add(graph1, dummy, dummy, createLiteral("before")); + // wait for the baton + startMain.set(true); + waitAtMost(TEN_SECONDS).untilTrue(baton); + dataset.add(graph1, dummy, dummy, createLiteral("after")); + dataset.commit(); + finishLine.set(true); + } finally { + dataset.end(); + } + }).start(); + + waitAtMost(TEN_SECONDS).untilTrue(startMain); + dataset.begin(WRITE); + try { + dataset.add(graph2, dummy, dummy, createLiteral("before")); + // pass the baton + baton.set(true); + dataset.add(graph2, dummy, dummy, createLiteral("after")); + dataset.commit(); + } finally { + dataset.end(); + } + waitAtMost(TEN_SECONDS).untilTrue(finishLine); + dataset.begin(READ); + try { + assertTrue("Failed to find the triple that proves that the first thread finished!", + dataset.contains(graph1, dummy, dummy, createLiteral("after"))); + } finally { + dataset.end(); + } + } + +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java new file mode 100644 index 0000000..65d91ff --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/TransactionLifecycleTest.java @@ -0,0 +1,22 @@ +package org.apache.jena.sparql.core.pergraph; + +import static org.apache.jena.query.DatasetFactory.wrap; + +import org.apache.jena.query.Dataset; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.transaction.AbstractTestTransactionLifecycle; +import org.junit.Ignore; +import org.junit.Test; + +public class TransactionLifecycleTest extends AbstractTestTransactionLifecycle { + + @Override + protected Dataset create() { + return wrap(new DatasetGraphPerGraphLocking()); + } + + @Test + @Override + @Ignore("Block this test in the superclass because we can have multiple writers.") + public synchronized void transaction_concurrency_writer() {} +} http://git-wip-us.apache.org/repos/asf/jena/blob/8601cd30/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java ---------------------------------------------------------------------- diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java new file mode 100644 index 0000000..5bd5b05 --- /dev/null +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/pergraph/ViewTest.java @@ -0,0 +1,13 @@ +package org.apache.jena.sparql.core.pergraph; + +import org.apache.jena.sparql.core.DatasetGraph; +import org.apache.jena.sparql.core.DatasetGraphPerGraphLocking; +import org.apache.jena.sparql.core.TestDatasetGraphViewGraphs; + +public class ViewTest extends TestDatasetGraphViewGraphs { + + @Override + protected DatasetGraph createBaseDSG() { + return new DatasetGraphPerGraphLocking(); + } +}
