Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/255#discussion_r160698418
  
    --- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
 ---
    @@ -0,0 +1,862 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.mongodb.aggregation;
    +
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
    +import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
    +
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.NavigableSet;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.ConcurrentSkipListSet;
    +import java.util.function.Function;
    +
    +import org.apache.rya.api.domain.RyaStatement;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.domain.RyaURI;
    +import org.apache.rya.api.domain.StatementMetadata;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.mongodb.MongoDbRdfConstants;
    +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
    +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
    +import 
org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
    +import 
org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
    +import org.bson.Document;
    +import org.bson.conversions.Bson;
    +import org.openrdf.model.Literal;
    +import org.openrdf.model.Resource;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.QueryEvaluationException;
    +import org.openrdf.query.algebra.Compare;
    +import org.openrdf.query.algebra.ExtensionElem;
    +import org.openrdf.query.algebra.ProjectionElem;
    +import org.openrdf.query.algebra.ProjectionElemList;
    +import org.openrdf.query.algebra.StatementPattern;
    +import org.openrdf.query.algebra.ValueConstant;
    +import org.openrdf.query.algebra.ValueExpr;
    +import org.openrdf.query.algebra.Var;
    +import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
    +
    +import com.google.common.base.Objects;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.BiMap;
    +import com.google.common.collect.HashBiMap;
    +import com.mongodb.BasicDBObject;
    +import com.mongodb.DBObject;
    +import com.mongodb.client.MongoCollection;
    +import com.mongodb.client.model.Aggregates;
    +import com.mongodb.client.model.BsonField;
    +import com.mongodb.client.model.Filters;
    +import com.mongodb.client.model.Projections;
    +
    +import info.aduna.iteration.CloseableIteration;
    +
    +/**
    + * Represents a portion of a query tree as MongoDB aggregation pipeline. 
Should
    + * be built bottom-up: start with a statement pattern implemented as a 
$match
    + * step, then add steps to the pipeline to handle higher levels of the 
query
    + * tree. Methods are provided to add certain supported query operations to 
the
    + * end of the internal pipeline. In some cases, specific arguments may be
    + * unsupported, in which case the pipeline is unchanged and the method 
returns
    + * false.
    + */
    +public class AggregationPipelineQueryNode extends ExternalSet {
    +    /**
    +     * An aggregation result corresponding to a solution should map this 
key
    +     * to an object which itself maps variable names to variable values.
    +     */
    +    static final String VALUES = "<VALUES>";
    +
    +    /**
    +     * An aggregation result corresponding to a solution should map this 
key
    +     * to an object which itself maps variable names to the corresponding 
hashes
    +     * of their values.
    +     */
    +    static final String HASHES = "<HASHES>";
    +
    +    /**
    +     * An aggregation result corresponding to a solution should map this 
key
    +     * to an object which itself maps variable names to their datatypes, 
if any.
    +     */
    +    static final String TYPES = "<TYPES>";
    +
    +    private static final String LEVEL = "derivation_level";
    +    private static final String[] FIELDS = { VALUES, HASHES, TYPES, LEVEL, 
TIMESTAMP };
    +
    +    private static final String JOINED_TRIPLE = "<JOINED_TRIPLE>";
    +    private static final String FIELDS_MATCH = "<JOIN_FIELDS_MATCH>";
    +
    +    private static final MongoDBStorageStrategy<RyaStatement> strategy = 
new SimpleMongoDBStorageStrategy();
    +
    +    private static final Bson DEFAULT_TYPE = new Document("$literal", 
XMLSchema.ANYURI.stringValue());
    +    private static final Bson DEFAULT_CONTEXT = new Document("$literal", 
"");
    +    private static final Bson DEFAULT_DV = 
DocumentVisibilityAdapter.toDBObject(MongoDbRdfConstants.EMPTY_DV);
    +    private static final Bson DEFAULT_METADATA = new Document("$literal",
    +            StatementMetadata.EMPTY_METADATA.toString());
    +
    +    private static boolean isValidFieldName(String name) {
    +        return !(name == null || name.contains(".") || name.contains("$")
    +                || name.equals("_id"));
    +    }
    +
    +    /**
    +     * For a given statement pattern, represents a mapping from query 
variables
    +     * to their corresponding parts of matching triples. If necessary, also
    +     * substitute variable names including invalid characters with 
temporary
    +     * replacements, while producing a map back to the original names.
    +     */
    +    private static class StatementVarMapping {
    +        private final Map<String, String> varToTripleValue = new 
HashMap<>();
    +        private final Map<String, String> varToTripleHash = new 
HashMap<>();
    +        private final Map<String, String> varToTripleType = new 
HashMap<>();
    +        private final BiMap<String, String> varToOriginalName;
    +
    +        String valueField(String varName) {
    +            return varToTripleValue.get(varName);
    +        }
    +        String hashField(String varName) {
    +            return varToTripleHash.get(varName);
    +        }
    +        String typeField(String varName) {
    +            return varToTripleType.get(varName);
    +        }
    +
    +        Set<String> varNames() {
    +            return varToTripleValue.keySet();
    +        }
    +
    +        private String replace(String original) {
    +            if (varToOriginalName.containsValue(original)) {
    +                return varToOriginalName.inverse().get(original);
    +            }
    +            else {
    +                String replacement = "field-" + UUID.randomUUID();
    +                varToOriginalName.put(replacement, original);
    +                return replacement;
    +            }
    +        }
    +
    +        private String sanitize(String name) {
    +            if (varToOriginalName.containsValue(name)) {
    +                return varToOriginalName.inverse().get(name);
    +            }
    +            else if (name != null && !isValidFieldName(name)) {
    +                return replace(name);
    +            }
    +            return name;
    +        }
    +
    +        StatementVarMapping(StatementPattern sp, BiMap<String, String> 
varToOriginalName) {
    +            this.varToOriginalName = varToOriginalName;
    +            if (sp.getSubjectVar() != null && 
!sp.getSubjectVar().hasValue()) {
    +                String name = sanitize(sp.getSubjectVar().getName());
    +                varToTripleValue.put(name, SUBJECT);
    +                varToTripleHash.put(name, SUBJECT_HASH);
    +            }
    +            if (sp.getPredicateVar() != null && 
!sp.getPredicateVar().hasValue()) {
    +                String name = sanitize(sp.getPredicateVar().getName());
    +                varToTripleValue.put(name, PREDICATE);
    +                varToTripleHash.put(name, PREDICATE_HASH);
    +            }
    +            if (sp.getObjectVar() != null && 
!sp.getObjectVar().hasValue()) {
    +                String name = sanitize(sp.getObjectVar().getName());
    +                varToTripleValue.put(name, OBJECT);
    +                varToTripleHash.put(name, OBJECT_HASH);
    +                varToTripleType.put(name, OBJECT_TYPE);
    +            }
    +            if (sp.getContextVar() != null && 
!sp.getContextVar().hasValue()) {
    +                String name = sanitize(sp.getContextVar().getName());
    +                varToTripleValue.put(name, CONTEXT);
    +            }
    +        }
    +
    +        Bson getProjectExpression() {
    +            return getProjectExpression(new LinkedList<>(), str -> "$" + 
str);
    +        }
    +
    +        Bson getProjectExpression(Iterable<String> alsoInclude,
    +                Function<String, String> getFieldExpr) {
    +            Document values = new Document();
    +            Document hashes = new Document();
    +            Document types = new Document();
    +            for (String varName : varNames()) {
    +                values.append(varName, 
getFieldExpr.apply(valueField(varName)));
    +                if (varToTripleHash.containsKey(varName)) {
    +                    hashes.append(varName, 
getFieldExpr.apply(hashField(varName)));
    +                }
    +                if (varToTripleType.containsKey(varName)) {
    +                    types.append(varName, 
getFieldExpr.apply(typeField(varName)));
    +                }
    +            }
    +            for (String varName : alsoInclude) {
    +                values.append(varName, 1);
    +                hashes.append(varName, 1);
    +                types.append(varName, 1);
    +            }
    +            List<Bson> fields = new LinkedList<>();
    +            fields.add(Projections.excludeId());
    +            fields.add(Projections.computed(VALUES, values));
    +            fields.add(Projections.computed(HASHES, hashes));
    +            if (!types.isEmpty()) {
    +                fields.add(Projections.computed(TYPES, types));
    +            }
    +            fields.add(Projections.computed(LEVEL, new Document("$max",
    +                    Arrays.asList("$" + LEVEL, getFieldExpr.apply(LEVEL), 
0))));
    +            fields.add(Projections.computed(TIMESTAMP, new Document("$max",
    +                    Arrays.asList("$" + TIMESTAMP, 
getFieldExpr.apply(TIMESTAMP), 0))));
    +            return Projections.fields(fields);
    +        }
    +    }
    +
    +    /**
    +     * Given a StatementPattern, generate an object representing the 
arguments
    +     * to a "$match" command that will find matching triples.
    +     * @param sp The StatementPattern to search for
    +     * @param path If given, specify the field that should be matched 
against
    +     *  the statement pattern, using an ordered list of field names for a 
nested
    +     *  field. E.g. to match records { "x": { "y": <statement pattern } }, 
pass
    +     *  "x" followed by "y".
    +     * @return The argument of a "$match" query
    +     */
    +    private static BasicDBObject getMatchExpression(StatementPattern sp, 
String ... path) {
    --- End diff --
    
    Okay, I see.


---

Reply via email to