afs commented on code in PR #3184:
URL: https://github.com/apache/jena/pull/3184#discussion_r2303435989


##########
jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/DatasetGraphOverSparql.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.dispatch;
+
+import static org.apache.jena.query.ReadWrite.WRITE;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.Query;
+import org.apache.jena.query.QueryFactory;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.system.PrefixMap;
+import org.apache.jena.riot.system.PrefixMapFactory;
+import org.apache.jena.riot.system.Prefixes;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.BasicPattern;
+import org.apache.jena.sparql.core.DatasetGraphBase;
+import org.apache.jena.sparql.core.GraphView;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.sparql.core.Substitute;
+import org.apache.jena.sparql.core.Transactional;
+import org.apache.jena.sparql.core.TransactionalNull;
+import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.UpdateExec;
+import org.apache.jena.sparql.expr.Expr;
+import org.apache.jena.sparql.expr.aggregate.AggCount;
+import org.apache.jena.sparql.modify.request.QuadAcc;
+import org.apache.jena.sparql.modify.request.QuadDataAcc;
+import org.apache.jena.sparql.modify.request.UpdateDataDelete;
+import org.apache.jena.sparql.modify.request.UpdateDataInsert;
+import org.apache.jena.sparql.modify.request.UpdateDeleteWhere;
+import org.apache.jena.sparql.modify.request.UpdateDrop;
+import org.apache.jena.sparql.syntax.Element;
+import org.apache.jena.sparql.syntax.ElementNamedGraph;
+import org.apache.jena.sparql.syntax.ElementTriplesBlock;
+import org.apache.jena.sparql.syntax.ElementUnion;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.update.Update;
+import org.apache.jena.update.UpdateRequest;
+
+/**
+ * This class provides a base implementation of the Jena DatasetGraph interface
+ * to a remote SPARQL endpoint. Efficiency not guaranteed.
+ *
+ * Any returned iterators must be closed to free the resources.
+ * This base class does not support transactions.
+ *
+ * All inserts are passed on as SPARQL update requests.
+ * Blank nodes should be avoided because they are likely to become renamed 
across separate requests.
+ *
+ * Invocation of deleteAny() across the default graph and all named graphs 
fires two requests.
+ * All other methods fire a single request.
+ */
+public abstract class DatasetGraphOverSparql
+    extends DatasetGraphBase
+{
+    private PrefixMap prefixes = PrefixMapFactory.create();
+    private Transactional transactional = TransactionalNull.create();
+
+    public DatasetGraphOverSparql() {
+        super();
+        initContext();
+    }
+
+    protected PrefixMap getPrefixes() {
+        return prefixes;
+    }
+
+    protected Transactional getTransactional() {
+        return transactional;
+    }
+
+    protected void initContext() {
+        Context cxt = getContext();
+        // Use the context to advertise that SPARQL statements should not be 
parsed.
+        SparqlDispatcherRegistry.setParseCheck(cxt, false);
+    }
+
+    protected abstract QueryExec query(Query query);
+    protected abstract UpdateExec update(UpdateRequest UpdateRequest);
+
+    protected void execUpdate(Update update) {
+        execUpdate(new UpdateRequest(update));
+    }
+
+    protected void execUpdate(UpdateRequest updateRequest) {
+        UpdateExec uExec = update(updateRequest);
+        uExec.execute();
+    }
+
+    /**
+     * This method must return a StreamRDF instance that handles bulk inserts 
of RDF tuples (triples or quads).
+     * The default implementation flushes every 1000 tuples.
+     * Alternative implementations could e.g. flush by the string length of 
the update request.
+     */
+    protected StreamRDF newUpdateSink() {
+        StreamRDF sink = new StreamRDFToUpdateRequest(this::execUpdate, 
Prefixes.adapt(getPrefixes()), 1000);
+        return sink;
+    }
+
+    @Override
+    public Iterator<Node> listGraphNodes() {
+        QueryExec qExec = query(graphsQuery);
+        return Iter.onClose(
+            Iter.map(qExec.select(), b -> b.get(vg)),
+            qExec::close);
+    }
+
+    @Override
+    public Iterator<Quad> find(Node g, Node s, Node p, Node o) {
+        Iterator<Quad> result;
+        if (g == null || Node.ANY.equals(g)) {
+            result = findTriplesOrQuads(this::query, s, p, o);
+        } else if (Quad.isDefaultGraph(g)) {
+            Iterator<Triple> base = findTriples(this::query, s, p, o);
+            result = Iter.map(base, t -> Quad.create(Quad.defaultGraphIRI, t));
+        } else {
+            result = findQuads(this::query, g, s, p, o);
+        }
+        return result;
+    }
+
+    @Override
+    public Iterator<Quad> findNG(Node g, Node s, Node p, Node o) {
+        Iterator<Quad> result = findQuads(this::query, g, s, p, o);
+        return result;
+    }
+
+    @Override
+    public Graph getDefaultGraph() {
+        DatasetGraphOverSparql self = this;
+        return new GraphView(this, Quad.defaultGraphNodeGenerated) {
+            @Override
+            protected int graphBaseSize() {
+                long size = sizeLong();
+                return (size < Integer.MAX_VALUE) ? (int)size : 
Integer.MAX_VALUE;
+            }
+
+            @Override
+            public long sizeLong() {
+                long result = fetchLong(self::query, defaultGraphSizeQuery, 
vc);
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public Graph getGraph(Node graphNode) {
+        DatasetGraphOverSparql self = this;
+        return new GraphView(this, graphNode) {
+            @Override
+            protected int graphBaseSize() {
+                long size = sizeLong();
+                return (size < Integer.MAX_VALUE) ? (int)size : 
Integer.MAX_VALUE;
+            }
+
+            @Override
+            public long sizeLong() {
+                Query q = createQueryNamedGraphSize(graphNode, vc);
+                long result = fetchLong(self::query, q, vc);
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public void addGraph(Node graphName, Graph graph) {
+        StreamRDF sink = newUpdateSink();
+        try {
+            sink.start();
+            StreamRDFToUpdateRequest.sendGraphTriplesToStream(graph, 
graphName, sink);
+        } finally {
+            sink.finish();
+        }
+    }
+
+    @Override
+    public void removeGraph(Node graphName) {
+        Objects.requireNonNull(graphName);
+        UpdateRequest ur = new UpdateRequest(new UpdateDrop(graphName));
+        execUpdate(ur);
+    }
+
+    @Override
+    public void add(Quad quad) {
+        Quad q = harmonizeTripleInQuad(quad);
+        if (!q.isConcrete()) {
+            throw new IllegalArgumentException("Concrete quad expected.");
+        }
+        Update update = new UpdateDataInsert(new QuadDataAcc(List.of(q)));
+        execUpdate(new UpdateRequest(update));
+    }
+
+    @Override
+    public void delete(Quad quad) {
+        Quad q = harmonizeTripleInQuad(quad);
+        if (!q.isConcrete()) {
+            throw new IllegalArgumentException("Concrete quad expected.");
+        }
+        Update update = new UpdateDataDelete(new QuadDataAcc(List.of(q)));
+        execUpdate(update);
+    }
+
+    @Override
+    public void deleteAny(Node g, Node s, Node p, Node o) {
+        deleteAnyInternal(g, s, p, o);
+        boolean isAnyGraph = g == null || Node.ANY.equals(g);
+        if (isAnyGraph) {
+            deleteAnyInternal(Quad.defaultGraphIRI, s, p, o);
+        }
+    }
+
+    @Override
+    public long size() {
+        long result = fetchLong(this::query, graphsCountQuery, vc);
+        return result;
+    }
+
+    @Override
+    public boolean supportsTransactions() {
+        return false;
+    }
+
+    @Override
+    public void abort() {
+        getTransactional().abort();
+    }
+
+    @Override
+    public void begin(ReadWrite readWrite) {
+        getTransactional().begin(readWrite);
+    }
+
+    @Override
+    public void commit() {
+        getTransactional().commit();
+    }
+
+    @Override
+    public void end() {
+        // Note: AbstractTestRDFConnection.transaction_bad_01() expects

Review Comment:
   I don't understand this comment.



##########
jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/UpdateDispatcher.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.dispatch;
+
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.exec.UpdateExec;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.update.UpdateRequest;
+
+public interface UpdateDispatcher {

Review Comment:
   Please document with some javadoc.



##########
jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/DatasetGraphOverSparql.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.dispatch;
+
+import static org.apache.jena.query.ReadWrite.WRITE;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.Query;
+import org.apache.jena.query.QueryFactory;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.system.PrefixMap;
+import org.apache.jena.riot.system.PrefixMapFactory;
+import org.apache.jena.riot.system.Prefixes;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.BasicPattern;
+import org.apache.jena.sparql.core.DatasetGraphBase;
+import org.apache.jena.sparql.core.GraphView;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.sparql.core.Substitute;
+import org.apache.jena.sparql.core.Transactional;
+import org.apache.jena.sparql.core.TransactionalNull;
+import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.UpdateExec;
+import org.apache.jena.sparql.expr.Expr;
+import org.apache.jena.sparql.expr.aggregate.AggCount;
+import org.apache.jena.sparql.modify.request.QuadAcc;
+import org.apache.jena.sparql.modify.request.QuadDataAcc;
+import org.apache.jena.sparql.modify.request.UpdateDataDelete;
+import org.apache.jena.sparql.modify.request.UpdateDataInsert;
+import org.apache.jena.sparql.modify.request.UpdateDeleteWhere;
+import org.apache.jena.sparql.modify.request.UpdateDrop;
+import org.apache.jena.sparql.syntax.Element;
+import org.apache.jena.sparql.syntax.ElementNamedGraph;
+import org.apache.jena.sparql.syntax.ElementTriplesBlock;
+import org.apache.jena.sparql.syntax.ElementUnion;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.update.Update;
+import org.apache.jena.update.UpdateRequest;
+
+/**
+ * This class provides a base implementation of the Jena DatasetGraph interface
+ * to a remote SPARQL endpoint. Efficiency not guaranteed.

Review Comment:
   I don't see anything making this remote-specific.
   
   It would be weird normally ... maybe for logging/mocking for develoment and 
testing?
   
   Maybe this class should be alongside the other `DatasetGraph`.



##########
jena-fuseki2/jena-fuseki-mod-exectracker/src/main/java/org/apache/jena/fuseki/mod/exec/tracker/ExecTrackerService.java:
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.fuseki.mod.exec.tracker;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jena.fuseki.FusekiException;
+import org.apache.jena.fuseki.server.Endpoint;
+import org.apache.jena.fuseki.servlets.BaseActionREST;
+import org.apache.jena.fuseki.servlets.HttpAction;
+import org.apache.jena.riot.WebContent;
+import org.apache.jena.sparql.exec.tracker.BasicTaskExec;
+import org.apache.jena.sparql.exec.tracker.TaskEventHistory;
+import org.apache.jena.sparql.exec.tracker.TaskListener;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.web.HttpSC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.internal.bind.JsonTreeWriter;
+import com.google.gson.stream.JsonWriter;
+
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.AsyncEvent;
+import jakarta.servlet.AsyncListener;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+/**
+ * REST action handler for listing running query executions and stopping them.

Review Comment:
   Please add some javadoc that describe the commands supported.
   
   The `fuseki:operation fuseki:tracker` will also need documentation for the 
website.
   
   



##########
jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/QueryDispatcher.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.dispatch;
+
+import org.apache.jena.query.Query;
+import org.apache.jena.query.Syntax;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.util.Context;
+
+public interface QueryDispatcher {

Review Comment:
   Please document with some javadoc.



##########
jena-arq/src/main/java/org/apache/jena/sparql/engine/dispatch/DatasetGraphOverSparql.java:
##########
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.dispatch;
+
+import static org.apache.jena.query.ReadWrite.WRITE;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.iterator.Iter;
+import org.apache.jena.atlas.iterator.IteratorCloseable;
+import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.Query;
+import org.apache.jena.query.QueryFactory;
+import org.apache.jena.query.ReadWrite;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.system.PrefixMap;
+import org.apache.jena.riot.system.PrefixMapFactory;
+import org.apache.jena.riot.system.Prefixes;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.JenaTransactionException;
+import org.apache.jena.sparql.core.BasicPattern;
+import org.apache.jena.sparql.core.DatasetGraphBase;
+import org.apache.jena.sparql.core.GraphView;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.sparql.core.Substitute;
+import org.apache.jena.sparql.core.Transactional;
+import org.apache.jena.sparql.core.TransactionalNull;
+import org.apache.jena.sparql.core.Var;
+import org.apache.jena.sparql.engine.binding.Binding;
+import org.apache.jena.sparql.engine.binding.BindingFactory;
+import org.apache.jena.sparql.exec.QueryExec;
+import org.apache.jena.sparql.exec.UpdateExec;
+import org.apache.jena.sparql.expr.Expr;
+import org.apache.jena.sparql.expr.aggregate.AggCount;
+import org.apache.jena.sparql.modify.request.QuadAcc;
+import org.apache.jena.sparql.modify.request.QuadDataAcc;
+import org.apache.jena.sparql.modify.request.UpdateDataDelete;
+import org.apache.jena.sparql.modify.request.UpdateDataInsert;
+import org.apache.jena.sparql.modify.request.UpdateDeleteWhere;
+import org.apache.jena.sparql.modify.request.UpdateDrop;
+import org.apache.jena.sparql.syntax.Element;
+import org.apache.jena.sparql.syntax.ElementNamedGraph;
+import org.apache.jena.sparql.syntax.ElementTriplesBlock;
+import org.apache.jena.sparql.syntax.ElementUnion;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.update.Update;
+import org.apache.jena.update.UpdateRequest;
+
+/**
+ * This class provides a base implementation of the Jena DatasetGraph interface
+ * to a remote SPARQL endpoint. Efficiency not guaranteed.
+ *
+ * Any returned iterators must be closed to free the resources.
+ * This base class does not support transactions.
+ *
+ * All inserts are passed on as SPARQL update requests.
+ * Blank nodes should be avoided because they are likely to become renamed 
across separate requests.
+ *
+ * Invocation of deleteAny() across the default graph and all named graphs 
fires two requests.
+ * All other methods fire a single request.
+ */
+public abstract class DatasetGraphOverSparql
+    extends DatasetGraphBase
+{
+    private PrefixMap prefixes = PrefixMapFactory.create();
+    private Transactional transactional = TransactionalNull.create();
+
+    public DatasetGraphOverSparql() {
+        super();
+        initContext();
+    }
+
+    protected PrefixMap getPrefixes() {
+        return prefixes;
+    }
+
+    protected Transactional getTransactional() {
+        return transactional;
+    }
+
+    protected void initContext() {
+        Context cxt = getContext();
+        // Use the context to advertise that SPARQL statements should not be 
parsed.
+        SparqlDispatcherRegistry.setParseCheck(cxt, false);
+    }
+
+    protected abstract QueryExec query(Query query);
+    protected abstract UpdateExec update(UpdateRequest UpdateRequest);
+
+    protected void execUpdate(Update update) {
+        execUpdate(new UpdateRequest(update));
+    }
+
+    protected void execUpdate(UpdateRequest updateRequest) {
+        UpdateExec uExec = update(updateRequest);
+        uExec.execute();
+    }
+
+    /**
+     * This method must return a StreamRDF instance that handles bulk inserts 
of RDF tuples (triples or quads).
+     * The default implementation flushes every 1000 tuples.
+     * Alternative implementations could e.g. flush by the string length of 
the update request.
+     */
+    protected StreamRDF newUpdateSink() {
+        StreamRDF sink = new StreamRDFToUpdateRequest(this::execUpdate, 
Prefixes.adapt(getPrefixes()), 1000);
+        return sink;
+    }
+
+    @Override
+    public Iterator<Node> listGraphNodes() {
+        QueryExec qExec = query(graphsQuery);
+        return Iter.onClose(
+            Iter.map(qExec.select(), b -> b.get(vg)),
+            qExec::close);
+    }
+
+    @Override
+    public Iterator<Quad> find(Node g, Node s, Node p, Node o) {
+        Iterator<Quad> result;
+        if (g == null || Node.ANY.equals(g)) {
+            result = findTriplesOrQuads(this::query, s, p, o);
+        } else if (Quad.isDefaultGraph(g)) {
+            Iterator<Triple> base = findTriples(this::query, s, p, o);
+            result = Iter.map(base, t -> Quad.create(Quad.defaultGraphIRI, t));
+        } else {
+            result = findQuads(this::query, g, s, p, o);
+        }
+        return result;
+    }
+
+    @Override
+    public Iterator<Quad> findNG(Node g, Node s, Node p, Node o) {
+        Iterator<Quad> result = findQuads(this::query, g, s, p, o);
+        return result;
+    }
+
+    @Override
+    public Graph getDefaultGraph() {
+        DatasetGraphOverSparql self = this;
+        return new GraphView(this, Quad.defaultGraphNodeGenerated) {
+            @Override
+            protected int graphBaseSize() {
+                long size = sizeLong();
+                return (size < Integer.MAX_VALUE) ? (int)size : 
Integer.MAX_VALUE;
+            }
+
+            @Override
+            public long sizeLong() {
+                long result = fetchLong(self::query, defaultGraphSizeQuery, 
vc);
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public Graph getGraph(Node graphNode) {
+        DatasetGraphOverSparql self = this;
+        return new GraphView(this, graphNode) {
+            @Override
+            protected int graphBaseSize() {
+                long size = sizeLong();
+                return (size < Integer.MAX_VALUE) ? (int)size : 
Integer.MAX_VALUE;
+            }
+
+            @Override
+            public long sizeLong() {
+                Query q = createQueryNamedGraphSize(graphNode, vc);
+                long result = fetchLong(self::query, q, vc);
+                return result;
+            }
+        };
+    }
+
+    @Override
+    public void addGraph(Node graphName, Graph graph) {
+        StreamRDF sink = newUpdateSink();
+        try {
+            sink.start();
+            StreamRDFToUpdateRequest.sendGraphTriplesToStream(graph, 
graphName, sink);
+        } finally {
+            sink.finish();
+        }
+    }
+
+    @Override
+    public void removeGraph(Node graphName) {
+        Objects.requireNonNull(graphName);
+        UpdateRequest ur = new UpdateRequest(new UpdateDrop(graphName));
+        execUpdate(ur);
+    }
+
+    @Override
+    public void add(Quad quad) {
+        Quad q = harmonizeTripleInQuad(quad);
+        if (!q.isConcrete()) {
+            throw new IllegalArgumentException("Concrete quad expected.");
+        }
+        Update update = new UpdateDataInsert(new QuadDataAcc(List.of(q)));
+        execUpdate(new UpdateRequest(update));
+    }
+
+    @Override
+    public void delete(Quad quad) {
+        Quad q = harmonizeTripleInQuad(quad);
+        if (!q.isConcrete()) {
+            throw new IllegalArgumentException("Concrete quad expected.");
+        }
+        Update update = new UpdateDataDelete(new QuadDataAcc(List.of(q)));
+        execUpdate(update);
+    }
+
+    @Override
+    public void deleteAny(Node g, Node s, Node p, Node o) {
+        deleteAnyInternal(g, s, p, o);
+        boolean isAnyGraph = g == null || Node.ANY.equals(g);
+        if (isAnyGraph) {
+            deleteAnyInternal(Quad.defaultGraphIRI, s, p, o);
+        }
+    }
+
+    @Override
+    public long size() {
+        long result = fetchLong(this::query, graphsCountQuery, vc);
+        return result;
+    }
+
+    @Override
+    public boolean supportsTransactions() {
+        return false;
+    }
+

Review Comment:
   For clarity, please add
   ```java
        public boolean supportsTransactionAbort() {
           return false;
       }
   ```



##########
jena-fuseki2/jena-fuseki-mod-exectracker/src/main/java/org/apache/jena/fuseki/mod/exec/tracker/ExecTrackerService.java:
##########
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.fuseki.mod.exec.tracker;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.jena.fuseki.FusekiException;
+import org.apache.jena.fuseki.server.Endpoint;
+import org.apache.jena.fuseki.servlets.BaseActionREST;
+import org.apache.jena.fuseki.servlets.HttpAction;
+import org.apache.jena.riot.WebContent;
+import org.apache.jena.sparql.exec.tracker.BasicTaskExec;
+import org.apache.jena.sparql.exec.tracker.TaskEventHistory;
+import org.apache.jena.sparql.exec.tracker.TaskListener;
+import org.apache.jena.sparql.util.Context;
+import org.apache.jena.web.HttpSC;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.internal.bind.JsonTreeWriter;
+import com.google.gson.stream.JsonWriter;
+
+import jakarta.servlet.AsyncContext;
+import jakarta.servlet.AsyncEvent;
+import jakarta.servlet.AsyncListener;
+import jakarta.servlet.http.HttpServletRequest;
+import jakarta.servlet.http.HttpServletResponse;
+
+/**
+ * REST action handler for listing running query executions and stopping them.
+ */
+public class ExecTrackerService extends BaseActionREST {
+    private static final Logger logger = 
LoggerFactory.getLogger(ExecTrackerService.class);
+
+    // Gson for formatting server side event (SSE) JSON.
+    // Note: Pretty printing breaks SSE events due to newlines!
+    private static Gson gsonForSseEvents = new Gson();
+
+    /** Helper class to track SSE clients. */
+    private class Clients {
+        // Lock to prevent concurrent addition/removal of listeners while 
broadcasting events.
+        Object listenerLock = new Object();
+
+        // The endpoint of the clients.
+        Endpoint endpoint;
+
+        // Single listener on an TaskTrackerRegistry. - Not needed here; this 
listener initialized during FMOD init.
+        Runnable taskTrackerListenerDisposer;
+
+        // Web clients on the ExecTracker.
+        Map<AsyncContext, Runnable> eventListeners = 
Collections.synchronizedMap(new IdentityHashMap<>()); // new 
ConcurrentHashMap<>();
+
+        // The history tracker connected to the taskTracker.
+        // TaskEventHistory historyTracker;
+    }
+
+    /** Registered clients listening to server side events for indexer status 
updates. */
+    private Map<TaskEventHistory, Clients> trackerToClients = new 
ConcurrentHashMap<>(); // Collections.synchronizedMap(new IdentityHashMap<>());
+
+    public ExecTrackerService() {}
+
+    private static long getExecId(HttpAction action) {
+        String str = action.getRequest().getParameter("requestId");
+        Objects.requireNonNull(str);
+        long result = Long.parseLong(str);
+        return result;
+    }
+
+    /**
+     * The GET command can serve: the website, the notification stream from 
task execution
+     * and the latest task execution status.
+     */
+    @Override
+    protected void doGet(HttpAction action) {
+        String rawCommand = action.getRequestParameter("command");
+        String command = Optional.ofNullable(rawCommand).orElse("page");
+        switch (command) {
+        case "page": servePage(action); break;
+        case "events": serveEvents(action); break;
+        case "status": serveStatus(action); break;
+        case "stop": stopExec(action); break;
+        default:
+            throw new UnsupportedOperationException("Unsupported operation: " 
+ command);
+        }
+    }
+
+    protected void stopExec(HttpAction action) {
+        checkIsAbortAllowed(action);
+
+        long execId = getExecId(action);
+
+        TaskEventHistory taskEventHistory = requireTaskEventHistory(action);
+        BasicTaskExec task = taskEventHistory.getTaskBySerialId(execId);
+
+        if (task != null) {
+            try {
+                task.abort();
+            } catch (Throwable t) {
+                logger.warn("Exception raised during abort.", t);
+            }
+            logger.info("Sending stop request to execution: " + execId);
+        } else {
+            logger.warn("No such execution to abort: " + execId);
+        }
+
+        respond(action, HttpSC.OK_200, WebContent.contentTypeTextPlain, "Abort 
request accepted.");
+    }
+
+    protected void servePage(HttpAction action) {
+        // Serves the minimal graphql ui
+        String resourceName = "exec-tracker/index.html";
+        String str = null;
+        try (InputStream in = 
ExecTrackerService.class.getClassLoader().getResourceAsStream(resourceName)) {
+            str = IOUtils.toString(in, StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            throw new FusekiException(e);
+        }
+
+        if (str == null) {
+            respond(action, HttpSC.INTERNAL_SERVER_ERROR_500, 
WebContent.contentTypeTextPlain,
+                "Failed to load classpath resource " + resourceName);
+        } else {
+            respond(action, HttpSC.OK_200, WebContent.contentTypeHTML, str);
+        }
+    }
+
+    protected TaskEventHistory requireTaskEventHistory(HttpAction action) {
+        Context cxt = action.getEndpoint().getContext();
+        TaskEventHistory taskEventHistory = TaskEventHistory.require(cxt);
+        return taskEventHistory;
+    }
+
+    protected Runnable registerTaskEventListener(TaskEventHistory 
taskEventHistory, Clients clients) {
+        // Register the SSE handler to the history tracker
+        InternalListener listener = new InternalListener(taskEventHistory, 
clients);
+        Runnable disposeTaskEventListener = 
taskEventHistory.addListener(BasicTaskExec.class, listener);
+        return disposeTaskEventListener;
+    }
+
+    protected class InternalListener implements TaskListener<BasicTaskExec> {
+        protected TaskEventHistory taskEvenHistory;
+        protected Clients clients;
+
+        Long getTaskId(BasicTaskExec task) {
+            long taskId = taskEvenHistory.getId(task);
+            Long serialId = taskEvenHistory.getSerialId(taskId);
+            return serialId;
+        }
+
+        public InternalListener(TaskEventHistory taskEventHistory, Clients 
clients) {
+            super();
+            this.taskEvenHistory = taskEventHistory;
+            this.clients = clients;
+        }
+
+        @Override
+        public void onStateChange(BasicTaskExec task) {
+            switch (task.getTaskState()) {
+            case STARTING: onStart(task); break;
+            case TERMINATED: onTerminated(task); break;
+            default: // ignored
+            }
+        }
+
+        public void onStart(BasicTaskExec startRecord) {
+            Long serialId = getTaskId(startRecord);
+            if (serialId != null) {
+                try (JsonTreeWriter writer = new JsonTreeWriter()) {
+                    writer.beginObject();
+                    TaskStatusWriter.writeStartRecordMembers(writer, serialId, 
startRecord);
+                    TaskStatusWriter.writeCanAbort(writer, 
isAbortAllowed(clients.endpoint));
+                    writer.endObject();
+                    JsonElement json = writer.get();
+                    synchronized (clients.listenerLock) {
+                        Iterator<Entry<AsyncContext, Runnable>> it = 
clients.eventListeners.entrySet().iterator();
+                        broadcastJson(it, json);
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public void onTerminated(BasicTaskExec endRecord) {
+            Long serialId = getTaskId(endRecord);
+            if (serialId != null) {
+                try (JsonTreeWriter writer = new JsonTreeWriter()) {
+                    TaskStatusWriter.writeCompletionRecordObject(writer, 
serialId, endRecord);
+                    JsonElement json = writer.get();
+                    synchronized (clients.listenerLock) {
+                        Iterator<Entry<AsyncContext, Runnable>> it = 
clients.eventListeners.entrySet().iterator();
+                        broadcastJson(it, json);
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+    protected void serveEvents(HttpAction action) {

Review Comment:
   This is showing up as (-1, -1) in the log. Maybe this will help:
   
   ```
           action.setResponseStatus(HttpSC.OK_200);
   ```
   
   It would also benefit from a `try-catch` just in case an internal error 
happens.



##########
jena-fuseki2/jena-fuseki-mod-exectracker/src/main/java/org/apache/jena/fuseki/mod/exec/tracker/FMod_ExecTracker.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.fuseki.mod.exec.tracker;

Review Comment:
   Shouldn't this be `exectracker`?
   What else would be `org.apache.jena.fuseki.mod.exec`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to