This is an automated email from the ASF dual-hosted git repository. andy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push: new 807df6a9cf GH-1961: Thread-safe and consistent GraphTxn.find new 8e6c4bd734 Merge pull request #1964 from afs/graphtxn 807df6a9cf is described below commit 807df6a9cf75d63318ab4269876f5b58315a5fee Author: Andy Seaborne <a...@apache.org> AuthorDate: Thu Jul 20 13:14:26 2023 +0100 GH-1961: Thread-safe and consistent GraphTxn.find --- .../org/apache/jena/sparql/graph/GraphTxn.java | 52 +++++++++++++++++++++- .../src/main/java/org/apache/jena/system/Txn.java | 32 ++++++------- 2 files changed, 67 insertions(+), 17 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java index 691bae5141..813d0c06b8 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java @@ -18,13 +18,19 @@ package org.apache.jena.sparql.graph; +import java.util.List; + import org.apache.jena.graph.Graph; +import org.apache.jena.graph.Node; +import org.apache.jena.graph.Triple; import org.apache.jena.query.ReadWrite; import org.apache.jena.query.TxnType; import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.sparql.core.DatasetGraphFactory; import org.apache.jena.sparql.core.Transactional; import org.apache.jena.sparql.core.mem.DatasetGraphInMemory; +import org.apache.jena.util.iterator.ExtendedIterator; +import org.apache.jena.util.iterator.WrappedIterator; /** * In-memory, transactional graph. @@ -32,7 +38,9 @@ import org.apache.jena.sparql.core.mem.DatasetGraphInMemory; * @implNote * The implementation uses the default graph of {@link DatasetGraphInMemory}. * The graph transaction handler continues to work. - * This class adds the {@link Transactional} to the graph itself. + * This class adds the {@link Transactional} to the graph itself + * and provides {@link ExtendedIterator ExtendedIterators} that provide + * read access to the data if used outside a transaction. */ public class GraphTxn extends GraphWrapper implements Transactional { @@ -98,4 +106,46 @@ public class GraphTxn extends GraphWrapper implements Transactional { public boolean isInTransaction() { return getT().isInTransaction(); } + + private static class IteratorTxn<T> extends WrappedIterator<T> { + + private final GraphTxn graph; + private final boolean needIterTxn; + + IteratorTxn(GraphTxn graph, ExtendedIterator<T> base) { + super(base, true); // removeDenied. + this.graph = graph; + needIterTxn = graph.getT().isInTransaction(); + if ( needIterTxn ) + graph.begin(TxnType.READ); + } + + @Override + public void close() { + if ( needIterTxn ) { + graph.commit(); + graph.end(); + } + } + } + + @Override + public ExtendedIterator<Triple> find(Triple triple) { + if ( false ) + return isolate(get().find(triple)); + return new IteratorTxn<Triple>(this, get().find(triple)); + } + + @Override + public ExtendedIterator<Triple> find(Node s, Node p, Node o) { + if ( false ) + return isolate(get().find(s, p, o)); + return new IteratorTxn<Triple>(this, get().find(s, p, o)); + } + + /** Isolate by materializing the iterator. */ + private ExtendedIterator<Triple> isolate(ExtendedIterator<Triple> iter) { + List<Triple> list = iter.toList(); + return WrappedIterator.create(list.iterator()); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/system/Txn.java b/jena-arq/src/main/java/org/apache/jena/system/Txn.java index 61d66c8e2a..603f0fe145 100644 --- a/jena-arq/src/main/java/org/apache/jena/system/Txn.java +++ b/jena-arq/src/main/java/org/apache/jena/system/Txn.java @@ -18,10 +18,10 @@ package org.apache.jena.system; -import java.util.function.Supplier ; +import java.util.function.Supplier; import org.apache.jena.query.TxnType; -import org.apache.jena.sparql.core.Transactional ; +import org.apache.jena.sparql.core.Transactional; /** Application utilities for executing code in transactions. * <p> @@ -69,43 +69,43 @@ public class Txn { /** Execute application code in a transaction with the given {@link TxnType transaction type}. */ public static <T extends Transactional> void exec(T txn, TxnType txnType, Runnable r) { - boolean b = txn.isInTransaction() ; + boolean b = txn.isInTransaction(); if ( b ) TxnOp.compatibleWithPromote(txnType, txn); else - txn.begin(txnType) ; - try { r.run() ; } + txn.begin(txnType); + try { r.run(); } catch (Throwable th) { onThrowable(th, txn); - throw th ; + throw th; } if ( !b ) { if ( txn.isInTransaction() ) // May have been explicit commit or abort. - txn.commit() ; - txn.end() ; + txn.commit(); + txn.end(); } } /** Execute and return a value in a transaction with the given {@link TxnType transaction type}. */ public static <T extends Transactional, X> X calc(T txn, TxnType txnType, Supplier<X> r) { - boolean b = txn.isInTransaction() ; + boolean b = txn.isInTransaction(); if ( b ) TxnOp.compatibleWithPromote(txnType, txn); else - txn.begin(txnType) ; + txn.begin(txnType); X x; - try { x = r.get() ; } + try { x = r.get(); } catch (Throwable th) { onThrowable(th, txn); - throw th ; + throw th; } if ( !b ) { if ( txn.isInTransaction() ) // May have been explicit commit or abort. - txn.commit() ; - txn.end() ; + txn.commit(); + txn.end(); } return x; } @@ -133,8 +133,8 @@ public class Txn { // Attempt some kind of cleanup. private static <T extends Transactional> void onThrowable(Throwable th, T txn) { try { - txn.abort() ; - txn.end() ; + txn.abort(); + txn.end(); } catch (Throwable th2) { th.addSuppressed(th2); } } }