RYA-377 Update the Projection processor to handle Construct queries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/538393fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/538393fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/538393fe Branch: refs/heads/master Commit: 538393fe2dc303f14fa87beea7e777b9d6be717b Parents: 83d09f4 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Wed Nov 15 19:46:47 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../api/function/projection/BNodeIdFactory.java | 34 +++ .../projection/MultiProjectionEvaluator.java | 130 +++++++++++ .../projection/ProjectionEvaluator.java | 187 ++++++++++++++++ .../function/projection/RandomUUIDFactory.java | 36 +++ .../MultiProjectionEvaluatorTest.java | 193 ++++++++++++++++ .../projection/ProjectionEvaluatorTest.java | 221 +++++++++++++++++++ .../rya/api/model/VisibilityStatement.java | 5 + .../kafka/processors/ProcessorResult.java | 48 ++++ .../StatementPatternProcessorSupplier.java | 2 + .../MultiProjectionProcessorSupplier.java | 119 ++++++++++ .../projection/ProjectionProcessorSupplier.java | 58 ++--- .../apache/rya/streams/kafka/RdfTestUtil.java | 46 ++++ .../kafka/processors/ProjectionProcessorIT.java | 152 ------------- .../projection/MultiProjectionProcessorIT.java | 155 +++++++++++++ .../MultiProjectionProcessorTest.java | 119 ++++++++++ .../projection/ProjectionProcessorIT.java | 115 ++++++++++ .../projection/ProjectionProcessorTest.java | 82 +++++++ 17 files changed, 1516 insertions(+), 186 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/BNodeIdFactory.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/BNodeIdFactory.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/BNodeIdFactory.java new file mode 100644 index 0000000..99f545a --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/BNodeIdFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates IDs for Blank Nodes. + */ +@DefaultAnnotation(NonNull.class) +public interface BNodeIdFactory { + + /** + * @return The ID to use for the next Blank Node. + */ + public String nextId(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java new file mode 100644 index 0000000..e2b7046 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/MultiProjectionEvaluator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.BNode; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.BNodeGenerator; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.TupleExpr; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Processes a {@link MultiProjection} node from a SPARQL query. + * + * @see ProjectionEvaluator + */ +@DefaultAnnotation(NonNull.class) +public class MultiProjectionEvaluator { + + private final ValueFactory vf = new ValueFactoryImpl(); + + private final Set<ProjectionEvaluator> projections; + private final Set<String> blankNodeSourceNames; + private final BNodeIdFactory bNodeIdFactory; + + /** + * Constructs an instance of {@link MultiProjection}. + * + * @param projections - The {@link ProjectionEvaluators} that handle each projection within the multi. (not null) + * @param blankNodeSourceNames - If there are blank nodes in the projection, this is a set of their names + * so that they may be re-label to have the same node IDs. (not null) + * @param bNodeIdFactory - Creates the IDs for Blank Nodes. (not null) + */ + public MultiProjectionEvaluator( + final Set<ProjectionEvaluator> projections, + final Set<String> blankNodeSourceNames, + final BNodeIdFactory bnodeIdFactory) { + this.projections = requireNonNull(projections); + this.blankNodeSourceNames = requireNonNull(blankNodeSourceNames); + this.bNodeIdFactory = requireNonNull(bnodeIdFactory); + } + + /** + * Make a {@link MultiProjectionEvaluator} that processes the logic of a {@link MultiProjection}. + * + * @param multiProjection - Defines the projections that will be processed. (not null) + * @param bNodeIdFactory - Creates the IDs for Blank Nodes. (not null) + * @return A {@link MultiProjectionEvaluator} for the provided {@link MultiProjection}. + */ + public static MultiProjectionEvaluator make(final MultiProjection multiProjection, final BNodeIdFactory bNodeIdFactory) { + requireNonNull(multiProjection); + + // Figure out if there are extensions. + final TupleExpr arg = multiProjection.getArg(); + final Optional<Extension> extension = (arg instanceof Extension) ? Optional.of((Extension)arg): Optional.empty(); + + // If there are, iterate through them and find any blank node source names. + final Set<String> blankNodeSourceNames = new HashSet<>(); + if(extension.isPresent()) { + for(final ExtensionElem elem : extension.get().getElements()) { + if(elem.getExpr() instanceof BNodeGenerator) { + blankNodeSourceNames.add( elem.getName() ); + } + } + } + + // Create a ProjectionEvaluator for each projection that is part of the multi. + final Set<ProjectionEvaluator> projections = new HashSet<>(); + for(final ProjectionElemList projectionElemList : multiProjection.getProjections()) { + projections.add( new ProjectionEvaluator(projectionElemList, extension) ); + } + + return new MultiProjectionEvaluator(projections, blankNodeSourceNames, bNodeIdFactory); + } + + /** + * Apply the projections against a {@link VisibilityBindingSet}. + * + * @param bs - The value the projection will be applied to. (not null) + * @return A set of values that result from the projection. + */ + public Set<VisibilityBindingSet> project(final VisibilityBindingSet bs) { + requireNonNull(bs); + + // Generate an ID for each blank node that will appear in the results. + final Map<String, BNode> blankNodes = new HashMap<>(); + for(final String blankNodeSourceName : blankNodeSourceNames) { + blankNodes.put(blankNodeSourceName, vf.createBNode(bNodeIdFactory.nextId())); + } + + // Iterate through each of the projections and create the results from them. + final Set<VisibilityBindingSet> results = new HashSet<>(); + for(final ProjectionEvaluator projection : projections) { + results.add( projection.project(bs, blankNodes) ); + } + + return results; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java new file mode 100644 index 0000000..a0b59c1 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/ProjectionEvaluator.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.BNode; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.BNodeGenerator; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Processes a {@link Projection} node from a SPARQL query. + * </p> + * A projection is used transform the bindings that are in a Binding Set. It may do the following things: + * <ul> + * <li>Change the binding name for a value.</li> + * <li>Completely remove a binding from the Binding Set.</li> + * <li>Insert a binding that has a constant value.</li> + * <li>Insert a binding with a blank node that identifies some resource.</li> + * </ul> + * </p> + * If you do not supply ID values for the blank nodes that may result from a projection, then a random {@link UUID} + * is used. + */ +@DefaultAnnotation(NonNull.class) +public class ProjectionEvaluator { + + private final ValueFactory vf = new ValueFactoryImpl(); + + /** + * All off the projection elements that define what will appear in the resulting binding sets. + */ + private final ProjectionElemList projectionElems; + + /** + * Maps from a ProjectionElem's source name to the constant value that should be used for that name + * in resulting binding sets. + */ + private final Map<String, Value> constantSources = new HashMap<>(); + + /** + * A set of names for the anonymous source names. These values will be blank node UUIDs. + */ + private final Set<String> anonymousSources = new HashSet<>(); + + /** + * Constructs an instance of {@link ProjectionEvaluator}. + * + * @param projectionElems - Defines the structure of the resulting value. (not null) + * @param extensions - Extra information about the projection elements when there are anonymous constants or blank + * nodes within the projection elements. (not null) + */ + public ProjectionEvaluator(final ProjectionElemList projectionElems, final Optional<Extension> extensions) { + this.projectionElems = requireNonNull(projectionElems); + requireNonNull(extensions); + + // Find all extensions that represent constant insertions. + if(extensions.isPresent()) { + for(final ExtensionElem extensionElem : extensions.get().getElements()) { + final ValueExpr valueExpr = extensionElem.getExpr(); + + // If the extension is a ValueConstant, store it so that they may be added to the binding sets. + if(valueExpr instanceof ValueConstant) { + final String sourceName = extensionElem.getName(); + final Value targetValue = ((ValueConstant) valueExpr).getValue(); + constantSources.put(sourceName, targetValue); + } + + // If the extension is a BNodeGenerator, keep track of the name so that we know we have to generate an ID for it. + else if(valueExpr instanceof BNodeGenerator) { + final String sourceName = extensionElem.getName(); + anonymousSources.add( sourceName ); + } + } + } + } + + /** + * Make a {@link ProjectionEvaluator} that processes the logic of a {@link Projection}. + * + * @param projection - Defines the projection that will be processed. (not null) + * @return A {@link ProjectionEvaluator} for the provided {@link Projection}. + */ + public static ProjectionEvaluator make(final Projection projection) { + requireNonNull(projection); + + final ProjectionElemList projectionElems = projection.getProjectionElemList(); + + final TupleExpr arg = projection.getArg(); + final Optional<Extension> extension = arg instanceof Extension ? Optional.of((Extension)arg) : Optional.empty(); + + return new ProjectionEvaluator(projectionElems, extension); + } + + /** + * Applies the projection to a value. If the result has any blank nodes, those nodes will use random UUIDs. + * If you want to control what those IDs are, then use {@link #project(VisibilityBindingSet, Map)} instead. + * + * @param bs - The value the projection will be applied to. (not null) + * @return A new value that is the result of the projection. + */ + public VisibilityBindingSet project(final VisibilityBindingSet bs) { + return project(bs, new HashMap<>()); + } + + /** + * Applies the projection to a value. If the result has a blank node whose ID is not mapped to a value in + * {@code blankNodes}, then a random UUID will be used. + * + * @param bs - The value the projection will be applied to. (not null) + * @param blankNodes - A map from node source names to the blank nodes that will be used for those names. (not null) + * @return A new value that is the result of the projection. + */ + public VisibilityBindingSet project(final VisibilityBindingSet bs, final Map<String, BNode> blankNodes) { + requireNonNull(bs); + requireNonNull(blankNodes); + + // Apply the projection elements against the original binding set. + final MapBindingSet result = new MapBindingSet(); + for (final ProjectionElem elem : projectionElems.getElements()) { + final String sourceName = elem.getSourceName(); + + Value value = null; + + // If the binding set already has the source name, then use the target name. + if (bs.hasBinding(sourceName)) { + value = bs.getValue(elem.getSourceName()); + } + + // If the source name represents a constant value, then use the constant. + else if(constantSources.containsKey(sourceName)) { + value = constantSources.get(sourceName); + } + + // If the source name represents an anonymous value, then create a Blank Node. + else if(anonymousSources.contains(sourceName)) { + if(blankNodes.containsKey(sourceName)) { + value = blankNodes.get(sourceName); + } else { + value = vf.createBNode( UUID.randomUUID().toString() ); + } + } + + result.addBinding(elem.getTargetName(), value); + } + + return new VisibilityBindingSet(result, bs.getVisibility()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/RandomUUIDFactory.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/RandomUUIDFactory.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/RandomUUIDFactory.java new file mode 100644 index 0000000..59740b0 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/projection/RandomUUIDFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import java.util.UUID; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The Blank Node IDs are random {@link UUID}s. + */ +@DefaultAnnotation(NonNull.class) +public class RandomUUIDFactory implements BNodeIdFactory { + + @Override + public String nextId() { + return UUID.randomUUID().toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/MultiProjectionEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/MultiProjectionEvaluatorTest.java b/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/MultiProjectionEvaluatorTest.java new file mode 100644 index 0000000..c02e87b --- /dev/null +++ b/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/MultiProjectionEvaluatorTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.BNode; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Unit tests the methods of {@link MultiProjectionEvaluator}. + */ +public class MultiProjectionEvaluatorTest { + + @Test + public void singleBlankNode() throws Exception { + // Read the multi projection object from a SPARQL query. + final MultiProjection multiProjection = getMultiProjection( + "CONSTRUCT {" + + "_:b a <urn:movementObservation> ; " + + "<urn:location> ?location ; " + + "<urn:direction> ?direction ; " + + "}" + + "WHERE {" + + "?thing <urn:corner> ?location ." + + "?thing <urn:compass> ?direction." + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("location", vf.createLiteral("South St and 5th St")); + bs.addBinding("direction", vf.createLiteral("NW")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // Create the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final String blankNodeId = UUID.randomUUID().toString(); + final BNode blankNode = vf.createBNode(blankNodeId); + + bs = new MapBindingSet(); + bs.addBinding("subject", blankNode); + bs.addBinding("predicate", RDF.TYPE); + bs.addBinding("object", vf.createURI("urn:movementObservation")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new MapBindingSet(); + bs.addBinding("subject", blankNode); + bs.addBinding("predicate", vf.createURI("urn:location")); + bs.addBinding("object", vf.createLiteral("South St and 5th St")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new MapBindingSet(); + bs.addBinding("subject", blankNode); + bs.addBinding("predicate", vf.createURI("urn:direction")); + bs.addBinding("object", vf.createLiteral("NW")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + // Run the projection evaluator. + final Set<VisibilityBindingSet> results = MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId).project(original); + + // The expected binding sets. + assertEquals(expected, results); + } + + @Test + public void multipleBlanknodes() throws Exception { + // Read the multi projection object from a SPARQL query. + final MultiProjection multiProjection = getMultiProjection( + "CONSTRUCT {" + + "_:b a <urn:vehicle> . " + + "_:b <urn:tiresCount> 4 ." + + "_:c a <urn:pet> . " + + "_:c <urn:isDead> false . " + + "}" + + "WHERE {" + + "?vehicle <urn:owner> ?owner . " + + "?vehicle <urn:plates> ?plates . " + + "?pet <urn:owner> ?owner . " + + "?pet <urn:isLiving> true . " + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("vehicle", vf.createLiteral("Alice's car")); + bs.addBinding("owner", vf.createURI("urn:Alice")); + bs.addBinding("plates", vf.createLiteral("XXXXXXX")); + bs.addBinding("pet", vf.createURI("urn:Kitty")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // Run the projection evaluator. + final Set<VisibilityBindingSet> results = MultiProjectionEvaluator.make(multiProjection, new RandomUUIDFactory()).project(original); + + // Figure out the blank nodes. + Value vehicalBNode = null; + Value petBNode = null; + for(final VisibilityBindingSet result : results) { + final Value object = result.getValue("object"); + if(object.equals(vf.createURI("urn:vehicle"))) { + vehicalBNode = result.getValue("subject"); + } else if(object.equals(vf.createURI("urn:pet"))) { + petBNode = result.getValue("subject"); + } + } + + // The expected binding sets. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + bs = new MapBindingSet(); + bs.addBinding("subject", vehicalBNode); + bs.addBinding("predicate", RDF.TYPE); + bs.addBinding("object", vf.createURI("urn:vehicle")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new MapBindingSet(); + bs.addBinding("subject", vehicalBNode); + bs.addBinding("predicate", vf.createURI("urn:tiresCount")); + bs.addBinding("object", vf.createLiteral("4", XMLSchema.INTEGER)); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new MapBindingSet(); + bs.addBinding("subject", petBNode); + bs.addBinding("predicate", RDF.TYPE); + bs.addBinding("object", vf.createURI("urn:pet")); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + bs = new MapBindingSet(); + bs.addBinding("subject", petBNode); + bs.addBinding("predicate", vf.createURI("urn:isDead")); + bs.addBinding("object", vf.createLiteral(false)); + expected.add( new VisibilityBindingSet(bs, "a|b") ); + + assertEquals(expected, results); + } + + /** + * Get the first {@link MultiProjection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link MultiProjection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final MultiProjection node) throws Exception { + multiProjection.set(node); + } + }); + + return multiProjection.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/ProjectionEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/ProjectionEvaluatorTest.java b/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/ProjectionEvaluatorTest.java new file mode 100644 index 0000000..e4a26a0 --- /dev/null +++ b/common/rya.api.function/src/test/java/org/apache/rya/api/function/projection/ProjectionEvaluatorTest.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.projection; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Unit tests the methods of {@link ProjectionEvaluator}. + */ +public class ProjectionEvaluatorTest { + + /** + * This Projection enumerates all of the variables that were in the query, none of them are anonymous, and + * none of them insert constants. + */ + @Test + public void changesNothing() throws Exception { + // Read the projection object from a SPARQL query. + final Projection projection = getProjection( + "SELECT ?person ?employee ?business " + + "WHERE { " + + "?person <urn:talksTo> ?employee . " + + "?employee <urn:worksAt> ?business . " + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // Execute the projection. + final VisibilityBindingSet result = ProjectionEvaluator.make(projection).project(original); + assertEquals(original, result); + } + + /** + * This Projection replaces some of the variables names with different names. + */ + @Test + public void renameBindings() throws Exception { + // Read the projection object from a SPARQL query. + final Projection projection = getProjection( + "SELECT (?person AS ?p) (?employee AS ?e) ?business " + + "WHERE { " + + "?person <urn:talksTo> ?employee . " + + "?employee <urn:worksAt> ?business . " + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // The expected binding set changes the "person" binding name to "p" and "employee" to "e". + bs = new MapBindingSet(); + bs.addBinding("p", vf.createURI("urn:Alice")); + bs.addBinding("e", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a|b"); + + // Execute the projection. + final VisibilityBindingSet result = ProjectionEvaluator.make(projection).project(original); + assertEquals(expected, result); + } + + /** + * This projection drops a binding from the original Binding Set. + */ + @Test + public void dropsBinding() throws Exception { + // Read the projection object from a SPARQL query. + final Projection projection = getProjection( + "SELECT ?person " + + "WHERE { " + + "?person <urn:talksTo> ?employee . " + + "?employee <urn:worksAt> ?business . " + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // The expected binding set only has the "person" binding. + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a|b"); + + // Execute the projection. + final VisibilityBindingSet result = ProjectionEvaluator.make(projection).project(original); + assertEquals(expected, result); + } + + /** + * This projection creates a Binding Set that represents a Statement and add a constant value to it. + */ + @Test + public void addsConstantBinding() throws Exception { + // Read the projection object from a SPARQL query. + final Projection projection = getProjection( + "CONSTRUCT { ?person <urn:hasGrandchild> ?grandchild } " + + "WHERE {" + + "?person <urn:hasChild> ?child ." + + "?child <urn:hasChild> ?grandchild . " + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("child", vf.createURI("urn:Bob")); + bs.addBinding("grandchild", vf.createURI("urn:Charlie")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // The expected binding set represents a statement. + bs = new MapBindingSet(); + bs.addBinding("subject", vf.createURI("urn:Alice")); + bs.addBinding("predicate", vf.createURI("urn:hasGrandchild")); + bs.addBinding("object", vf.createURI("urn:Charlie")); + final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a|b"); + + // Execute the projection. + final VisibilityBindingSet result = ProjectionEvaluator.make(projection).project(original); + assertEquals(expected, result); + } + + /** + * This projection creates a Binding Set that represents a Statement that has a blank node added to it. + */ + @Test + public void addsBlankNodeBinding() throws Exception { + // Read the projection object from a SPARQL query. + final Projection projection = getProjection( + "CONSTRUCT { ?person <urn:hasChild> _:b } " + + "WHERE {" + + "?person <urn:hasGrandchild> ?grandchild ." + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("hasGrandchild", vf.createURI("urn:Bob")); + final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b"); + + // Execute the projection. + final VisibilityBindingSet result = ProjectionEvaluator.make(projection).project(original); + + // The expected binding set represents a statement. We need to get the blank node's id from the + // result since that is different every time. + bs = new MapBindingSet(); + bs.addBinding("subject", vf.createURI("urn:Alice")); + bs.addBinding("predicate", vf.createURI("urn:hasChild")); + bs.addBinding("object", result.getValue("object")); + final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a|b"); + + assertEquals(expected, result); + } + + /** + * Get the first {@link Projection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Projection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Projection getProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Projection> projection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Projection node) throws Exception { + projection.set(node); + } + }); + + return projection.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/common/rya.api.model/src/main/java/org/apache/rya/api/model/VisibilityStatement.java ---------------------------------------------------------------------- diff --git a/common/rya.api.model/src/main/java/org/apache/rya/api/model/VisibilityStatement.java b/common/rya.api.model/src/main/java/org/apache/rya/api/model/VisibilityStatement.java index 21e60d8..ffe95d7 100644 --- a/common/rya.api.model/src/main/java/org/apache/rya/api/model/VisibilityStatement.java +++ b/common/rya.api.model/src/main/java/org/apache/rya/api/model/VisibilityStatement.java @@ -84,4 +84,9 @@ public class VisibilityStatement extends StatementDecorator { public int hashCode() { return Objects.hash(visibility, super.getStatement()); } + + @Override + public String toString() { + return "Statement: " + super.getStatement().toString() + ", Visibility: " + visibility; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java index ac3d849..5f7a06b 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java @@ -21,6 +21,8 @@ package org.apache.rya.streams.kafka.processors; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; +import java.util.Objects; + import org.apache.kafka.streams.processor.Processor; import org.apache.rya.api.model.VisibilityBindingSet; @@ -92,6 +94,23 @@ public class ProcessorResult { return binary.get(); } + @Override + public int hashCode() { + return Objects.hash(type, unary, binary); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof ProcessorResult) { + final ProcessorResult other = (ProcessorResult) o; + return Objects.equals(type, other.type) && + Objects.equals(unary, other.unary) && + Objects.equals(binary, other.binary); + } + return false; + } + + /** * Creates a {@link ProcessorResult} using the supplied value. * @@ -151,6 +170,20 @@ public class ProcessorResult { public VisibilityBindingSet getResult() { return result; } + + @Override + public int hashCode() { + return Objects.hash(result); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof UnaryResult) { + final UnaryResult other = (UnaryResult) o; + return Objects.equals(result, other.result); + } + return false; + } } /** @@ -186,6 +219,21 @@ public class ProcessorResult { return result; } + @Override + public int hashCode() { + return Objects.hash(side, result); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof BinaryResult) { + final BinaryResult other = (BinaryResult) o; + return Objects.equals(side, other.side) && + Objects.equals(result, other.result); + } + return false; + } + /** * A label that is used to by the downstream binary prcoessor to distinguish which upstream processor * produced the {@link BinaryResult}. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java index bc99a7b..f7c2e5e 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java @@ -97,6 +97,8 @@ public class StatementPatternProcessorSupplier implements ProcessorSupplier<Stri @Override public void process(final String key, final VisibilityStatement statement) { + log.debug("\nINPUT:\n{}\n", statement); + // Check to see if the Statement matches the Statement Pattern. final Optional<BindingSet> bs = spMatcher.match(statement); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorSupplier.java new file mode 100644 index 0000000..68aec0a --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorSupplier.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.projection.MultiProjectionEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link MultiProjectionProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class MultiProjectionProcessorSupplier extends RyaStreamsProcessorSupplier { + + private final MultiProjectionEvaluator multiProjection; + + /** + * Constructs an instance of {@link MultiProjectionProcessorSupplier}. + * + * @param multiProjection - Defines the MultiProjection work that will be performed by supplied processors. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public MultiProjectionProcessorSupplier( + final MultiProjectionEvaluator multiProjection, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.multiProjection = requireNonNull(multiProjection); + } + + @Override + public Processor<Object, ProcessorResult> get() { + return new MultiProjectionProcessor(multiProjection, super.getResultFactory()); + } + + /** + * Evaluates {@link ProcessorResult}s against a {@link MultiProjectionEvaluator} and forwards its results + * to a downstream processor. + */ + @DefaultAnnotation(NonNull.class) + public static final class MultiProjectionProcessor extends RyaStreamsProcessor { + private static final Logger log = LoggerFactory.getLogger(MultiProjectionProcessor.class); + + private final MultiProjectionEvaluator multiProjection; + + private ProcessorContext context; + + /** + * Constructs an instance of {@link MultiProjectionProcessor}. + * + * @param multiProjection - Defines the MultiProjection work that will be performed by this processor. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public MultiProjectionProcessor( + final MultiProjectionEvaluator multiProjection, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.multiProjection = requireNonNull(multiProjection); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public void process(final Object key, final ProcessorResult value) { + // projections can only be unary + if (value.getType() != ResultType.UNARY) { + throw new RuntimeException("The ProcessorResult to be processed must be Unary."); + } + + // Apply the projection to the binding set and forward the results. + final VisibilityBindingSet bs = value.getUnary().getResult(); + for(final VisibilityBindingSet newVisBs : multiProjection.project(bs)) { + log.debug("\nOUTPUT:\n{}", newVisBs); + context.forward(key, super.getResultFactory().make(newVisBs)); + } + } + + @Override + public void punctuate(final long timestamp) { + // Do nothing. + } + + @Override + public void close() { + // Do nothing. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java index 67a777f..bc46d4f 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java @@ -22,16 +22,15 @@ import static java.util.Objects.requireNonNull; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.projection.ProjectionEvaluator; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.kafka.processors.ProcessorResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; -import org.openrdf.query.algebra.Projection; -import org.openrdf.query.algebra.ProjectionElem; -import org.openrdf.query.algebra.ProjectionElemList; -import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -42,49 +41,47 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public class ProjectionProcessorSupplier extends RyaStreamsProcessorSupplier { - private final ProjectionElemList projectionElems; + private final ProjectionEvaluator projection; /** * Constructs an instance of {@link ProjectionProcessorSupplier}. * - * @param projectionElems - The {@link ProjectionElemList} that defines which bindings get forwarded or changed. (not null) + * @param projection - Defines the projection work that will be performed by supplied processors. (not null) * @param resultFactory - The factory that the supplied processors will use to create results. (not null) */ public ProjectionProcessorSupplier( - final ProjectionElemList projectionElems, + final ProjectionEvaluator projection, final ProcessorResultFactory resultFactory) { super(resultFactory); - this.projectionElems = requireNonNull(projectionElems); + this.projection = requireNonNull(projection); } @Override public Processor<Object, ProcessorResult> get() { - return new ProjectionProcessor(projectionElems, super.getResultFactory()); + return new ProjectionProcessor(projection, super.getResultFactory()); } /** - * Evaluates {@link ProcessorResult}s against a {@link Projection}. Any results found in - * the {@link ProjectionElemList} will be modified and/or forwarded. A {@link ProjectionElemList} defines - * a source and target name for a binding, so if a binding name appears in the source list of the {@link ProjectionElemList}, - * then it will be renamed with the associated target name. + * Evaluates {@link ProcessorResult}s against a {@link ProjectionEvaluator} and forwards its result + * to a downstream processor. */ @DefaultAnnotation(NonNull.class) - public static final class ProjectionProcessor implements Processor<Object, ProcessorResult> { + public static final class ProjectionProcessor extends RyaStreamsProcessor { + private static final Logger log = LoggerFactory.getLogger(ProjectionProcessor.class); - private final ProjectionElemList projectionElems; - private final ProcessorResultFactory resultFactory; + private final ProjectionEvaluator projection; private ProcessorContext context; /** * Constructs an instance of {@link ProjectionProcessor}. * - * @param projectionElems - The projection elems that will determine what to do with the bindings. (not null) + * @param projection - Defines the projection work that will be performed by this processor. (not null) * @param resultFactory - The factory that the processor will use to create results. (not null) */ - public ProjectionProcessor(final ProjectionElemList projectionElems, final ProcessorResultFactory resultFactory) { - this.projectionElems = requireNonNull(projectionElems); - this.resultFactory = requireNonNull(resultFactory); + public ProjectionProcessor(final ProjectionEvaluator projection, final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.projection = requireNonNull(projection); } @Override @@ -99,20 +96,13 @@ public class ProjectionProcessorSupplier extends RyaStreamsProcessorSupplier { throw new RuntimeException("The ProcessorResult to be processed must be Unary."); } - final UnaryResult unary = result.getUnary(); - final VisibilityBindingSet bindingSet = unary.getResult(); + // Apply the projection to the binding set. + final VisibilityBindingSet bs = result.getUnary().getResult(); + final VisibilityBindingSet newVisBs = projection.project(bs); - final MapBindingSet newBindingSet = new MapBindingSet(bindingSet.size()); - for (final ProjectionElem elem : projectionElems.getElements()) { - if (bindingSet.hasBinding(elem.getSourceName())) { - newBindingSet.addBinding(elem.getTargetName(), bindingSet.getValue(elem.getSourceName())); - } - } - - // wrap the new binding set with the original's visibility. - final VisibilityBindingSet newVisiSet = new VisibilityBindingSet(newBindingSet, bindingSet.getVisibility()); - final ProcessorResult resultValue = resultFactory.make(newVisiSet); - context.forward(key, resultValue); + // Forward the result to the downstream processor. + log.debug("\nOUTPUT:\n{}", newVisBs); + context.forward(key, super.getResultFactory().make(newVisBs)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java index 109e40d..190bad3 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java @@ -22,6 +22,8 @@ import static java.util.Objects.requireNonNull; import java.util.concurrent.atomic.AtomicReference; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import org.openrdf.query.parser.ParsedQuery; @@ -59,4 +61,48 @@ public final class RdfTestUtil { }); return statementPattern.get(); } + + /** + * Get the first {@link Projection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link Projection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable Projection getProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<Projection> projection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final Projection node) throws Exception { + projection.set(node); + } + }); + + return projection.get(); + } + + /** + * Get the first {@link MultiProjection} node from a SPARQL query. + * + * @param sparql - The query that contains a single Projection node. + * @return The first {@link MultiProjection} that is encountered. + * @throws Exception The query could not be parsed. + */ + public static @Nullable MultiProjection getMultiProjection(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<MultiProjection> multiProjection = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visit(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final MultiProjection node) throws Exception { + multiProjection.set(node); + } + }); + + return multiProjection.get(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java deleted file mode 100644 index f58387e..0000000 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.api.model.VisibilityStatement; -import org.apache.rya.streams.kafka.KafkaTestUtil; -import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; -import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; -import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; -import org.apache.rya.test.kafka.KafkaTestInstanceRule; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.ProjectionElem; -import org.openrdf.query.algebra.ProjectionElemList; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.Sets; - -/** - * Integration tests the methods of {@link StatementPatternProcessor}. - */ -public class ProjectionProcessorIT { - - @Rule - public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); - - @Test - public void projection_renameOne() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that handles the projection. - final ProjectionElemList elems = new ProjectionElemList(); - elems.addElement(new ProjectionElem("otherPerson", "dog")); - elems.addElement(new ProjectionElem("person", "person")); - builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, result -> ProcessorResult.make(new UnaryResult(result))), "SP1"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "P1"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); - - // Load some data into the input topic. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Sparky")), "a") ); - - // Show the correct binding set results from the job. - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("dog", vf.createURI("urn:Sparky")); - final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a"); - final Set<VisibilityBindingSet> expected = new HashSet<>(); - expected.add(binding); - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected)); - } - - @Test - public void projection_keepOneDropOne() throws Exception { - // Enumerate some topics that will be re-used - final String ryaInstance = UUID.randomUUID().toString(); - final UUID queryId = UUID.randomUUID(); - final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); - final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that handles the projection. - final ProjectionElemList elems = new ProjectionElemList(); - elems.addElement(new ProjectionElem("otherPerson", "otherPerson")); - builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, result -> ProcessorResult.make(new UnaryResult(result))), "SP1"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "P1"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); - - // Load some data into the input topic. - final ValueFactory vf = new ValueFactoryImpl(); - final List<VisibilityStatement> statements = new ArrayList<>(); - statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); - - // Show the correct binding set results from the job. - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a"); - final Set<VisibilityBindingSet> expected = new HashSet<>(); - expected.add(binding); - KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected)); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java new file mode 100644 index 0000000..ee0e55b --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.function.projection.MultiProjectionEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; +import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; +import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.BNode; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link MultiProjectionProcessor}. + */ +public class MultiProjectionProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:corner> ?location . }"); + final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?thing <urn:compass> ?direction . }"); + final MultiProjection multiProjection = RdfTestUtil.getMultiProjection( + "CONSTRUCT {" + + "_:b a <urn:movementObservation> ; " + + "<urn:location> ?location ; " + + "<urn:direction> ?direction ; " + + "}" + + "WHERE {" + + "?thing <urn:corner> ?location ." + + "?thing <urn:compass> ?direction." + + "}"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, + result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); + builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, + result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); + + builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("thing"), + Lists.newArrayList("thing", "location", "direction"), + result -> ProcessorResult.make( new UnaryResult(result) )), "SP1", "SP2"); + + final StateStoreSupplier joinStoreSupplier = + Stores.create( "NATURAL_JOIN" ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); + builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); + + final String blankNodeId = UUID.randomUUID().toString(); + builder.addProcessor("MULTIPROJECTION", new MultiProjectionProcessorSupplier( + MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId), + result -> ProcessorResult.make(new UnaryResult(result))), "NATURAL_JOIN"); + + builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "MULTIPROJECTION"); + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final BNode blankNode = vf.createBNode(blankNodeId); + + MapBindingSet expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", RDF.TYPE); + expectedBs.addBinding("object", vf.createURI("urn:movementObservation")); + + expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", vf.createURI("urn:direction")); + expectedBs.addBinding("object", vf.createURI("urn:NW")); + + + expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", vf.createURI("urn:location")); + expectedBs.addBinding("object", vf.createURI("urn:corner1")); + + // Run the test. + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/538393fe/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorTest.java new file mode 100644 index 0000000..d25db23 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.projection.MultiProjectionEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.openrdf.model.BNode; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Unit test the methods of {@link MultiProjectionProcessor}. + */ +public class MultiProjectionProcessorTest { + + @Test + public void showProjectionFunctionIsCalled() throws Exception { + // The SPARQL that will define the projection. + final MultiProjection multiProjection = RdfTestUtil.getMultiProjection( + "CONSTRUCT {" + + "_:b a <urn:movementObservation> ; " + + "<urn:location> ?location ; " + + "<urn:direction> ?direction ; " + + "}" + + "WHERE {" + + "?thing <urn:corner> ?location ." + + "?thing <urn:compass> ?direction." + + "}"); + + // Create a Binding Set that contains the result of the WHERE clause. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet inputBs = new MapBindingSet(); + inputBs.addBinding("location", vf.createURI("urn:corner1")); + inputBs.addBinding("direction", vf.createURI("urn:NW")); + final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(inputBs, "a|b"); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final String blankNodeId = UUID.randomUUID().toString(); + final BNode blankNode = vf.createBNode(blankNodeId); + + MapBindingSet expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", RDF.TYPE); + expectedBs.addBinding("object", vf.createURI("urn:movementObservation")); + expected.add(new VisibilityBindingSet(expectedBs, "a|b")); + + expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", vf.createURI("urn:direction")); + expectedBs.addBinding("object", vf.createURI("urn:NW")); + expected.add(new VisibilityBindingSet(expectedBs, "a|b")); + + expectedBs = new MapBindingSet(); + expectedBs.addBinding("subject", blankNode); + expectedBs.addBinding("predicate", vf.createURI("urn:location")); + expectedBs.addBinding("object", vf.createURI("urn:corner1")); + expected.add(new VisibilityBindingSet(expectedBs, "a|b")); + + // Mock the processor context that will be invoked. + final ProcessorContext context = mock(ProcessorContext.class); + + // Run the test. + final MultiProjectionProcessor processor = new MultiProjectionProcessor( + MultiProjectionEvaluator.make(multiProjection, () -> blankNodeId), + result -> ProcessorResult.make(new UnaryResult(result))); + processor.init(context); + processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs))); + + final ArgumentCaptor<ProcessorResult> results = ArgumentCaptor.forClass(ProcessorResult.class); + verify(context, times(3)).forward(any(), results.capture()); + + final Set<VisibilityBindingSet> resultBindingSets = results.getAllValues().stream() + .map(result -> { + return (result.getType() == ResultType.UNARY) ? result.getUnary().getResult() : result.getBinary().getResult(); + }) + .collect(Collectors.toSet()); + + assertEquals(expected, resultBindingSets); + } +} \ No newline at end of file