RYA-377 ProjectionProcessor

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/98af7aa7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/98af7aa7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/98af7aa7

Branch: refs/heads/master
Commit: 98af7aa7fd22784855a1f45b15b342db3f44eaea
Parents: 4acbe5a
Author: Andrew Smith <smith...@gmail.com>
Authored: Mon Nov 13 14:51:28 2017 -0500
Committer: caleb <caleb.me...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 .../projection/ProjectionProcessorSupplier.java | 128 ++++++++++++++++
 .../kafka/processors/ProjectionProcessorIT.java | 152 +++++++++++++++++++
 2 files changed, 280 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/98af7aa7/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java
new file mode 100644
index 0000000..67a777f
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorSupplier.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.projection;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.ResultType;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link ProjectionProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class ProjectionProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+    private final ProjectionElemList projectionElems;
+
+    /**
+     * Constructs an instance of {@link ProjectionProcessorSupplier}.
+     *
+     * @param projectionElems - The {@link ProjectionElemList} that defines 
which bindings get forwarded or changed. (not null)
+     * @param resultFactory - The factory that the supplied processors will 
use to create results. (not null)
+     */
+    public ProjectionProcessorSupplier(
+            final ProjectionElemList projectionElems,
+            final ProcessorResultFactory resultFactory) {
+        super(resultFactory);
+        this.projectionElems = requireNonNull(projectionElems);
+    }
+
+    @Override
+    public Processor<Object, ProcessorResult> get() {
+        return new ProjectionProcessor(projectionElems, 
super.getResultFactory());
+    }
+
+    /**
+     * Evaluates {@link ProcessorResult}s against a {@link Projection}.  Any 
results found in
+     * the {@link ProjectionElemList} will be modified and/or forwarded.  A 
{@link ProjectionElemList} defines
+     * a source and target name for a binding, so if a binding name appears in 
the source list of the {@link ProjectionElemList},
+     * then it will be renamed with the associated target name.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class ProjectionProcessor implements Processor<Object, 
ProcessorResult> {
+
+        private final ProjectionElemList projectionElems;
+        private final ProcessorResultFactory resultFactory;
+
+        private ProcessorContext context;
+
+        /**
+         * Constructs an instance of {@link ProjectionProcessor}.
+         *
+         * @param projectionElems - The projection elems that will determine 
what to do with the bindings. (not null)
+         * @param resultFactory - The factory that the processor will use to 
create results. (not null)
+         */
+        public ProjectionProcessor(final ProjectionElemList projectionElems, 
final ProcessorResultFactory resultFactory) {
+            this.projectionElems = requireNonNull(projectionElems);
+            this.resultFactory = requireNonNull(resultFactory);
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public void process(final Object key, final ProcessorResult result) {
+            // projections can only be unary
+            if (result.getType() != ResultType.UNARY) {
+                throw new RuntimeException("The ProcessorResult to be 
processed must be Unary.");
+            }
+
+            final UnaryResult unary = result.getUnary();
+            final VisibilityBindingSet bindingSet = unary.getResult();
+
+            final MapBindingSet newBindingSet = new 
MapBindingSet(bindingSet.size());
+            for (final ProjectionElem elem : projectionElems.getElements()) {
+                if (bindingSet.hasBinding(elem.getSourceName())) {
+                    newBindingSet.addBinding(elem.getTargetName(), 
bindingSet.getValue(elem.getSourceName()));
+                }
+            }
+
+            // wrap the new binding set with the original's visibility.
+            final VisibilityBindingSet newVisiSet = new 
VisibilityBindingSet(newBindingSet, bindingSet.getVisibility());
+            final ProcessorResult resultValue = resultFactory.make(newVisiSet);
+            context.forward(key, resultValue);
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Nothing to do.
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/98af7aa7/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java
new file mode 100644
index 0000000..53d1765
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/ProjectionProcessorIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import 
org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
+import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import 
org.apache.rya.streams.kafka.processors.projection.ProjectionProcessorSupplier;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class ProjectionProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void projection_renameOne() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the projection.
+        final ProjectionElemList elems = new ProjectionElemList();
+        elems.addElement(new ProjectionElem("otherPerson", "dog"));
+        elems.addElement(new ProjectionElem("person", "person"));
+        builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, 
result -> ProcessorResult.make(new UnaryResult(result))), "SP1");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"P1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Load some data into the input topic.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Sparky")), "a") );
+
+        // Show the correct binding set results from the job.
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("person", vf.createURI("urn:Alice"));
+        bs.addBinding("dog", vf.createURI("urn:Sparky"));
+        final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a");
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        expected.add(binding);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, Sets.newHashSet(expected));
+    }
+
+    @Test
+    public void projection_keepOneDropOne() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that handles the projection.
+        final ProjectionElemList elems = new ProjectionElemList();
+        elems.addElement(new ProjectionElem("otherPerson", "otherPerson"));
+        builder.addProcessor("P1", new ProjectionProcessorSupplier(elems, 
result -> ProcessorResult.make(new UnaryResult(result))), "SP1");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"P1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Load some data into the input topic.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final List<VisibilityStatement> statements = new ArrayList<>();
+        statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+        // Show the correct binding set results from the job.
+        final QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+        final VisibilityBindingSet binding = new VisibilityBindingSet(bs, "a");
+        final Set<VisibilityBindingSet> expected = new HashSet<>();
+        expected.add(binding);
+        KafkaTestUtil.runStreamProcessingTest(kafka, statementsTopic, 
resultsTopic, builder, 2000, statements, Sets.newHashSet(expected));
+    }
+}
\ No newline at end of file

Reply via email to