[ https://issues.apache.org/jira/browse/RYA-303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16319366#comment-16319366 ]
ASF GitHub Bot commented on RYA-303: ------------------------------------ Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r160547307 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.mongodb.MongoClient; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.util.JSON; + +/** + * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows: + * + * <pre> + * <code> + * ----- PCJ Metadata Doc ----- + * { + * _id: [table_name]_METADATA, + * sparql: [sparql query to match results], + * cardinality: [number of results] + * } + * + * ----- PCJ Results Doc ----- + * { + * pcjName: [table_name], + * auths: [auths] + * [binding_var1]: { + * uri: [type_uri], + * value: value + * } + * . + * . + * . + * [binding_varn]: { + * uri: [type_uri], + * value: value + * } + * } + * </code> + * </pre> + */ +public class MongoPcjDocuments { + public static final String PCJ_COLLECTION_NAME = "pcjs"; + + // metadata fields + public static final String CARDINALITY_FIELD = "cardinality"; + public static final String SPARQL_FIELD = "sparql"; + public static final String PCJ_ID = "_id"; + public static final String VAR_ORDER_ID = "varOrders"; + + // pcj results fields + private static final String BINDING_VALUE = "value"; + private static final String BINDING_TYPE = "uri"; + private static final String AUTHS_FIELD = "auths"; + private static final String PCJ_NAME = "pcjName"; + + private final MongoCollection<Document> pcjCollection; + private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); + + /** + * Creates a new {@link MongoPcjDocuments}. + * @param client - The {@link MongoClient} to use to connect to mongo. + * @param ryaInstanceName - The rya instance to connect to. + */ + public MongoPcjDocuments(final MongoClient client, final String ryaInstanceName) { + requireNonNull(client); + requireNonNull(ryaInstanceName); + pcjCollection = client.getDatabase(ryaInstanceName).getCollection(PCJ_COLLECTION_NAME); + } + + private String getMetadataID(final String pcjName) { + return pcjName + "_METADATA"; + } + + /** + * Creates a {@link Document} containing the metadata defining the PCj. + * @param pcjName - The name of the PCJ. (not null) + * @param sparql - The sparql query the PCJ will use. + * @return The document built around the provided metadata. + * @throws PCJStorageException - Thrown when the sparql query is malformed. + */ + public Document getMetadataDocument(final String pcjName, final String sparql) throws PCJStorageException { + requireNonNull(pcjName); + requireNonNull(sparql); + + final Set<VariableOrder> varOrders; + try { + varOrders = pcjVarOrderFactory.makeVarOrders(sparql); + } catch (final MalformedQueryException e) { + throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e); + } + + return new Document() + .append(PCJ_ID, getMetadataID(pcjName)) + .append(SPARQL_FIELD, sparql) + .append(CARDINALITY_FIELD, 0) + .append(VAR_ORDER_ID, varOrders); + + } + + /** + * Creates a new PCJ based on the provided metadata. The initial pcj results will be empty. + * @param pcjName - The unique name of the PCJ. + * @param varOrders - The {@link VariableOrder}s. + * @param sparql - The query the pcj is assigned to. + * @throws @throws PCJStorageException - Thrown when the sparql query is malformed. + */ + public void createPcj(final String pcjName, final String sparql) throws PCJStorageException { + pcjCollection.insertOne(getMetadataDocument(pcjName, sparql)); + } + + /** + * Creates a new PCJ document and populates it by scanning an instance of + * Rya for historic matches. + * <p> + * If any portion of this operation fails along the way, the partially + * create PCJ table will be left in Accumulo. + * + * @param ryaConn - Connects to the Rya that will be scanned. (not null) + * @param pcjTableName - The name of the PCJ table that will be created. (not null) + * @param sparql - The SPARQL query whose results will be loaded into the table. (not null) + * @throws PCJStorageException The PCJ table could not be create or the + * values from Rya were not able to be loaded into it. + */ + public void createAndPopulatePcj( + final RepositoryConnection ryaConn, + final String pcjTableName, + final String sparql) throws PCJStorageException { + checkNotNull(pcjTableName); + checkNotNull(sparql); + + // Create the PCJ document in Mongo. + createPcj(pcjTableName, sparql); + + // Load historic matches from Rya into the PCJ table. + populatePcj(pcjTableName, ryaConn); + } + + /** + * Gets the {@link PcjMetadata} from a provided PCJ name. + * + * @param pcjName - The PCJ to get from MongoDB. (not null) + * @return - The {@link PcjMetadata} of the Pcj specified. + * @throws PCJStorageException The PCJ Table does not exist. + */ + public PcjMetadata getPcjMetadata(final String pcjName) throws PCJStorageException { + requireNonNull(pcjName); + + // since query by ID, there will only be one. + final Document result = pcjCollection.find(new Document(PCJ_ID, getMetadataID(pcjName))).first(); + + if(result == null) { + throw new PCJStorageException("The PCJ: " + pcjName + " does not exist."); + } + + final String sparql = result.getString(SPARQL_FIELD); + final int cardinality = result.getInteger(CARDINALITY_FIELD, 0); + final List<List<String>> varOrders= (List<List<String>>) result.get(VAR_ORDER_ID); + final Set<VariableOrder> varOrder = new HashSet<>(); + for(final List<String> vars : varOrders) { + varOrder.add(new VariableOrder(vars)); + } + //MongoDB does not need to use VarOrders + return new PcjMetadata(sparql, cardinality, varOrder); + } + + /** + * Adds binding set results to a specific PCJ. + * @param pcjName - The PCJ to add the results to. + * @param results - The binding set results. + */ + public void addResults(final String pcjName, final Collection<VisibilityBindingSet> results) { --- End diff -- preconditions > Mongo PCJ indexer support > ------------------------- > > Key: RYA-303 > URL: https://issues.apache.org/jira/browse/RYA-303 > Project: Rya > Issue Type: Improvement > Reporter: Andrew Smith > Assignee: Andrew Smith > -- This message was sent by Atlassian JIRA (v6.4.14#64029)