[ https://issues.apache.org/jira/browse/RYA-416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16313643#comment-16313643 ]
ASF GitHub Bot commented on RYA-416: ------------------------------------ Github user jessehatfield commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/254#discussion_r159932088 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java --- @@ -0,0 +1,882 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations to the + * end of the internal pipeline. In some cases, specific arguments may be + * unsupported, in which case the pipeline is unchanged and the method returns + * false. + */ +public class AggregationPipelineQueryNode extends ExternalSet { + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to variable values. + */ + static final String VALUES = "<VALUES>"; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to the corresponding hashes + * of their values. + */ + static final String HASHES = "<HASHES>"; + + /** + * An aggregation result corresponding to a solution should map this key + * to an object which itself maps variable names to their datatypes, if any. + */ + static final String TYPES = "<TYPES>"; + + private static final String LEVEL = "derivation_level"; + private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, TIMESTAMP }; + + private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>"; + private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>"; + + private static final MongoDBStorageStrategy<RyaStatement> strategy = new SimpleMongoDBStorageStrategy(); + + private static final Bson DEFAULT_TYPE = new Document("$literal", XMLSchema.ANYURI.stringValue()); + private static final Bson DEFAULT_CONTEXT = new Document("$literal", ""); + private static final Bson DEFAULT_DV = DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV); + private static final Bson DEFAULT_METADATA = new Document("$literal", + StatementMetadata.EMPTY_METADATA.toString()); + + private static boolean isValidFieldName(String name) { + return !(name == null || name.contains(".") || name.contains("$") + || name.equals("_id")); + } + + /** + * For a given statement pattern, represents a mapping from query variables + * to their corresponding parts of matching triples. If necessary, also + * substitute variable names including invalid characters with temporary + * replacements, while producing a map back to the original names. + */ + private static class StatementVarMapping { + private final Map<String, String> varToTripleValue = new HashMap<>(); + private final Map<String, String> varToTripleHash = new HashMap<>(); + private final Map<String, String> varToTripleType = new HashMap<>(); + private final BiMap<String, String> varToOriginalName; + + String valueField(String varName) { + return varToTripleValue.get(varName); + } + String hashField(String varName) { + return varToTripleHash.get(varName); + } + String typeField(String varName) { + return varToTripleType.get(varName); + } + + Set<String> varNames() { + return varToTripleValue.keySet(); + } + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + private String sanitize(String name) { + if (varToOriginalName.containsValue(name)) { + return varToOriginalName.inverse().get(name); + } + else if (name != null && !isValidFieldName(name)) { + return replace(name); + } + return name; + } + + StatementVarMapping(StatementPattern sp, BiMap<String, String> varToOriginalName) { + this.varToOriginalName = varToOriginalName; + if (sp.getSubjectVar() != null && !sp.getSubjectVar().hasValue()) { + String name = sanitize(sp.getSubjectVar().getName()); + varToTripleValue.put(name, SUBJECT); + varToTripleHash.put(name, SUBJECT_HASH); + } + if (sp.getPredicateVar() != null && !sp.getPredicateVar().hasValue()) { + String name = sanitize(sp.getPredicateVar().getName()); + varToTripleValue.put(name, PREDICATE); + varToTripleHash.put(name, PREDICATE_HASH); + } + if (sp.getObjectVar() != null && !sp.getObjectVar().hasValue()) { + String name = sanitize(sp.getObjectVar().getName()); + varToTripleValue.put(name, OBJECT); + varToTripleHash.put(name, OBJECT_HASH); + varToTripleType.put(name, OBJECT_TYPE); + } + if (sp.getContextVar() != null && !sp.getContextVar().hasValue()) { + String name = sanitize(sp.getContextVar().getName()); + varToTripleValue.put(name, CONTEXT); + } + } + + Bson getProjectExpression() { + return getProjectExpression(new LinkedList<>(), str -> "$" + str); + } + + Bson getProjectExpression(Iterable<String> alsoInclude, + Function<String, String> getFieldExpr) { + Document values = new Document(); + Document hashes = new Document(); + Document types = new Document(); + for (String varName : varNames()) { + values.append(varName, getFieldExpr.apply(valueField(varName))); + if (varToTripleHash.containsKey(varName)) { + hashes.append(varName, getFieldExpr.apply(hashField(varName))); + } + if (varToTripleType.containsKey(varName)) { + types.append(varName, getFieldExpr.apply(typeField(varName))); + } + } + for (String varName : alsoInclude) { + values.append(varName, 1); + hashes.append(varName, 1); + types.append(varName, 1); + } + List<Bson> fields = new LinkedList<>(); + fields.add(Projections.excludeId()); + fields.add(Projections.computed(VALUES, values)); + fields.add(Projections.computed(HASHES, hashes)); + if (!types.isEmpty()) { + fields.add(Projections.computed(TYPES, types)); + } + fields.add(Projections.computed(LEVEL, + maxValueExpr("$" + LEVEL, getFieldExpr.apply(LEVEL), 0))); + fields.add(Projections.computed(TIMESTAMP, + maxValueExpr("$" + TIMESTAMP, getFieldExpr.apply(TIMESTAMP), 0))); + return Projections.fields(fields); + } + } + + /** + * Generate a projection expression that evaluates to the maximum of two + * fields and a default value. + */ + private static Document maxValueExpr(String field1, String field2, Object defaultValue) { + if (field1.equals(field2)) { + return ConditionalOperators.ifNull(field1, defaultValue); + } + else { + Document vars = new Document("x", ConditionalOperators.ifNull(field1, defaultValue)) + .append("y", ConditionalOperators.ifNull(field2, defaultValue)); + Document gt = new Document("$gt", Arrays.asList("$$x", "$$y")); + Document maxExpr = new Document("$cond", + new Document("if", gt).append("then", "$$x").append("else", "$$y")); + return new Document("$let", new Document("vars", vars).append("in", maxExpr)); + } + } + + /** + * Given a StatementPattern, generate an object representing the arguments + * to a "$match" command that will find matching triples. + * @param sp The StatementPattern to search for + * @param path If given, specify the field that should be matched against + * the statement pattern, using an ordered list of field names for a nested + * field. E.g. to match records { "x": { "y": <statement pattern } }, pass + * "x" followed by "y". + * @return The argument of a "$match" query + */ + private static BasicDBObject getMatchExpression(StatementPattern sp, String ... path) { + final Var subjVar = sp.getSubjectVar(); + final Var predVar = sp.getPredicateVar(); + final Var objVar = sp.getObjectVar(); + final Var contextVar = sp.getContextVar(); + RyaURI s = null; + RyaURI p = null; + RyaType o = null; + RyaURI c = null; + if (subjVar != null && subjVar.getValue() instanceof Resource) { + s = RdfToRyaConversions.convertResource((Resource) subjVar.getValue()); + } + if (predVar != null && predVar.getValue() instanceof URI) { + p = RdfToRyaConversions.convertURI((URI) predVar.getValue()); + } + if (objVar != null && objVar.getValue() != null) { + o = RdfToRyaConversions.convertValue(objVar.getValue()); + } + if (contextVar != null && contextVar.getValue() instanceof URI) { + c = RdfToRyaConversions.convertURI((URI) contextVar.getValue()); + } + RyaStatement rs = new RyaStatement(s, p, o, c); + DBObject obj = strategy.getQuery(rs); + // Add path prefix, if given + if (path.length > 0) { + StringBuilder sb = new StringBuilder(); + for (String str : path) { + sb.append(str).append("."); + } + String prefix = sb.toString(); + Set<String> originalKeys = new HashSet<>(obj.keySet()); + originalKeys.forEach(key -> { + Object value = obj.removeField(key); + obj.put(prefix + key, value); + }); + } + return (BasicDBObject) obj; + } + + private static String valueFieldExpr(String varName) { + return "$" + VALUES + "." + varName; + } + private static String hashFieldExpr(String varName) { + return "$" + HASHES + "." + varName; + } + private static String typeFieldExpr(String varName) { + return "$" + TYPES + "." + varName; + } + private static String joinFieldExpr(String triplePart) { + return "$" + JOINED_TRIPLE + "." + triplePart; + } + + /** + * Get an object representing the value field of some value expression, or + * return null if the expression isn't supported. + */ + private Object valueFieldExpr(ValueExpr expr) { + if (expr instanceof Var) { + return valueFieldExpr(((Var) expr).getName()); + } + else if (expr instanceof ValueConstant) { + return new Document("$literal", ((ValueConstant) expr).getValue().stringValue()); + } + else { + return null; + } + } + + private final List<Bson> pipeline; + private final MongoCollection<Document> collection; + private final Set<String> assuredBindingNames; + private final Set<String> bindingNames; + private final BiMap<String, String> varToOriginalName; + + private String replace(String original) { + if (varToOriginalName.containsValue(original)) { + return varToOriginalName.inverse().get(original); + } + else { + String replacement = "field-" + UUID.randomUUID(); + varToOriginalName.put(replacement, original); + return replacement; + } + } + + /** + * Create a pipeline based on a StatementPattern. + * @param baseSP The leaf node in the query tree. + */ + public AggregationPipelineQueryNode(MongoCollection<Document> collection, StatementPattern baseSP) { + Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(baseSP); + this.collection = collection; + this.varToOriginalName = HashBiMap.create(); + StatementVarMapping mapping = new StatementVarMapping(baseSP, varToOriginalName); + this.assuredBindingNames = new HashSet<>(mapping.varNames()); + this.bindingNames = new HashSet<>(mapping.varNames()); + this.pipeline = new LinkedList<>(); + this.pipeline.add(Aggregates.match(getMatchExpression(baseSP))); + this.pipeline.add(Aggregates.project(mapping.getProjectExpression())); + } + + AggregationPipelineQueryNode(MongoCollection<Document> collection, + List<Bson> pipeline, Set<String> assuredBindingNames, + Set<String> bindingNames, BiMap<String, String> varToOriginalName) { + Preconditions.checkNotNull(collection); + Preconditions.checkNotNull(pipeline); + Preconditions.checkNotNull(assuredBindingNames); + Preconditions.checkNotNull(bindingNames); + Preconditions.checkNotNull(varToOriginalName); + this.collection = collection; + this.pipeline = pipeline; + this.assuredBindingNames = assuredBindingNames; + this.bindingNames = bindingNames; + this.varToOriginalName = varToOriginalName; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof AggregationPipelineQueryNode) { + AggregationPipelineQueryNode other = (AggregationPipelineQueryNode) o; + if (this.collection.equals(other.collection) + && this.assuredBindingNames.equals(other.assuredBindingNames) + && this.bindingNames.equals(other.bindingNames) + && this.varToOriginalName.equals(other.varToOriginalName) + && this.pipeline.size() == other.pipeline.size()) { + // Check pipeline steps for equality -- underlying types don't + // have well-behaved equals methods, so check for equivalent + // string representations. + for (int i = 0; i < this.pipeline.size(); i++) { + Bson doc1 = this.pipeline.get(i); + Bson doc2 = other.pipeline.get(i); + if (!doc1.toString().equals(doc2.toString())) { + return false; + } + } + return true; + } + } + return false; + } + + @Override + public int hashCode() { + int result = collection.hashCode(); + for (Bson step : pipeline) { + result = result * 37 + step.toString().hashCode(); + } + result = result * 37 + assuredBindingNames.hashCode(); + result = result * 37 + bindingNames.hashCode(); + result = result * 37 + varToOriginalName.hashCode(); + return result; + } + + @Override + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + throws QueryEvaluationException { + return new PipelineResultIteration(collection.aggregate(pipeline), varToOriginalName, bindings); + } + + @Override + public Set<String> getAssuredBindingNames() { + Set<String> names = new HashSet<>(); + for (String name : assuredBindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public Set<String> getBindingNames() { + Set<String> names = new HashSet<>(); + for (String name : bindingNames) { + names.add(varToOriginalName.getOrDefault(name, name)); + } + return names; + } + + @Override + public AggregationPipelineQueryNode clone() { + return new AggregationPipelineQueryNode(collection, + new LinkedList<>(pipeline), + new HashSet<>(assuredBindingNames), + new HashSet<>(bindingNames), + HashBiMap.create(varToOriginalName)); + } + + @Override + public String getSignature() { + super.getSignature(); + Set<String> assured = getAssuredBindingNames(); + Set<String> any = getBindingNames(); + StringBuilder sb = new StringBuilder("AggregationPipelineQueryNode (binds: "); + sb.append(String.join(", ", assured)); + if (any.size() > assured.size()) { + Set<String> optionalBindingNames = any; + optionalBindingNames.removeAll(assured); + sb.append(" [") + .append(String.join(", ", optionalBindingNames)) + .append("]"); + } + sb.append(")\n"); + for (Bson doc : pipeline) { + sb.append(doc.toString()).append("\n"); + } + return sb.toString(); + } + + /** + * Get the internal list of aggregation pipeline steps. Note that documents + * resulting from this pipeline will be structured using an internal + * intermediate representation. For documents representing triples, see + * {@link #getTriplePipeline}, and for query solutions, see + * {@link #evaluate}. + * @return The current internal pipeline. + */ + List<Bson> getPipeline() { + return pipeline; + } + + /** + * Add a join with an individual {@link StatementPattern} to the pipeline. + * @param sp The statement pattern to join with + * @return true if the join was successfully added to the pipeline. + */ + public boolean joinWith(StatementPattern sp) { + // 1. Determine shared variables and new variables + StatementVarMapping spMap = new StatementVarMapping(sp, varToOriginalName); + NavigableSet<String> sharedVars = new ConcurrentSkipListSet<>(spMap.varNames()); + sharedVars.retainAll(assuredBindingNames); + // 2. Join on one shared variable + String joinKey = sharedVars.pollFirst(); + String collectionName = collection.getNamespace().getCollectionName(); + Bson join; + if (joinKey == null) { + return false; + } + else { + join = Aggregates.lookup(collectionName, + HASHES + "." + joinKey, + spMap.hashField(joinKey), + JOINED_TRIPLE); + } + pipeline.add(join); + // 3. Unwind the joined triples so each document represents a binding + // set (solution) from the base branch and a triple that may match. + pipeline.add(Aggregates.unwind("$" + JOINED_TRIPLE)); + // 4. (Optional) If there are any shared variables that weren't used as + // the join key, project all existing fields plus a new field that + // tests the equality of those shared variables. + BasicDBObject matchOpts = getMatchExpression(sp, JOINED_TRIPLE); + if (!sharedVars.isEmpty()) { + List<Bson> eqTests = new LinkedList<>(); + for (String varName : sharedVars) { + String oldField = valueFieldExpr(varName); + String newField = joinFieldExpr(spMap.valueField(varName)); + Bson eqTest = new Document("$eq", Arrays.asList(oldField, newField)); + eqTests.add(eqTest); + } + Bson eqProjectOpts = Projections.fields( + Projections.computed(FIELDS_MATCH, Filters.and(eqTests)), + Projections.include(JOINED_TRIPLE, VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)); + pipeline.add(Aggregates.project(eqProjectOpts)); + matchOpts.put(FIELDS_MATCH, true); + } + // 5. Filter for solutions whose triples match the joined statement + // pattern, and, if applicable, whose additional shared variables + // match the current solution. + pipeline.add(Aggregates.match(matchOpts)); + // 5. Project the results to include variables from the new SP (with + // appropriate renaming) and variables referenced only in the base + // pipeline (with previous names). + Bson finalProjectOpts = new StatementVarMapping(sp, varToOriginalName) + .getProjectExpression(assuredBindingNames, + str -> joinFieldExpr(str)); + assuredBindingNames.addAll(spMap.varNames()); + bindingNames.addAll(spMap.varNames()); + pipeline.add(Aggregates.project(finalProjectOpts)); + return true; + } + + /** + * Add a SPARQL projection or multi-projection operation to the pipeline. + * The number of documents produced by the pipeline after this operation + * will be the number of documents entering this stage (the number of + * intermediate results) multiplied by the number of + * {@link ProjectionElemList}s supplied here. + * @param projections One or more projections, i.e. mappings from the result + * at this stage of the query into a set of variables. + * @return true if the projection(s) were added to the pipeline. + */ + public boolean project(Iterable<ProjectionElemList> projections) { + if (projections == null || !projections.iterator().hasNext()) { + return false; + } + List<Bson> projectOpts = new LinkedList<>(); + Set<String> bindingNamesUnion = new HashSet<>(); + Set<String> bindingNamesIntersection = null; + for (ProjectionElemList projection : projections) { + Document valueDoc = new Document(); + Document hashDoc = new Document(); + Document typeDoc = new Document(); + Set<String> projectionBindingNames = new HashSet<>(); + for (ProjectionElem elem : projection.getElements()) { + String to = elem.getTargetName(); + // If the 'to' name is invalid, replace it internally + if (!isValidFieldName(to)) { + to = replace(to); + } + String from = elem.getSourceName(); + // If the 'from' name is invalid, use the internal substitute + if (varToOriginalName.containsValue(from)) { + from = varToOriginalName.inverse().get(from); + } + projectionBindingNames.add(to); + if (to.equals(from)) { + valueDoc.append(to, 1); + hashDoc.append(to, 1); + typeDoc.append(to, 1); + } + else { + valueDoc.append(to, valueFieldExpr(from)); + hashDoc.append(to, hashFieldExpr(from)); + typeDoc.append(to, typeFieldExpr(from)); + } + } + bindingNamesUnion.addAll(projectionBindingNames); + if (bindingNamesIntersection == null) { + bindingNamesIntersection = new HashSet<>(projectionBindingNames); + } + else { + bindingNamesIntersection.retainAll(projectionBindingNames); + } + projectOpts.add(new Document() + .append(VALUES, valueDoc) + .append(HASHES, hashDoc) + .append(TYPES, typeDoc) + .append(LEVEL, "$" + LEVEL) + .append(TIMESTAMP, "$" + TIMESTAMP)); + } + if (projectOpts.size() == 1) { + pipeline.add(Aggregates.project(projectOpts.get(0))); + } + else { + String listKey = "PROJECTIONS"; + Bson projectIndividual = Projections.fields( + Projections.computed(VALUES, "$" + listKey + "." + VALUES), + Projections.computed(HASHES, "$" + listKey + "." + HASHES), + Projections.computed(TYPES, "$" + listKey + "." + TYPES), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(Projections.computed(listKey, projectOpts))); + pipeline.add(Aggregates.unwind("$" + listKey)); + pipeline.add(Aggregates.project(projectIndividual)); + } + assuredBindingNames.clear(); + bindingNames.clear(); + assuredBindingNames.addAll(bindingNamesIntersection); + bindingNames.addAll(bindingNamesUnion); + return true; + } + + /** + * Add a SPARQL extension to the pipeline, if possible. An extension adds + * some number of variables to the result. Adds a "$project" step to the + * pipeline, but differs from the SPARQL project operation in that + * 1) pre-existing variables are always kept, and 2) values of new variables + * are defined by expressions, which may be more complex than simply + * variable names. Not all expressions are supported. If unsupported + * expression types are used in the extension, the pipeline will remain + * unchanged and this method will return false. + * @param extensionElements A list of new variables and their expressions + * @return True if the extension was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean extend(Iterable<ExtensionElem> extensionElements) { + List<Bson> valueFields = new LinkedList<>(); + List<Bson> hashFields = new LinkedList<>(); + List<Bson> typeFields = new LinkedList<>(); + for (String varName : bindingNames) { + valueFields.add(Projections.include(varName)); + hashFields.add(Projections.include(varName)); + typeFields.add(Projections.include(varName)); + } + Set<String> newVarNames = new HashSet<>(); + for (ExtensionElem elem : extensionElements) { + String name = elem.getName(); + if (!isValidFieldName(name)) { + // If the field name is invalid, replace it internally + name = replace(name); + } + // We can only handle certain kinds of value expressions; return + // failure for any others. + ValueExpr expr = elem.getExpr(); + final Object valueField; + final Object hashField; + final Object typeField; + if (expr instanceof Var) { + String varName = ((Var) expr).getName(); + valueField = "$" + varName; + hashField = "$" + varName; + typeField = "$" + varName; + } + else if (expr instanceof ValueConstant) { + Value val = ((ValueConstant) expr).getValue(); + valueField = new Document("$literal", val.stringValue()); + hashField = new Document("$literal", SimpleMongoDBStorageStrategy.hash(val.stringValue())); + if (val instanceof Literal) { + typeField = new Document("$literal", ((Literal) val).getDatatype().stringValue()); + } + else { + typeField = null; + } + } + else { + // if not understood, return failure + return false; + } + valueFields.add(Projections.computed(name, valueField)); + hashFields.add(Projections.computed(name, hashField)); + if (typeField != null) { + typeFields.add(Projections.computed(name, typeField)); + } + newVarNames.add(name); + } + assuredBindingNames.addAll(newVarNames); + bindingNames.addAll(newVarNames); + Bson projectOpts = Projections.fields( + Projections.computed(VALUES, Projections.fields(valueFields)), + Projections.computed(HASHES, Projections.fields(hashFields)), + Projections.computed(TYPES, Projections.fields(typeFields)), + Projections.include(LEVEL), + Projections.include(TIMESTAMP)); + pipeline.add(Aggregates.project(projectOpts)); + return true; + } + + /** + * Add a SPARQL filter to the pipeline, if possible. A filter eliminates + * results that don't satisfy a given condition. Not all conditional + * expressions are supported. If unsupported expressions are used in the + * filter, the pipeline will remain unchanged and this method will return + * false. + * @param condition The filter condition + * @return True if the filter was successfully converted into a pipeline + * step, false otherwise. + */ + public boolean filter(ValueExpr condition) { + if (condition instanceof Compare) { + Compare compare = (Compare) condition; + Compare.CompareOp operator = compare.getOperator(); + Object leftArg = valueFieldExpr(compare.getLeftArg()); + Object rightArg = valueFieldExpr(compare.getRightArg()); + if (leftArg == null || rightArg == null) { + // unsupported value expression, can't convert filter + return false; + } + final String opFunc; + switch (operator) { + case EQ: + opFunc = "$eq"; + break; + case NE: + opFunc = "$ne"; + break; + case LT: + opFunc = "$lt"; + break; + case LE: + opFunc = "$le"; + break; + case GT: + opFunc = "$gt"; + break; + case GE: + opFunc = "$ge"; + break; + default: + // unrecognized comparison operator, can't convert filter + return false; + } + Document compareDoc = new Document(opFunc, Arrays.asList(leftArg, rightArg)); + pipeline.add(Aggregates.project(Projections.fields( + Projections.computed("FILTER", compareDoc), + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + pipeline.add(Aggregates.match(new Document("FILTER", true))); + pipeline.add(Aggregates.project(Projections.fields( + Projections.include(VALUES, HASHES, TYPES, LEVEL, TIMESTAMP)))); + return true; + } + return false; + } + + /** + * Add a $group step to filter out redundant solutions. + * @return True if the distinct operation was successfully appended. + */ + public boolean distinct() { --- End diff -- Can you elaborate? If there's a cleaner way to accomplish something like a SPARQL "DISTINCT" operation, I haven't found it yet -- keep in mind we don't want a field containing a list of distinct values, but rather a separate document returned for each distinct value. This solution is based on [this example](https://docs.mongodb.com/manual/reference/operator/aggregation/group/#retrieve-distinct-values) which uses $group to retrieve distinct results, except that the key is constructed from the variable hashes and all the fields are kept (using "$first" to choose one value for each since the values within the group should be identical). > Add the ability use the MongoDB aggregation pipeline to evaluate simple > SPARQL expressions > ------------------------------------------------------------------------------------------ > > Key: RYA-416 > URL: https://issues.apache.org/jira/browse/RYA-416 > Project: Rya > Issue Type: New Feature > Reporter: Jesse Hatfield > Assignee: Jesse Hatfield > > MongoDB provides the [aggregation pipeline > framework|https://docs.mongodb.com/manual/core/aggregation-pipeline/] for > multi-stage data processing. Currently, the query engine invokes this > framework to apply individual statement patterns (using a "$match" expression > for each and iterating through the results), then applies higher-level query > operations (join, filter, select, project, etc) client-side. > In principle, those high-level query operations could be rewritten as > aggregation pipeline stages as well ($group, $match, $project, etc). This > would allow more query evaluation logic to be executed by the MongoDB server > itself, enabling server-side optimization. This could be used as a general > query optimization, but would additionally be useful for any tool that only > needed to write query results back to the server: adding a write step to the > end of the resulting pipeline could obviate the need to communicate > individual results to the client at all. -- This message was sent by Atlassian JIRA (v6.4.14#64029)