Author: sallen
Date: Mon Mar 26 00:31:06 2012
New Revision: 1305184
URL: http://svn.apache.org/viewvc?rev=1305184&view=rev
Log:
JENA-205 (Streaming results for CONSTRUCT queries). Added support to ARQ for
streaming CONSTRUCT queries for both local and remote query execution.
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/query/QueryExecution.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/http/QueryEngineHTTP.java
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/modify/TemplateLib.java
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/query/QueryExecution.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/query/QueryExecution.java?rev=1305184&r1=1305183&r2=1305184&view=diff
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/query/QueryExecution.java
(original)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/query/QueryExecution.java
Mon Mar 26 00:31:06 2012
@@ -18,8 +18,10 @@
package com.hp.hpl.jena.query;
+import java.util.Iterator ;
import java.util.concurrent.TimeUnit ;
+import com.hp.hpl.jena.graph.Triple ;
import com.hp.hpl.jena.rdf.model.Model ;
import com.hp.hpl.jena.sparql.util.Context ;
import com.hp.hpl.jena.util.FileManager ;
@@ -76,6 +78,16 @@ public interface QueryExecution
* @return Model The model argument for casaded code.
*/
public Model execConstruct(Model model);
+
+ /**
+ * Execute a CONSTRUCT query, returning the results as an iterator of
{@link Triple}.
+ * <b>Caution:</b> This method may return duplicate Triples. This method
may be useful if you only
+ * need the results for stream processing, as it can avoid having to place
the results in a Model.
+ *
+ * @return An iterator of Triple objects (possibly containing duplicates)
generated
+ * by applying the CONSTRUCT template of the query to the bindings in the
WHERE clause.
+ */
+ public Iterator<Triple> execConstructTriples();
/** Execute a DESCRIBE query */
public Model execDescribe();
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java?rev=1305184&r1=1305183&r2=1305184&view=diff
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
(original)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
Mon Mar 26 00:31:06 2012
@@ -18,11 +18,9 @@
package com.hp.hpl.jena.sparql.engine;
-import java.util.HashMap ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.List ;
-import java.util.Map ;
import java.util.Set ;
import java.util.concurrent.TimeUnit ;
@@ -57,6 +55,7 @@ import com.hp.hpl.jena.sparql.engine.bin
import com.hp.hpl.jena.sparql.engine.iterator.QueryIteratorBase ;
import com.hp.hpl.jena.sparql.engine.iterator.QueryIteratorWrapper ;
import com.hp.hpl.jena.sparql.graph.GraphFactory ;
+import com.hp.hpl.jena.sparql.modify.TemplateLib ;
import com.hp.hpl.jena.sparql.syntax.ElementGroup ;
import com.hp.hpl.jena.sparql.syntax.Template ;
import com.hp.hpl.jena.sparql.util.Context ;
@@ -223,7 +222,6 @@ public class QueryExecutionBase implemen
return execConstruct(GraphFactory.makeJenaDefaultModel()) ;
}
-// TODO: enable this?
// /**
// * Executes as a construct query, placing the results into a newly
constructed {@link com.hp.hpl.jena.sparql.graph.GraphDistinctDataBag}.
// * The threshold policy is set from the current context.
@@ -234,10 +232,35 @@ public class QueryExecutionBase implemen
// ThresholdPolicy<Triple> thresholdPolicy =
ThresholdPolicyFactory.policyFromContext(context);
// return execConstruct(GraphFactory.makeDataBagModel(thresholdPolicy))
;
// }
-
+
@Override
public Model execConstruct(Model model)
{
+ try
+ {
+ Iterator<Triple> it = execConstructTriples();
+
+ // Prefixes for result
+ insertPrefixesInto(model);
+
+ while (it.hasNext())
+ {
+ Triple t = it.next();
+ Statement stmt = ModelUtils.tripleToStatement(model, t);
+ if ( stmt != null )
+ model.add(stmt);
+ }
+ }
+ finally
+ {
+ this.close();
+ }
+ return model;
+ }
+
+ @Override
+ public Iterator<Triple> execConstructTriples()
+ {
if ( ! query.isConstructType() )
throw new QueryExecException("Attempt to get a CONSTRUCT model
from a "+labelForQuery(query)+" query") ;
// This causes there to be no PROJECT around the pattern.
@@ -246,29 +269,9 @@ public class QueryExecutionBase implemen
startQueryIterator() ;
- // Prefixes for result
- insertPrefixesInto(model) ;
Template template = query.getConstructTemplate() ;
-
- // Build each template substitution as triples.
- for ( ; queryIterator.hasNext() ; )
- {
- Set<Triple> set = new HashSet<Triple>() ;
- Map<Node, Node> bNodeMap = new HashMap<Node, Node>() ;
- Binding binding = queryIterator.nextBinding() ;
- template.subst(set, bNodeMap, binding) ;
-
- // Convert and merge into Model.
- for ( Iterator<Triple> iter = set.iterator() ; iter.hasNext() ; )
- {
- Triple t = iter.next() ;
- Statement stmt = ModelUtils.tripleToStatement(model, t) ;
- if ( stmt != null )
- model.add(stmt) ;
- }
- }
- this.close() ;
- return model ;
+
+ return TemplateLib.calcTriples(template.getTriples(), queryIterator);
}
@Override
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/http/QueryEngineHTTP.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/http/QueryEngineHTTP.java?rev=1305184&r1=1305183&r2=1305184&view=diff
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/http/QueryEngineHTTP.java
(original)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/engine/http/QueryEngineHTTP.java
Mon Mar 26 00:31:06 2012
@@ -27,12 +27,24 @@ import java.util.Map ;
import java.util.concurrent.TimeUnit ;
import org.openjena.atlas.io.IO ;
+import org.openjena.atlas.iterator.IteratorResourceClosing ;
import org.openjena.riot.Lang ;
+import org.openjena.riot.RiotReader ;
+import org.openjena.riot.RiotTripleParsePuller ;
import org.openjena.riot.WebContent ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
-import com.hp.hpl.jena.query.* ;
+import com.hp.hpl.jena.graph.Triple ;
+import com.hp.hpl.jena.query.ARQ ;
+import com.hp.hpl.jena.query.Dataset ;
+import com.hp.hpl.jena.query.Query ;
+import com.hp.hpl.jena.query.QueryException ;
+import com.hp.hpl.jena.query.QueryExecException ;
+import com.hp.hpl.jena.query.QueryExecution ;
+import com.hp.hpl.jena.query.QuerySolution ;
+import com.hp.hpl.jena.query.ResultSet ;
+import com.hp.hpl.jena.query.ResultSetFactory ;
import com.hp.hpl.jena.rdf.model.Model ;
import com.hp.hpl.jena.sparql.ARQException ;
import com.hp.hpl.jena.sparql.graph.GraphFactory ;
@@ -269,6 +281,41 @@ public class QueryEngineHTTP implements
}
@Override
+ public Iterator<Triple> execConstructTriples()
+ {
+ HttpQuery httpQuery = makeHttpQuery() ;
+ httpQuery.setAccept(modelContentType) ;
+ InputStream in = httpQuery.exec() ;
+
+ //Don't assume the endpoint actually gives back the content type we
asked for
+ String actualContentType = httpQuery.getContentType();
+
+ //If the server fails to return a Content-Type then we will assume
+ //the server returned the type we asked for
+ if (actualContentType == null || actualContentType.equals(""))
+ {
+ actualContentType = modelContentType;
+ }
+
+ //Try to select language appropriately here based on the model content
type
+ Lang lang = WebContent.contentTypeToLang(actualContentType);
+ if (!lang.isTriples()) throw new QueryException("Endpoint returned
Content Type: " + actualContentType + " is not a valid RDF Graph syntax");
+
+ // Special case N-Triples, because the RIOT reader has a pull interface
+ if (lang == Lang.NTRIPLES)
+ {
+ return new
IteratorResourceClosing<Triple>(RiotReader.createParserNTriples(in, null), in);
+ }
+ else
+ {
+ // Otherwise, we have to spin up a thread to deal with it
+ RiotTripleParsePuller parsePuller = new RiotTripleParsePuller(in,
lang, null);
+ parsePuller.parse();
+ return parsePuller;
+ }
+ }
+
+ @Override
public boolean execAsk()
{
HttpQuery httpQuery = makeHttpQuery() ;
Modified:
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/modify/TemplateLib.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/modify/TemplateLib.java?rev=1305184&r1=1305183&r2=1305184&view=diff
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/modify/TemplateLib.java
(original)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/com/hp/hpl/jena/sparql/modify/TemplateLib.java
Mon Mar 26 00:31:06 2012
@@ -35,6 +35,8 @@ public class TemplateLib
// See also Substitute -- combine?
// Or is this specifc enough to CONSTRUCT/Update template processing?
+ // TODO We could eliminate some of the duplication in this class by
writing generic methods and adding a shared super-interface to Triple and Quad
+
/**
* Take a template, as a list of quad patterns, a default graph, and an
iterator of bindings,
* and produce an iterator of quads that results from applying the
template to the bindings.
@@ -66,6 +68,34 @@ public class TemplateLib
return quads;
}
+ /** Substitute into triple patterns */
+ public static Iterator<Triple> calcTriples(final List<Triple> triples,
Iterator<Binding> bindings)
+ {
+ return Iter.mapMany(bindings, new Transform<Binding,
Iterator<Triple>>()
+ {
+ Map<Node, Node> bNodeMap = new HashMap<Node, Node>() ;
+ @Override
+ public Iterator<Triple> convert(final Binding b)
+ {
+ // Iteration is a new mapping of bnodes.
+ bNodeMap.clear() ;
+
+ List<Triple> tripleList = new
ArrayList<Triple>(triples.size());
+ for (Triple triple : triples)
+ {
+ Triple q = subst(triple, b, bNodeMap) ;
+ if ( ! q.isConcrete() )
+ {
+ //Log.warn(TemplateLib.class, "Unbound quad:
"+FmtUtils.stringForQuad(quad)) ;
+ continue ;
+ }
+ tripleList.add(q);
+ }
+ return tripleList.iterator();
+ }
+ });
+ }
+
/** Substitute into quad patterns */
public static Iterator<Quad> calcQuads(final List<Quad> quads,
Iterator<Binding> bindings)
{
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java?rev=1305184&view=auto
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
(added)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotParsePuller.java
Mon Mar 26 00:31:06 2012
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.riot;
+
+import java.io.IOException ;
+import java.io.InputStream ;
+import java.util.Iterator ;
+import java.util.NoSuchElementException ;
+import java.util.concurrent.ArrayBlockingQueue ;
+import java.util.concurrent.BlockingQueue ;
+import java.util.concurrent.CancellationException ;
+import java.util.concurrent.TimeUnit ;
+
+import org.openjena.atlas.AtlasException ;
+import org.openjena.atlas.lib.Closeable ;
+import org.openjena.atlas.lib.Sink ;
+import org.openjena.riot.lang.LangRIOT ;
+import org.openjena.riot.system.ParserProfile ;
+
+/**
+ * A wrapper that converts the RIOT parsing API from push to pull. It does
this by starting up a
+ * thread that feeds results through a {@link BlockingQueue}. You must call
{@link #parse()} in
+ * order to start the background thread parsing the InputStream. You then use
the iterator methods
+ * to retrieve the statements.
+ * <p/>
+ * Note: You can avoid using this class if you are dealing with N-Triples or
N-Quads, because RIOT
+ * provides pull interfaces directly for those parsers.
+ */
+public abstract class RiotParsePuller<T> implements Iterator<T>, Closeable,
LangRIOT
+{
+ private static final int QUEUE_CAPACITY = 1000;
+ private static final int ITERATOR_POLL_TIMEOUT = 1000; // one second
+ private static final TimeUnit ITERATOR_POLL_TIMEUNIT =
TimeUnit.MILLISECONDS;
+
+ protected final InputStream in;
+ protected final Lang lang;
+ protected final String baseIRI;
+ protected final LangRIOT parser;
+ private final BlockingQueue<T> queue;
+
+ @SuppressWarnings("unchecked")
+ private final T endMarker = (T)new Object();
+
+ private Thread readThread;
+ private volatile RuntimeException uncaughtException;
+
+ // For the iterator bit
+ private boolean finished;
+ private T slot;
+
+ public RiotParsePuller(InputStream in, Lang lang, String baseIRI)
+ {
+ this.in = in;
+ this.lang = lang;
+ this.baseIRI = baseIRI;
+ this.queue = new ArrayBlockingQueue<T>(QUEUE_CAPACITY);
+
+ Sink<T> sink = createSink();
+ this.parser = createParser(sink);
+ }
+
+ private Sink<T> createSink()
+ {
+ // Executes within the context of the thread
+ Sink<T> sink = new Sink<T>()
+ {
+ @Override
+ public void send(T item)
+ {
+ try
+ {
+ if (Thread.interrupted()) throw new InterruptedException();
+
+ // Hopefully we'll never get passed null... but just in
case
+ if (null == item) return;
+
+ queue.put(item);
+ }
+ catch (InterruptedException e)
+ {
+ // Presumably throwing an exception is how the parsers
expect you to cancel?
+ throw new CancellationException();
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ // do nothing
+ }
+
+ @Override
+ public void close()
+ {
+ // do nothing
+ }
+ };
+ return sink;
+ }
+
+ @Override
+ public Lang getLang()
+ {
+ return lang;
+ }
+
+ @Override
+ public ParserProfile getProfile()
+ {
+ return parser.getProfile();
+ }
+
+ @Override
+ public void setProfile(ParserProfile profile)
+ {
+ parser.setProfile(profile);
+ }
+
+ protected abstract LangRIOT createParser(Sink<T> sink);
+
+ /**
+ * Starts the background thread parsing the InputStream. This method
+ * returns immediately. To retrieve the results, use the iterator methods.
+ */
+ @Override
+ public void parse()
+ {
+ readThread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ boolean noExceptions = true;
+ try
+ {
+ parser.parse();
+ }
+ catch (RuntimeException e)
+ {
+ uncaughtException = e;
+ noExceptions = false;
+ }
+ finally
+ {
+ if (noExceptions)
+ {
+ try
+ {
+ queue.put(endMarker);
+ }
+ catch (InterruptedException e)
+ {
+ // Someone cancelled right as we were trying to
add the endMarker!
+ uncaughtException = new CancellationException();
+ }
+ }
+
+ try
+ {
+ in.close();
+ }
+ catch (IOException e)
+ {
+ uncaughtException = new AtlasException("Error closing
input stream", e);
+ }
+ }
+ }
+ });
+ readThread.start();
+ }
+
+ @Override
+ public void close()
+ {
+ if (null != readThread)
+ {
+ readThread.interrupt();
+ }
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ if ( finished ) return false ;
+ if ( slot != null ) return true ;
+ while (true)
+ {
+ try
+ {
+ slot = queue.poll(ITERATOR_POLL_TIMEOUT,
ITERATOR_POLL_TIMEUNIT);
+ }
+ catch (InterruptedException e)
+ {
+ // Someone wants us to finish I guess
+ return false;
+ }
+
+ if (null != slot) break;
+
+ // put this down here so we can drain as much as possible from the
queue before rethrowing the exception
+ if (null != uncaughtException)
+ {
+ finished = true;
+ // Don't throw an exception if cancellation was requested
+ if (uncaughtException instanceof CancellationException)
+ {
+ return false;
+ }
+ else
+ {
+ throw uncaughtException;
+ }
+ }
+ }
+
+ if ( slot == endMarker )
+ {
+ finished = true ;
+ slot = null ;
+ return false ;
+ }
+ return true ;
+ }
+
+ @Override
+ public T next()
+ {
+ if ( ! hasNext() )
+ throw new NoSuchElementException() ;
+ T item = slot ;
+ slot = null ;
+ return item ;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException() ;
+ }
+
+
+}
+
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotQuadParsePuller.java?rev=1305184&view=auto
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
(added)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotQuadParsePuller.java
Mon Mar 26 00:31:06 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.riot;
+
+import java.io.InputStream ;
+
+import org.openjena.atlas.lib.Sink ;
+import org.openjena.riot.lang.LangRIOT ;
+
+import com.hp.hpl.jena.sparql.core.Quad ;
+
+/**
+ * A RiotParsePuller that operates on Quads.
+ *
+ * @see RiotParsePuller
+ */
+public class RiotQuadParsePuller extends RiotParsePuller<Quad>
+{
+ public RiotQuadParsePuller(InputStream in, Lang lang, String baseIRI)
+ {
+ super(in, lang, baseIRI) ;
+ }
+
+ @Override
+ protected LangRIOT createParser(Sink<Quad> sink)
+ {
+ return RiotReader.createParserQuads(in, lang, baseIRI, sink) ;
+ }
+}
+
Added:
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
URL:
http://svn.apache.org/viewvc/incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotTripleParsePuller.java?rev=1305184&view=auto
==============================================================================
---
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
(added)
+++
incubator/jena/Jena2/ARQ/trunk/src/main/java/org/openjena/riot/RiotTripleParsePuller.java
Mon Mar 26 00:31:06 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.openjena.riot;
+
+import java.io.InputStream ;
+
+import org.openjena.atlas.lib.Sink ;
+import org.openjena.riot.lang.LangRIOT ;
+
+import com.hp.hpl.jena.graph.Triple ;
+
+/**
+ * A RiotParsePuller that operates on Triples.
+ *
+ * @see RiotParsePuller
+ */
+public class RiotTripleParsePuller extends RiotParsePuller<Triple>
+{
+ public RiotTripleParsePuller(InputStream in, Lang lang, String baseIRI)
+ {
+ super(in, lang, baseIRI) ;
+ }
+
+ @Override
+ protected LangRIOT createParser(Sink<Triple> sink)
+ {
+ return RiotReader.createParserTriples(in, lang, baseIRI, sink) ;
+ }
+}
+