Hi all, According to the RIOT docs, iterating over triples/quads with piped streams requires separate threads for producer/consumer.
For some applications this isn't practical. In my case I am running an Hadoop job on NTriple datasets, so I am parsing one triple at a time. The overhead and extra code complexity of kicking off a thread to parse each triple is too high, and this may be true for other use cases involving small datasets. I wrote some StreamRDF implementations which store the results in Java Collections, so that parsing can be run on a single thread. Attached is a patch with the implementations, tests and an example (I borrowed the term 'Collector' from Apache Lucene). But I now suspect that I've overlooked some simple existing API call to do this. Any feedback appreciated. Regards, Barry
Index: src-examples/arq/examples/riot/ExRIOT_7.java =================================================================== --- src-examples/arq/examples/riot/ExRIOT_7.java (revision 0) +++ src-examples/arq/examples/riot/ExRIOT_7.java (working copy) @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package arq.examples.riot; + +import org.apache.jena.riot.RDFDataMgr; +import org.apache.jena.riot.lang.CollectorStreamBase; +import org.apache.jena.riot.lang.CollectorStreamTriples; + +import com.hp.hpl.jena.graph.Triple; + +/** + * Example of using RIOT for streaming RDF to be stored into a Collection. + * + * Suitable for single-threaded parsing, for use with small data or distributed + * computing frameworks (e.g. Hadoop) where the overhead of creating many threads + * is significant. + * + * @see CollectorStreamBase + */ +public class ExRIOT_7 { + + public static void main(String... argv) { + final String filename = "data.ttl"; + + CollectorStreamTriples inputStream = new CollectorStreamTriples(); + RDFDataMgr.parse(inputStream, filename); + + for (Triple triple : inputStream.getCollected()) { + System.out.println(triple); + } + } + +} Index: src/main/java/org/apache/jena/riot/lang/CollectorStreamBase.java =================================================================== --- src/main/java/org/apache/jena/riot/lang/CollectorStreamBase.java (revision 0) +++ src/main/java/org/apache/jena/riot/lang/CollectorStreamBase.java (working copy) @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.riot.lang; + +import java.util.Collection; + +import org.apache.jena.atlas.lib.Tuple; +import org.apache.jena.riot.system.PrefixMap; +import org.apache.jena.riot.system.PrefixMapFactory; +import org.apache.jena.riot.system.StreamRDF; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Base class for StreamRDF implementations which store received <T> + * objects in a {@link java.util.Collection}. + * + * The resulting collection can be retrieved via the {@link #getCollected()} + * method. + * + * Implementations are suitable for single-threaded parsing, for use with small + * data or distributed computing frameworks (e.g. Hadoop) where the overhead + * of creating many threads is significant. + * + * @param <T> Type of the value stored in the collection + */ +public abstract class CollectorStreamBase<T> implements StreamRDF { + private final PrefixMap prefixes = PrefixMapFactory.createForInput(); + private String baseIri; + + @Override + public void finish() {} + + @Override + public void triple(Triple triple) {} + + @Override + public void tuple(Tuple<Node> tuple) {} + + @Override + public void quad(Quad quad) {} + + @Override + public void start() {} + + @Override + public void base(String base) { + this.baseIri = base; + } + + @Override + public void prefix(String prefix, String iri) { + prefixes.add(prefix, iri); + } + + public PrefixMap getPrefixes() { + return prefixes; + } + + public String getBaseIri() { + return baseIri; + } + + /** + * @return The collection received by this instance. + */ + public abstract Collection<T> getCollected(); +} Index: src/main/java/org/apache/jena/riot/lang/CollectorStreamTuples.java =================================================================== --- src/main/java/org/apache/jena/riot/lang/CollectorStreamTuples.java (revision 0) +++ src/main/java/org/apache/jena/riot/lang/CollectorStreamTuples.java (working copy) @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.riot.lang; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.jena.atlas.lib.Tuple; +import org.apache.jena.riot.system.StreamRDF; + +import com.hp.hpl.jena.graph.Node; + +/** + * Collector stream for quads. + * + * @see CollectorStreamBase + */ +public class CollectorStreamTuples extends CollectorStreamBase<Tuple<Node>> implements StreamRDF { + private List<Tuple<Node>> tuples = new ArrayList<Tuple<Node>>(); + + @Override + public void start() { + tuples.clear(); + } + + @Override + public void tuple(Tuple<Node> tuple) { + tuples.add(tuple); + } + + @Override + public Collection<Tuple<Node>> getCollected() { + return tuples; + } +} Index: src/main/java/org/apache/jena/riot/lang/CollectorStreamQuads.java =================================================================== --- src/main/java/org/apache/jena/riot/lang/CollectorStreamQuads.java (revision 0) +++ src/main/java/org/apache/jena/riot/lang/CollectorStreamQuads.java (working copy) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.riot.lang; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.jena.riot.system.StreamRDF; + +import com.hp.hpl.jena.sparql.core.Quad; + +/** + * Collector stream for quads. + * + * @see CollectorStreamBase + */ +public class CollectorStreamQuads extends CollectorStreamBase<Quad> implements StreamRDF { + private List<Quad> quads = new ArrayList<Quad>(); + + @Override + public void start() { + quads.clear(); + } + + @Override + public void quad(Quad quad) { + quads.add(quad); + } + + @Override + public Collection<Quad> getCollected() { + return quads; + } +} Index: src/main/java/org/apache/jena/riot/lang/CollectorStreamTriples.java =================================================================== --- src/main/java/org/apache/jena/riot/lang/CollectorStreamTriples.java (revision 0) +++ src/main/java/org/apache/jena/riot/lang/CollectorStreamTriples.java (working copy) @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.riot.lang; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.jena.riot.system.StreamRDF; + +import com.hp.hpl.jena.graph.Triple; + +/** + * Collector stream for triples. + * + * @see CollectorStreamBase + */ +public class CollectorStreamTriples extends CollectorStreamBase<Triple> implements StreamRDF { + private List<Triple> triples = new ArrayList<Triple>(); + + @Override + public void start() { + triples.clear(); + } + + @Override + public void triple(Triple triple) { + triples.add(triple); + } + + @Override + public Collection<Triple> getCollected() { + return triples; + } +} Index: src/test/java/org/apache/jena/riot/lang/TestCollectorStream.java =================================================================== --- src/test/java/org/apache/jena/riot/lang/TestCollectorStream.java (revision 0) +++ src/test/java/org/apache/jena/riot/lang/TestCollectorStream.java (working copy) @@ -0,0 +1,83 @@ +package org.apache.jena.riot.lang; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.jena.atlas.lib.Tuple; +import org.apache.jena.riot.system.StreamRDF; +import org.junit.Assert; +import org.junit.Test; + +import com.hp.hpl.jena.graph.Node; +import com.hp.hpl.jena.graph.Triple; +import com.hp.hpl.jena.sparql.core.Quad; +import com.hp.hpl.jena.sparql.util.NodeFactoryExtra; + +public class TestCollectorStream { + + private List<Triple> writeTriples(StreamRDF out, int size) { + List<Triple> results = new ArrayList<Triple>(); + out.start(); + for (int i = 1; i <= size; i++) { + Triple t = new Triple(com.hp.hpl.jena.graph.NodeFactory.createAnon(), + com.hp.hpl.jena.graph.NodeFactory.createURI("http://predicate"), NodeFactoryExtra.intToNode(i)); + out.triple(t); + results.add(t); + } + out.finish(); + return results; + } + + @Test + public void test_streamed_triples() { + CollectorStreamTriples out = new CollectorStreamTriples(); + List<Triple> expected = writeTriples(out, 10); + + Assert.assertEquals(expected, out.getCollected()); + } + + + private List<Quad> writeQuads(StreamRDF out, int size) { + List<Quad> results = new ArrayList<Quad>(); + out.start(); + for (int i = 1; i <= size; i++) { + Quad q = new Quad(com.hp.hpl.jena.graph.NodeFactory.createURI("http://graph"), + com.hp.hpl.jena.graph.NodeFactory.createAnon(), + com.hp.hpl.jena.graph.NodeFactory.createURI("http://predicate"), NodeFactoryExtra.intToNode(i)); + out.quad(q); + results.add(q); + } + out.finish(); + return results; + } + + @Test + public void test_streamed_quads() { + CollectorStreamQuads out = new CollectorStreamQuads(); + List<Quad> expected = writeQuads(out, 10); + + Assert.assertEquals(expected, out.getCollected()); + } + + private List<Tuple<Node>> writeTuples(StreamRDF out, int size) { + List<Tuple<Node>> results = new ArrayList<Tuple<Node>>(); + out.start(); + for (int i = 1; i <= size; i++) { + Tuple<Node> t = Tuple.createTuple(com.hp.hpl.jena.graph.NodeFactory.createURI("http://graph"), + com.hp.hpl.jena.graph.NodeFactory.createAnon(), + com.hp.hpl.jena.graph.NodeFactory.createURI("http://predicate"), NodeFactoryExtra.intToNode(i)); + out.tuple(t); + results.add(t); + } + out.finish(); + return results; + } + + @Test + public void test_streamed_tuples() { + CollectorStreamTuples out = new CollectorStreamTuples(); + List<Tuple<Node>> expected = writeTuples(out, 10); + + Assert.assertEquals(expected, out.getCollected()); + } +}
