This is an automated email from the ASF dual-hosted git repository. pujav65 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-rya.git
commit 31edfb474da298629b703ba318bce4f0b96a0fd4 Author: eric.white <eric.wh...@parsons.com> AuthorDate: Wed May 8 09:32:55 2019 -0400 Updates for Mongo aggregation --- .../aggregation/AggregationPipelineQueryNode.java | 16 +- .../iter/RyaStatementBindingSetCursorIterator.java | 23 +-- .../metadata/matching/StatementMetadataNode.java | 169 +++++++++++---------- .../strategy/MongoPipelineStrategy.java | 67 ++++---- .../apache/rya/forwardchain/batch/MongoSpinIT.java | 95 ++++++------ 5 files changed, 194 insertions(+), 176 deletions(-) diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java index 6996fcd..fda8538 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java @@ -95,6 +95,8 @@ import com.mongodb.client.model.Projections; * false. */ public class AggregationPipelineQueryNode extends ExternalSet { + private static final long serialVersionUID = 1L; + /** * An aggregation result corresponding to a solution should map this key * to an object which itself maps variable names to variable values. @@ -230,8 +232,12 @@ public class AggregationPipelineQueryNode extends ExternalSet { } final List<Bson> fields = new LinkedList<>(); fields.add(Projections.excludeId()); - fields.add(Projections.computed(VALUES, values)); - fields.add(Projections.computed(HASHES, hashes)); + if (!values.isEmpty()) { + fields.add(Projections.computed(VALUES, values)); + } + if (!hashes.isEmpty()) { + fields.add(Projections.computed(HASHES, hashes)); + } if (!types.isEmpty()) { fields.add(Projections.computed(TYPES, types)); } @@ -778,8 +784,7 @@ public class AggregationPipelineQueryNode extends ExternalSet { */ public void requireSourceDerivationDepth(final int requiredLevel) { if (requiredLevel > 0) { - pipeline.add(Aggregates.match(new Document(LEVEL, - new Document("$gte", requiredLevel)))); + pipeline.add(Aggregates.match(Filters.gte(LEVEL, requiredLevel))); } } @@ -794,8 +799,7 @@ public class AggregationPipelineQueryNode extends ExternalSet { * timestamp than this. */ public void requireSourceTimestamp(final long t) { - pipeline.add(Aggregates.match(new Document(TIMESTAMP, - new Document("$gte", t)))); + pipeline.add(Aggregates.match(Filters.gte(TIMESTAMP, t))); } /** diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java index 462da1c..179b3d4 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java @@ -34,16 +34,19 @@ import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil; import org.bson.Document; +import org.bson.conversions.Bson; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.query.BindingSet; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; +import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.client.AggregateIterable; import com.mongodb.client.MongoCollection; -import com.mongodb.util.JSON; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.Filters; public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> { private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class); @@ -102,7 +105,7 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration< if (currentBatchQueryResultCursorIsValid()) { // convert to Rya Statement final Document queryResult = batchQueryResultsIterator.next(); - final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson()); + final DBObject dbo = BasicDBObject.parse(queryResult.toJson()); currentResultStatement = strategy.deserializeDBObject(dbo); // Find all of the queries in the executed RangeMap that this result matches @@ -136,21 +139,23 @@ public class RyaStatementBindingSetCursorIterator implements CloseableIteration< private void submitBatchQuery() { int count = 0; executedRangeMap.clear(); - final List<Document> pipeline = new ArrayList<>(); - final List<DBObject> match = new ArrayList<>(); + final List<Bson> pipeline = new ArrayList<>(); + final List<Bson> matches = new ArrayList<>(); while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){ count++; final RyaStatement query = queryIterator.next(); executedRangeMap.putAll(query, rangeMap.get(query)); final DBObject currentQuery = strategy.getQuery(query); - match.add(currentQuery); + final Document doc = Document.parse(currentQuery.toString()); + matches.add(doc); } - if (match.size() > 1) { - pipeline.add(new Document("$match", new Document("$or", match))); - } else if (match.size() == 1) { - pipeline.add(new Document("$match", match.get(0))); + final int numMatches = matches.size(); + if (numMatches > 1) { + pipeline.add(Aggregates.match(Filters.or(matches))); + } else if (numMatches == 1) { + pipeline.add(Aggregates.match(matches.get(0))); } else { batchQueryResultsIterator = Iterators.emptyIterator(); return; diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java index a5f8b3a..439d06a 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java @@ -8,9 +8,9 @@ package org.apache.rya.indexing.statement.metadata.matching; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -37,9 +37,9 @@ import java.util.Set; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.api.RdfCloudTripleStoreUtils; +import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaIRI; import org.apache.rya.api.domain.StatementMetadata; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.persist.query.RyaQueryEngine; @@ -81,7 +81,7 @@ import com.google.common.base.Preconditions; * expensive from a storage perspective. It is also expensive from a query * perspective in that three joins are required to evaluate a query that is * reduced to a single scan in non-reified form. - * + * * This class provides Rya with the ability to issue reified queries even though * statements are not reified. Each {@link RyaStatement} contains a * {@link StatementMetadata} field that allows users to store additional @@ -112,18 +112,18 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e private StatementPattern statement; private Map<RyaIRI, Var> properties; - private Collection<StatementPattern> patterns; - private List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI); - private C conf; + private final Collection<StatementPattern> patterns; + private final List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI); + private final C conf; private Set<String> bindingNames; private RyaQueryEngine<C> queryEngine; - public StatementMetadataNode(final Collection<StatementPattern> patterns, C conf) { + public StatementMetadataNode(final Collection<StatementPattern> patterns, final C conf) { this.conf = conf; this.patterns = patterns; verifySameSubjects(patterns); verifyAllPredicatesAreConstants(patterns); - boolean correctForm = verifyHasCorrectTypePattern(patterns); + final boolean correctForm = verifyHasCorrectTypePattern(patterns); if (!correctForm) { throw new IllegalArgumentException("Invalid reified StatementPatterns."); } @@ -132,7 +132,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e /** * Get {@link StatementPattern}s representing the underlying reified query. - * + * * @return Collection of StatementPatterns */ public Collection<StatementPattern> getReifiedStatementPatterns() { @@ -148,7 +148,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * @throws IllegalStateException * If all of the Subjects are not the same. */ - private static void verifySameSubjects(Collection<StatementPattern> patterns) throws IllegalStateException { + private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException { requireNonNull(patterns); final Iterator<StatementPattern> it = patterns.iterator(); @@ -204,7 +204,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e boolean valid = true; boolean contextSet = false; Var context = null; - + for (final StatementPattern pattern : patterns) { final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString()); @@ -216,15 +216,20 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e return false; } } - + if (predicate.equals(TYPE_ID_URI)) { - final RyaIRI statementID = new RyaIRI(pattern.getObjectVar().getValue().stringValue()); - if (statementID.equals(STATEMENT_ID_URI)) { - statementFound = true; + final Value objectValue = pattern.getObjectVar().getValue(); + if (objectValue != null) { + final RyaIRI statementID = new RyaIRI(objectValue.stringValue()); + if (statementID.equals(STATEMENT_ID_URI)) { + statementFound = true; + } else { + // contains more than one Statement containing TYPE_ID_URI + // as Predicate + // and STATEMENT_ID_URI as Object + valid = false; + } } else { - // contains more than one Statement containing TYPE_ID_URI - // as Predicate - // and STATEMENT_ID_URI as Object valid = false; } } @@ -274,20 +279,20 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * the user specified metadata properties and is used for comparison with * the metadata properties extracted from RyaStatements passed back by the * {@link RyaQueryEngine}. - * + * * @param patterns * - collection of patterns representing a reified query */ - private void setStatementPatternAndProperties(Collection<StatementPattern> patterns) { + private void setStatementPatternAndProperties(final Collection<StatementPattern> patterns) { - StatementPattern sp = new StatementPattern(); - Map<RyaIRI, Var> properties = new HashMap<>(); + final StatementPattern sp = new StatementPattern(); + final Map<RyaIRI, Var> properties = new HashMap<>(); for (final StatementPattern pattern : patterns) { final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString()); if (!uriList.contains(predicate)) { - Var objVar = pattern.getObjectVar(); + final Var objVar = pattern.getObjectVar(); properties.put(predicate, objVar); continue; } @@ -315,17 +320,17 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * {@link RyaQueryEngine}. */ @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Collection<BindingSet> bindingset) + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset) throws QueryEvaluationException { if (bindingset.size() == 0) { return new EmptyIteration<>(); } queryEngine = RyaQueryEngineFactory.getQueryEngine(conf); - Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>(); - Iterator<BindingSet> iter = bindingset.iterator(); + final Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>(); + final Iterator<BindingSet> iter = bindingset.iterator(); while (iter.hasNext()) { - BindingSet bs = iter.next(); + final BindingSet bs = iter.next(); statements.add(new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>( getRyaStatementFromBindings(bs), bs)); } @@ -333,7 +338,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iteration; try { iteration = queryEngine.queryWithBindingSet(statements, conf); - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { throw new RuntimeException(e); } @@ -344,17 +349,17 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * Uses StatementPattern constraints to form a RyaStatement, and fills in * any null values with {@link BindingSet} values corresponding to the * variable for that position. - * + * * @param bs * @return RyaStatement whose values are determined by StatementPattern and * BindingSet constraints */ - private RyaStatement getRyaStatementFromBindings(BindingSet bs) { + private RyaStatement getRyaStatementFromBindings(final BindingSet bs) { - Value subjValue = getVarValue(statement.getSubjectVar(), bs); - Value predValue = getVarValue(statement.getPredicateVar(), bs); - Value objValue = getVarValue(statement.getObjectVar(), bs); - Value contextValue = getVarValue(statement.getContextVar(), bs); + final Value subjValue = getVarValue(statement.getSubjectVar(), bs); + final Value predValue = getVarValue(statement.getPredicateVar(), bs); + final Value objValue = getVarValue(statement.getObjectVar(), bs); + final Value contextValue = getVarValue(statement.getContextVar(), bs); RyaIRI subj = null; RyaIRI pred = null; RyaType obj = null; @@ -373,7 +378,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e if (objValue != null) { obj = RdfToRyaConversions.convertValue(objValue); } - + if(contextValue != null) { context = RdfToRyaConversions.convertIRI((IRI) contextValue); } @@ -386,12 +391,12 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * otherwise returns the BindingSet Value corresponding to * {@link Var#getName()}. If no such Binding exits, this method returns * null. - * + * * @param var * @param bindings * @return Value */ - private Value getVarValue(Var var, BindingSet bindings) { + private Value getVarValue(final Var var, final BindingSet bindings) { if (var == null) { return null; } else if (var.hasValue()) { @@ -402,20 +407,20 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e } @Override - public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings) + public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings) throws QueryEvaluationException { return evaluate(Collections.singleton(bindings)); } @Override - public boolean equals(Object other) { + public boolean equals(final Object other) { if (this == other) { return true; } if (other instanceof StatementMetadataNode) { - StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other; + final StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other; if (meta.patterns.size() != this.patterns.size()) { return false; } @@ -424,8 +429,8 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e return false; } - Set<StatementPattern> thisSet = new HashSet<>(patterns); - Set<StatementPattern> thatSet = new HashSet<>(meta.patterns); + final Set<StatementPattern> thisSet = new HashSet<>(patterns); + final Set<StatementPattern> thatSet = new HashSet<>(meta.patterns); return thisSet.equals(thatSet); } else { return false; @@ -435,7 +440,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e @Override public int hashCode() { int hashcode = 0; - for (StatementPattern sp : patterns) { + for (final StatementPattern sp : patterns) { hashcode += sp.hashCode(); } return hashcode; @@ -465,9 +470,9 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e } private Set<String> getVariableNames() { - Set<String> vars = new HashSet<>(); - for (StatementPattern pattern : patterns) { - for (Var var : pattern.getVarList()) { + final Set<String> vars = new HashSet<>(); + for (final StatementPattern pattern : patterns) { + for (final Var var : pattern.getVarList()) { if (var.getValue() == null) { vars.add(var.getName()); } @@ -495,16 +500,16 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e */ class PropertyFilterAndBindingSetJoinIteration implements CloseableIteration<BindingSet, QueryEvaluationException> { - private CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements; - private Map<RyaIRI, Var> properties; - private StatementPattern sp; + private final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements; + private final Map<RyaIRI, Var> properties; + private final StatementPattern sp; private BindingSet next; private boolean hasNextCalled = false; private boolean hasNext = false; public PropertyFilterAndBindingSetJoinIteration( - CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements, - Map<RyaIRI, Var> properties, StatementPattern sp) { + final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements, + final Map<RyaIRI, Var> properties, final StatementPattern sp) { this.statements = statements; this.properties = properties; this.sp = sp; @@ -562,7 +567,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e public void close() throws QueryEvaluationException { try { statements.close(); - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { throw new QueryEvaluationException(e); } } @@ -570,14 +575,14 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e /** * Fast-forwards Iteration to next valid Entry and builds the * BindingSet. - * + * * @return BindingSet * @throws RyaDAOException */ private Optional<BindingSet> getNext() throws RyaDAOException { Optional<BindingSet> optionalBs = Optional.empty(); while (statements.hasNext() && !optionalBs.isPresent()) { - Map.Entry<RyaStatement, BindingSet> next = statements.next(); + final Map.Entry<RyaStatement, BindingSet> next = statements.next(); optionalBs = buildBindingSet(next.getKey(), next.getValue()); } return optionalBs; @@ -590,7 +595,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * {@link StatementMetadata} properties for the specified RyaStatement * and if the BindingSet built form the StatementMetadata properties can * be joined with specified BindingSet. - * + * * @param statement * - RyaStatement * @param bindingSet @@ -598,15 +603,15 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * @return - Optional containing BindingSet is a valid BindingSet could * be built */ - private Optional<BindingSet> buildBindingSet(RyaStatement statement, BindingSet bindingSet) { + private Optional<BindingSet> buildBindingSet(final RyaStatement statement, final BindingSet bindingSet) { - QueryBindingSet bs = new QueryBindingSet(); - Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement); + final QueryBindingSet bs = new QueryBindingSet(); + final Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement); if (!optPropBs.isPresent()) { return Optional.empty(); } - BindingSet propBs = optPropBs.get(); - BindingSet spBs = buildBindingSetFromStatementPattern(statement); + final BindingSet propBs = optPropBs.get(); + final BindingSet spBs = buildBindingSetFromStatementPattern(statement); if (!canJoinBindingSets(spBs, propBs)) { return Optional.empty(); } @@ -625,24 +630,24 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * StatementMetadata properties for specified RyaStatement. If * consistent, this method builds the associated BindingSet otherwise an * empty Optional is returned. - * + * * @param statement * @return */ - private Optional<BindingSet> buildPropertyBindingSet(RyaStatement statement) { - StatementMetadata metadata = statement.getMetadata(); - Map<RyaIRI, RyaType> statementProps = metadata.getMetadata(); + private Optional<BindingSet> buildPropertyBindingSet(final RyaStatement statement) { + final StatementMetadata metadata = statement.getMetadata(); + final Map<RyaIRI, RyaType> statementProps = metadata.getMetadata(); if (statementProps.size() < properties.size()) { return Optional.empty(); } - QueryBindingSet bs = new QueryBindingSet(); - for (Map.Entry<RyaIRI, Var> entry : properties.entrySet()) { - RyaIRI key = entry.getKey(); - Var var = entry.getValue(); + final QueryBindingSet bs = new QueryBindingSet(); + for (final Map.Entry<RyaIRI, Var> entry : properties.entrySet()) { + final RyaIRI key = entry.getKey(); + final Var var = entry.getValue(); if (!statementProps.containsKey(key)) { return Optional.empty(); } else { - Value val = RyaToRdfConversions.convertValue(statementProps.get(key)); + final Value val = RyaToRdfConversions.convertValue(statementProps.get(key)); if (var.getValue() == null) { bs.addBinding(var.getName(), val); } else if (!var.getValue().equals(val)) { @@ -661,16 +666,16 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e * If it doesn't have a Value, a Binding is created from the * RyaStatement using the {@link RyaType} for the corresponding position * (Subject, Predicate, Object). - * + * * @param statement * @return BindingSet */ - private BindingSet buildBindingSetFromStatementPattern(RyaStatement statement) { - Var subjVar = sp.getSubjectVar(); - Var predVar = sp.getPredicateVar(); - Var objVar = sp.getObjectVar(); - Var contextVar = sp.getContextVar(); - QueryBindingSet bs = new QueryBindingSet(); + private BindingSet buildBindingSetFromStatementPattern(final RyaStatement statement) { + final Var subjVar = sp.getSubjectVar(); + final Var predVar = sp.getPredicateVar(); + final Var objVar = sp.getObjectVar(); + final Var contextVar = sp.getContextVar(); + final QueryBindingSet bs = new QueryBindingSet(); if (subjVar.getValue() == null) { bs.addBinding(subjVar.getName(), RyaToRdfConversions.convertValue(statement.getSubject())); @@ -683,7 +688,7 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e if (objVar.getValue() == null) { bs.addBinding(objVar.getName(), RyaToRdfConversions.convertValue(statement.getObject())); } - + if (contextVar != null && contextVar.getValue() == null) { bs.addBinding(contextVar.getName(), RyaToRdfConversions.convertValue(statement.getContext())); } @@ -691,10 +696,10 @@ public class StatementMetadataNode<C extends RdfCloudTripleStoreConfiguration> e return bs; } - private boolean canJoinBindingSets(BindingSet bs1, BindingSet bs2) { - for (Binding b : bs1) { - String name = b.getName(); - Value val = b.getValue(); + private boolean canJoinBindingSets(final BindingSet bs1, final BindingSet bs2) { + for (final Binding b : bs1) { + final String name = b.getName(); + final Value val = b.getValue(); if (bs2.hasBinding(name) && (!bs2.getValue(name).equals(val))) { return false; } diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java index 95ad841..4e80428 100644 --- a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java +++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java @@ -50,12 +50,11 @@ import org.eclipse.rdf4j.query.algebra.QueryRoot; import org.eclipse.rdf4j.query.algebra.TupleExpr; import com.google.common.base.Preconditions; -import com.mongodb.Block; +import com.mongodb.BasicDBObject; import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; -import com.mongodb.util.JSON; /** * A rule execution strategy for MongoDB Rya that converts a single rule into an @@ -83,7 +82,7 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { * passed a stateful configuration, uses the existing mongo client, * otherwise creates one. */ - public MongoPipelineStrategy(MongoDBRdfConfiguration mongoConf) throws ForwardChainException { + public MongoPipelineStrategy(final MongoDBRdfConfiguration mongoConf) throws ForwardChainException { Preconditions.checkNotNull(mongoConf); final String mongoDBName = mongoConf.getMongoDBName(); final String collectionName = mongoConf.getTriplesCollectionName(); @@ -100,7 +99,7 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { this.dao = RyaSailFactory.getMongoDAO(mongoConf); statefulConf = this.dao.getConf(); } - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { throw new ForwardChainException("Can't connect to Rya.", e); } final MongoClient mongoClient = statefulConf.getMongoClient(); @@ -130,11 +129,11 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { * @throws ForwardChainException if execution fails. */ @Override - public long executeConstructRule(AbstractConstructRule rule, - StatementMetadata metadata) throws ForwardChainException { + public long executeConstructRule(final AbstractConstructRule rule, + final StatementMetadata metadata) throws ForwardChainException { Preconditions.checkNotNull(rule); logger.info("Applying inference rule " + rule + "..."); - long timestamp = System.currentTimeMillis(); + final long timestamp = System.currentTimeMillis(); // Get a pipeline that turns individual matches into triples List<Bson> pipeline = null; try { @@ -149,20 +148,20 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { } pipeline = toPipeline(rule, requireSourceLevel, timestamp); } - catch (ForwardChainException e) { + catch (final ForwardChainException e) { logger.error(e); } if (pipeline == null) { if (backup == null) { logger.error("Couldn't convert " + rule + " to pipeline:"); - for (String line : rule.getQuery().toString().split("\n")) { + for (final String line : rule.getQuery().toString().split("\n")) { logger.error("\t" + line); } throw new UnsupportedOperationException("Couldn't convert query to pipeline."); } else { logger.debug("Couldn't convert " + rule + " to pipeline:"); - for (String line : rule.getQuery().toString().split("\n")) { + for (final String line : rule.getQuery().toString().split("\n")) { logger.debug("\t" + line); } logger.debug("Using fallback strategy."); @@ -171,32 +170,30 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { } } // Execute the pipeline - for (Bson step : pipeline) { + for (final Bson step : pipeline) { logger.debug("\t" + step.toString()); } - LongAdder count = new LongAdder(); + final LongAdder count = new LongAdder(); baseCollection.aggregate(pipeline) .allowDiskUse(true) .batchSize(PIPELINE_BATCH_SIZE) - .forEach(new Block<Document>() { - @Override - public void apply(Document doc) { - final DBObject dbo = (DBObject) JSON.parse(doc.toJson()); - RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo); - if (!statementExists(rstmt)) { - count.increment(); - doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString()); - try { - batchWriter.addObjectToQueue(doc); - } catch (MongoDbBatchWriterException e) { - logger.error("Couldn't insert " + rstmt, e); - } + .forEach((final Document doc) -> { + final DBObject dbo = BasicDBObject.parse(doc.toJson()); + final RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo); + if (!statementExists(rstmt)) { + count.increment(); + doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString()); + try { + batchWriter.addObjectToQueue(doc); + } catch (final MongoDbBatchWriterException e) { + logger.error("Couldn't insert " + rstmt, e); } } }); + try { batchWriter.flush(); - } catch (MongoDbBatchWriterException e) { + } catch (final MongoDbBatchWriterException e) { throw new ForwardChainException("Error writing to Mongo", e); } logger.info("Added " + count + " new statements."); @@ -211,10 +208,10 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { return count.longValue(); } - private boolean statementExists(RyaStatement rstmt) { + private boolean statementExists(final RyaStatement rstmt) { try { return engine.query(new RyaQuery(rstmt)).iterator().hasNext(); - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { logger.error("Error querying for " + rstmt, e); return false; } @@ -231,7 +228,7 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { backup.shutDown(); try { batchWriter.shutdown(); - } catch (MongoDbBatchWriterException e) { + } catch (final MongoDbBatchWriterException e) { throw new ForwardChainException("Error shutting down batch writer", e); } } @@ -247,24 +244,24 @@ public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy { * @return An aggregation pipeline. * @throws ForwardChainException if pipeline construction fails. */ - private List<Bson> toPipeline(AbstractConstructRule rule, int sourceLevel, - long timestamp) throws ForwardChainException { + private List<Bson> toPipeline(final AbstractConstructRule rule, final int sourceLevel, + final long timestamp) throws ForwardChainException { TupleExpr tupleExpr = rule.getQuery().getTupleExpr(); if (!(tupleExpr instanceof QueryRoot)) { tupleExpr = new QueryRoot(tupleExpr); } try { tupleExpr.visit(pipelineVisitor); - } catch (Exception e) { + } catch (final Exception e) { throw new ForwardChainException("Error converting construct rule to an aggregation pipeline", e); } if (tupleExpr instanceof QueryRoot) { - QueryRoot root = (QueryRoot) tupleExpr; + final QueryRoot root = (QueryRoot) tupleExpr; if (root.getArg() instanceof AggregationPipelineQueryNode) { - AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg(); + final AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg(); pipelineNode.distinct(); // require distinct triples pipelineNode.requireSourceDerivationDepth(sourceLevel); - long latestTime = executionTimes.getOrDefault(rule, 0L); + final long latestTime = executionTimes.getOrDefault(rule, 0L); if (latestTime > 0) { pipelineNode.requireSourceTimestamp(latestTime); } diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java index 84080e5..efa13dc 100644 --- a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java +++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java @@ -18,10 +18,13 @@ */ package org.apache.rya.forwardchain.batch; +import static org.junit.Assert.assertEquals; + import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -37,10 +40,11 @@ import org.apache.rya.sail.config.RyaSailFactory; import org.apache.rya.test.mongo.EmbeddedMongoFactory; import org.eclipse.rdf4j.model.ValueFactory; import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.AbstractTupleQueryResultHandler; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.QueryLanguage; import org.eclipse.rdf4j.query.TupleQuery; -import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.query.TupleQueryResultHandlerException; import org.eclipse.rdf4j.query.impl.ListBindingSet; import org.eclipse.rdf4j.repository.RepositoryException; import org.eclipse.rdf4j.repository.sail.SailRepository; @@ -48,11 +52,10 @@ import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; import org.eclipse.rdf4j.rio.RDFFormat; import org.eclipse.rdf4j.rio.Rio; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; import com.mongodb.MongoClient; import com.mongodb.ServerAddress; @@ -84,48 +87,52 @@ public class MongoSpinIT { } @Test + public void testNoStrategy() throws Exception { + loadDataFiles(); + final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql")); + final Set<BindingSet> expected = new HashSet<>(); + assertEquals(expected, solutions); + } + + @Test public void testSailStrategy() throws Exception { - insertDataFile(Resources.getResource("data.ttl"), "http://example.org#"); - insertDataFile(Resources.getResource("university.ttl"), "http://example.org#"); - insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#"); - Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql")); - Set<BindingSet> expected = new HashSet<>(); - Assert.assertEquals(expected, solutions); + loadDataFiles(); conf.setUseAggregationPipeline(false); - ForwardChainSpinTool tool = new ForwardChainSpinTool(); + final ForwardChainSpinTool tool = new ForwardChainSpinTool(); ToolRunner.run(conf, tool, new String[] {}); - solutions = executeQuery(Resources.getResource("query.sparql")); - expected.add(new ListBindingSet(Arrays.asList("X", "Y"), + final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql")); + final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"), VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1"))); - Assert.assertEquals(expected, solutions); + assertEquals(expected, solutions); // TODO: Check if spin rules with empty WHERE clauses, such as // rl:scm-cls in the owlrl.ttl test file, should be included. - Assert.assertEquals(48, tool.getNumInferences()); + assertEquals(48, tool.getNumInferences()); } @Test public void testPipelineStrategy() throws Exception { - insertDataFile(Resources.getResource("data.ttl"), "http://example.org#"); - insertDataFile(Resources.getResource("university.ttl"), "http://example.org#"); - insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#"); - Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql")); - Set<BindingSet> expected = new HashSet<>(); - Assert.assertEquals(expected, solutions); + loadDataFiles(); conf.setUseAggregationPipeline(true); - ForwardChainSpinTool tool = new ForwardChainSpinTool(); + final ForwardChainSpinTool tool = new ForwardChainSpinTool(); ToolRunner.run(conf, tool, new String[] {}); - solutions = executeQuery(Resources.getResource("query.sparql")); - expected.add(new ListBindingSet(Arrays.asList("X", "Y"), + final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql")); + final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"), VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1"))); - Assert.assertEquals(expected, solutions); + assertEquals(expected, solutions); // TODO: Check if spin rules with empty WHERE clauses, such as // rl:scm-cls in the owlrl.ttl test file, should be included. - Assert.assertEquals(41, tool.getNumInferences()); + assertEquals(41, tool.getNumInferences()); + } + + private void loadDataFiles() throws Exception { + insertDataFile(Resources.getResource("data.ttl"), "http://example.org#"); + insertDataFile(Resources.getResource("university.ttl"), "http://example.org#"); + insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#"); } - private void insertDataFile(URL dataFile, String defaultNamespace) throws Exception { - RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get(); - SailRepositoryConnection conn = repository.getConnection(); + private void insertDataFile(final URL dataFile, final String defaultNamespace) throws Exception { + final RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get(); + final SailRepositoryConnection conn = repository.getConnection(); try { conn.add(dataFile, defaultNamespace, format); } finally { @@ -133,29 +140,29 @@ public class MongoSpinIT { } } - private Set<BindingSet> executeQuery(URL queryFile) throws Exception { - SailRepositoryConnection conn = repository.getConnection(); - try { - try( - final InputStream queryIS = queryFile.openStream(); - final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, Charsets.UTF_8)); - ) { - final String query = br.lines().collect(Collectors.joining("\n")); - final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query); - final TupleQueryResult result = tupleQuery.evaluate(); - final Set<BindingSet> solutions = new HashSet<>(); - while (result.hasNext()) { - solutions.add(result.next()); + private Set<BindingSet> executeQuery(final URL queryFile) throws Exception { + final SailRepositoryConnection conn = repository.getConnection(); + try( + final InputStream queryIS = queryFile.openStream(); + final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, StandardCharsets.UTF_8)); + ) { + final String query = br.lines().collect(Collectors.joining("\n")); + final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query); + final Set<BindingSet> solutions = new HashSet<>(); + tupleQuery.evaluate(new AbstractTupleQueryResultHandler() { + @Override + public void handleSolution(final BindingSet bindingSet) throws TupleQueryResultHandlerException { + solutions.add(bindingSet); } - return solutions; - } + }); + return solutions; } finally { closeQuietly(conn); } } private static MongoDBRdfConfiguration getConf() throws Exception { - MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true); + final MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true); final MongoClient c = EmbeddedMongoFactory.newFactory().newMongoClient(); final ServerAddress address = c.getAddress(); builder.setMongoHost(address.getHost());