RYA-377 Code review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f0725df5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f0725df5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f0725df5 Branch: refs/heads/master Commit: f0725df5921a45adc37bc5cf73b5f70dfd886ac5 Parents: f365521 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Mon Jan 8 16:41:30 2018 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:02 2018 -0500 ---------------------------------------------------------------------- common/rya.api.function/pom.xml | 4 +- .../function/aggregation/AverageFunction.java | 3 + .../api/function/aggregation/CountFunction.java | 3 + .../api/function/aggregation/MaxFunction.java | 3 + .../api/function/aggregation/MinFunction.java | 3 + .../api/function/aggregation/SumFunction.java | 3 + .../apache/rya/api/utils/CloseableIterator.java | 31 ++++++++ .../accumulo/AccumuloBatchUpdatePCJIT.java | 2 +- .../TimestampedNotificationProcessor.java | 2 +- .../PeriodicNotificationApplicationIT.java | 2 +- .../pruner/PeriodicNotificationBinPrunerIT.java | 14 ++-- .../pcj/storage/PeriodicQueryResultStorage.java | 28 +++---- .../pcj/storage/PrecomputedJoinStorage.java | 11 +-- .../storage/accumulo/AccumuloPcjStorage.java | 1 + .../AccumuloPeriodicQueryResultStorage.java | 2 +- .../AccumuloValueBindingSetIterator.java | 2 +- .../pcj/storage/accumulo/PcjTables.java | 3 +- .../accumulo/ScannerBindingSetIterator.java | 2 +- .../pcj/storage/accumulo/PcjTablesIT.java | 4 +- .../integration/AccumuloPcjStorageIT.java | 2 +- .../AccumuloPeriodicQueryResultStorageIT.java | 84 ++++++++++---------- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 2 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 2 +- .../indexing/pcj/fluo/integration/InputIT.java | 2 +- .../indexing/pcj/fluo/integration/QueryIT.java | 2 +- .../RyaInputIncrementalUpdateIT.java | 2 +- .../pcj/fluo/integration/StreamingTestIT.java | 2 +- .../pcj/fluo/visibility/PcjVisibilityIT.java | 2 +- extras/rya.streams/api/pom.xml | 5 -- extras/rya.streams/kafka/pom.xml | 2 +- .../kafka/processors/ProcessorResult.java | 2 +- .../processors/join/CloseableIterator.java | 32 -------- .../processors/join/JoinProcessorSupplier.java | 3 +- .../kafka/processors/join/JoinStateStore.java | 1 + .../processors/join/KeyValueJoinStateStore.java | 1 + extras/rya.streams/pom.xml | 10 ++- pom.xml | 2 +- 37 files changed, 146 insertions(+), 135 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/pom.xml ---------------------------------------------------------------------- diff --git a/common/rya.api.function/pom.xml b/common/rya.api.function/pom.xml index ce88e36..5b7ee0a 100644 --- a/common/rya.api.function/pom.xml +++ b/common/rya.api.function/pom.xml @@ -27,8 +27,8 @@ under the License. <version>3.2.12-incubating-SNAPSHOT</version> </parent> - <artifactId>rya.api.function</artifactId> - <name>Apache Rya Common API - Functions</name> + <artifactId>rya.api.evaluation</artifactId> + <name>Apache Rya Common API - Evaluation Functions</name> <dependencies> <!-- Rya dependencies. --> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java index a73d5ac..4a31fce 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java @@ -19,6 +19,7 @@ package org.apache.rya.api.function.aggregation; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import java.math.BigDecimal; import java.math.BigInteger; @@ -51,6 +52,8 @@ public final class AverageFunction implements AggregationFunction { @Override public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements."); + requireNonNull(state); + requireNonNull(childBindingSet); // Only update the average if the child contains the binding that we are averaging. final String aggregatedName = aggregation.getAggregatedBindingName(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java index 7dd5b21..879df5e 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java @@ -19,6 +19,7 @@ package org.apache.rya.api.function.aggregation; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import java.math.BigInteger; @@ -39,6 +40,8 @@ public final class CountFunction implements AggregationFunction { @Override public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); + requireNonNull(state); + requireNonNull(childBindingSet); // Only add one to the count if the child contains the binding that we are counting. final String aggregatedName = aggregation.getAggregatedBindingName(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java index 3295fbb..5b5d493 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java @@ -19,6 +19,7 @@ package org.apache.rya.api.function.aggregation; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import org.apache.rya.api.model.VisibilityBindingSet; import org.openrdf.model.Value; @@ -40,6 +41,8 @@ public final class MaxFunction implements AggregationFunction { @Override public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements."); + requireNonNull(state); + requireNonNull(childBindingSet); // Only update the max if the child contains the binding that we are finding the max value for. final String aggregatedName = aggregation.getAggregatedBindingName(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java index d6bf751..f1b083c 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java @@ -19,6 +19,7 @@ package org.apache.rya.api.function.aggregation; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import org.apache.rya.api.model.VisibilityBindingSet; import org.openrdf.model.Value; @@ -40,6 +41,8 @@ public final class MinFunction implements AggregationFunction { @Override public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements."); + requireNonNull(state); + requireNonNull(childBindingSet); // Only update the min if the child contains the binding that we are finding the min value for. final String aggregatedName = aggregation.getAggregatedBindingName(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java index 97735f2..7ddc9ae 100644 --- a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java @@ -19,6 +19,7 @@ package org.apache.rya.api.function.aggregation; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import java.math.BigInteger; @@ -48,6 +49,8 @@ public final class SumFunction implements AggregationFunction { @Override public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements."); + requireNonNull(state); + requireNonNull(childBindingSet); // Only add values to the sum if the child contains the binding that we are summing. final String aggregatedName = aggregation.getAggregatedBindingName(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java new file mode 100644 index 0000000..c29f5e0 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/utils/CloseableIterator.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import java.util.Iterator; + +/** + * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources + * that need to be released once you are done iterating. + * + * @param <T> The type of object that is iterated over. + */ +public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java index 40941c8..5028454 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java @@ -27,11 +27,11 @@ import org.apache.rya.accumulo.AccumuloITBase; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.api.client.Install.InstallConfiguration; import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.sail.config.RyaSailFactory; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java index ae586da..dcc47b6 100644 --- a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java +++ b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java @@ -22,8 +22,8 @@ import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.periodic.notification.api.BinPruner; import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.NodeBin; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java index 92e3276..cd06f2a 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; @@ -61,7 +62,6 @@ import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.apache.rya.periodic.notification.notification.CommandNotification; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java index 830fa46..ac2202c 100644 --- a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java +++ b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java @@ -39,6 +39,7 @@ import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.Span; import org.apache.fluo.core.client.FluoClientImpl; import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; @@ -48,7 +49,6 @@ import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; @@ -68,7 +68,7 @@ import com.google.common.collect.Sets; public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { - + @Test public void periodicPrunerTest() throws Exception { @@ -238,7 +238,7 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { pruner.stop(); } - + private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception { try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) { Set<BindingSet> actual = new HashSet<>(); @@ -248,13 +248,13 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { Assert.assertEquals(expected, actual); } } - + private void compareFluoCounts(FluoClient client, String pcjId, long bin) { QueryBindingSet bs = new QueryBindingSet(); bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new LiteralImpl(Long.toString(bin), XMLSchema.LONG)); - + VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID); - + try(Snapshot sx = client.newSnapshot()) { String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); Set<String> ids = new HashSet<>(); @@ -279,5 +279,5 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { } } } - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java index 6637dde..2936738 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Optional; import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.query.BindingSet; @@ -32,7 +32,7 @@ import org.openrdf.query.BindingSet; * */ public interface PeriodicQueryResultStorage { - + /** * Binding name for the periodic bin id */ @@ -45,27 +45,27 @@ public interface PeriodicQueryResultStorage { * @throws PeriodicQueryStorageException */ public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException; - + /** * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id * @param queryId - id of the storage layer for the given SPARQL query * @param sparql - SPARQL query whose periodic results will be stored - * @return - id of the storage layer + * @return - id of the storage layer * @throws PeriodicQueryStorageException */ public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException; - + /** * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id * whose results are written in the order indicated by the specified VariableOrder. * @param queryId - id of the storage layer for the given SPARQL query * @param sparql - SPARQL query whose periodic results will be stored * @param varOrder - VariableOrder indicating the order that results will be written in - * @return - id of the storage layer + * @return - id of the storage layer * @throws PeriodicQueryStorageException */ public void createPeriodicQuery(String queryId, String sparql, VariableOrder varOrder) throws PeriodicQueryStorageException; - + /** * Retrieve the {@link PeriodicQueryStorageMetdata} for the give query id * @param queryID - id of the query whose metadata will be returned @@ -73,7 +73,7 @@ public interface PeriodicQueryResultStorage { * @throws PeriodicQueryStorageException */ public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryID) throws PeriodicQueryStorageException;; - + /** * Add periodic query results to the storage layer indicated by the given query id * @param queryId - id indicating the storage layer that results will be added to @@ -81,7 +81,7 @@ public interface PeriodicQueryResultStorage { * @throws PeriodicQueryStorageException */ public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException;; - + /** * Deletes periodic query results from the storage layer * @param queryId - id indicating the storage layer that results will be deleted from @@ -89,14 +89,14 @@ public interface PeriodicQueryResultStorage { * @throws PeriodicQueryStorageException */ public void deletePeriodicQueryResults(String queryId, long binID) throws PeriodicQueryStorageException;; - + /** - * Deletes all results for the storage layer indicated by the given query id + * Deletes all results for the storage layer indicated by the given query id * @param queryID - id indicating the storage layer whose results will be deleted * @throws PeriodicQueryStorageException */ public void deletePeriodicQuery(String queryID) throws PeriodicQueryStorageException;; - + /** * List results in the given storage layer indicated by the query id * @param queryId - id indicating the storage layer whose results will be listed @@ -105,11 +105,11 @@ public interface PeriodicQueryResultStorage { * @throws PeriodicQueryStorageException */ public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binID) throws PeriodicQueryStorageException;; - + /** * List all storage tables containing periodic results. * @return List of Strings with names of all tables containing periodic results */ public List<String> listPeriodicTables(); - + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java index 4988035..70c8b0e 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.List; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.openrdf.query.BindingSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -103,16 +104,6 @@ public interface PrecomputedJoinStorage extends AutoCloseable { public void close() throws PCJStorageException; /** - * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources - * that need to be released once you are done iterating. - * - * @param <T> The type of object that is iterated over. - */ - public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { - - } - - /** * An operation of {@link PrecomputedJoinStorage} failed. */ public static class PCJStorageException extends PcjException { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java index 3d0f11b..f3d078d 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java @@ -39,6 +39,7 @@ import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryExce import org.apache.rya.api.instance.RyaDetailsUpdater; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PCJIdFactory; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java index f8547f5..8124aff 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java @@ -38,11 +38,11 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.io.Text; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PCJIdFactory; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.openrdf.model.impl.LiteralImpl; import org.openrdf.model.vocabulary.XMLSchema; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java index ff8ff14..c488d36 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java @@ -25,8 +25,8 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.fluo.api.data.Bytes; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.openrdf.query.BindingSet; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java index 40db32a..9346c00 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java @@ -56,10 +56,9 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java index 26fd8c9..b457dfd 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java @@ -27,7 +27,7 @@ import java.util.Map.Entry; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.openrdf.query.BindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java index e689f9d..b95c812 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java @@ -45,9 +45,9 @@ import org.apache.rya.accumulo.MiniAccumuloSingleton; import org.apache.rya.accumulo.RyaTestInstanceRule; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.rdftriplestore.RdfCloudTripleStore; @@ -120,7 +120,7 @@ public class PcjTablesIT { private String getRyaInstanceName() { return testInstance.getRyaInstanceName(); } - + /** * Format a Mini Accumulo to be a Rya repository. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java index 5ba5e40..33571f7 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java @@ -39,9 +39,9 @@ import org.apache.rya.api.instance.RyaDetailsRepository; import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java index 1e21bf2..2d9da4d 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPeriodicQueryResultStorageIT.java @@ -30,10 +30,10 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.security.Authorizations; import org.apache.rya.accumulo.AccumuloITBase; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PeriodicQueryTableNameFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; @@ -53,86 +53,86 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { private static final String RYA = "rya_"; private static final PeriodicQueryTableNameFactory nameFactory = new PeriodicQueryTableNameFactory(); private static final ValueFactory vf = new ValueFactoryImpl(); - + @Before public void init() throws AccumuloException, AccumuloSecurityException { super.getConnector().securityOperations().changeUserAuthorizations("root", new Authorizations("U")); periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getConnector(), RYA); } - - + + @Test public void testCreateAndMeta() throws PeriodicQueryStorageException { - + String sparql = "select ?x where { ?x <urn:pred> ?y.}"; VariableOrder varOrder = new VariableOrder("periodicBinId", "x"); PeriodicQueryStorageMetadata expectedMeta = new PeriodicQueryStorageMetadata(sparql, varOrder); - + String id = periodicStorage.createPeriodicQuery(sparql); Assert.assertEquals(expectedMeta, periodicStorage.getPeriodicQueryMetadata(id)); Assert.assertEquals(Arrays.asList(nameFactory.makeTableName(RYA, id)), periodicStorage.listPeriodicTables()); periodicStorage.deletePeriodicQuery(id); } - - + + @Test public void testAddListDelete() throws Exception { - + String sparql = "select ?x where { ?x <urn:pred> ?y.}"; String id = periodicStorage.createPeriodicQuery(sparql); - + Set<BindingSet> expected = new HashSet<>(); Set<VisibilityBindingSet> storageSet = new HashSet<>(); - + //add result matching user's visibility QueryBindingSet bs = new QueryBindingSet(); bs.addBinding("periodicBinId", vf.createLiteral(1L)); bs.addBinding("x",vf.createURI("uri:uri123")); expected.add(bs); storageSet.add(new VisibilityBindingSet(bs,"U")); - + //add result with different visibility that is not expected bs = new QueryBindingSet(); bs.addBinding("periodicBinId", vf.createLiteral(1L)); bs.addBinding("x",vf.createURI("uri:uri456")); storageSet.add(new VisibilityBindingSet(bs,"V")); - + periodicStorage.addPeriodicQueryResults(id, storageSet); - + Set<BindingSet> actual = new HashSet<>(); try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) { iter.forEachRemaining(x -> actual.add(x)); } - + Assert.assertEquals(expected, actual); - + periodicStorage.deletePeriodicQueryResults(id, 1L); - + Set<BindingSet> actual2 = new HashSet<>(); try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(1L))) { iter.forEachRemaining(x -> actual2.add(x)); } - + Assert.assertEquals(new HashSet<>(), actual2); periodicStorage.deletePeriodicQuery(id); - + } - + @Test public void multiBinTest() throws PeriodicQueryStorageException, Exception { - + String sparql = "prefix function: <http://org.apache.rya/function#> " //n + "prefix time: <http://www.w3.org/2006/time#> " //n + "select ?id (count(?obs) as ?total) where {" //n + "Filter(function:periodic(?time, 2, .5, time:hours)) " //n + "?obs <uri:hasTime> ?time. " //n + "?obs <uri:hasId> ?id } group by ?id"; //n - - + + final ValueFactory vf = new ValueFactoryImpl(); long currentTime = System.currentTimeMillis(); String queryId = UUID.randomUUID().toString().replace("-", ""); - + // Create the expected results of the SPARQL query once the PCJ has been computed. final Set<BindingSet> expected1 = new HashSet<>(); final Set<BindingSet> expected2 = new HashSet<>(); @@ -142,81 +142,81 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { long period = 1800000; long binId = (currentTime/period)*period; - + MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expected1.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expected1.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expected1.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId)); expected1.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expected2.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expected2.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + period)); expected2.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); expected3.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + 2*period)); expected3.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - + bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER)); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); bs.addBinding("periodicBinId", vf.createLiteral(binId + 3*period)); expected4.add(bs); storageResults.add(new VisibilityBindingSet(bs)); - - + + String id = periodicStorage.createPeriodicQuery(queryId, sparql); periodicStorage.addPeriodicQueryResults(queryId, storageResults); - + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) { Set<BindingSet> actual1 = new HashSet<>(); while(iter.hasNext()) { @@ -224,7 +224,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { } Assert.assertEquals(expected1, actual1); } - + periodicStorage.deletePeriodicQueryResults(queryId, binId); try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId))) { Set<BindingSet> actual1 = new HashSet<>(); @@ -233,7 +233,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { } Assert.assertEquals(Collections.emptySet(), actual1); } - + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) { Set<BindingSet> actual2 = new HashSet<>(); while(iter.hasNext()) { @@ -241,7 +241,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { } Assert.assertEquals(expected2, actual2); } - + periodicStorage.deletePeriodicQueryResults(queryId, binId + period); try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + period))) { Set<BindingSet> actual2 = new HashSet<>(); @@ -250,7 +250,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { } Assert.assertEquals(Collections.emptySet(), actual2); } - + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 2*period))) { Set<BindingSet> actual3 = new HashSet<>(); while(iter.hasNext()) { @@ -258,7 +258,7 @@ public class AccumuloPeriodicQueryResultStorageIT extends AccumuloITBase { } Assert.assertEquals(expected3, actual3); } - + try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(binId + 3*period))) { Set<BindingSet> actual4 = new HashSet<>(); while(iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index f2e8cf9..5493a5f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -62,7 +62,7 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> - <artifactId>rya.api.function</artifactId> + <artifactId>rya.api.evaluation</artifactId> </dependency> <!-- 3rd Party Runtime Dependencies. --> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 3fea6ed..181f322 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -33,12 +33,12 @@ import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.rdftriplestore.RyaSailRepository; import org.openrdf.model.Statement; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index d623043..866d32b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -29,10 +29,10 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 3e72f1b..610f502 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -39,11 +39,11 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.functions.DateTimeWithinPeriod; import org.apache.rya.api.functions.OWLTime; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index 5cd3ab1..65083e8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -27,10 +27,10 @@ import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.external.PrecomputedJoinIndexer; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index e83a894..6135920 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -28,9 +28,9 @@ import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.log4j.Logger; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index 8529bd5..90ed01a 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -47,12 +47,12 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml index 250028f..55c0e79 100644 --- a/extras/rya.streams/api/pom.xml +++ b/extras/rya.streams/api/pom.xml @@ -54,11 +54,6 @@ under the License. <artifactId>guava</artifactId> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <!-- Test dependences --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml index 778630d..16b07b2 100644 --- a/extras/rya.streams/kafka/pom.xml +++ b/extras/rya.streams/kafka/pom.xml @@ -60,7 +60,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> - <artifactId>rya.api.function</artifactId> + <artifactId>rya.api.evaluation</artifactId> </dependency> <dependency> <groupId>org.apache.rya</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java index 5f7a06b..124bc76 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java @@ -36,7 +36,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; * {@link VisibilityBindingSet} because some downstream processors require more information about * which upstream processor is emitting the result in order to do their work. * </p> - * Currently there are only two types processors: + * Currently there are only two types of processors: * <ul> * <li>Unary Processor - A processor that only has a single upstream node feeding it input.</li> * <li>Binary Processor - A processor that has two upstream nodes feeding it input.</li> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java deleted file mode 100644 index 9ea927d..0000000 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.streams.kafka.processors.join; - -import java.util.Iterator; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * An {@link Iterator} that is also {@link AutoCloseable}. - * - * @param <T> - The type of elements that will be iterated over. - */ -@DefaultAnnotation(NonNull.class) -public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java index 9ed2363..367ca6f 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.rya.api.function.join.IterativeJoin; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.streams.kafka.processors.ProcessorResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; @@ -75,7 +76,7 @@ public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier { this.allVars = requireNonNull(allVars); if(!allVars.subList(0, joinVars.size()).equals(joinVars)) { - throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + + throw new IllegalArgumentException("The allVars list must start with the joinVars list, but it did not. " + "Join Vars: " + joinVars + ", All Vars: " + allVars); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java index 17a6ebb..2afc1d8 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java @@ -19,6 +19,7 @@ package org.apache.rya.streams.kafka.processors.join; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java index d12957a..254f226 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java @@ -28,6 +28,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; import org.openrdf.query.impl.MapBindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/extras/rya.streams/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/pom.xml b/extras/rya.streams/pom.xml index dd876a0..93b6b1c 100644 --- a/extras/rya.streams/pom.xml +++ b/extras/rya.streams/pom.xml @@ -38,7 +38,15 @@ <module>kafka-test</module> <module>api</module> <module>client</module> - <module>geo</module> <module>integration</module> </modules> + + <profiles> + <profile> + <id>geoindexing</id> + <modules> + <module>geo</module> + </modules> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f0725df5/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 31b17f8..99640f2 100644 --- a/pom.xml +++ b/pom.xml @@ -217,7 +217,7 @@ under the License. </dependency> <dependency> <groupId>org.apache.rya</groupId> - <artifactId>rya.api.function</artifactId> + <artifactId>rya.api.evaluation</artifactId> <version>${project.version}</version> </dependency> <dependency>