This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git


The following commit(s) were added to refs/heads/main by this push:
     new 807df6a9cf GH-1961: Thread-safe and consistent GraphTxn.find
     new 8e6c4bd734 Merge pull request #1964 from afs/graphtxn
807df6a9cf is described below

commit 807df6a9cf75d63318ab4269876f5b58315a5fee
Author: Andy Seaborne <a...@apache.org>
AuthorDate: Thu Jul 20 13:14:26 2023 +0100

    GH-1961: Thread-safe and consistent GraphTxn.find
---
 .../org/apache/jena/sparql/graph/GraphTxn.java     | 52 +++++++++++++++++++++-
 .../src/main/java/org/apache/jena/system/Txn.java  | 32 ++++++-------
 2 files changed, 67 insertions(+), 17 deletions(-)

diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
index 691bae5141..813d0c06b8 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
@@ -18,13 +18,19 @@
 
 package org.apache.jena.sparql.graph;
 
+import java.util.List;
+
 import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
 import org.apache.jena.query.ReadWrite;
 import org.apache.jena.query.TxnType;
 import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.sparql.core.DatasetGraphFactory;
 import org.apache.jena.sparql.core.Transactional;
 import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
+import org.apache.jena.util.iterator.ExtendedIterator;
+import org.apache.jena.util.iterator.WrappedIterator;
 
 /**
  * In-memory, transactional graph.
@@ -32,7 +38,9 @@ import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
  * @implNote
  * The implementation uses the default graph of {@link DatasetGraphInMemory}.
  * The graph transaction handler continues to work.
- * This class adds the {@link Transactional} to the graph itself.
+ * This class adds the {@link Transactional} to the graph itself
+ * and provides {@link ExtendedIterator ExtendedIterators} that provide
+ * read access to the data if used outside a transaction.
  */
 public class GraphTxn extends GraphWrapper implements Transactional {
 
@@ -98,4 +106,46 @@ public class GraphTxn extends GraphWrapper implements 
Transactional {
     public boolean isInTransaction() {
         return getT().isInTransaction();
     }
+
+    private static class IteratorTxn<T> extends WrappedIterator<T> {
+
+        private final GraphTxn graph;
+        private final boolean needIterTxn;
+
+        IteratorTxn(GraphTxn graph, ExtendedIterator<T> base) {
+            super(base, true);  // removeDenied.
+            this.graph = graph;
+            needIterTxn = graph.getT().isInTransaction();
+            if ( needIterTxn )
+                graph.begin(TxnType.READ);
+        }
+
+        @Override
+        public void close() {
+            if ( needIterTxn ) {
+                graph.commit();
+                graph.end();
+            }
+        }
+    }
+
+    @Override
+    public ExtendedIterator<Triple> find(Triple triple) {
+        if ( false )
+            return isolate(get().find(triple));
+        return new IteratorTxn<Triple>(this, get().find(triple));
+    }
+
+    @Override
+    public ExtendedIterator<Triple> find(Node s, Node p, Node o) {
+        if ( false )
+            return isolate(get().find(s, p, o));
+        return new IteratorTxn<Triple>(this, get().find(s, p, o));
+    }
+
+    /** Isolate by materializing the iterator. */
+    private ExtendedIterator<Triple> isolate(ExtendedIterator<Triple> iter) {
+        List<Triple> list = iter.toList();
+        return WrappedIterator.create(list.iterator());
+    }
 }
diff --git a/jena-arq/src/main/java/org/apache/jena/system/Txn.java 
b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
index 61d66c8e2a..603f0fe145 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/Txn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
@@ -18,10 +18,10 @@
 
 package org.apache.jena.system;
 
-import java.util.function.Supplier ;
+import java.util.function.Supplier;
 
 import org.apache.jena.query.TxnType;
-import org.apache.jena.sparql.core.Transactional ;
+import org.apache.jena.sparql.core.Transactional;
 
 /** Application utilities for executing code in transactions.
  * <p>
@@ -69,43 +69,43 @@ public class Txn {
 
     /** Execute application code in a transaction with the given {@link 
TxnType transaction type}. */
     public static <T extends Transactional> void exec(T txn, TxnType txnType, 
Runnable r) {
-        boolean b = txn.isInTransaction() ;
+        boolean b = txn.isInTransaction();
         if ( b )
             TxnOp.compatibleWithPromote(txnType, txn);
         else
-            txn.begin(txnType) ;
-        try { r.run() ; }
+            txn.begin(txnType);
+        try { r.run(); }
         catch (Throwable th) {
             onThrowable(th, txn);
-            throw th ;
+            throw th;
         }
         if ( !b ) {
             if ( txn.isInTransaction() )
                 // May have been explicit commit or abort.
-                txn.commit() ;
-            txn.end() ;
+                txn.commit();
+            txn.end();
         }
     }
 
     /** Execute and return a value in a transaction with the given {@link 
TxnType transaction type}. */
     public static <T extends Transactional, X> X calc(T txn, TxnType txnType, 
Supplier<X> r) {
-        boolean b = txn.isInTransaction() ;
+        boolean b = txn.isInTransaction();
         if ( b )
             TxnOp.compatibleWithPromote(txnType, txn);
         else
-            txn.begin(txnType) ;
+            txn.begin(txnType);
         X x;
-        try { x = r.get() ; }
+        try { x = r.get(); }
         catch (Throwable th) {
             onThrowable(th, txn);
-            throw th ;
+            throw th;
         }
 
         if ( !b ) {
             if ( txn.isInTransaction() )
                 // May have been explicit commit or abort.
-                txn.commit() ;
-            txn.end() ;
+                txn.commit();
+            txn.end();
         }
         return x;
     }
@@ -133,8 +133,8 @@ public class Txn {
     // Attempt some kind of cleanup.
     private static <T extends Transactional> void onThrowable(Throwable th, T 
txn) {
         try {
-            txn.abort() ;
-            txn.end() ;
+            txn.abort();
+            txn.end();
         } catch (Throwable th2) { th.addSuppressed(th2); }
     }
 }

Reply via email to