RYA-377 TopologyBuilder A factory for turning TupleExpr parsed from SPARQL into TopologyBuilder objects used by Rya Streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/83d09f42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/83d09f42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/83d09f42 Branch: refs/heads/master Commit: 83d09f42c98f16150aab69c42465157b55c28a14 Parents: b8b0a12 Author: Andrew Smith <smith...@gmail.com> Authored: Tue Nov 14 16:28:43 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../streams/kafka/topology/TopologyFactory.java | 414 +++++++++++++++++++ .../processors/StatementPatternProcessorIT.java | 124 ++---- .../kafka/processors/join/JoinProcessorIT.java | 86 ++-- .../kafka/topology/TopologyFactoryTest.java | 106 +++++ 4 files changed, 573 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java new file mode 100644 index 0000000..782a58b --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/topology/TopologyFactory.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.topology; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.rya.api.function.join.IterativeJoin; +import org.apache.rya.api.function.join.LeftOuterJoin; +import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier; +import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier; +import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.BinaryTupleOperator; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.LeftJoin; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Factory for building {@link TopologyBuilder}s from a SPARQL query. + */ +@DefaultAnnotation(NonNull.class) +public class TopologyFactory implements TopologyBuilderFactory { + private static final String SOURCE = "SOURCE"; + private static final String STATEMENT_PATTERN_PREFIX = "SP_"; + private static final String JOIN_PREFIX = "JOIN_"; + private static final String PROJECTION_PREFIX = "PROJECTION_"; + private static final String SINK = "SINK"; + + private List<ProcessorEntry> processorEntryList; + + /** + * Builds a {@link TopologyBuilder} based on the provided sparql query. + * + * @param sparqlQuery - The SPARQL query to build a topology for. (not null) + * @param statementTopic - The topic for the source to read from. (not null) + * @param resultTopic - The topic for the sink to write to. (not null) + * @return - The created {@link TopologyBuilder}. + * @throws MalformedQueryException - The provided query is not a valid SPARQL query. + * @throws TopologyBuilderException - A problem occurred while constructing the topology. + */ + @Override + public TopologyBuilder build(final String sparqlQuery, final String statementTopic, final String resultTopic) + throws MalformedQueryException, TopologyBuilderException { + requireNonNull(sparqlQuery); + requireNonNull(statementTopic); + requireNonNull(resultTopic); + + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparqlQuery, null); + final TopologyBuilder builder = new TopologyBuilder(); + + final TupleExpr expr = parsedQuery.getTupleExpr(); + final QueryVisitor visitor = new QueryVisitor(); + expr.visit(visitor); + + processorEntryList = visitor.getProcessorEntryList(); + final Map<TupleExpr, String> idMap = visitor.getIDs(); + // add source node + builder.addSource(SOURCE, new StringDeserializer(), new VisibilityStatementDeserializer(), statementTopic); + + // processing the processor entry list in reverse order means we go from leaf + // nodes -> parent nodes. + // So, when the parent processing nodes get added, the upstream + // processing node will already exist. + + ProcessorEntry entry = null; + for (int ii = processorEntryList.size() - 1; ii >= 0; ii--) { + entry = processorEntryList.get(ii); + //statement patterns need to be connected to the Source. + if(entry.getNode() instanceof StatementPattern) { + builder.addProcessor(entry.getID(), entry.getSupplier(), SOURCE); + } else { + final List<TupleExpr> parents = entry.getUpstreamNodes(); + final String[] parentIDs = new String[parents.size()]; + for (int id = 0; id < parents.size(); id++) { + parentIDs[id] = idMap.get(parents.get(id)); + } + builder.addProcessor(entry.getID(), entry.getSupplier(), parentIDs); + } + + if (entry.getNode() instanceof Join || entry.getNode() instanceof LeftJoin) { + // Add a state store for the join processor. + final StateStoreSupplier joinStoreSupplier = + Stores.create( entry.getID() ) + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .persistent() + .build(); + builder.addStateStore(joinStoreSupplier, entry.getID()); + } + } + + // convert processing results to visibility binding sets + builder.addProcessor("OUTPUT_FORMATTER", new BindingSetOutputFormatterSupplier(), entry.getID()); + + // add sink + builder.addSink(SINK, resultTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "OUTPUT_FORMATTER"); + + return builder; + } + + @VisibleForTesting + public List<ProcessorEntry> getProcessorEntry() { + return processorEntryList; + } + + /** + * An entry to be added as a Processing node in kafka streams' + * TopologyBuilder. + */ + final static class ProcessorEntry { + private final TupleExpr node; + private final String id; + private final Optional<Side> downstreamSide; + private final ProcessorSupplier<?, ?> supplier; + private final List<TupleExpr> upstreamNodes; + + /** + * Creates a new {@link ProcessorEntry}. + * + * @param node - The RDF node to be added as a processor. (not null) + * @param id - The id for the {@link TupleExpr} node. (not null) + * @param downstreamSide - Which side the current node is on from its downstream processor. (not null) + * @param supplier - Supplies the {@link Processor} for this node. (not null) + * @param upstreamNodes - The RDF nodes that will become upstream processing nodes. (not null) + */ + public ProcessorEntry(final TupleExpr node, final String id, final Optional<Side> downstreamSide, final ProcessorSupplier<?, ?> supplier, final List<TupleExpr> upstreamNodes) { + this.node = requireNonNull(node); + this.id = requireNonNull(id); + this.downstreamSide = requireNonNull(downstreamSide); + this.supplier = requireNonNull(supplier); + this.upstreamNodes = requireNonNull(upstreamNodes); + } + + /** + * @return - The RDF node to added as a processor. + */ + public TupleExpr getNode() { + return node; + } + + /** + * @return - The side the node is on from its downstream processor. + */ + public Optional<Side> getDownstreamSide() { + return downstreamSide; + } + + /** + * @return - The upstream parents to this node. These parent nodes must + * result in a {@link ProcessorEntry} + */ + public List<TupleExpr> getUpstreamNodes() { + return upstreamNodes; + } + + /** + * @return - The processor id of the node. + */ + public String getID() { + return id; + } + + /** + * @return - The {@link ProcessorSupplier} used to supply the + * {@link Processor} for this node. + */ + public ProcessorSupplier<?, ?> getSupplier() { + return supplier; + } + + @Override + public boolean equals(final Object other) { + if (!(other instanceof ProcessorEntry)) { + return false; + } + final ProcessorEntry o = (ProcessorEntry) other; + return Objects.equals(node, o.node) && + Objects.equals(id, o.id) && + Objects.equals(downstreamSide, o.downstreamSide) && + Objects.equals(supplier, o.supplier) && + Objects.equals(upstreamNodes, o.upstreamNodes); + } + + @Override + public int hashCode() { + return Objects.hash(node, downstreamSide, upstreamNodes, id, supplier); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("ID: " + id + "\n"); + if (downstreamSide.isPresent()) { + sb.append("***********************************\n"); + sb.append("SIDE: " + downstreamSide.get() + "\n"); + } + sb.append("***********************************\n"); + sb.append("PARENTS: "); + for (final TupleExpr expr : upstreamNodes) { + sb.append(expr.toString() + ","); + } + sb.append("\n***********************************\n"); + sb.append("NODE: " + node.toString()); + sb.append("\n"); + return sb.toString(); + } + } + + /** + * Visits each node in a {@link TupleExpr} and creates a + * {@link ProcessorSupplier} and meta information needed for creating a + * {@link TopologyBuilder}. + */ + final static class QueryVisitor extends QueryModelVisitorBase<TopologyBuilderException> { + // Each node needs a ProcessorEntry to be a processor node in the + // TopologyBuilder. + private final List<ProcessorEntry> entries = new ArrayList<>(); + private final Map<TupleExpr, String> idMap = new HashMap<>(); + + /** + * @return The {@link ProcessorEntry}s used to create a Topology. + */ + public List<ProcessorEntry> getProcessorEntryList() { + return entries; + } + + /** + * @return The IDs created for each {@link TupleExpr} node in the query that resulted in a {@link ProcessorEntry}. + */ + public Map<TupleExpr, String> getIDs() { + return idMap; + } + + @Override + public void meet(final StatementPattern node) throws TopologyBuilderException { + // topology parent for Statement Patterns will always be a source + final String id = STATEMENT_PATTERN_PREFIX + UUID.randomUUID(); + final Optional<Side> side = getSide(node); + final StatementPatternProcessorSupplier supplier = new StatementPatternProcessorSupplier(node, result -> getResult(side, result)); + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList())); + idMap.put(node, id); + super.meet(node); + } + + @Override + public void meet(final Projection node) throws TopologyBuilderException { + final String id = PROJECTION_PREFIX + UUID.randomUUID(); + final Optional<Side> side = getSide(node); + TupleExpr arg = node.getArg(); + // If the arg is an Extension, there are rebindings that need to be + // ignored since they do not have a processor node. + if (arg instanceof Extension) { + arg = ((Extension) arg).getArg(); + } + final ProjectionProcessorSupplier supplier = new ProjectionProcessorSupplier(node.getProjectionElemList(), result -> getResult(side, result)); + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(arg))); + idMap.put(node, id); + super.meet(node); + } + + @Override + public void meet(final Join node) throws TopologyBuilderException { + final String id = JOIN_PREFIX + UUID.randomUUID(); + meetJoin(id, new NaturalJoin(), node); + super.meet(node); + } + + @Override + public void meet(final LeftJoin node) throws TopologyBuilderException { + final String id = JOIN_PREFIX + UUID.randomUUID(); + meetJoin(id, new LeftOuterJoin(), node); + super.meet(node); + } + + /** + * Gets the {@link Side} the current node in the visitor is on relative to the provided node. + * @param node - The node used to determine the side of the current visitor node. + * @return The {@link Side} the current node is on. + */ + private Optional<Side> getSide(final QueryModelNode node) { + // if query parent is a binary operator, need to determine if its left or right. + if (node.getParentNode() instanceof BinaryTupleOperator) { + final BinaryTupleOperator binary = (BinaryTupleOperator) node.getParentNode(); + if (node.equals(binary.getLeftArg())) { + return Optional.of(Side.LEFT); + } else { + return Optional.of(Side.RIGHT); + } + } else { + return Optional.empty(); + } + } + + /** + * Creates a join entry based on a provided {@link IterativeJoin} and the Join's + * {@link BinaryTupleOperator}. + * + * @param id - The ID of the join. + * @param joinFunction - The {@link IterativeJoin} function to perform during processing. + * @param node - The {@link BinaryTupleOperator} used to create the process. + */ + private void meetJoin(final String id, final IterativeJoin joinFunction, final BinaryTupleOperator node) { + final Set<String> leftArgs = node.getLeftArg().getBindingNames(); + final Set<String> rightArgs = node.getRightArg().getBindingNames(); + final List<String> joinVars = Lists.newArrayList(Sets.intersection(leftArgs, rightArgs)); + + leftArgs.removeAll(joinVars); + rightArgs.removeAll(joinVars); + + final List<String> otherVars = new ArrayList<>(); + otherVars.addAll(leftArgs); + otherVars.addAll(rightArgs); + + // the join variables need to be sorted so that when compared to all + // the variables, the start of the all variable list is congruent to + // the join var list. + joinVars.sort(Comparator.naturalOrder()); + otherVars.sort(Comparator.naturalOrder()); + + final List<String> allVars = new ArrayList<>(); + allVars.addAll(joinVars); + allVars.addAll(otherVars); + + final Optional<Side> side = getSide(node); + final JoinProcessorSupplier supplier = new JoinProcessorSupplier(id, joinFunction, joinVars, allVars, result -> getResult(side, result)); + entries.add(new ProcessorEntry(node, id, side, supplier, Lists.newArrayList(node.getLeftArg(), node.getRightArg()))); + idMap.put(node, id); + } + + /** + * Creates a {@link ProcessorResult} based on a side and result. + * + * @param side - If one is present, a {@link BinaryResult} is created. + * @param result - The result to wrap in a {@link ProcessorResult}. + * @return The {@link ProcessorResult} used by the {@link Processor}. + */ + private ProcessorResult getResult(final Optional<Side> side, final VisibilityBindingSet result) { + if (side.isPresent()) { + return ProcessorResult.make(new BinaryResult(side.get(), result)); + } else { + return ProcessorResult.make(new UnaryResult(result)); + } + } + } + + /** + * An Exception thrown when a problem occurs when constructing the processor + * topology in the {@link TopologyFactory}. + */ + public class TopologyBuilderException extends Exception { + private static final long serialVersionUID = 1L; + + public TopologyBuilderException(final String message, final Throwable cause) { + super(message, cause); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java index 0b2ff60..e55ec2e 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java @@ -24,25 +24,18 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; -import org.apache.rya.streams.kafka.RdfTestUtil; -import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; -import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterSupplier.BindingSetOutputFormatter; -import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; -import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.algebra.evaluation.QueryBindingSet; /** @@ -61,23 +54,10 @@ public class StatementPatternProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); // Create a statement that generate an SP result. final ValueFactory vf = new ValueFactoryImpl(); @@ -104,23 +84,10 @@ public class StatementPatternProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = "SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -155,27 +122,13 @@ public class StatementPatternProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "?person ?action <urn:Bob>" + + "}"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -185,14 +138,10 @@ public class StatementPatternProcessorIT { // Show the correct binding set results from the job. final Set<VisibilityBindingSet> expected = new HashSet<>(); - QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - bs = new QueryBindingSet(); + final QueryBindingSet bs = new QueryBindingSet(); bs.addBinding("person", vf.createURI("urn:Alice")); bs.addBinding("action", vf.createURI("urn:talksTo")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); expected.add( new VisibilityBindingSet(bs, "a") ); // Run the test. @@ -207,27 +156,13 @@ public class StatementPatternProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPattern object that will be evaluated. - final StatementPattern sp1 = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); - final StatementPattern sp2 = RdfTestUtil.getSp("SELECT * WHERE { ?person ?action <urn:Bob> }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp1, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("SP2", new StatementPatternProcessorSupplier(sp2, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "SP1", "SP2"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson ." + + "?person ?action <urn:Bob>" + + "}"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); // Create some statements where some generates SP results and others do not. final ValueFactory vf = new ValueFactoryImpl(); @@ -240,24 +175,17 @@ public class StatementPatternProcessorIT { final Set<VisibilityBindingSet> expected = new HashSet<>(); QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); - bs.addBinding("otherPerson", vf.createURI("urn:Bob")); - expected.add( new VisibilityBindingSet(bs, "a") ); - bs = new QueryBindingSet(); bs.addBinding("person", vf.createURI("urn:Alice")); bs.addBinding("action", vf.createURI("urn:talksTo")); - expected.add( new VisibilityBindingSet(bs, "a") ); - - bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Alice")); bs.addBinding("otherPerson", vf.createURI("urn:Charlie")); - expected.add( new VisibilityBindingSet(bs, "a|b") ); + expected.add(new VisibilityBindingSet(bs, "a&(a|b)")); bs = new QueryBindingSet(); - bs.addBinding("person", vf.createURI("urn:Charlie")); - bs.addBinding("action", vf.createURI("urn:walksWith")); - expected.add( new VisibilityBindingSet(bs, "b") ); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("action", vf.createURI("urn:talksTo")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add(new VisibilityBindingSet(bs, "a")); // Run the test. KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java index 7051efa..dbad15c 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -46,6 +46,7 @@ import org.apache.rya.streams.kafka.processors.output.BindingSetOutputFormatterS import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerde; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; @@ -82,46 +83,13 @@ public class JoinProcessorIT { final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); - // Get the StatementPatterns that will be evaluated. - final StatementPattern leftSp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?employee }"); - final StatementPattern rightSp = RdfTestUtil.getSp("SELECT * WHERE { ?employee <urn:worksAt> ?business }"); - // Setup a topology. - final TopologyBuilder builder = new TopologyBuilder(); - - // The topic that Statements are written to is used as a source. - builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); - - // Add a processor that handles the first statement pattern. - builder.addProcessor("LEFT_SP", new StatementPatternProcessorSupplier(leftSp, - result -> ProcessorResult.make( new BinaryResult(Side.LEFT, result) )), "STATEMENTS"); - - // Add a processor that handles the second statement pattern. - builder.addProcessor("RIGHT_SP", new StatementPatternProcessorSupplier(rightSp, - result -> ProcessorResult.make( new BinaryResult(Side.RIGHT, result) )), "STATEMENTS"); - - // Add a processor that handles a natrual join over the SPs. - builder.addProcessor("NATURAL_JOIN", new JoinProcessorSupplier( - "NATURAL_JOIN", - new NaturalJoin(), - Lists.newArrayList("employee"), - Lists.newArrayList("employee", "person", "business"), - result -> ProcessorResult.make( new UnaryResult(result) )), "LEFT_SP", "RIGHT_SP"); - - // Add a state store for the join processor. - final StateStoreSupplier joinStoreSupplier = - Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); - builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); - - // Add a processor that formats the VisibilityBindingSet for output. - builder.addProcessor("SINK_FORMATTER", BindingSetOutputFormatter::new, "NATURAL_JOIN"); - - // Add a sink that writes the data out to a new Kafka topic. - builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic); // Create some statements that generate a bunch of right SP results. final ValueFactory vf = new ValueFactoryImpl(); @@ -194,10 +162,10 @@ public class JoinProcessorIT { // Add a state store for the join processor. final StateStoreSupplier joinStoreSupplier = Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); // Add a processor that formats the VisibilityBindingSet for output. @@ -277,10 +245,10 @@ public class JoinProcessorIT { // Add a state store for the join processor. final StateStoreSupplier joinStoreSupplier = Stores.create( "NATURAL_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); builder.addStateStore(joinStoreSupplier, "NATURAL_JOIN"); // Add a processor that formats the VisibilityBindingSet for output. @@ -379,18 +347,18 @@ public class JoinProcessorIT { // Setup the join state suppliers. final StateStoreSupplier join1StoreSupplier = Stores.create( "JOIN1" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); builder.addStateStore(join1StoreSupplier, "JOIN1"); final StateStoreSupplier join2StoreSupplier = Stores.create( "JOIN2" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); builder.addStateStore(join2StoreSupplier, "JOIN2"); // Add a processor that formats the VisibilityBindingSet for output. @@ -459,10 +427,10 @@ public class JoinProcessorIT { // Add a state store for the join processor. final StateStoreSupplier joinStoreSupplier = Stores.create( "LEFT_JOIN" ) - .withStringKeys() - .withValues(new VisibilityBindingSetSerde()) - .inMemory() - .build(); + .withStringKeys() + .withValues(new VisibilityBindingSetSerde()) + .inMemory() + .build(); builder.addStateStore(joinStoreSupplier, "LEFT_JOIN"); // Add a processor that formats the VisibilityBindingSet for output. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/83d09f42/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java new file mode 100644 index 0000000..eda4c89 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/topology/TopologyFactoryTest.java @@ -0,0 +1,106 @@ +package org.apache.rya.streams.kafka.topology; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.rya.streams.kafka.topology.TopologyFactory.ProcessorEntry; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; + +public class TopologyFactoryTest { + private static TopologyFactory FACTORY; + + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + private static final Var TALKS_TO = new Var("-const-urn:talksTo", VF.createURI("urn:talksTo")); + private static final Var CHEWS = new Var("-const-urn:chews", VF.createURI("urn:chews")); + + static { + TALKS_TO.setAnonymous(true); + TALKS_TO.setConstant(true); + CHEWS.setAnonymous(true); + CHEWS.setConstant(true); + } + + @Before + public void setup() { + FACTORY = new TopologyFactory(); + } + + @Test + public void projectionStatementPattern() throws Exception { + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "}"; + + FACTORY.build(query, "source", "sink"); + final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); + + assertTrue(entries.get(0).getNode() instanceof Projection); + assertTrue(entries.get(1).getNode() instanceof StatementPattern); + + final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson")); + assertEquals(expected, entries.get(1).getNode()); + } + + @Test + public void projectionJoinStatementPattern() throws Exception { + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "?otherPerson <urn:talksTo> ?dog . " + + "}"; + + FACTORY.build(query, "source", "sink"); + final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); + + assertTrue(entries.get(0).getNode() instanceof Projection); + assertTrue(entries.get(1).getNode() instanceof Join); + StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson")); + assertEquals(expected, entries.get(2).getNode()); + expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog")); + assertEquals(expected, entries.get(3).getNode()); + } + + @Test + public void projectionJoinJoinStatementPattern() throws Exception { + final String query = "SELECT * WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "?otherPerson <urn:talksTo> ?dog . " + + "?dog <urn:chews> ?toy . " + + "}"; + + FACTORY.build(query, "source", "sink"); + final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); + + assertTrue(entries.get(0).getNode() instanceof Projection); + assertTrue(entries.get(1).getNode() instanceof Join); + assertTrue(entries.get(2).getNode() instanceof Join); + StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson")); + assertEquals(expected, entries.get(3).getNode()); + expected = new StatementPattern(new Var("otherPerson"), TALKS_TO, new Var("dog")); + assertEquals(expected, entries.get(4).getNode()); + expected = new StatementPattern(new Var("dog"), CHEWS, new Var("toy")); + assertEquals(expected, entries.get(5).getNode()); + } + + @Test + public void projectionStatementPattern_rebind() throws Exception { + final String query = "CONSTRUCT { ?person <urn:mightKnow> ?otherPerson } WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "}"; + + FACTORY.build(query, "source", "sink"); + final List<ProcessorEntry> entries = FACTORY.getProcessorEntry(); + + assertTrue(entries.get(0).getNode() instanceof Projection); + final StatementPattern expected = new StatementPattern(new Var("person"), TALKS_TO, new Var("otherPerson")); + assertEquals(expected, entries.get(1).getNode()); + } +}