RYA-377 Implement the StatementPatternProcessor for the Rya Streams project.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/653e4b83 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/653e4b83 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/653e4b83 Branch: refs/heads/master Commit: 653e4b83ac2fecac0c1b2107fbcdffe7bc357371 Parents: 0ad2c51 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Mon Nov 6 16:39:35 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:00 2018 -0500 ---------------------------------------------------------------------- common/rya.api.function/pom.xml | 11 +- .../function/sp/StatementPatternMatcher.java | 142 ++++++++++ .../sp/StatementPatternMatcherTest.java | 272 +++++++++++++++++++ .../streams/api/interactor/LoadStatements.java | 11 +- .../client/command/LoadStatementsCommand.java | 2 +- extras/rya.streams/kafka/pom.xml | 4 + .../kafka/interactor/KafkaLoadStatements.java | 13 +- .../RyaStreamsSinkFormatterSupplier.java | 83 ++++++ .../StatementPatternProcessorSupplier.java | 121 +++++++++ .../apache/rya/streams/kafka/KafkaTestUtil.java | 127 +++++++++ .../apache/rya/streams/kafka/RdfTestUtil.java | 62 +++++ .../interactor/KafkaGetQueryResultStreamIT.java | 20 +- .../kafka/interactor/KafkaLoadStatementsIT.java | 61 ++--- .../processors/StatementPatternProcessorIT.java | 135 +++++++++ .../kafka/queries/KafkaQueryChangeLogIT.java | 26 +- .../VisibilityBindingSetKafkaIT.java | 44 ++- .../VisibilityStatementKafkaIT.java | 44 ++- 17 files changed, 1039 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/pom.xml ---------------------------------------------------------------------- diff --git a/common/rya.api.function/pom.xml b/common/rya.api.function/pom.xml index f05dd6f..ce88e36 100644 --- a/common/rya.api.function/pom.xml +++ b/common/rya.api.function/pom.xml @@ -31,6 +31,7 @@ under the License. <name>Apache Rya Common API - Functions</name> <dependencies> + <!-- Rya dependencies. --> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.api.model</artifactId> @@ -38,15 +39,23 @@ under the License. <!-- Third Party Dependencies --> <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryalgebra-model</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryalgebra-evaluation</artifactId> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> - <dependency> <groupId>com.github.stephenc.findbugs</groupId> <artifactId>findbugs-annotations</artifactId> </dependency> + <!-- Testing dependencies. --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java new file mode 100644 index 0000000..208f8d1 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.sp; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +import org.openrdf.model.Statement; +import org.openrdf.model.Value; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Matches {@link Statement}s against a {@link StatementPattern} and returns {@link BindingSet}s + * when the statement matched the pattern. + */ +@DefaultAnnotation(NonNull.class) +public class StatementPatternMatcher { + + private final StatementPattern pattern; + + /** + * Constructs an instance of {@link StatementPatternMatcher}. + * + * @param pattern - The pattern that will be matched against. (not null) + */ + public StatementPatternMatcher(final StatementPattern pattern) { + this.pattern = requireNonNull(pattern); + } + + /** + * Matches a {@link Statement} against the provided {@link StatementPattern} and returns a {@link BindingSet} + * if the statement matched the pattern. + * + * @param statement - The statement that will be matched against the pattern. (not null) + * @return A {@link BinidngSet} containing the statement's values filled in for the pattern's variables if + * the statement's values match the pattern's constants; otherwise empty. + */ + public Optional<BindingSet> match(final Statement statement) { + requireNonNull(statement); + + // Setup the resulting binding set that could be built from this Statement. + final QueryBindingSet bs = new QueryBindingSet(); + + if(matchesValue(pattern.getSubjectVar(), statement.getSubject(), bs) && + matchesValue(pattern.getPredicateVar(), statement.getPredicate(), bs) && + matchesValue(pattern.getObjectVar(), statement.getObject(), bs) && + matchesContext(pattern.getContextVar(), statement.getContext(), bs)) { + return Optional.of(bs); + } else { + return Optional.empty(); + } + } + + /** + * The following table describes how a Subject, Predicate, and Object Var may be handled for a Statement and a + * Statement Pattern: + * <table border=1> + * <tr> <th>Pattern's var is constant</th> <th>Effect on resulting BS</th> </tr> + * <try> <td>yes</td> <td>Emit a BS if they match, no Context binding</td> </tr> + * <try> <td>no</td> <td>Emit a BS with a binding for the variable</td> </tr> + * </table> + * + * @param var - The statement pattern variable that is being matched. (not null) + * @param stmtValue - The statement's value for the variable. (not null) + * @param bs - The binding set that may be updated to include a binding for the variable. (not null) + * @return {@code true} if he variable and the statement value match, otherwise {@code false}, + */ + private boolean matchesValue(final Var var, final Value stmtValue, final QueryBindingSet bs) { + requireNonNull(var); + requireNonNull(stmtValue); + requireNonNull(bs); + + // If the var is a constant, statement's value must match the var's value. + if(var.isConstant()) { + if(!stmtValue.equals(var.getValue())) { + return false; + } + } else { + // Otherwise it is a variable to be filled in. + bs.addBinding(var.getName(), stmtValue); + } + + // Either the value matched the constant or the binding set was updated. + return true; + } + + /** + * The following table describes how Context may be handled for a Statement and a Statement Pattern: + * <table border=1> + * <tr> <th>Pattern's context state</th> <th>Statement has a context value</th> <th>Effect on resulting BS</th></tr> + * <tr> <td>not mentioned</td> <td>yes</td> <td>Emit BS without a Context binding</td> </tr> + * <tr> <td>not mentioned</td> <td>no</td> <td>Emit BS without a Context binding</td> </tr> + * <tr> <td>has a constant</td> <td>yes</td> <td>Emit BS if they match, no Context binding</td> </tr> + * <tr> <td>has a constant</td> <td>no</td> <td>Do not emit a BS</td> </tr> + * <tr> <td>has a variable</td> <td>yes</td> <td>Emit BS with Context binding</td> </tr> + * <tr> <td>has a variable</td> <td>no</td> <td>Do not emit a BS</td> </tr> + * </table> + * + * @param cntxVar - The statement pattern's context variable. This may be {@code null} when there is no context + * specified for the pattern. + * @param stmtCntx - The statement's context value. This may be {@code null} when there was no context + * specified within the statement. + * @param bs - The binding set that may be updated to include a context binding. (not null) + * @return {@code true} if the the pattern's context variable and statement's context matched, otherwise {@code false}. + */ + private boolean matchesContext(@Nullable final Var cntxVar, @Nullable final Value stmtCntx, final QueryBindingSet bs) { + if(cntxVar == null) { + // If there is no context, automatically matches. + return true; + } else if(stmtCntx == null) { + // If no value was provided within the statement, then it does not match. + return false; + } else { + // Otherwise handle it like a normal variable. + return matchesValue(cntxVar, stmtCntx, bs); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java b/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java new file mode 100644 index 0000000..78a5418 --- /dev/null +++ b/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.sp; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Unit tests the methods of {@link StatementPatternMatcher}. + */ +public class StatementPatternMatcherTest { + + @Test + public void matchesSubject() throws Exception { + // Create the matcher against a pattern that matches a specific subject. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "<urn:Alice> ?p ?o ." + + "}")); + + // Create a statement that matches the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Create the expected resulting Binding Set. + final QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("p", vf.createURI("urn:talksTo")); + expected.addBinding("o", vf.createURI("urn:Bob")); + + // Show the expected Binding Set matches the resulting Binding Set. + final Optional<BindingSet> bs = matcher.match(statement); + assertEquals(expected, bs.get()); + } + + @Test + public void doesNotMatchSubject() throws Exception { + // Create the matcher against a pattern that matches a specific subject. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "<urn:Alice> ?p ?o ." + + "}")); + + // Create a statement that does not match the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Show the statement did not match. + final Optional<BindingSet> bs = matcher.match(statement); + assertFalse(bs.isPresent()); + } + + @Test + public void matchesPredicate() throws Exception { + // Create the matcher against a pattern that matches a specific predicate. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "?s <urn:talksTo> ?o ." + + "}")); + + // Create a statement that matches the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Create the expected resulting Binding Set. + final QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("s", vf.createURI("urn:Alice")); + expected.addBinding("o", vf.createURI("urn:Bob")); + + // Show the expected Binding Set matches the resulting Binding Set. + final Optional<BindingSet> bs = matcher.match(statement); + assertEquals(expected, bs.get()); + } + + @Test + public void doesNotMatchPredicate() throws Exception { + // Create the matcher against a pattern that matches a specific predicate. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "?s <urn:talksTo> ?o ." + + "}")); + + // Create a statement that does not match the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:knows"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Show the statement did not match. + final Optional<BindingSet> bs = matcher.match(statement); + assertFalse(bs.isPresent()); + } + + @Test + public void matchesObject() throws Exception { + // Create the matcher against a pattern that matches a specific object. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "?s ?p <urn:Bob> ." + + "}")); + + // Create a statement that matches the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Create the expected resulting Binding Set. + final QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("s", vf.createURI("urn:Alice")); + expected.addBinding("p", vf.createURI("urn:talksTo")); + + // Show the expected Binding Set matches the resulting Binding Set. + final Optional<BindingSet> bs = matcher.match(statement); + assertEquals(expected, bs.get()); + } + + @Test + public void doesNotMatchObject() throws Exception { + // Create the matcher against a pattern that matches a specific object. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "?s ?p <urn:Bob> ." + + "}")); + + // Create a statement that does not match the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:knows"), vf.createURI("urn:Alice"), vf.createURI("urn:testGraph")); + + // Show the statement did not match. + final Optional<BindingSet> bs = matcher.match(statement); + assertFalse(bs.isPresent()); + } + + @Test + public void matchesContext() throws Exception { + // Create a matcher against a pattern that matches a specific context. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "GRAPH <urn:testGraph> {" + + "?s ?p ?o ." + + "}" + + "}")); + + // Create a statement that matches the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Create the expected resulting Binding Set. + final QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("s", vf.createURI("urn:Alice")); + expected.addBinding("p", vf.createURI("urn:talksTo")); + expected.addBinding("o", vf.createURI("urn:Bob")); + + // Show the expected Binding Set matches the resulting Binding Set. + final Optional<BindingSet> bs = matcher.match(statement); + assertEquals(expected, bs.get()); + } + + @Test + public void doesNotMatchContext() throws Exception { + // Create a matcher against a pattern that matches a specific context. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "GRAPH <urn:testGraph> {" + + "?s ?p ?o ." + + "}" + + "}")); + + // Create a statement that does not match the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:wrong")); + + // Show the statement did not match. + final Optional<BindingSet> bs = matcher.match(statement); + assertFalse(bs.isPresent()); + } + + @Test + public void variableContext() throws Exception { + // Create a matcher against a pattern that matches a variable context. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "GRAPH ?c {" + + "?s ?p ?o ." + + "}" + + "}")); + + // Create a statement that matches the pattern. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob"), vf.createURI("urn:testGraph")); + + // Create the expected resulting Binding Set. + final QueryBindingSet expected = new QueryBindingSet(); + expected.addBinding("s", vf.createURI("urn:Alice")); + expected.addBinding("p", vf.createURI("urn:talksTo")); + expected.addBinding("o", vf.createURI("urn:Bob")); + expected.addBinding("c", vf.createURI("urn:testGraph")); + + // Show the expected Binding Set matches the resulting Binding Set. + final Optional<BindingSet> bs = matcher.match(statement); + assertEquals(expected, bs.get()); + } + + @Test + public void variableContext_contextFreeStatement() throws Exception { + // Create a matcher against a pattern that matches a variable context. + final StatementPatternMatcher matcher = new StatementPatternMatcher(getSp( + "SELECT * WHERE {" + + "GRAPH ?c {" + + "?s ?p ?o ." + + "}" + + "}")); + + // Create a statement that does not have a context value. + final ValueFactory vf = new ValueFactoryImpl(); + final Statement statement = vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")); + + // Show the statement did not match. + final Optional<BindingSet> bs = matcher.match(statement); + assertFalse(bs.isPresent()); + } + + /** + * Fetch the {@link StatementPattern} from a SPARQL string. + * + * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul) + * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null} + * @throws Exception The statement pattern could not be found in the parsed SPARQL query. + */ + public static @Nullable StatementPattern getSp(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final StatementPattern node) throws Exception { + statementPattern.set(node); + } + }); + return statementPattern.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java index babe914..c64a08d 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java @@ -19,6 +19,7 @@ package org.apache.rya.streams.api.interactor; import java.nio.file.Path; +import java.util.Collection; import org.apache.rya.api.model.VisibilityStatement; import org.apache.rya.streams.api.exception.RyaStreamsException; @@ -45,5 +46,13 @@ public interface LoadStatements { * @throws RyaStreamsException Thrown when the format of the file provided is unknown, * or not a valid RDF format. */ - public void load(final Path statementsPath, final String visibilities) throws RyaStreamsException; + public void fromFile(final Path statementsPath, final String visibilities) throws RyaStreamsException; + + /** + * Loads a series of {@link VisibilityStatement}s from a collection into the RyaStreams system. + * + * @param statements - The statements that will be loaded. (not null) + * @throws RyaStreamsException The statements could not be loaded. + */ + public void fromCollection(Collection<VisibilityStatement> statements) throws RyaStreamsException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java index 6ae63da..9414b28 100644 --- a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java +++ b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java @@ -127,7 +127,7 @@ public class LoadStatementsCommand implements RyaStreamsCommand { final Properties producerProps = buildProperties(params); try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) { final LoadStatements statements = new KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer); - statements.load(statementsPath, params.visibilities); + statements.fromFile(statementsPath, params.visibilities); } catch (final Exception e) { System.err.println("Unable to parse statements file: " + statementsPath.toString()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index d5fffe0..33cc985 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -41,6 +41,10 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> + <artifactId>rya.api.function</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> <artifactId>rya.streams.api</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java index 4cf8f9b..8ab3ab6 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java @@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Collection; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -65,7 +66,7 @@ public class KafkaLoadStatements implements LoadStatements { @Override - public void load(final Path statementsPath, final String visibilities) throws RyaStreamsException { + public void fromFile(final Path statementsPath, final String visibilities) throws RyaStreamsException { requireNonNull(statementsPath); requireNonNull(visibilities); @@ -99,4 +100,14 @@ public class KafkaLoadStatements implements LoadStatements { throw new RyaStreamsException("Could not load the RDF file's Statements into Rya Streams.", e); } } + + @Override + public void fromCollection(final Collection<VisibilityStatement> statements) throws RyaStreamsException { + requireNonNull(statements); + + for(final VisibilityStatement statement : statements) { + producer.send(new ProducerRecord<>(topic, statement)); + } + producer.flush(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java new file mode 100644 index 0000000..d6a8d2d --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.rya.api.model.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link RyaStreamsSinkFormatter} instances. + */ +@DefaultAnnotation(NonNull.class) +public class RyaStreamsSinkFormatterSupplier implements ProcessorSupplier<Object, ProcessorResult> { + + @Override + public Processor<Object, ProcessorResult> get() { + return new RyaStreamsSinkFormatter(); + } + + /** + * Accepts {@link ProcessorResult}s and forwards just their {@link VisibilityBindingSet} so that it may be + * written to a sink. + */ + @DefaultAnnotation(NonNull.class) + public static final class RyaStreamsSinkFormatter implements Processor<Object, ProcessorResult> { + + private ProcessorContext processorContext; + + @Override + public void init(final ProcessorContext context) { + processorContext = context; + } + + @Override + public void process(final Object key, final ProcessorResult value) { + + VisibilityBindingSet result = null; + switch(value.getType()) { + case UNARY: + result = value.getUnary().getResult(); + break; + + case BINARY: + result = value.getBinary().getResult(); + break; + } + + if(result != null) { + processorContext.forward(key, result); + } + } + + @Override + public void punctuate(final long timestamp) { + // Does nothing. + } + + @Override + public void close() { + // Does nothing. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java new file mode 100644 index 0000000..6991783 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.rya.api.function.sp.StatementPatternMatcher; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link StatementPatternProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class StatementPatternProcessorSupplier implements ProcessorSupplier<String, VisibilityStatement> { + + private final StatementPattern sp; + private final ProcessorResultFactory resultFactory; + + /** + * Constructs an instance of {@link StatementPatternProcessorSupplier}. + * + * @param sp - The statement pattern that the supplied processors will match against. (not null) + * @param keyFactory - The key factory that the supplied processors will use to create result keys. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + */ + public StatementPatternProcessorSupplier( + final StatementPattern sp, + final ProcessorResultFactory resultFactory) { + this.sp = requireNonNull(sp); + this.resultFactory = requireNonNull(resultFactory); + } + + @Override + public Processor<String, VisibilityStatement> get() { + return new StatementPatternProcessor(sp, resultFactory); + } + + /** + * Evaluates {@link VisibilityStatement}s against a {@link StatementPattern}. Any who match the pattern + * will forward a {@link VisibilityBindingSet} as well as store that new binding set in the local state store + * so that downstream join processors may access everything that has ever been emitted for the pattern. + */ + @DefaultAnnotation(NonNull.class) + public static final class StatementPatternProcessor implements Processor<String, VisibilityStatement> { + + private final StatementPatternMatcher spMatcher; + private final ProcessorResultFactory resultFactory; + + private ProcessorContext context; + + /** + * Constructs an instance of {@link StatementPatternProcessor}. + * + * @param sp - The statement pattern that the processor will match statements against. (not null) + * @param resultFactory - The factory that the processor will use to create results. (not null) + */ + public StatementPatternProcessor( + final StatementPattern sp, + final ProcessorResultFactory resultFactory) { + this.spMatcher = new StatementPatternMatcher( requireNonNull(sp) ); + this.resultFactory = requireNonNull(resultFactory); + } + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public void process(final String key, final VisibilityStatement statement) { + // Check to see if the Statement matches the Statement Pattern. + final Optional<BindingSet> bs = spMatcher.match(statement); + + if(bs.isPresent()) { + // If it does, wrap the Binding Set with the Statement's visibility expression and write it to the state store. + final VisibilityBindingSet visBs = new VisibilityBindingSet(bs.get(), statement.getVisibility()); + + // Wrap the binding set as a result and forward it to the downstream processor. + final ProcessorResult resultValue = resultFactory.make(visBs); + context.forward(key, resultValue); + } + } + + @Override + public void punctuate(final long timestamp) { + // Nothing to do. + } + + @Override + public void close() { + // Nothing to do. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java new file mode 100644 index 0000000..bff4fdb --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A set of utility functions that are useful when writing tests against a Kafka instance. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaTestUtil { + + private KafkaTestUtil() { } + + /** + * Create a {@link Producer} that is able to write to a topic that is hosted within an embedded instance of Kafka. + * + * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param keySerializerClass - Serializes the keys. (not null) + * @param valueSerializerClass - Serializes the values. (not null) + * @return A {@link Producer} that can be used to write records to a topic. + */ + public static <K, V> Producer<K, V> makeProducer( + final KafkaTestInstanceRule kafka, + final Class<? extends Serializer<K>> keySerializerClass, + final Class<? extends Serializer<V>> valueSerializerClass) { + requireNonNull(kafka); + requireNonNull(keySerializerClass); + requireNonNull(valueSerializerClass); + + final Properties props = kafka.createBootstrapServerConfig(); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass.getName()); + return new KafkaProducer<>(props); + } + + /** + * Create a {@link Consumer} that has a unique group ID and reads everything from a topic that is hosted within an + * embedded instance of Kafka starting at the earliest point by default. + * + * @param kafka - The Kafka rule used to connect to the embedded Kafkfa instance. (not null) + * @param keyDeserializerClass - Deserializes the keys. (not null) + * @param valueDeserializerClass - Deserializes the values. (not null) + * @return A {@link Consumer} that can be used to read records from a topic. + */ + public static <K, V> Consumer<K, V> fromStartConsumer( + final KafkaTestInstanceRule kafka, + final Class<? extends Deserializer<K>> keyDeserializerClass, + final Class<? extends Deserializer<V>> valueDeserializerClass) { + requireNonNull(kafka); + requireNonNull(keyDeserializerClass); + requireNonNull(valueDeserializerClass); + + final Properties props = kafka.createBootstrapServerConfig(); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass.getName()); + return new KafkaConsumer<>(props); + } + + /** + * Polls a {@link Consumer> until it has either polled too many times without hitting the target number + * of results, or it hits the target number of results. + * + * @param pollMs - How long each poll could take. + * @param pollIterations - The maximum number of polls that will be attempted. + * @param targetSize - The number of results to read before stopping. + * @param consumer - The consumer that will be polled. + * @return The results that were read frmo the consumer. + * @throws Exception If the poll failed. + */ + public static <K, V> List<V> pollForResults( + final int pollMs, + final int pollIterations, + final int targetSize, + final Consumer<K, V> consumer) throws Exception { + requireNonNull(consumer); + + final List<V> values = new ArrayList<>(); + + int i = 0; + while(values.size() < targetSize && i < pollIterations) { + for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) { + values.add( record.value() ); + } + i++; + } + + return values; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java new file mode 100644 index 0000000..109e40d --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.atomic.AtomicReference; + +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A set of utility functions that are useful when writing tests RDF functions. + */ +@DefaultAnnotation(NonNull.class) +public final class RdfTestUtil { + + private RdfTestUtil() { } + + /** + * Fetch the {@link StatementPattern} from a SPARQL string. + * + * @param sparql - A SPARQL query that contains only a single Statement Patern. (not nul) + * @return The {@link StatementPattern} that was in the query, if it could be found. Otherwise {@code null} + * @throws Exception The statement pattern could not be found in the parsed SPARQL query. + */ + public static @Nullable StatementPattern getSp(final String sparql) throws Exception { + requireNonNull(sparql); + + final AtomicReference<StatementPattern> statementPattern = new AtomicReference<>(); + final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null); + parsed.getTupleExpr().visitChildren(new QueryModelVisitorBase<Exception>() { + @Override + public void meet(final StatementPattern node) throws Exception { + statementPattern.set(node); + } + }); + return statementPattern.get(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java index 3343f76..67889e9 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java @@ -23,17 +23,15 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.UUID; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.streams.api.entity.QueryResultStream; import org.apache.rya.streams.api.interactor.GetQueryResultStream; +import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.KafkaTopics; import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; import org.apache.rya.test.kafka.KafkaTestInstanceRule; @@ -52,16 +50,6 @@ public class KafkaGetQueryResultStreamIT { public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); /** - * @return A {@link Producer} that is able to write {@link VisibilityBindingSet}s. - */ - private Producer<?, VisibilityBindingSet> makeProducer() { - final Properties producerProps = kafka.createBootstrapServerConfig(); - producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityBindingSetSerializer.class.getName()); - return new KafkaProducer<>(producerProps); - } - - /** * Polls a {@link QueryResultStream} until it has either polled too many times without hitting * the target number of results, or it hits the target number of results. * @@ -112,7 +100,8 @@ public class KafkaGetQueryResultStreamIT { original.add(new VisibilityBindingSet(bs, "b|c")); // Write some entries to the query result topic in Kafka. - try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) { + try(final Producer<?, VisibilityBindingSet> producer = + KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) { final String resultTopic = KafkaTopics.queryResultsTopic(queryId); for(final VisibilityBindingSet visBs : original) { producer.send(new ProducerRecord<>(resultTopic, visBs)); @@ -132,7 +121,8 @@ public class KafkaGetQueryResultStreamIT { // Create an ID for the query. final UUID queryId = UUID.randomUUID(); - try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) { + try(final Producer<?, VisibilityBindingSet> producer = + KafkaTestUtil.makeProducer(kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) { final String resultTopic = KafkaTopics.queryResultsTopic(queryId); // Write a single visibility binding set to the query's result topic. This will not appear in the expected results. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java index 5a81d23..b48addd 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java @@ -24,21 +24,14 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; -import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; import org.apache.rya.test.kafka.KafkaITBase; @@ -62,48 +55,30 @@ public class KafkaLoadStatementsIT extends KafkaITBase { @Test(expected = UnsupportedRDFormatException.class) public void test_invalidFile() throws Exception { - final String topic = rule.getKafkaTopicName(); - final String visibilities = "a|b|c"; - final Properties props = rule.createBootstrapServerConfig(); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName()); - try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) { - final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer); - command.load(INVALID, visibilities); + try(final Producer<?, VisibilityStatement> producer = + KafkaTestUtil.makeProducer(rule, StringSerializer.class, VisibilityStatementSerializer.class)) { + final KafkaLoadStatements command = new KafkaLoadStatements(rule.getKafkaTopicName(), producer); + command.fromFile(INVALID, "a|b|c"); } } @Test public void testTurtle() throws Exception { - final String topic = rule.getKafkaTopicName(); final String visibilities = "a|b|c"; - final Properties props = rule.createBootstrapServerConfig(); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName()); - try (final Producer<Object, VisibilityStatement> producer = new KafkaProducer<>(props)) { - final KafkaLoadStatements command = new KafkaLoadStatements(topic, producer); - command.load(TURTLE_FILE, visibilities); - } - - // Read a VisibilityBindingSet from the test topic. - final List<VisibilityStatement> read = new ArrayList<>(); - final Properties consumerProps = rule.createBootstrapServerConfig(); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); + // Load the statements into the kafka topic. + try(final Producer<?, VisibilityStatement> producer = + KafkaTestUtil.makeProducer(rule, StringSerializer.class, VisibilityStatementSerializer.class)) { + final KafkaLoadStatements command = new KafkaLoadStatements(rule.getKafkaTopicName(), producer); + command.fromFile(TURTLE_FILE, visibilities); + } - try (final KafkaConsumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { + // Read a VisibilityBindingSets from the test topic. + final List<VisibilityStatement> read;// = new ArrayList<>(); + try(Consumer<String, VisibilityStatement> consumer = + KafkaTestUtil.fromStartConsumer(rule, StringDeserializer.class, VisibilityStatementDeserializer.class)) { consumer.subscribe(Arrays.asList(rule.getKafkaTopicName())); - final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(2000); - - assertEquals(3, records.count()); - final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = records.iterator(); - while(iter.hasNext()) { - final VisibilityStatement visiSet = iter.next().value(); - read.add(visiSet); - } + read = KafkaTestUtil.pollForResults(500, 6, 3, consumer); } final List<VisibilityStatement> original = new ArrayList<>(); @@ -121,4 +96,4 @@ public class KafkaLoadStatementsIT extends KafkaITBase { // Show the written statement matches the read one. assertEquals(original, read); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java new file mode 100644 index 0000000..1b58b42 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +/** + * Integration tests the methods of {@link StatementPatternProcessor}. + */ +public class StatementPatternProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void statementPatternMatches() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the StatementPattern object that will be evaluated. + final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { ?person <urn:talksTo> ?otherPerson }"); + + // Setup a topology. + final TopologyBuilder builder = new TopologyBuilder(); + + // The topic that Statements are written to is used as a source. + builder.addSource("STATEMENTS", new StringDeserializer(), new VisibilityStatementDeserializer(), statementsTopic); + + // Add a processor that handles the first statement pattern. + builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS"); + + // Add a processor that formats the VisibilityBindingSet for output. + builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, "SP1"); + + // Add a sink that writes the data out to a new Kafka topic. + builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), new VisibilityBindingSetSerializer(), "SINK_FORMATTER"); + + // Start the streams program. + final Properties props = kafka.createBootstrapServerConfig(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "StatementPatternProcessorIT"); + + final KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(props)); + streams.cleanUp(); + try { + streams.start(); + + // Wait for the streams application to start. Streams only see data after their consumers are connected. + Thread.sleep(2000); + + // Load some data into the input topic. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { + new KafkaLoadStatements(statementsTopic, producer).fromCollection(statements); + } + + // Wait for the final results to appear in the output topic and verify the expected Binding Set was found. + try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer( + kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { + // Register the topic. + consumer.subscribe(Arrays.asList(resultsTopic)); + + // Poll for the result. + final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 1, consumer); + + // Show the correct binding set results from the job. + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("otherPerson", vf.createURI("urn:Bob")); + final VisibilityBindingSet expected = new VisibilityBindingSet(bs, "a"); + + final VisibilityBindingSet result = results.iterator().next(); + assertEquals(expected, result); + } + } finally { + streams.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java index 9e89ca7..ff2b59b 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java @@ -22,22 +22,18 @@ import static org.junit.Assert.assertEquals; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.UUID; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.streams.api.queries.ChangeLogEntry; import org.apache.rya.streams.api.queries.QueryChange; import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; +import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; import org.apache.rya.test.kafka.KafkaITBase; @@ -55,11 +51,10 @@ import info.aduna.iteration.CloseableIteration; * Integration tests the {@link KafkaQueryChangeLog}. */ public class KafkaQueryChangeLogIT extends KafkaITBase { - KafkaQueryChangeLog changeLog; + private KafkaQueryChangeLog changeLog; private Producer<?, QueryChange> producer; private Consumer<?, QueryChange> consumer; - private String topic; @Rule @@ -68,25 +63,14 @@ public class KafkaQueryChangeLogIT extends KafkaITBase { @Before public void setup() { topic = rule.getKafkaTopicName(); - final Properties producerProperties = rule.createBootstrapServerConfig(); - producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); - - final Properties consumerProperties = rule.createBootstrapServerConfig(); - consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); - producer = new KafkaProducer<>(producerProperties); - consumer = new KafkaConsumer<>(consumerProperties); + producer = KafkaTestUtil.makeProducer(rule, StringSerializer.class, QueryChangeSerializer.class); + consumer = KafkaTestUtil.fromStartConsumer(rule, StringDeserializer.class, QueryChangeDeserializer.class); changeLog = new KafkaQueryChangeLog(producer, consumer, topic); } @After public void cleanup() { - producer.flush(); producer.close(); - consumer.close(); } @@ -202,4 +186,4 @@ public class KafkaQueryChangeLogIT extends KafkaITBase { } return changes; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java index 6104578..f9129ff 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java @@ -21,18 +21,15 @@ package org.apache.rya.streams.kafka.serialization; import static org.junit.Assert.assertEquals; import java.util.Arrays; -import java.util.Properties; -import java.util.UUID; +import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; @@ -49,7 +46,7 @@ public class VisibilityBindingSetKafkaIT { public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Test - public void readAndWrite() { + public void readAndWrite() throws Exception { // Create the object that will be written to the topic. final ValueFactory vf = new ValueFactoryImpl(); @@ -59,32 +56,23 @@ public class VisibilityBindingSetKafkaIT { final VisibilityBindingSet original = new VisibilityBindingSet(bs, "a|b|c"); // Write a VisibilityBindingSet to the test topic. - final Properties producerProps = kafka.createBootstrapServerConfig(); - producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityBindingSetSerializer.class.getName()); - - try(final KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<>(producerProps)) { + try(Producer<String, VisibilityBindingSet> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityBindingSetSerializer.class)) { producer.send( new ProducerRecord<String, VisibilityBindingSet>(kafka.getKafkaTopicName(), original) ); } // Read a VisibilityBindingSet from the test topic. - VisibilityBindingSet read; - - final Properties consumerProps = kafka.createBootstrapServerConfig(); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityBindingSetDeserializer.class.getName()); - - try(final KafkaConsumer<String, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps)) { + try(Consumer<String, VisibilityBindingSet> consumer = KafkaTestUtil.fromStartConsumer( + kafka, StringDeserializer.class, VisibilityBindingSetDeserializer.class)) { + // Register the topic. consumer.subscribe(Arrays.asList(kafka.getKafkaTopicName())); - final ConsumerRecords<String, VisibilityBindingSet> records = consumer.poll(1000); - assertEquals(1, records.count()); - read = records.iterator().next().value(); - } + // Poll for the result. + final List<VisibilityBindingSet> results = KafkaTestUtil.pollForResults(500, 6, 1, consumer); - // Show the written statement matches the read one. - assertEquals(original, read); + // Show the written statement matches the read one. + final VisibilityBindingSet read = results.iterator().next(); + assertEquals(original, read); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java index 62122bd..b85eb0c 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java @@ -21,18 +21,15 @@ package org.apache.rya.streams.kafka.serialization; import static org.junit.Assert.assertEquals; import java.util.Arrays; -import java.util.Properties; -import java.util.UUID; +import java.util.List; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTestUtil; import org.apache.rya.test.kafka.KafkaTestInstanceRule; import org.junit.Rule; import org.junit.Test; @@ -48,7 +45,7 @@ public class VisibilityStatementKafkaIT { public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); @Test - public void readAndWrite() { + public void readAndWrite() throws Exception { // Create the object that will be written to the topic. final ValueFactory vf = new ValueFactoryImpl(); final VisibilityStatement original = new VisibilityStatement( @@ -60,32 +57,23 @@ public class VisibilityStatementKafkaIT { "a|b|c"); // Write a VisibilityStatement to the test topic. - final Properties producerProps = kafka.createBootstrapServerConfig(); - producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VisibilityStatementSerializer.class.getName()); - - try(final KafkaProducer<String, VisibilityStatement> producer = new KafkaProducer<>(producerProps)) { + try(Producer<String, VisibilityStatement> producer = KafkaTestUtil.makeProducer( + kafka, StringSerializer.class, VisibilityStatementSerializer.class)) { producer.send( new ProducerRecord<String, VisibilityStatement>(kafka.getKafkaTopicName(), original) ); } // Read a VisibilityStatement from the test topic. - VisibilityStatement read; - - final Properties consumerProps = kafka.createBootstrapServerConfig(); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VisibilityStatementDeserializer.class.getName()); - - try(final KafkaConsumer<String, VisibilityStatement> consumer = new KafkaConsumer<>(consumerProps)) { + try(Consumer<String, VisibilityStatement> consumer = KafkaTestUtil.fromStartConsumer( + kafka, StringDeserializer.class, VisibilityStatementDeserializer.class)) { + // Register the topic. consumer.subscribe(Arrays.asList(kafka.getKafkaTopicName())); - final ConsumerRecords<String, VisibilityStatement> records = consumer.poll(1000); - assertEquals(1, records.count()); - read = records.iterator().next().value(); - } + // Poll for the result. + final List<VisibilityStatement> results = KafkaTestUtil.pollForResults(500, 6, 1, consumer); - // Show the written statement matches the read one. - assertEquals(original, read); + // Show the written statement matches the read one. + final VisibilityStatement read = results.iterator().next(); + assertEquals(original, read); + } } } \ No newline at end of file