RYA-377 ProjectionProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/98af7aa7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/98af7aa7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/98af7aa7 Branch: refs/heads/master Commit: 98af7aa7fd22784855a1f45b15b342db3f44eaea Parents: 4acbe5a Author: Andrew Smith <smith...@gmail.com> Authored: Mon Nov 13 14:51:28 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- .../projection/ProjectionProcessorSupplier.java | 128 ++++++++++++++++ .../kafka/processors/ProjectionProcessorIT.java | 152 +++++++++++++++++++ 2 files changed, 280 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/98af7aa7/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java new file mode 100644 index 0000000..67a777f --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link ProjectionProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class ProjectionProcessorSupplier extends RyaStreamsProcessorSupplier { + + private final ProjectionElemList projectionElems; + + /** + * Constructs an instance of {@link ProjectionProcessorSupplier}. + * + * @param projectionElems - The {@link ProjectionElemList} that defines which bindings get forwarded or changed. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public ProjectionProcessorSupplier( + final ProjectionElemList projectionElems, + final ProcessorResultFactory resultFactory) { + super(resultFactory); + this.projectionElems = requireNonNull(projectionElems); + } + + @Override + public Processor<Object, ProcessorResult> get() { + return new ProjectionProcessor(projectionElems, super.getResultFactory()); + } + + /** + * Evaluates {@link ProcessorResult}s against a {@link Projection}. Any results found in + * the {@link ProjectionElemList} will be modified and/or forwarded. A {@link ProjectionElemList} defines + * a source and target name for a binding, so if a binding name appears in the source list of the {@link ProjectionElemList}, + * then it will be renamed with the associated target name. + */ + @DefaultAnnotation(NonNull.class) + public static final class ProjectionProcessor implements Processor<Object, ProcessorResult> { + + private final ProjectionElemList projectionElems; + private final ProcessorResultFactory resultFactory; + + private ProcessorContext context; + + /** + * Constructs an instance of {@link ProjectionProcessor}. + * + * @param projectionElems - The projection elems that will determine what to do with the bindings. (not null) + * @param resultFactory - The factory that the processor will use to create results. (not null) + */ + public ProjectionProcessor(final ProjectionElemList projectionElems, final ProcessorResultFactory resultFactory) { + this.projectionElems = requireNonNull(projectionElems); + this.resultFactory = requireNonNull(resultFactory); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public void process(final Object key, final ProcessorResult result) { + // projections can only be unary + if (result.getType() != ResultType.UNARY) { + throw new RuntimeException("The ProcessorResult to be processed must be Unary."); + } + + final UnaryResult unary = result.getUnary(); + final VisibilityBindingSet bindingSet = unary.getResult(); + + final MapBindingSet newBindingSet = new MapBindingSet(bindingSet.size()); + for (final ProjectionElem elem : projectionElems.getElements()) { + if (bindingSet.hasBinding(elem.getSourceName())) { + newBindingSet.addBinding(elem.getTargetName(), bindingSet.getValue(elem.getSourceName())); + } + } + + // wrap the new binding set with the original's visibility. + final VisibilityBindingSet newVisiSet = new VisibilityBindingSet(newBindingSet, bindingSet.getVisibility()); + final ProcessorResult resultValue = resultFactory.make(newVisiSet); + context.forward(key, resultValue); + } + + @Override + public void punctuate(final long timestamp) { + // Nothing to do. + } + + @Override + public void close() { + // Nothing to do. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/98af7aa7/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java new file mode 100644 index 0000000..53d1765 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; +import org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link StatementPatternProcessor}. + */ +public class ProjectionProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void projection_renameOne() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that handles the projection. + final ProjectionElemList elems = new ProjectionElemList(); + elems.addElement(new ProjectionElem("otherPerson", "dog")); + elems.addElement(new ProjectionElem("person", "person")); + builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, result -> ProcessorResult.make(new UnaryResult(result))), "SP1"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "P1"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Load some data into the input topic. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Sparky")), "a") ); + + // Show the correct binding set results from the job. + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("dog", vf.createURI("urn:Sparky")); + final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a"); + final Set<VisibilityBindingSet> expected = new HashSet<>(); + expected.add(binding); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected)); + } + + @Test + public void projection_keepOneDropOne() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that handles the projection. + final ProjectionElemList elems = new ProjectionElemList(); + elems.addElement(new ProjectionElem("otherPerson", "otherPerson")); + builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, result -> ProcessorResult.make(new UnaryResult(result))), "SP1"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "P1"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Load some data into the input topic. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a"); + final Set<VisibilityBindingSet> expected = new HashSet<>(); + expected.add(binding); + KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, Sets.newHashSet(expected)); + } +} \ No newline at end of file