RYA-377 add temporal equals function Also added core functionality for adding other temporal functions.
Updated geo tests to remove temporal stuff. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/bc925911 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/bc925911 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/bc925911 Branch: refs/heads/master Commit: bc92591185ee8cc4598daf5f30bf4cf5235c23bb Parents: bd36443 Author: Andrew Smith <smith...@gmail.com> Authored: Thu Nov 30 13:11:49 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../api/function/temporal/EqualsTemporal.java | 46 +++++++ .../temporal/TemporalRelationFunction.java | 65 ++++++++++ ...f.query.algebra.evaluation.function.Function | 17 +++ .../temporal/TemporalFunctionsTest.java | 75 +++++++++++ .../kafka/processors/filter/GeoFilterIT.java | 6 +- .../processors/filter/TemporalFilterIT.java | 127 +++++++++++++++++++ 6 files changed, 332 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/EqualsTemporal.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/EqualsTemporal.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/EqualsTemporal.java new file mode 100644 index 0000000..c8a6041 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/EqualsTemporal.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.api.function.temporal; + +import java.time.ZonedDateTime; +import java.util.Objects; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Filter function in a SPARQL query used to filter equality over time. + */ +@DefaultAnnotation(NonNull.class) +public class EqualsTemporal extends TemporalRelationFunction { + private static final String URI = BASE_URI + "equals"; + + @Override + public String getURI() { + return URI; + } + + @Override + protected boolean relation(final ZonedDateTime d1, final ZonedDateTime d2) { + Objects.requireNonNull(d1); + Objects.requireNonNull(d2); + return d1.isEqual(d2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/TemporalRelationFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/TemporalRelationFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/TemporalRelationFunction.java new file mode 100644 index 0000000..02710d9 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/temporal/TemporalRelationFunction.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.temporal; + +import java.time.ZonedDateTime; +import java.time.format.DateTimeParseException; + +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.function.Function; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Function for comparing 2 {@link ZonedDateTime} objects in a SPARQL filter. + */ +@DefaultAnnotation(NonNull.class) +abstract class TemporalRelationFunction implements Function { + public static final String BASE_URI = "http://rya.apache.org/ns/temporal/"; + + @Override + public Value evaluate(final ValueFactory valueFactory, final Value... args) throws ValueExprEvaluationException { + if (args.length != 2) { + throw new ValueExprEvaluationException(getURI() + " requires exactly 2 arguments, got " + args.length); + } + + try { + final ZonedDateTime date1 = ZonedDateTime.parse(args[0].stringValue()); + final ZonedDateTime date2 = ZonedDateTime.parse(args[1].stringValue()); + final boolean result = relation(date1, date2); + + return valueFactory.createLiteral(result); + } catch (final DateTimeParseException e) { + throw new ValueExprEvaluationException("Date/Times provided must be of the ISO-8601 format. Example: 2007-04-05T14:30Z"); + } + } + + /** + * The comparison function to perform between 2 {@link ZonedDateTime} + * objects. + * + * @param d1 first {@link ZonedDateTime} to compare. (not null) + * @param d2 second {@link ZonedDateTime} to compare. (not null) + * @return The result of the comparison between {@link ZonedDateTime}s. + */ + protected abstract boolean relation(ZonedDateTime d1, ZonedDateTime d2); +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/common/rya.api.function/src/main/resources/META-INF/services/org.openrdf.query.algebra.evaluation.function.Function ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/resources/META-INF/services/org.openrdf.query.algebra.evaluation.function.Function b/common/rya.api.function/src/main/resources/META-INF/services/org.openrdf.query.algebra.evaluation.function.Function new file mode 100644 index 0000000..475b9dd --- /dev/null +++ b/common/rya.api.function/src/main/resources/META-INF/services/org.openrdf.query.algebra.evaluation.function.Function @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.rya.api.function.temporal.EqualsTemporal \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/common/rya.api.function/src/main/test/org/apache/rya/api/function/temporal/TemporalFunctionsTest.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/test/org/apache/rya/api/function/temporal/TemporalFunctionsTest.java b/common/rya.api.function/src/main/test/org/apache/rya/api/function/temporal/TemporalFunctionsTest.java new file mode 100644 index 0000000..e0dabe1 --- /dev/null +++ b/common/rya.api.function/src/main/test/org/apache/rya/api/function/temporal/TemporalFunctionsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.temporal; + +import static org.junit.Assert.assertEquals; + +import java.time.ZonedDateTime; + +import org.junit.Test; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; + +public class TemporalFunctionsTest { + private static final ZonedDateTime TIME = ZonedDateTime.parse("2015-12-30T12:00:00Z"); + private static final ZonedDateTime TIME_10 = ZonedDateTime.parse("2015-12-30T12:00:10Z"); + private static final ZonedDateTime TIME_20 = ZonedDateTime.parse("2015-12-30T12:00:20Z"); + + final ValueFactory VF = ValueFactoryImpl.getInstance(); + + @Test + public void testEquals_equal() throws Exception { + final EqualsTemporal function = new EqualsTemporal(); + + // 2 times equal + final Value[] args = new Value[2]; + args[0] = VF.createLiteral(TIME.toString()); + args[1] = VF.createLiteral(TIME.toString()); + final Value rez = function.evaluate(VF, args); + + assertEquals(VF.createLiteral(true), rez); + } + + @Test + public void testEquals_before() throws Exception { + final EqualsTemporal function = new EqualsTemporal(); + + // first time is before + final Value[] args = new Value[2]; + args[0] = VF.createLiteral(TIME.toString()); + args[1] = VF.createLiteral(TIME_10.toString()); + final Value rez = function.evaluate(VF, args); + + assertEquals(VF.createLiteral(false), rez); + } + + @Test + public void testEquals_after() throws Exception { + final EqualsTemporal function = new EqualsTemporal(); + + // first time is after + final Value[] args = new Value[2]; + args[0] = VF.createLiteral(TIME_20.toString()); + args[1] = VF.createLiteral(TIME_10.toString()); + final Value rez = function.evaluate(VF, args); + + assertEquals(VF.createLiteral(false), rez); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java index b9d81ef..642ecbc 100644 --- a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java @@ -57,7 +57,7 @@ import com.vividsolutions.jts.geom.GeometryFactory; import com.vividsolutions.jts.io.WKTWriter; /** - * Integration tests the methods of {@link FilterProcessor}. + * Integration tests the geo methods of {@link FilterProcessor}. */ public class GeoFilterIT { private static final String GEO = "http://www.opengis.net/def/function/geosparql/"; @@ -92,9 +92,7 @@ public class GeoFilterIT { // Get the RDF model objects that will be used to build the query. final String sparql = - "PREFIX time: <http://www.w3.org/2006/time#> \n" - + "PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> \n" - + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n" + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n" + "PREFIX geof: <" + GEO + ">\n" + "SELECT * \n" + "WHERE { \n" http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/bc925911/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java new file mode 100644 index 0000000..2bc98ca --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.junit.Assert.assertEquals; + +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.function.Function; +import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests the temporal methods of {@link FilterProcessor}. + */ +public class TemporalFilterIT { + private static final ValueFactory vf = new ValueFactoryImpl(); + private static final String TEMPORAL = "http://rya.apache.org/ns/temporal"; + private static final ZonedDateTime time1 = ZonedDateTime.parse("2015-12-30T12:00:00Z"); + private static final ZonedDateTime time2 = ZonedDateTime.parse("2015-12-30T12:00:10Z"); + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Test + public void temporalFunctionsRegistered() { + int count = 0; + final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); + for (final Function fun : funcs) { + if (fun.getURI().startsWith(TEMPORAL)) { + count++; + } + } + + // There are 1 temporal functions registered, ensure that there are 1. + assertEquals(1, count); + } + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time/> \n" + + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:time> time:atTime ?date .\n" + + " FILTER(tempf:equals(?date, \"" + time1.toString() + "\")) " + + "}"; + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("date", vf.createLiteral(time1.toString())); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, 2000, statements, expected, VisibilityBindingSetDeserializer.class); + } + + private List<VisibilityStatement> getStatements() throws Exception { + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(statement(time1), "a")); + statements.add(new VisibilityStatement(statement(time2), "a")); + return statements; + } + + private static Statement statement(final ZonedDateTime time) { + final Resource subject = vf.createURI("urn:time"); + final URI predicate = vf.createURI("http://www.w3.org/2006/time/atTime"); + final Value object = vf.createLiteral(time.toString()); + return new StatementImpl(subject, predicate, object); + } +} \ No newline at end of file