RYA-416 A new query node type to represent a MongoDB aggregation pipeline whose results can be converted to binding sets, and tools for optionally transforming some SPARQL expressions into such a node. Closes #254.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/d5ebb731 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/d5ebb731 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/d5ebb731 Branch: refs/heads/master Commit: d5ebb7315509edbc212c7481f8abe76ee0eec934 Parents: fff50c4 Author: Jesse Hatfield <[email protected]> Authored: Thu Dec 21 17:32:47 2017 -0500 Committer: caleb <[email protected]> Committed: Thu Jan 11 09:50:03 2018 -0500 ---------------------------------------------------------------------- dao/mongodb.rya/pom.xml | 5 + .../AbstractMongoDBRdfConfigurationBuilder.java | 16 + .../rya/mongodb/MongoDBRdfConfiguration.java | 40 +- .../AggregationPipelineQueryNode.java | 856 +++++++++++++++++++ .../AggregationPipelineQueryOptimizer.java | 73 ++ .../aggregation/PipelineResultIteration.java | 135 +++ .../SparqlToPipelineTransformVisitor.java | 196 +++++ .../dao/SimpleMongoDBStorageStrategy.java | 21 +- .../AggregationPipelineQueryNodeTest.java | 331 +++++++ .../mongodb/aggregation/PipelineQueryIT.java | 421 +++++++++ .../PipelineResultIterationTest.java | 152 ++++ .../SparqlToPipelineTransformVisitorTest.java | 207 +++++ 12 files changed, 2446 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/pom.xml ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/pom.xml b/dao/mongodb.rya/pom.xml index 0803aa8..0afac81 100644 --- a/dao/mongodb.rya/pom.xml +++ b/dao/mongodb.rya/pom.xml @@ -86,5 +86,10 @@ Tests will fail with the following error when using 32bit JVM on either Linux or <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java index bb14a39..369f7a0 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java @@ -43,6 +43,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM protected static final String DEFAULT_MONGO_PORT = "27017"; private String mongoCollectionPrefix = "rya_"; private String mongoDBName = "rya"; + private boolean usePipeline = false; protected static final String MONGO_USER = "mongo.user"; protected static final String MONGO_PASSWORD = "mongo.password"; @@ -142,6 +143,20 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM } /** + * Enable or disable an optimization that executes queries, to the extent + * possible, using the MongoDB aggregation pipeline. Defaults to false. + * If true, replaces a query tree or subtree with a single node representing + * a series of pipeline steps. Transformation may not be supported for all + * query algebra expressions; these expressions are left unchanged and the + * optimization is attempted on their child subtrees. + * @param usePipeline whether to use aggregation pipeline optimization. + */ + public B setUseAggregationPipeline(boolean usePipeline) { + this.usePipeline = usePipeline; + return confBuilder(); + } + + /** * @return extension of {@link MongoDBRdfConfiguration} with specified parameters set */ @Override @@ -171,6 +186,7 @@ public abstract class AbstractMongoDBRdfConfigurationBuilder<B extends AbstractM conf.setTablePrefix(mongoCollectionPrefix); conf.setMongoHostname(host); conf.setMongoPort(port); + conf.setUseAggregationPipeline(usePipeline); return conf; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java index 835ed27..44dc851 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java @@ -20,11 +20,14 @@ package org.apache.rya.mongodb; import static java.util.Objects.requireNonNull; +import java.util.List; import java.util.Properties; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.conf.Configuration; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.mongodb.aggregation.AggregationPipelineQueryOptimizer; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; import edu.umd.cs.findbugs.annotations.Nullable; @@ -51,6 +54,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; public static final String MONGO_GEO_MAXDISTANCE = "mongo.geo.maxdist"; + public static final String USE_AGGREGATION_PIPELINE = "rya.mongodb.query.pipeline"; + /** * Constructs an empty instance of {@link MongoDBRdfConfiguration}. */ @@ -251,4 +256,37 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { public void setFlush(final boolean flush){ setBoolean(CONF_FLUSH_EACH_UPDATE, flush); } -} \ No newline at end of file + + /** + * Whether aggregation pipeline optimization is enabled. + * @return true if queries will be evaluated using MongoDB aggregation. + */ + public boolean getUseAggregationPipeline() { + return getBoolean(USE_AGGREGATION_PIPELINE, false); + } + + /** + * Enable or disable an optimization that executes queries, to the extent + * possible, using the MongoDB aggregation pipeline. Replaces a query tree + * or subtree with a single node representing a series of pipeline steps. + * Transformation may not be supported for all query algebra expressions; + * these expressions are left unchanged and the optimization is attempted + * on their child subtrees. + * @param value whether to use aggregation pipeline optimization. + */ + public void setUseAggregationPipeline(boolean value) { + setBoolean(USE_AGGREGATION_PIPELINE, value); + } + + @Override + public List<Class<QueryOptimizer>> getOptimizers() { + List<Class<QueryOptimizer>> optimizers = super.getOptimizers(); + if (getUseAggregationPipeline()) { + Class<?> cl = AggregationPipelineQueryOptimizer.class; + @SuppressWarnings("unchecked") + Class<QueryOptimizer> optCl = (Class<QueryOptimizer>) cl; + optimizers.add(optCl); + } + return optimizers; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java new file mode 100644 index 0000000..7a84f5d --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java @@ -0,0 +1,856 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations to the + * end of the internal pipeline. In some cases, specific arguments may be + * unsupported, in which case the pipeline is unchanged and the method returns + * false. + */ +public class AggregationPipelineQueryNode extends ExternalSet { + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to variable values. + */ + static final String VALUES = "<VALUES>"; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to the corresponding hashes + * of their values. + */ + static final String HASHES = "<HASHES>"; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to their datatypes, if any. + */ + static final String TYPES = "<TYPES>"; + + private static final String LEVEL = "derivation_level"; + private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP }; + + private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>"; + private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>"; + + private static final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy(); + + private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue()); + private static final Bson DEFAULT_CONTEXT = new Document("$literal", ""); + private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV); + private static final Bson DEFAULT_METADATA = new Document("$literal", + StatementMetadata.EMPTY_METADATA.toString()); + + private static boolean isValidFieldName(String name) { + return !(name == null || name.contains(".") || name.contains("$") + || name.equals("_id")); + } + + /** + * For a given statement pattern, represents a mapping from query variables + * to their corresponding parts of matching triples. If necessary, also + * substitute variable names including invalid characters with temporary + * replacements, while producing a map back to the original names. + */ + private static class StatementVarMapping { + private final Map<String, String> varToTripleValue = new HashMap<>(); + private final Map<String, String> varToTripleHash = new HashMap<>(); + private final Map<String, String> varToTripleType = new HashMap<>(); + private final BiMap<String, String> varToOriginalName; + + String valueField(String varName) { + return varToTripleValue.get(varName); + } + String hashField(String varName) { + return varToTripleHash.get(varName); + } + String typeField(String varName) { + return varToTripleType.get(varName); + } + + Set<String> varNames() { + return varToTripleValue.keySet(); + } + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + private String sanitize(String name) { + if (varToOriginalName.containsValue(name)) { + return varToOriginalName.inverse().get(name); + } + else if (name != null && !isValidFieldName(name)) { + return replace(name); + } + return name; + } + + StatementVarMapping(StatementPattern sp, BiMap<String, String> varToOriginalName) { + this.varToOriginalName = varToOriginalName; + if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) { + String name = sanitize(sp.getSubjectVar().getName()); + varToTripleValue.put(name, SUBJECT); + varToTripleHash.put(name, SUBJECT_HASH); + } + if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) { + String name = sanitize(sp.getPredicateVar().getName()); + varToTripleValue.put(name, PREDICATE); + varToTripleHash.put(name, PREDICATE_HASH); + } + if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) { + String name = sanitize(sp.getObjectVar().getName()); + varToTripleValue.put(name, OBJECT); + varToTripleHash.put(name, OBJECT_HASH); + varToTripleType.put(name, OBJECT_TYPE); + } + if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) { + String name = sanitize(sp.getContextVar().getName()); + varToTripleValue.put(name, CONTEXT); + } + } + + Bson getProjectExpression() { + return getProjectExpression(new LinkedList<>(), str -> "$" + str); + } + + Bson getProjectExpression(Iterable<String> alsoInclude, + Function<String, String> getFieldExpr) { + Document values = new Document(); + Document hashes = new Document(); + Document types = new Document(); + for (String varName : varNames()) { + values.append(varName, getFieldExpr.apply(valueField(varName))); + if (varToTripleHash.containsKey(varName)) { + hashes.append(varName, getFieldExpr.apply(hashField(varName))); + } + if (varToTripleType.containsKey(varName)) { + types.append(varName, getFieldExpr.apply(typeField(varName))); + } + } + for (String varName : alsoInclude) { + values.append(varName, 1); + hashes.append(varName, 1); + types.append(varName, 1); + } + List<Bson> fields = new LinkedList<>(); + fields.add(Projections.excludeId()); + fields.add(Projections.computed(VALUES, values)); + fields.add(Projections.computed(HASHES, hashes)); + if (!types.isEmpty()) { + fields.add(Projections.computed(TYPES, types)); + } + fields.add(Projections.computed(LEVEL, new Document("$max", + Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 0)))); + fields.add(Projections.computed(TIMESTAMP, new Document("$max", + Arrays.asList("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0)))); + return Projections.fields(fields); + } + } + + /** + * Given a StatementPattern, generate an object representing the arguments + * to a "$match" command that will find matching triples. + * @param sp The StatementPattern to search for + * @param path If given, specify the field that should be matched against + * the statement pattern, using an ordered list of field names for a nested + * field. E.g. to match records { "x": { "y": <statement pattern } }, pass + * "x" followed by "y". + * @return The argument of a "$match" query + */ + private static BasicDBObject getMatchExpression(StatementPattern sp, String ... path) { + final Var subjVar = sp.getSubjectVar(); + final Var predVar = sp.getPredicateVar(); + final Var objVar = sp.getObjectVar(); + final Var contextVar = sp.getContextVar(); + RyaURI s = null; + RyaURI p = null; + RyaType o = null; + RyaURI c = null; + if (subjVar != null && subjVar.getValue() instanceof Resource) { + s = RdfToRyaConversions.convertResource((Resource) subjVar.getValue()); + } + if (predVar != null && predVar.getValue() instanceof URI) { + p = RdfToRyaConversions.convertURI((URI) predVar.getValue()); + } + if (objVar != null && objVar.getValue() != null) { + o = RdfToRyaConversions.convertValue(objVar.getValue()); + } + if (contextVar != null && contextVar.getValue() instanceof URI) { + c = RdfToRyaConversions.convertURI((URI) contextVar.getValue()); + } + RyaStatement rs = new RyaStatement(s, p, o, c); + DBObject obj = strategy.getQuery(rs); + // Add path prefix, if given + if (path.length > 0) { + StringBuilder sb = new StringBuilder(); + for (String str : path) { + sb.append(str).append("."); + } + String prefix = sb.toString(); + Set<String> originalKeys = new HashSet<>(obj.keySet()); + originalKeys.forEach(key -> { + Object value = obj.removeField(key); + obj.put(prefix + key, value); + }); + } + return (BasicDBObject) obj; + } + + private static String valueFieldExpr(String varName) { + return "$" + VALUES + "." + varName; + } + private static String hashFieldExpr(String varName) { + return "$" + HASHES + "." + varName; + } + private static String typeFieldExpr(String varName) { + return "$" + TYPES + "." + varName; + } + private static String joinFieldExpr(String triplePart) { + return "$" + JOINED_TRIPLE + "." + triplePart; + } + + /** + * Get an object representing the value field of some value expression, or + * return null if the expression isn't supported. + */ + private Object valueFieldExpr(ValueExpr expr) { + if (expr instanceof Var) { + return valueFieldExpr(((Var) expr).getName()); + } + else if (expr instanceof ValueConstant) { + return new Document("$literal", ((ValueConstant) expr).getValue().stringValue()); + } + else { + return null; + } + } + + private final List<Bson> pipeline; + private final MongoCollection<Document> collection; + private final Set<String> assuredBindingNames; + private final Set<String> bindingNames; + private final BiMap<String, String> varToOriginalName; + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + /** + * Create a pipeline query node based on a StatementPattern. + * @param collection The collection of triples to query. + * @param baseSP The leaf node in the query tree. + */ + public AggregationPipelineQueryNode(MongoCollection<Document> collection, StatementPattern baseSP) { + this.collection = Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(baseSP); + this.varToOriginalName = HashBiMap.create(); + StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName); + this.assuredBindingNames = new HashSet<>(mapping.varNames()); + this.bindingNames = new HashSet<>(mapping.varNames()); + this.pipeline = new LinkedList<>(); + this.pipeline.add(Aggregates.match(getMatchExpression(baseSP))); + this.pipeline.add(Aggregates.project(mapping.getProjectExpression())); + } + + AggregationPipelineQueryNode(MongoCollection<Document> collection, + List<Bson> pipeline, Set<String> assuredBindingNames, + Set<String> bindingNames, BiMap<String, String> varToOriginalName) { + this.collection = Preconditions.checkNotNull(collection); + this.pipeline = Preconditions.checkNotNull(pipeline); + this.assuredBindingNames = Preconditions.checkNotNull(assuredBindingNames); + this.bindingNames = Preconditions.checkNotNull(bindingNames); + this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof AggregationPipelineQueryNode) { + AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o; + if (this.collection.equals(other.collection) + && this.assuredBindingNames.equals(other.assuredBindingNames) + && this.bindingNames.equals(other.bindingNames) + && this.varToOriginalName.equals(other.varToOriginalName) + && this.pipeline.size() == other.pipeline.size()) { + // Check pipeline steps for equality -- underlying types don't + // have well-behaved equals methods, so check for equivalent + // string representations. + for (int i = 0; i < this.pipeline.size(); i++) { + Bson doc1 = this.pipeline.get(i); + Bson doc2 = other.pipeline.get(i); + if (!doc1.toString().equals(doc2.toString())) { + return false; + } + } + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(collection, pipeline, assuredBindingNames, + bindingNames, varToOriginalName); + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings); + } + + @Override + public Set<String> getAssuredBindingNames() { + Set<String> names = new HashSet<>(); + for (String name : assuredBindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public Set<String> getBindingNames() { + Set<String> names = new HashSet<>(); + for (String name : bindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public AggregationPipelineQueryNode clone() { + return new AggregationPipelineQueryNode(collection, + new LinkedList<>(pipeline), + new HashSet<>(assuredBindingNames), + new HashSet<>(bindingNames), + HashBiMap.create(varToOriginalName)); + } + + @Override + public String getSignature() { + super.getSignature(); + Set<String> assured = getAssuredBindingNames(); + Set<String> any = getBindingNames(); + StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: "); + sb.append(String.join(", ", assured)); + if (any.size() > assured.size()) { + Set<String> optionalBindingNames = any; + optionalBindingNames.removeAll(assured); + sb.append(" [") + .append(String.join(", ", optionalBindingNames)) + .append("]"); + } + sb.append(")\n"); + for (Bson doc : pipeline) { + sb.append(doc.toString()).append("\n"); + } + return sb.toString(); + } + + /** + * Get the internal list of aggregation pipeline steps. Note that documents + * resulting from this pipeline will be structured using an internal + * intermediate representation. For documents representing triples, see + * {@link #getTriplePipeline}, and for query solutions, see + * {@link #evaluate}. + * @return The current internal pipeline. + */ + List<Bson> getPipeline() { + return pipeline; + } + + /** + * Add a join with an individual {@link StatementPattern} to the pipeline. + * @param sp The statement pattern to join with + * @return true if the join was successfully added to the pipeline. + */ + public boolean joinWith(StatementPattern sp) { + Preconditions.checkNotNull(sp); + // 1. Determine shared variables and new variables + StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName); + NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames()); + sharedVars.retainAll(assuredBindingNames); + // 2. Join on one shared variable + String joinKey = sharedVars.pollFirst(); + String collectionName = collection.getNamespace().getCollectionName(); + Bson join; + if (joinKey == null) { + return false; + } + else { + join = Aggregates.lookup(collectionName, + HASHES + "." + joinKey, + spMap.hashField(joinKey), + JOINED_TRIPLE); + } + pipeline.add(join); + // 3. Unwind the joined triples so each document represents a binding + // set (solution) from the base branch and a triple that may match. + pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE)); + // 4. (Optional) If there are any shared variables that weren't used as + // the join key, project all existing fields plus a new field that + // tests the equality of those shared variables. + BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE); + if (!sharedVars.isEmpty()) { + List<Bson> eqTests = new LinkedList<>(); + for (String varName : sharedVars) { + String oldField = valueFieldExpr(varName); + String newField = joinFieldExpr(spMap.valueField(varName)); + Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField)); + eqTests.add(eqTest); + } + Bson eqProjectOpts = Projections.fields( + Projections.computed(FIELDS_MATCH, Filters.and(eqTests)), + Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)); + pipeline.add(Aggregates.project(eqProjectOpts)); + matchOpts.put(FIELDS_MATCH, true); + } + // 5. Filter for solutions whose triples match the joined statement + // pattern, and, if applicable, whose additional shared variables + // match the current solution. + pipeline.add(Aggregates.match(matchOpts)); + // 6. Project the results to include variables from the new SP (with + // appropriate renaming) and variables referenced only in the base + // pipeline (with previous names). + Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName) + .getProjectExpression(assuredBindingNames, + str -> joinFieldExpr(str)); + assuredBindingNames.addAll(spMap.varNames()); + bindingNames.addAll(spMap.varNames()); + pipeline.add(Aggregates.project(finalProjectOpts)); + return true; + } + + /** + * Add a SPARQL projection or multi-projection operation to the pipeline. + * The number of documents produced by the pipeline after this operation + * will be the number of documents entering this stage (the number of + * intermediate results) multiplied by the number of + * {@link ProjectionElemList}s supplied here. + * @param projections One or more projections, i.e. mappings from the result + * at this stage of the query into a set of variables. + * @return true if the projection(s) were added to the pipeline. + */ + public boolean project(Iterable<ProjectionElemList> projections) { + if (projections == null || !projections.iterator().hasNext()) { + return false; + } + List<Bson> projectOpts = new LinkedList<>(); + Set<String> bindingNamesUnion = new HashSet<>(); + Set<String> bindingNamesIntersection = null; + for (ProjectionElemList projection : projections) { + Document valueDoc = new Document(); + Document hashDoc = new Document(); + Document typeDoc = new Document(); + Set<String> projectionBindingNames = new HashSet<>(); + for (ProjectionElem elem : projection.getElements()) { + String to = elem.getTargetName(); + // If the 'to' name is invalid, replace it internally + if (!isValidFieldName(to)) { + to = replace(to); + } + String from = elem.getSourceName(); + // If the 'from' name is invalid, use the internal substitute + if (varToOriginalName.containsValue(from)) { + from = varToOriginalName.inverse().get(from); + } + projectionBindingNames.add(to); + if (to.equals(from)) { + valueDoc.append(to, 1); + hashDoc.append(to, 1); + typeDoc.append(to, 1); + } + else { + valueDoc.append(to, valueFieldExpr(from)); + hashDoc.append(to, hashFieldExpr(from)); + typeDoc.append(to, typeFieldExpr(from)); + } + } + bindingNamesUnion.addAll(projectionBindingNames); + if (bindingNamesIntersection == null) { + bindingNamesIntersection = new HashSet<>(projectionBindingNames); + } + else { + bindingNamesIntersection.retainAll(projectionBindingNames); + } + projectOpts.add(new Document() + .append(VALUES, valueDoc) + .append(HASHES, hashDoc) + .append(TYPES, typeDoc) + .append(LEVEL, "$" + LEVEL) + .append(TIMESTAMP, "$" + TIMESTAMP)); + } + if (projectOpts.size() == 1) { + pipeline.add(Aggregates.project(projectOpts.get(0))); + } + else { + String listKey = "PROJECTIONS"; + Bson projectIndividual = Projections.fields( + Projections.computed(VALUES, "$" + listKey + "." + VALUES), + Projections.computed(HASHES, "$" + listKey + "." + HASHES), + Projections.computed(TYPES, "$" + listKey + "." + TYPES), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts))); + pipeline.add(Aggregates.unwind("$" + listKey)); + pipeline.add(Aggregates.project(projectIndividual)); + } + assuredBindingNames.clear(); + bindingNames.clear(); + assuredBindingNames.addAll(bindingNamesIntersection); + bindingNames.addAll(bindingNamesUnion); + return true; + } + + /** + * Add a SPARQL extension to the pipeline, if possible. An extension adds + * some number of variables to the result. Adds a "$project" step to the + * pipeline, but differs from the SPARQL project operation in that + * 1) pre-existing variables are always kept, and 2) values of new variables + * are defined by expressions, which may be more complex than simply + * variable names. Not all expressions are supported. If unsupported + * expression types are used in the extension, the pipeline will remain + * unchanged and this method will return false. + * @param extensionElements A list of new variables and their expressions + * @return True if the extension was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean extend(Iterable<ExtensionElem> extensionElements) { + List<Bson> valueFields = new LinkedList<>(); + List<Bson> hashFields = new LinkedList<>(); + List<Bson> typeFields = new LinkedList<>(); + for (String varName : bindingNames) { + valueFields.add(Projections.include(varName)); + hashFields.add(Projections.include(varName)); + typeFields.add(Projections.include(varName)); + } + Set<String> newVarNames = new HashSet<>(); + for (ExtensionElem elem : extensionElements) { + String name = elem.getName(); + if (!isValidFieldName(name)) { + // If the field name is invalid, replace it internally + name = replace(name); + } + // We can only handle certain kinds of value expressions; return + // failure for any others. + ValueExpr expr = elem.getExpr(); + final Object valueField; + final Object hashField; + final Object typeField; + if (expr instanceof Var) { + String varName = ((Var) expr).getName(); + valueField = "$" + varName; + hashField = "$" + varName; + typeField = "$" + varName; + } + else if (expr instanceof ValueConstant) { + Value val = ((ValueConstant) expr).getValue(); + valueField = new Document("$literal", val.stringValue()); + hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue())); + if (val instanceof Literal) { + typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue()); + } + else { + typeField = null; + } + } + else { + // if not understood, return failure + return false; + } + valueFields.add(Projections.computed(name, valueField)); + hashFields.add(Projections.computed(name, hashField)); + if (typeField != null) { + typeFields.add(Projections.computed(name, typeField)); + } + newVarNames.add(name); + } + assuredBindingNames.addAll(newVarNames); + bindingNames.addAll(newVarNames); + Bson projectOpts = Projections.fields( + Projections.computed(VALUES, Projections.fields(valueFields)), + Projections.computed(HASHES, Projections.fields(hashFields)), + Projections.computed(TYPES, Projections.fields(typeFields)), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(projectOpts)); + return true; + } + + /** + * Add a SPARQL filter to the pipeline, if possible. A filter eliminates + * results that don't satisfy a given condition. Not all conditional + * expressions are supported. If unsupported expressions are used in the + * filter, the pipeline will remain unchanged and this method will return + * false. Currently only supports binary {@link Compare} conditions among + * variables and/or literals. + * @param condition The filter condition + * @return True if the filter was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean filter(ValueExpr condition) { + if (condition instanceof Compare) { + Compare compare = (Compare) condition; + Compare.CompareOp operator = compare.getOperator(); + Object leftArg = valueFieldExpr(compare.getLeftArg()); + Object rightArg = valueFieldExpr(compare.getRightArg()); + if (leftArg == null || rightArg == null) { + // unsupported value expression, can't convert filter + return false; + } + final String opFunc; + switch (operator) { + case EQ: + opFunc = "$eq"; + break; + case NE: + opFunc = "$ne"; + break; + case LT: + opFunc = "$lt"; + break; + case LE: + opFunc = "$le"; + break; + case GT: + opFunc = "$gt"; + break; + case GE: + opFunc = "$ge"; + break; + default: + // unrecognized comparison operator, can't convert filter + return false; + } + Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg)); + pipeline.add(Aggregates.project(Projections.fields( + Projections.computed("FILTER", compareDoc), + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + pipeline.add(Aggregates.match(new Document("FILTER", true))); + pipeline.add(Aggregates.project(Projections.fields( + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + return true; + } + return false; + } + + /** + * Add a $group step to filter out redundant solutions. + * @return True if the distinct operation was successfully appended. + */ + public boolean distinct() { + List<String> key = new LinkedList<>(); + for (String varName : bindingNames) { + key.add(hashFieldExpr(varName)); + } + List<BsonField> reduceOps = new LinkedList<>(); + for (String field : FIELDS) { + reduceOps.add(new BsonField(field, new Document("$first", "$" + field))); + } + pipeline.add(Aggregates.group(new Document("$concat", key), reduceOps)); + return true; + } + + /** + * Add a step to the end of the current pipeline which prunes the results + * according to the recorded derivation level of their sources. At least one + * triple that was used to construct the result must have a derivation level + * at least as high as the parameter, indicating that it was derived via + * that many steps from the original data. (A value of zero is equivalent to + * input data that was not derived at all.) Use in conjunction with + * getTriplePipeline (which sets source level for generated triples) to + * avoid repeatedly deriving the same results. + * @param requiredLevel Required derivation depth. Reject a solution to the + * query if all of the triples involved in producing that solution have a + * lower derivation depth than this. If zero, does nothing. + */ + public void requireSourceDerivationDepth(int requiredLevel) { + if (requiredLevel > 0) { + pipeline.add(Aggregates.match(new Document(LEVEL, + new Document("$gte", requiredLevel)))); + } + } + + /** + * Add a step to the end of the current pipeline which prunes the results + * according to the timestamps of their sources. At least one triple that + * was used to construct the result must have a timestamp at least as + * recent as the parameter. Use in iterative applications to avoid deriving + * solutions that would have been generated in an earlier iteration. + * @param t Minimum required timestamp. Reject a solution to the query if + * all of the triples involved in producing that solution have an earlier + * timestamp than this. + */ + public void requireSourceTimestamp(long t) { + pipeline.add(Aggregates.match(new Document(TIMESTAMP, + new Document("$gte", t)))); + } + + /** + * Given that the current state of the pipeline produces data that can be + * interpreted as triples, add a project step to map each result from the + * intermediate result structure to a structure that can be stored in the + * triple store. Does not modify the internal pipeline, which will still + * produce intermediate results suitable for query evaluation. + * @param timestamp Attach this timestamp to the resulting triples. + * @param requireNew If true, add an additional step to check constructed + * triples against existing triples and only include new ones in the + * result. Adds a potentially expensive $lookup step. + * @throws IllegalStateException if the results produced by the current + * pipeline do not have variable names allowing them to be interpreted as + * triples (i.e. "subject", "predicate", and "object"). + */ + public List<Bson> getTriplePipeline(long timestamp, boolean requireNew) { + if (!assuredBindingNames.contains(SUBJECT) + || !assuredBindingNames.contains(PREDICATE) + || !assuredBindingNames.contains(OBJECT)) { + throw new IllegalStateException("Current pipeline does not produce " + + "records that can be converted into triples.\n" + + "Required variable names: <" + SUBJECT + ", " + PREDICATE + + ", " + OBJECT + ">\nCurrent variable names: " + + assuredBindingNames); + } + List<Bson> triplePipeline = new LinkedList<>(pipeline); + List<Bson> fields = new LinkedList<>(); + fields.add(Projections.computed(SUBJECT, valueFieldExpr(SUBJECT))); + fields.add(Projections.computed(SUBJECT_HASH, hashFieldExpr(SUBJECT))); + fields.add(Projections.computed(PREDICATE, valueFieldExpr(PREDICATE))); + fields.add(Projections.computed(PREDICATE_HASH, hashFieldExpr(PREDICATE))); + fields.add(Projections.computed(OBJECT, valueFieldExpr(OBJECT))); + fields.add(Projections.computed(OBJECT_HASH, hashFieldExpr(OBJECT))); + fields.add(Projections.computed(OBJECT_TYPE, + ConditionalOperators.ifNull(typeFieldExpr(OBJECT), DEFAULT_TYPE))); + fields.add(Projections.computed(CONTEXT, DEFAULT_CONTEXT)); + fields.add(Projections.computed(STATEMENT_METADATA, DEFAULT_METADATA)); + fields.add(DEFAULT_DV); + fields.add(Projections.computed(TIMESTAMP, new Document("$literal", timestamp))); + fields.add(Projections.computed(LEVEL, new Document("$add", Arrays.asList("$" + LEVEL, 1)))); + triplePipeline.add(Aggregates.project(Projections.fields(fields))); + if (requireNew) { + // Prune any triples that already exist in the data store + String collectionName = collection.getNamespace().getCollectionName(); + Bson includeAll = Projections.include(SUBJECT, SUBJECT_HASH, + PREDICATE, PREDICATE_HASH, OBJECT, OBJECT_HASH, + OBJECT_TYPE, CONTEXT, STATEMENT_METADATA, + DOCUMENT_VISIBILITY, TIMESTAMP, LEVEL); + List<Bson> eqTests = new LinkedList<>(); + eqTests.add(new Document("$eq", Arrays.asList("$$this." + PREDICATE_HASH, "$" + PREDICATE_HASH))); + eqTests.add(new Document("$eq", Arrays.asList("$$this." + OBJECT_HASH, "$" + OBJECT_HASH))); + Bson redundantFilter = new Document("$filter", new Document("input", "$" + JOINED_TRIPLE) + .append("as", "this").append("cond", new Document("$and", eqTests))); + triplePipeline.add(Aggregates.lookup(collectionName, SUBJECT_HASH, + SUBJECT_HASH, JOINED_TRIPLE)); + String numRedundant = "REDUNDANT"; + triplePipeline.add(Aggregates.project(Projections.fields(includeAll, + Projections.computed(numRedundant, new Document("$size", redundantFilter))))); + triplePipeline.add(Aggregates.match(Filters.eq(numRedundant, 0))); + triplePipeline.add(Aggregates.project(Projections.fields(includeAll))); + } + return triplePipeline; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java new file mode 100644 index 0000000..fb1f558 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryOptimizer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.openrdf.query.BindingSet; +import org.openrdf.query.Dataset; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryOptimizer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +/** + * MongoDB-specific query optimizer that replaces part or all of a SPARQL query + * tree with a MongoDB aggregation pipeline. + * <p> + * Transforms query trees using {@link SparqlToPipelineTransformVisitor}. If + * possible, this visitor will replace portions of the query tree, or the entire + * query, with an equivalent aggregation pipeline (contained in an + * {@link AggregationPipelineQueryNode}), thereby allowing query logic to be + * evaluated by the MongoDB server rather than by the client. + */ +public class AggregationPipelineQueryOptimizer implements QueryOptimizer, Configurable { + private StatefulMongoDBRdfConfiguration conf; + private Logger logger = LoggerFactory.getLogger(getClass()); + + @Override + public void optimize(TupleExpr tupleExpr, Dataset dataset, BindingSet bindings) { + SparqlToPipelineTransformVisitor pipelineVisitor = new SparqlToPipelineTransformVisitor(conf); + try { + tupleExpr.visit(pipelineVisitor); + } catch (Exception e) { + logger.error("Error attempting to transform query using the aggregation pipeline", e); + } + } + + /** + * @throws IllegalArgumentException if conf is not a {@link StatefulMongoDBRdfConfiguration}. + */ + @Override + public void setConf(Configuration conf) { + Preconditions.checkNotNull(conf); + Preconditions.checkArgument(conf instanceof StatefulMongoDBRdfConfiguration, + "Expected an instance of %s; received %s", + StatefulMongoDBRdfConfiguration.class.getName(), conf.getClass().getName()); + this.conf = (StatefulMongoDBRdfConfiguration) conf; + } + + @Override + public StatefulMongoDBRdfConfiguration getConf() { + return conf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java new file mode 100644 index 0000000..c533efc --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/PipelineResultIteration.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Map; + +import org.bson.Document; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Preconditions; +import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoCursor; + +import info.aduna.iteration.CloseableIteration; + +/** + * An iterator that converts the documents resulting from an + * {@link AggregationPipelineQueryNode} into {@link BindingSet}s. + */ +public class PipelineResultIteration implements CloseableIteration<BindingSet, QueryEvaluationException> { + private static final int BATCH_SIZE = 1000; + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + private final MongoCursor<Document> cursor; + private final Map<String, String> varToOriginalName; + private final BindingSet bindings; + private BindingSet nextSolution = null; + + /** + * Constructor. + * @param aggIter Iterator of documents in AggregationPipelineQueryNode's + * intermediate solution representation. + * @param varToOriginalName A mapping from field names in the pipeline + * result documents to equivalent variable names in the original query. + * Where an entry does not exist for a field, the field name and variable + * name are assumed to be the same. + * @param bindings A partial solution. May be empty. + */ + public PipelineResultIteration(AggregateIterable<Document> aggIter, + Map<String, String> varToOriginalName, + BindingSet bindings) { + this.varToOriginalName = Preconditions.checkNotNull(varToOriginalName); + this.bindings = Preconditions.checkNotNull(bindings); + Preconditions.checkNotNull(aggIter); + aggIter.batchSize(BATCH_SIZE); + this.cursor = aggIter.iterator(); + } + + private void lookahead() { + while (nextSolution == null && cursor.hasNext()) { + nextSolution = docToBindingSet(cursor.next()); + } + } + + @Override + public boolean hasNext() throws QueryEvaluationException { + lookahead(); + return nextSolution != null; + } + + @Override + public BindingSet next() throws QueryEvaluationException { + lookahead(); + BindingSet solution = nextSolution; + nextSolution = null; + return solution; + } + + /** + * @throws UnsupportedOperationException always. + */ + @Override + public void remove() throws QueryEvaluationException { + throw new UnsupportedOperationException("remove() undefined for query result iteration"); + } + + @Override + public void close() throws QueryEvaluationException { + cursor.close(); + } + + private QueryBindingSet docToBindingSet(Document result) { + QueryBindingSet bindingSet = new QueryBindingSet(bindings); + Document valueSet = result.get(AggregationPipelineQueryNode.VALUES, Document.class); + Document typeSet = result.get(AggregationPipelineQueryNode.TYPES, Document.class); + if (valueSet != null) { + for (Map.Entry<String, Object> entry : valueSet.entrySet()) { + String fieldName = entry.getKey(); + String valueString = entry.getValue().toString(); + String typeString = typeSet == null ? null : typeSet.getString(fieldName); + String varName = varToOriginalName.getOrDefault(fieldName, fieldName); + Value varValue; + if (typeString == null || typeString.equals(XMLSchema.ANYURI.stringValue())) { + varValue = VF.createURI(valueString); + } + else { + varValue = VF.createLiteral(valueString, VF.createURI(typeString)); + } + Binding existingBinding = bindingSet.getBinding(varName); + // If this variable is not already bound, add it. + if (existingBinding == null) { + bindingSet.addBinding(varName, varValue); + } + // If it's bound to something else, the solutions are incompatible. + else if (!existingBinding.getValue().equals(varValue)) { + return null; + } + } + } + return bindingSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java new file mode 100644 index 0000000..b7f5a67 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; + +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.bson.Document; +import org.openrdf.query.algebra.Distinct; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Visitor that transforms a SPARQL query tree by replacing as much of the tree + * as possible with one or more {@code AggregationPipelineQueryNode}s. + * <p> + * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation + * pipeline which is equivalent to the replaced portion of the original query. + * Evaluating this node executes the pipeline and converts the results into + * query solutions. If only part of the query was transformed, the remaining + * query logic (higher up in the query tree) can be applied to those + * intermediate solutions as normal. + * <p> + * In general, processes the tree in bottom-up order: A leaf node + * ({@link StatementPattern}) is replaced with a pipeline that matches the + * corresponding statements. Then, if the parent node's semantics are supported + * by the visitor, stages are appended to the pipeline and the subtree at the + * parent node is replaced with the extended pipeline. This continues up the + * tree until reaching a node that cannot be transformed, in which case that + * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf + * node) instead of the previous subtree, or until the entire tree has been + * subsumed into a single pipeline node. + * <p> + * Nodes which are transformed into pipeline stages: + * <p><ul> + * <li>A {@code StatementPattern} node forms the beginning of each pipeline. + * <li>Single-argument operations {@link Projection}, {@link MultiProjection}, + * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed + * into pipeline stages whenever the child {@link TupleExpr} represents a + * pipeline. + * <li>A {@link Filter} operation will be appended to the pipeline when its + * child {@code TupleExpr} represents a pipeline and the filter condition is a + * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}. + * <li>A {@link Join} operation will be appended to the pipeline when one child + * is a {@code StatementPattern} and the other is an + * {@code AggregationPipelineQueryNode}. + * </ul> + */ +public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase<Exception> { + private final MongoCollection<Document> inputCollection; + + /** + * Instantiate a visitor directly from a {@link MongoCollection}. + * @param inputCollection Stores triples. + */ + public SparqlToPipelineTransformVisitor(MongoCollection<Document> inputCollection) { + this.inputCollection = Preconditions.checkNotNull(inputCollection); + } + + /** + * Instantiate a visitor from a {@link MongoDBRdfConfiguration}. + * @param conf Contains database connection information. + */ + public SparqlToPipelineTransformVisitor(StatefulMongoDBRdfConfiguration conf) { + Preconditions.checkNotNull(conf); + MongoClient mongo = conf.getMongoClient(); + MongoDatabase db = mongo.getDatabase(conf.getMongoDBName()); + this.inputCollection = db.getCollection(conf.getTriplesCollectionName()); + } + + @Override + public void meet(StatementPattern sp) { + sp.replaceWith(new AggregationPipelineQueryNode(inputCollection, sp)); + } + + @Override + public void meet(Join join) throws Exception { + // If one branch is a single statement pattern, then try replacing the + // other with a pipeline. + AggregationPipelineQueryNode pipelineNode = null; + StatementPattern joinWithSP = null; + if (join.getRightArg() instanceof StatementPattern) { + join.getLeftArg().visit(this); + if (join.getLeftArg() instanceof AggregationPipelineQueryNode) { + pipelineNode = (AggregationPipelineQueryNode) join.getLeftArg(); + joinWithSP = (StatementPattern) join.getRightArg(); + } + } + else if (join.getLeftArg() instanceof StatementPattern) { + join.getRightArg().visit(this); + if (join.getRightArg() instanceof AggregationPipelineQueryNode) { + pipelineNode = (AggregationPipelineQueryNode) join.getRightArg(); + joinWithSP = (StatementPattern) join.getLeftArg(); + } + } + else { + // Otherwise, visit the children to try to replace smaller subtrees + join.visitChildren(this); + } + // If this is now a join between a pipeline node and a statement + // pattern, add the join step at the end of the pipeline, and replace + // this node with the extended pipeline node. + if (pipelineNode != null && joinWithSP != null && pipelineNode.joinWith(joinWithSP)) { + join.replaceWith(pipelineNode); + } + } + + @Override + public void meet(Projection projectionNode) throws Exception { + projectionNode.visitChildren(this); + if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); + if (pipelineNode.project(Arrays.asList(projectionNode.getProjectionElemList()))) { + projectionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(MultiProjection projectionNode) throws Exception { + projectionNode.visitChildren(this); + if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); + if (pipelineNode.project(projectionNode.getProjections())) { + projectionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(Extension extensionNode) throws Exception { + extensionNode.visitChildren(this); + if (extensionNode.getArg() instanceof AggregationPipelineQueryNode && extensionNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) extensionNode.getArg(); + if (pipelineNode.extend(extensionNode.getElements())) { + extensionNode.replaceWith(pipelineNode); + } + } + } + + @Override + public void meet(Reduced reducedNode) throws Exception { + reducedNode.visitChildren(this); + if (reducedNode.getArg() instanceof AggregationPipelineQueryNode && reducedNode.getParentNode() != null) { + reducedNode.replaceWith(reducedNode.getArg()); + } + } + + @Override + public void meet(Distinct distinctNode) throws Exception { + distinctNode.visitChildren(this); + if (distinctNode.getArg() instanceof AggregationPipelineQueryNode && distinctNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) distinctNode.getArg(); + pipelineNode.distinct(); + distinctNode.replaceWith(pipelineNode); + } + } + + @Override + public void meet(Filter filterNode) throws Exception { + filterNode.visitChildren(this); + if (filterNode.getArg() instanceof AggregationPipelineQueryNode && filterNode.getParentNode() != null) { + AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) filterNode.getArg(); + if (pipelineNode.filter(filterNode.getCondition())) { + filterNode.replaceWith(pipelineNode); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java index db33181..ecad9c6 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/dao/SimpleMongoDBStorageStrategy.java @@ -63,6 +63,15 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS public static final String STATEMENT_METADATA = "statementMetadata"; public static final String DOCUMENT_VISIBILITY = "documentVisibility"; + /** + * Generate the hash that will be used to index and retrieve a given value. + * @param value A value to be stored or accessed (e.g. a URI or literal). + * @return the hash associated with that value in MongoDB. + */ + public static String hash(String value) { + return DigestUtils.sha256Hex(value); + } + protected ValueFactoryImpl factory = new ValueFactoryImpl(); @Override @@ -91,14 +100,14 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS final RyaURI context = stmt.getContext(); final BasicDBObject query = new BasicDBObject(); if (subject != null){ - query.append(SUBJECT_HASH, DigestUtils.sha256Hex(subject.getData())); + query.append(SUBJECT_HASH, hash(subject.getData())); } if (object != null){ - query.append(OBJECT_HASH, DigestUtils.sha256Hex(object.getData())); + query.append(OBJECT_HASH, hash(object.getData())); query.append(OBJECT_TYPE, object.getDataType().toString()); } if (predicate != null){ - query.append(PREDICATE_HASH, DigestUtils.sha256Hex(predicate.getData())); + query.append(PREDICATE_HASH, hash(predicate.getData())); } if (context != null){ query.append(CONTEXT, context.getData()); @@ -179,11 +188,11 @@ public class SimpleMongoDBStorageStrategy implements MongoDBStorageStrategy<RyaS final BasicDBObject dvObject = DocumentVisibilityAdapter.toDBObject(statement.getColumnVisibility()); final BasicDBObject doc = new BasicDBObject(ID, new String(Hex.encodeHex(bytes))) .append(SUBJECT, statement.getSubject().getData()) - .append(SUBJECT_HASH, DigestUtils.sha256Hex(statement.getSubject().getData())) + .append(SUBJECT_HASH, hash(statement.getSubject().getData())) .append(PREDICATE, statement.getPredicate().getData()) - .append(PREDICATE_HASH, DigestUtils.sha256Hex(statement.getPredicate().getData())) + .append(PREDICATE_HASH, hash(statement.getPredicate().getData())) .append(OBJECT, statement.getObject().getData()) - .append(OBJECT_HASH, DigestUtils.sha256Hex(statement.getObject().getData())) + .append(OBJECT_HASH, hash(statement.getObject().getData())) .append(OBJECT_TYPE, statement.getObject().getDataType().toString()) .append(CONTEXT, context) .append(STATEMENT_METADATA, statement.getMetadata().toString()) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/d5ebb731/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java new file mode 100644 index 0000000..1e056c4 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNodeTest.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.IsLiteral; +import org.openrdf.query.algebra.Not; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; + +import com.google.common.collect.HashBiMap; +import com.google.common.collect.Sets; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; + +public class AggregationPipelineQueryNodeTest { + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + private static final String LUBM = "urn:lubm"; + private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent"); + private static final URI TAKES = VF.createURI(LUBM, "takesCourse"); + + private static Var constant(URI value) { + return new Var(value.stringValue(), value); + } + + private MongoCollection<Document> collection; + + @Before + @SuppressWarnings("unchecked") + public void setUp() { + collection = Mockito.mock(MongoCollection.class); + Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); + } + + @Test + public void testEquals() { + final AggregationPipelineQueryNode node1 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + final AggregationPipelineQueryNode node2 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + Assert.assertEquals(node1, node2); + Assert.assertEquals(node1.hashCode(), node2.hashCode()); + final AggregationPipelineQueryNode diff1 = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y"), + HashBiMap.create()); + final AggregationPipelineQueryNode diff2 = new AggregationPipelineQueryNode( + collection, + Arrays.asList(new Document()), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + HashBiMap<String, String> varMapping = HashBiMap.create(); + varMapping.put("field-x", "x"); + final AggregationPipelineQueryNode diff3 = new AggregationPipelineQueryNode( + collection, + Arrays.asList(new Document()), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + varMapping); + Assert.assertNotEquals(diff1, node1); + Assert.assertNotEquals(diff2, node1); + Assert.assertNotEquals(diff3, node1); + node1.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertEquals(node1, node2); + node2.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertNotEquals(node1, node2); + } + + @Test + public void testClone() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + final AggregationPipelineQueryNode copy = base.clone(); + Assert.assertEquals(base, copy); + copy.getPipeline().add(new Document("$project", new Document())); + Assert.assertNotEquals(base, copy); + base.getPipeline().add(new Document("$project", new Document())); + Assert.assertEquals(base, copy); + } + + @Test + public void testStatementPattern() throws Exception { + // All variables + StatementPattern sp = new StatementPattern(new Var("s"), new Var("p"), new Var("o")); + AggregationPipelineQueryNode node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("s", "p", "o"), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + // All constants + sp = new StatementPattern(constant(VF.createURI("urn:Alice")), constant(RDF.TYPE), constant(UNDERGRAD)); + node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet(), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet(), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + // Mixture + sp = new StatementPattern(new Var("student"), constant(RDF.TYPE), constant(UNDERGRAD)); + node = new AggregationPipelineQueryNode(collection, sp); + Assert.assertEquals(Sets.newHashSet("student"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("student"), node.getAssuredBindingNames()); + Assert.assertEquals(2, node.getPipeline().size()); + } + + @Test + public void testJoin() throws Exception { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Join on one shared variable + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("c"))); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "c", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "c"), node.getAssuredBindingNames()); + Assert.assertEquals(4, node.getPipeline().size()); + // Join on multiple shared variables + node = base.clone(); + success = node.joinWith(new StatementPattern(new Var("x"), constant(TAKES), new Var("y"))); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(5, node.getPipeline().size()); + } + + @Test + public void testProject() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Add a single projection + ProjectionElemList singleProjection = new ProjectionElemList(); + singleProjection.addElement(new ProjectionElem("x", "z")); + singleProjection.addElement(new ProjectionElem("y", "y")); + List<ProjectionElemList> projections = Arrays.asList(singleProjection); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.project(projections); + Assert.assertTrue(success); + Assert.assertEquals(1, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("z", "y"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("z", "y"), + node.getBindingNames()); + // Add a multi-projection + ProjectionElemList p1 = new ProjectionElemList(); + p1.addElement(new ProjectionElem("x", "solution")); + ProjectionElemList p2 = new ProjectionElemList(); + p2.addElement(new ProjectionElem("y", "solution")); + ProjectionElemList p3 = new ProjectionElemList(); + p3.addElement(new ProjectionElem("x", "x")); + p3.addElement(new ProjectionElem("x", "solution")); + p3.addElement(new ProjectionElem("y", "y")); + projections = Arrays.asList(p1, p2, p3); + node = base.clone(); + success = node.project(projections); + Assert.assertTrue(success); + Assert.assertEquals(3, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("solution"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "solution"), + node.getBindingNames()); + // Add no projections + node = base.clone(); + success = node.project(Arrays.asList()); + Assert.assertFalse(success); + Assert.assertEquals(base, node); + } + + @Test + public void testExtend() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a mix of variables and constants + List<ExtensionElem> extensionElements = Arrays.asList( + new ExtensionElem(new Var("x"), "subject"), + new ExtensionElem(new ValueConstant(RDF.TYPE), "predicate"), + new ExtensionElem(new Var("y"), "object")); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.extend(extensionElements); + Assert.assertTrue(success); + Assert.assertEquals(1, node.getPipeline().size()); + Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object"), + node.getAssuredBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y", "subject", "predicate", "object", "opt"), + node.getBindingNames()); + // Attempt to extend with an unsupported expression + extensionElements = Arrays.asList( + new ExtensionElem(new Var("x"), "subject"), + new ExtensionElem(new Not(new ValueConstant(VF.createLiteral(true))), "notTrue")); + node = base.clone(); + success = node.extend(extensionElements); + Assert.assertFalse(success); + Assert.assertEquals(base, node); + } + + @Test + public void testDistinct() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.distinct(); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + } + + @Test + public void testFilter() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a supported filter + AggregationPipelineQueryNode node = base.clone(); + boolean success = node.filter(new Compare(new Var("x"), new Var("y"), Compare.CompareOp.EQ)); + Assert.assertTrue(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(3, node.getPipeline().size()); + // Extend with an unsupported filter + node = base.clone(); + success = node.filter(new IsLiteral(new Var("opt"))); + Assert.assertFalse(success); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(0, node.getPipeline().size()); + } + + @Test + public void testRequireSourceDerivationLevel() throws Exception { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a level greater than zero + AggregationPipelineQueryNode node = base.clone(); + node.requireSourceDerivationDepth(3); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + // Extend with a level of zero (no effect) + node = base.clone(); + node.requireSourceDerivationDepth(0); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(0, node.getPipeline().size()); + } + + @Test + public void testRequireSourceTimestamp() { + final AggregationPipelineQueryNode base = new AggregationPipelineQueryNode( + collection, + new LinkedList<>(), + Sets.newHashSet("x", "y"), + Sets.newHashSet("x", "y", "opt"), + HashBiMap.create()); + // Extend with a level greater than zero + AggregationPipelineQueryNode node = base.clone(); + node.requireSourceTimestamp(System.currentTimeMillis()); + Assert.assertEquals(Sets.newHashSet("x", "y", "opt"), node.getBindingNames()); + Assert.assertEquals(Sets.newHashSet("x", "y"), node.getAssuredBindingNames()); + Assert.assertEquals(1, node.getPipeline().size()); + } +}
