[GitHub] incubator-rya pull request #267: RYA-440 Added commands to Rya Shell used to...
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164219745 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); --- End diff -- Null check on ryaInstance ---
[GitHub] incubator-rya pull request #267: RYA-440 Added commands to Rya Shell used to...
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164218885 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java --- @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor. + * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update + * any implementation of that repository. + */ +@DefaultAnnotation(NonNull.class) +public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration { + +private final InstanceExists instanceExists; + +/** + * Constructs an instance of {@link SetRyaStreamsConfigurationBase}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + */ +public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) { +this.instanceExists = requireNonNull(instanceExists); +} + +/** + * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya. + * + * @param ryaInstance - The Rya instance the repository must be connected to. (not null) + * @return A {@link RyaDetailsRepository} connected to the specified Rya instance. + */ +protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance); + +@Override +public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{ --- End diff -- Null check on ryaInstance ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159936556 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjIndexer.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.persist.index.RyaSecondaryIndexer; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Updates the PCJs that are in a {@link MongoPcjDocuments}. + */ +public interface PcjIndexer extends RyaSecondaryIndexer { +/** + * Creates the {@link MongoPcjDocuments} that will be used by the indexer. + * + * @param conf - Indicates how the {@link MongoPcjDocuments} is initialized. (not null) + * @return The {@link MongoPcjDocuments} that will be used by this indexer. + */ +public @Nullable MongoPcjDocuments getPcjStorage(Configuration conf); --- End diff -- Seems like the primary purpose of this class is to parse the config object to create the MongoPcjDocuments object. I don't think that this functionality should be lumped in with the role of the Indexer. In the case of PCJs, the primary role of the indexer is to route statements to the updater service, which then routes Bindingset results to Kafka, Mongo, Accumulo, etc for use with PCJs. You are primarily creating and using the indexer for the purpose of parsing the config object to create the MongoPcjDocuments object. You are not using it for its primary purpose, which is confusing. Also, you are adding a Mongo specific method to a very general indexing interface and not indicating that the extension is Mongo specific. This is also very misleading. I think all of this would be much cleaner if you separated the parsing of config/creation of MongoPcjDocuments from the indexing API. By combining them you are abusing established patterns of usage. ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159935133 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjQueryNode.java --- @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.api.utils.IteratorWrapper; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil; +import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import info.aduna.iteration.CloseableIteration; + +/** + * Indexing Node for PCJs expressions to be inserted into execution plan to + * delegate entity portion of query to {@link MongoPrecomputedJoinIndexer}. + */ +@DefaultAnnotation(NonNull.class) +public class PcjQueryNode extends ExternalTupleSet implements ExternalBatchingIterator { +private static final Logger log = Logger.getLogger(PcjQueryNode.class); +private final String tablename; +private final PcjIndexer indexer; +private final MongoPcjDocuments pcjDocs; + +/** + * Creates a new {@link PcjQueryNode}. + * + * @param sparql - name of sparql query whose results will be stored in PCJ table + * @param conf - Rya Configuration + * @param tablename - name of an existing PCJ table + * + * @throws MalformedQueryException - The SPARQL query needs to contain a projection. + */ +public PcjQueryNode(final String sparql, final String tablename, final MongoPcjDocuments pcjDocs) throws MalformedQueryException { +this.pcjDocs = checkNotNull(pcjDocs); +indexer = new MongoPrecomputedJoinIndexer(); +this.tablename = tablename; +final SPARQLParser sp = new SPARQLParser(); +final ParsedTupleQuery pq = (ParsedTupleQuery) sp.parseQuery(sparql, null); +final TupleExpr te = pq.getTupleExpr(); +Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), "TupleExpr is an invalid PCJ."); + +final Optional projection = new ParsedQueryUtil().findProjection(pq); +if (!projection.isPresent()) { +throw new MalformedQueryException("SPARQL query '" + sparql + "' does not contain a Projection."); +} +setProjectionExpr(projection.get()); +} + +/** + * Creates a new {@link PcjQueryNode}. + * + * @param accCon - connection to a valid Accumulo instance + * @param tablename - name of an existing PCJ table + */ +public P
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r160796664 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.mongodb.MongoClient; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.util.JSON; + +/** + * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows: + * + * + * + * - PCJ Metadata Doc - + * { + * _id: [table_name]_METADATA, + * sparql: [sparql query to match results], + * cardinality: [number of results] + * } + * + * - PCJ Results Doc - + * { + * pcjName: [table_name], + * auths: [auths] + * [binding_var1]: { + * uri: [type_uri], + * value: value + * } + * . + * . + * . + * [binding_varn]: { + * uri: [type_uri], + * value: value + * } + * } + * + * + */ +public class MongoPcjDocuments { +public static final String PCJ_COLLECTION_NAME = "pcjs"; + +// metadata fields +public static final String CARDINALITY_FIELD = "cardinality"; +public static final String SPARQL_FIELD = "sparql"; +public static final String PCJ_ID = "_id"; +public static final String VAR_ORDER_ID = "varOrders"; + +// pcj results fields +private static final String BINDING_VALUE = "value"; +private static final String BINDING_TYPE = "uri"; +private static final String AUTHS_FIELD = "auths"; +private static final String PCJ_NAME = "pcjName"; + +private final MongoCollection pcjCollection; +private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); + +/** + * Creates a new {@link MongoPcjDocuments}. + * @param client - The {@link MongoClient} to use to connect to mongo. + * @param ryaInstanceName - The rya instance to
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159934928 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjQueryNode.java --- @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.utils.IteratorWrapper; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil; +import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import info.aduna.iteration.CloseableIteration; + +/** + * Indexing Node for PCJs expressions to be inserted into execution plan to + * delegate entity portion of query to {@link MongoPrecomputedJoinIndexer}. + */ +@DefaultAnnotation(NonNull.class) +public class PcjQueryNode extends ExternalTupleSet implements ExternalBatchingIterator { --- End diff -- Did this ever get renamed to something more Mongo specific? ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160768697 --- Diff: extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.forwardchain.rule; + +import java.util.HashSet; +import java.util.Set; + +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +/** + * Query visitor that identifies all triple patterns represented as + * {@link StatementPattern}s in a query, which therefore represent triples + * that could potentially contribute to a solution. Considers only the statement + * patterns themselves, i.e. the leaves of the query tree, and does not consider + * other constraints that may restrict the set of triples that may be relevant. + * This means relying on this analysis to determine whether a fact can be part + * of a solution can yield false positives, but not false negatives. + */ +class AntecedentVisitor extends QueryModelVisitorBase { --- End diff -- Got it. When you were talking about rules I didn't realize you were doing so within the context of your Rule/Ruleset API. I thought you were using the term rule less formally and talking rules within the context of a Construct query (i.e. the rule being triggered was whether to generate the statement or not). It makes sense for a Rule to trigger another if the consequents of rule 1 intersect the antecedents of rule2. ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160702104 --- Diff: extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.forwardchain.rule; + +import java.util.HashSet; +import java.util.Set; + +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +/** + * Query visitor that identifies all triple patterns represented as + * {@link StatementPattern}s in a query, which therefore represent triples + * that could potentially contribute to a solution. Considers only the statement + * patterns themselves, i.e. the leaves of the query tree, and does not consider + * other constraints that may restrict the set of triples that may be relevant. + * This means relying on this analysis to determine whether a fact can be part + * of a solution can yield false positives, but not false negatives. + */ +class AntecedentVisitor extends QueryModelVisitorBase { --- End diff -- I wasn't aware that the EXISTS graph pattern could be used as a filter operator. I see your point. Just for clarity, in your example you meant that the rule would be triggered by ?x a :T and ?x :p :A, right? That is, whatever values of ?x you use in your construct statements must satisfy the condition that ?x occurs in a statement of the form ?x a :T and ?x :p :A, right? ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160698418 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java --- @@ -0,0 +1,862 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160699176 --- Diff: dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.List; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Not; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; + +import com.google.common.collect.Sets; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; + +public class SparqlToPipelineTransformVisitorTest { + +private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + +private static final String LUBM = "urn:lubm"; +private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent"); +private static final URI PROFESSOR = VF.createURI(LUBM, "Professor"); +private static final URI COURSE = VF.createURI(LUBM, "Course"); +private static final URI TAKES = VF.createURI(LUBM, "takesCourse"); +private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse"); + +private static Var constant(URI value) { +return new Var(value.stringValue(), value); +} + +MongoCollection collection; + +@Before +@SuppressWarnings("unchecked") +public void setUp() { +collection = Mockito.mock(MongoCollection.class); +Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); +} + +@Test +public void testStatementPattern() throws Exception { +QueryRoot query = new QueryRoot(new StatementPattern( +new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD))); +SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); +query.visit(visitor); +Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); +AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); +Assert.assertEquals(Sets.newHashSet("x"), pipelineNode.getAssuredBindingNames()); +} + +@Test +public void testJoin() throws Exception { +QueryRoot query = new QueryRoot(new Join( +new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)), +new StatementPattern(new Var("x"), constant(TAKES), new Var("course"; +SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); +query.visit(visitor); +Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); +AggregationPipelineQueryNode pipe
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160697260 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; + +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.bson.Document; +import org.openrdf.query.algebra.Distinct; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Visitor that transforms a SPARQL query tree by replacing as much of the tree + * as possible with one or more {@code AggregationPipelineQueryNode}s. + * + * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation + * pipeline which is equivalent to the replaced portion of the original query. + * Evaluating this node executes the pipeline and converts the results into + * query solutions. If only part of the query was transformed, the remaining + * query logic (higher up in the query tree) can be applied to those + * intermediate solutions as normal. + * + * In general, processes the tree in bottom-up order: A leaf node + * ({@link StatementPattern}) is replaced with a pipeline that matches the + * corresponding statements. Then, if the parent node's semantics are supported + * by the visitor, stages are appended to the pipeline and the subtree at the + * parent node is replaced with the extended pipeline. This continues up the + * tree until reaching a node that cannot be transformed, in which case that + * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf + * node) instead of the previous subtree, or until the entire tree has been + * subsumed into a single pipeline node. + * + * Nodes which are transformed into pipeline stages: + * + * A {@code StatementPattern} node forms the beginning of each pipeline. + * Single-argument operations {@link Projection}, {@link MultiProjection}, + * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed + * into pipeline stages whenever the child {@link TupleExpr} represents a + * pipeline. + * A {@link Filter} operation will be appended to the pipeline when its + * child {@code TupleExpr} represents a pipeline and the filter condition is a + * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}. + * A {@link Join} operation will be appended to the pipeline when one child + * is a {@code StatementPattern} and the other is an + * {@code AggregationPipelineQueryNode}. + * + */ +public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase { --- End diff -- As the trees are left leaning by default, I don' think that you have to worry about the case where you join two joins of statement patterns. ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160421704 --- Diff: extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.forwardchain.rule; + +import java.util.HashSet; +import java.util.Set; + +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +/** + * Query visitor that identifies all triple patterns represented as + * {@link StatementPattern}s in a query, which therefore represent triples + * that could potentially contribute to a solution. Considers only the statement + * patterns themselves, i.e. the leaves of the query tree, and does not consider + * other constraints that may restrict the set of triples that may be relevant. + * This means relying on this analysis to determine whether a fact can be part + * of a solution can yield false positives, but not false negatives. + */ +class AntecedentVisitor extends QueryModelVisitorBase { --- End diff -- Doesn't matter too much, but StatementPatternCollector.process(QueryModelNode node) will give you a list containing the StatementPatterns in the node. ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160318466 --- Diff: dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; +import java.util.List; + +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.openrdf.model.URI; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Not; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.QueryRoot; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.Var; + +import com.google.common.collect.Sets; +import com.mongodb.MongoNamespace; +import com.mongodb.client.MongoCollection; + +public class SparqlToPipelineTransformVisitorTest { + +private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + +private static final String LUBM = "urn:lubm"; +private static final URI UNDERGRAD = VF.createURI(LUBM, "UndergraduateStudent"); +private static final URI PROFESSOR = VF.createURI(LUBM, "Professor"); +private static final URI COURSE = VF.createURI(LUBM, "Course"); +private static final URI TAKES = VF.createURI(LUBM, "takesCourse"); +private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse"); + +private static Var constant(URI value) { +return new Var(value.stringValue(), value); +} + +MongoCollection collection; + +@Before +@SuppressWarnings("unchecked") +public void setUp() { +collection = Mockito.mock(MongoCollection.class); +Mockito.when(collection.getNamespace()).thenReturn(new MongoNamespace("db", "collection")); +} + +@Test +public void testStatementPattern() throws Exception { +QueryRoot query = new QueryRoot(new StatementPattern( +new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD))); +SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); +query.visit(visitor); +Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); +AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) query.getArg(); +Assert.assertEquals(Sets.newHashSet("x"), pipelineNode.getAssuredBindingNames()); +} + +@Test +public void testJoin() throws Exception { +QueryRoot query = new QueryRoot(new Join( +new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)), +new StatementPattern(new Var("x"), constant(TAKES), new Var("course"; +SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection); +query.visit(visitor); +Assert.assertTrue(query.getArg() instanceof AggregationPipelineQueryNode); +AggregationPipelineQueryNode pipe
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160315599 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java --- @@ -0,0 +1,862 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160317536 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import java.util.Arrays; + +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.bson.Document; +import org.openrdf.query.algebra.Distinct; +import org.openrdf.query.algebra.Extension; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.MultiProjection; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.Reduced; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; + +/** + * Visitor that transforms a SPARQL query tree by replacing as much of the tree + * as possible with one or more {@code AggregationPipelineQueryNode}s. + * + * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation + * pipeline which is equivalent to the replaced portion of the original query. + * Evaluating this node executes the pipeline and converts the results into + * query solutions. If only part of the query was transformed, the remaining + * query logic (higher up in the query tree) can be applied to those + * intermediate solutions as normal. + * + * In general, processes the tree in bottom-up order: A leaf node + * ({@link StatementPattern}) is replaced with a pipeline that matches the + * corresponding statements. Then, if the parent node's semantics are supported + * by the visitor, stages are appended to the pipeline and the subtree at the + * parent node is replaced with the extended pipeline. This continues up the + * tree until reaching a node that cannot be transformed, in which case that + * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf + * node) instead of the previous subtree, or until the entire tree has been + * subsumed into a single pipeline node. + * + * Nodes which are transformed into pipeline stages: + * + * A {@code StatementPattern} node forms the beginning of each pipeline. + * Single-argument operations {@link Projection}, {@link MultiProjection}, + * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed + * into pipeline stages whenever the child {@link TupleExpr} represents a + * pipeline. + * A {@link Filter} operation will be appended to the pipeline when its + * child {@code TupleExpr} represents a pipeline and the filter condition is a + * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}. + * A {@link Join} operation will be appended to the pipeline when one child + * is a {@code StatementPattern} and the other is an + * {@code AggregationPipelineQueryNode}. + * + */ +public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase { --- End diff -- Does the AggregationPipelineQueryNode have to be evaluated first in the query plan? Do you do anything to group nodes that can be evaluated in an AggregationPipelineQueryNode together? ---
[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/255#discussion_r160316024 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java --- @@ -0,0 +1,862 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.aggregation; + +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.function.Function; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.domain.StatementMetadata; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.mongodb.MongoDbRdfConstants; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.apache.rya.mongodb.document.operators.query.ConditionalOperators; +import org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.Literal; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Compare; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.ProjectionElem; +import org.openrdf.query.algebra.ProjectionElemList; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.impl.ExternalSet; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Aggregates; +import com.mongodb.client.model.BsonField; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; + +import info.aduna.iteration.CloseableIteration; + +/** + * Represents a portion of a query tree as MongoDB aggregation pipeline. Should + * be built bottom-up: start with a statement pattern implemented as a $match + * step, then add steps to the pipeline to handle higher levels of the query + * tree. Methods are provided to add certain supported query operations
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160262975 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a mechanism for storing {@link VisibilityBindingSet}s that have been emitted from either side of + * a Join and a way to fetch all {@link VisibilityBindingSet}s that join with it from the other side. + */ +@DefaultAnnotation(NonNull.class) +public interface JoinStateStore { + +/** + * Store a {@link VisibilityBindingSet} based on the side it was emitted from. + * + * @param result - The result whose value will be stored. (not null) + */ +public void store(BinaryResult result); --- End diff -- It's just that there are at least two implementations of Serializers for VisiblityBindingSet int eh code base. Granted that they might be for specific serialization/encoder interfaces. Were you at least able to reuse VisibilityBindingSetSerDe in the implementation of this interface? ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160262440 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.List; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.function.join.IterativeJoin; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link JoinProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier { + +private final String stateStoreName; +private final IterativeJoin join; +private final List joinVars; +private final List allVars; + +/** + * Constructs an instance of {@link JoinProcessorSupplier}. + * + * @param stateStoreName - The name of the state store the processor will use. (not null) + * @param join - The join function the supplied processor will use. (not null) + * @param joinVars - The variables that the supplied processor will join over. (not null) + * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets. + * This list must lead with the same variables and order as {@code joinVars}. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}. + */ +public JoinProcessorSupplier( +final String stateStoreName, +final IterativeJoin join, +final List joinVars, +final List allVars, +final ProcessorResultFactory resultFactory) throws IllegalArgumentException { +super(resultFactory); +this.stateStoreName = requireNonNull(stateStoreName); +this.join = requireNonNull(join); +this.joinVars = requireNonNull(joinVars); +this.allVars = requireNonNull(allVars); + +if(!allVars.subList(0, joinVars.size()).equals(joinVars)) { +throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + --- End diff -- "The vars List must start with the joinVars List, but it did not" or "The joinVars List must be a prefix of the vars List". I understand that "start with" and "prefix" are not perfectly well defined in the context of List, but "lead by" stood out as being awkward to me. ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160261077 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import java.util.Iterator; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An {@link Iterator} that is also {@link AutoCloseable}. + * + * @param - The type of elements that will be iterated over. + */ +@DefaultAnnotation(NonNull.class) +public interface CloseableIterator extends Iterator, AutoCloseable { } --- End diff -- I thought that you implemented another instance of CloseableIterator for similar reasons when you were doing the Fluo work. Did that get deleted? ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160071035 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import java.util.Iterator; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An {@link Iterator} that is also {@link AutoCloseable}. + * + * @param - The type of elements that will be iterated over. + */ +@DefaultAnnotation(NonNull.class) +public interface CloseableIterator extends Iterator, AutoCloseable { } --- End diff -- Do we really need another CloseableIterator interface? ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160069169 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/StartQuery.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor; + +import java.util.UUID; + +import org.apache.rya.streams.api.exception.RyaStreamsException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Starts processing a query in Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public interface StartQuery { --- End diff -- Whats the difference between StartQuery and RunQuery? ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160071321 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a mechanism for storing {@link VisibilityBindingSet}s that have been emitted from either side of + * a Join and a way to fetch all {@link VisibilityBindingSet}s that join with it from the other side. + */ +@DefaultAnnotation(NonNull.class) +public interface JoinStateStore { + +/** + * Store a {@link VisibilityBindingSet} based on the side it was emitted from. + * + * @param result - The result whose value will be stored. (not null) + */ +public void store(BinaryResult result); --- End diff -- Yet another way of serializing VisiblityBindingsets? Is this really necessary? ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160071093 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.List; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.rya.api.function.join.IterativeJoin; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side; +import org.apache.rya.streams.kafka.processors.ProcessorResultFactory; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor; +import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Supplies {@link JoinProcessor} instances. + */ +@DefaultAnnotation(NonNull.class) +public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier { + +private final String stateStoreName; +private final IterativeJoin join; +private final List joinVars; +private final List allVars; + +/** + * Constructs an instance of {@link JoinProcessorSupplier}. + * + * @param stateStoreName - The name of the state store the processor will use. (not null) + * @param join - The join function the supplied processor will use. (not null) + * @param joinVars - The variables that the supplied processor will join over. (not null) + * @param allVars - An ordered list of all the variables that may appear in resulting Binding Sets. + * This list must lead with the same variables and order as {@code joinVars}. (not null) + * @param resultFactory - The factory that the supplied processors will use to create results. (not null) + * @throws IllegalArgumentException Thrown if {@code allVars} does not start with {@code joinVars}. + */ +public JoinProcessorSupplier( +final String stateStoreName, +final IterativeJoin join, +final List joinVars, +final List allVars, +final ProcessorResultFactory resultFactory) throws IllegalArgumentException { +super(resultFactory); +this.stateStoreName = requireNonNull(stateStoreName); +this.join = requireNonNull(join); +this.joinVars = requireNonNull(joinVars); +this.allVars = requireNonNull(allVars); + +if(!allVars.subList(0, joinVars.size()).equals(joinVars)) { +throw new IllegalArgumentException("All vars must be lead by the join vars, but it did not. " + --- End diff -- Awkward Exception message. ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160070703 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import java.util.Objects; + +import org.apache.kafka.streams.processor.Processor; +import org.apache.rya.api.model.VisibilityBindingSet; + +import com.google.common.base.Optional; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a value that is emitted from a Rya Streams {@link Processor}. We can't just emit a + * {@link VisibilityBindingSet} because some downstream processors require more information about + * which upstream processor is emitting the result in order to do their work. + * + * Currently there are only two types processors: --- End diff -- types of ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160011353 --- Diff: extras/rya.streams/api/pom.xml --- @@ -0,0 +1,74 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + + +org.apache.rya +rya.streams.parent +3.2.12-incubating-SNAPSHOT + + +4.0.0 +rya.streams.api + +Apache Rya Streams API + +This module contains the Rya Streams API components. + + + + + +org.apache.rya +rya.api.model + + + +org.openrdf.sesame +sesame-queryparser-sparql + + + +org.slf4j +slf4j-api + + +com.google.guava +guava + + + --- End diff -- Included twice. ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r160009857 --- Diff: common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigInteger; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name + * that is being summed by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class SumFunction implements AggregationFunction { +private static final Logger log = LoggerFactory.getLogger(SumFunction.class); + +@Override +public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { +checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements."); --- End diff -- Preconditions ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r159984278 --- Diff: common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being + * mined by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class MinFunction implements AggregationFunction { + +private final ValueComparator compare = new ValueComparator(); + +@Override +public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { +checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements."); --- End diff -- Preconditions ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r159984048 --- Diff: common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigInteger; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name + * that is being counted by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class CountFunction implements AggregationFunction { +@Override +public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { +checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); --- End diff -- Preconditions ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r159983121 --- Diff: common/pom.xml --- @@ -34,6 +34,8 @@ under the License. rya.api +rya.api.model +rya.api.function --- End diff -- Based on the contents of this module, it seems like rya.api.evaluation would a better name. Naming the module function suggests that the module consists entirely of Filter Functions. Just a nit tho. ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r159984158 --- Diff: common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being + * maxed by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class MaxFunction implements AggregationFunction { + +private final ValueComparator compare = new ValueComparator(); + +@Override +public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { +checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements."); --- End diff -- Preconditions ---
[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/257#discussion_r159983828 --- Diff: common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java --- @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Map; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.DecimalLiteralImpl; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name + * that is being averaged by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class AverageFunction implements AggregationFunction { +private static final Logger log = LoggerFactory.getLogger(AverageFunction.class); + +@Override +public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { +checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements."); --- End diff -- Preconditions. ---
[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/258#discussion_r159947383 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoConnectionDetails.java --- @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The information the shell used to connect to Mongo server, not the DB or collections. + */ +@DefaultAnnotation(NonNull.class) +public class MongoConnectionDetails { + +private final Optional username; +private final Optional password; +private final String hostname; +private final int port; + +/** + * Constructs an instance of {@link MongoConnectionDetails}. + * + * @param hostname - The hostname of the Mongo DB that was connected to. (not null) + * @param port - The port of the Mongo DB that was connected to. + * @param username - The username that was used to establish the connection + * when performing administrative operations. (not null) + * @param password - The password that was used to establish the connection + * when performing administrative operations. (not null) + */ +public MongoConnectionDetails( +final String hostname, +final int port, +final Optional username, +final Optional password) { +this.hostname = requireNonNull(hostname); +this.port = port; +this.username = requireNonNull(username); +this.password = requireNonNull(password); +} + +/** + * @return The hostname of the Mongo DB that was connected to. + */ +public String getHostname() { +return hostname; +} + +/** + * @return The port of the Mongo DB that was connected to. + */ +public int getPort() { +return port; +} + + /** + * @return The username that was used to establish the connection when performing administrative operations. + */ + public Optional getUsername() { + return this.username; + } + + /** + * @return The password that was used to establish the connection when performing administrative operations. + */ + public Optional getPassword() { + return password; + } + +/** + * Create a {@link MongoDBRdfConfiguration} that is using this object's values. + * + * @param ryaInstanceName - The Rya instance to connect to. (not null) + * @return Constructs a new {@link MongoDBRdfConfiguration} object with values from this object. + */ +public MongoDBRdfConfiguration build(final String ryaInstanceName) { +// Note, we don't use the MongoDBRdfConfigurationBuilder here because it explicitly sets +// authorizations and visibilities to an empty string if they are not set on the builder. +// If they are null in the MongoRdfConfiguration object, it may do the right thing. +final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration(); +conf.setBoolean(ConfigUtils.USE_MONGO, true); +conf.setMongoHostname(hostname); +conf.setMongoPort("" + port); +conf.setMongoDBName(ryaInstanceName); + +if(username.isPresent()) { +conf.setMongoUser(username.get()); +} + +if(password.isPresent()) { +conf.s
[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/258#discussion_r159949461 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoUninstall.java --- @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.Uninstall; + +import com.mongodb.MongoClient; +import com.mongodb.MongoException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An Mongo implementation of the {@link Uninstall} command. + */ +@DefaultAnnotation(NonNull.class) +public class MongoUninstall implements Uninstall { + +private final MongoClient adminClient; +private final InstanceExists instanceExists; + +/** + * Constructs an instance of {@link MongoUninstall}. + * + * @param adminClient- Provides programmatic access to the instance of Mongo that hosts Rya instances. (not null) + * @param instanceExists - The interactor used to check if a Rya instance exists. (not null) + */ +public MongoUninstall(final MongoClient adminClient, final MongoInstanceExists instanceExists) { +this.adminClient = requireNonNull(adminClient); +this.instanceExists = requireNonNull(instanceExists); +} + +@Override +public void uninstall(final String ryaInstanceName) throws InstanceDoesNotExistException, RyaClientException { +try { --- End diff -- Preconditions ---
[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/258#discussion_r159945788 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java --- @@ -53,37 +55,37 @@ */ public RyaClient( final Install install, -final CreatePCJ createPcj, -final DeletePCJ deletePcj, -final CreatePeriodicPCJ createPeriodicPcj, -final DeletePeriodicPCJ deletePeriodicPcj, -final ListIncrementalQueries listIncrementalQueries, -final BatchUpdatePCJ batchUpdatePcj, +final Optional createPcj, +final Optional deletePcj, +final Optional createPeriodicPcj, +final Optional deletePeriodicPcj, +final Optional listIncrementalQueries, +final Optional batchUpdatePcj, final GetInstanceDetails getInstanceDetails, final InstanceExists instanceExists, final ListInstances listInstances, -final AddUser addUser, -final RemoveUser removeUser, +final Optional addUser, +final Optional removeUser, final Uninstall uninstall, final LoadStatements loadStatements, final LoadStatementsFile loadStatementsFile, final ExecuteSparqlQuery executeSparqlQuery) { this.install = requireNonNull(install); this.createPcj = requireNonNull(createPcj); this.deletePcj = requireNonNull(deletePcj); -this.createPeriodicPcj = createPeriodicPcj; -this.deletePeriodicPcj = deletePeriodicPcj; -this.listIncrementalQueries = listIncrementalQueries; +this.createPeriodicPcj = requireNonNull(createPeriodicPcj); +this.deletePeriodicPcj = requireNonNull(deletePeriodicPcj); +this.listIncrementalQueries = requireNonNull(listIncrementalQueries); this.bactchUpdatePCJ = requireNonNull(batchUpdatePcj); this.getInstanceDetails = requireNonNull(getInstanceDetails); this.instanceExists = requireNonNull(instanceExists); this.listInstances = requireNonNull(listInstances); this.addUser = requireNonNull(addUser); this.removeUser = requireNonNull(removeUser); this.uninstall = requireNonNull(uninstall); -this.loadStatements = requireNonNull(loadStatements); --- End diff -- Why did the null checks get removed? ---
[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/258#discussion_r159950520 --- Diff: extras/rya.pcj.fluo/pom.xml --- @@ -37,7 +37,7 @@ pcj.fluo.api pcj.fluo.app pcj.fluo.client -pcj.fluo.integration --- End diff -- This probably shouldn't be commented out. ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159803931 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java --- @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.matching; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.instance.RyaDetailsRepository; -import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; -import org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.matching.ExternalSetProvider; -import org.apache.rya.indexing.external.matching.QuerySegment; -import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet; -import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.TupleExpr; -import org.openrdf.sail.SailException; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * Implementation of {@link ExternalSetProvider} that provides {@link ExternalTupleSet}s. - * This provider uses either user specified Accumulo configuration information or user a specified - * List of ExternalTupleSets to populate an internal cache of ExternalTupleSets. If Accumulo configuration - * is provided, the provider connects to an instance of RyaDetails and populates the cache with - * PCJs registered in RyaDetails. - * - */ -public class AccumuloIndexSetProvider implements ExternalSetProvider { --- End diff -- Why are you deleting anything related to Accumulo? What was this replaced by? ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159803093 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjIndexer.java --- @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.persist.index.RyaSecondaryIndexer; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Updates the PCJs that are in a {@link MongoPcjDocuments}. + */ +public interface PcjIndexer extends RyaSecondaryIndexer { +/** + * Creates the {@link MongoPcjDocuments} that will be used by the indexer. + * + * @param conf - Indicates how the {@link MongoPcjDocuments} is initialized. (not null) + * @return The {@link MongoPcjDocuments} that will be used by this indexer. + */ +public @Nullable MongoPcjDocuments getPcjStorage(Configuration conf); --- End diff -- Shouldn't this method be getPcjDocuments? ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159801144 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/BasePcjIndexer.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Collections.singleton; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.groupingBy; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoSecondaryIndex; +import org.openrdf.model.URI; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that may be used to update an {@link EntityStorage} as new + * {@link RyaStatement}s are added to/removed from the Rya instance. + */ +@DefaultAnnotation(NonNull.class) +public abstract class BasePcjIndexer implements PcjIndexer, MongoSecondaryIndex { + +private final AtomicReference configuration = new AtomicReference<>(); +private final AtomicReference pcjDocs = new AtomicReference<>(); + +@Override +public void setConf(final Configuration conf) { +requireNonNull(conf); +pcjDocs.set( getPcjStorage(conf) ); +} + +@Override +public Configuration getConf() { +return configuration.get(); +} + +@Override +public void storeStatement(final RyaStatement statement) throws IOException { +requireNonNull(statement); +storeStatements( singleton(statement) ); +} + +@Override +public void storeStatements(final Collection statements) throws IOException { +requireNonNull(statements); + +final Map> groupedBySubject = statements.stream() +.collect(groupingBy(RyaStatement::getSubject)); + +for(final Entry> entry : groupedBySubject.entrySet()) { +try { +updateEntity(entry.getKey(), entry.getValue()); +} catch (final EntityStorageException e) { +throw new IOException("Failed to update the Entity index.", e); +} +} +} + +/** + * Updates a {@link Entity} to reflect new {@link RyaStatement}s. + * + * @param subject - The Subject of the {@link Entity} the statements are for. (not null) + * @param statements - Statements that the {@link Entity} will be updated with. (not null) + */ +private void updateEntity(final RyaURI subject, final Collection statements) throws EntityStorageException { +requireNonNull(subject); --- End diff -- This method still seems off to me. You're going to be adding statements to any updater that you need. There is no need to include this method. ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159802000 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPrecomputedJoinIndexer.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the state of the Precomputed Join indices that are used by Rya. + */ +@DefaultAnnotation(NonNull.class) +public class MongoPrecomputedJoinIndexer extends BasePcjIndexer { +private static final Logger log = Logger.getLogger(MongoPrecomputedJoinIndexer.class); + +@Override +public MongoPcjDocuments getPcjStorage(final Configuration conf) { --- End diff -- Why does this return an instance of MongoPcjDocuments but the method is called getPcjStorage? Shouldn't it return an instance of PcjStorage? ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159798308 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java --- @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.rya.api.client.BatchUpdatePCJ; +import org.apache.rya.api.client.CreatePCJ; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.PCJDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResultHandlerBase; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; + +/** + * A Mongo implementation of {@link CreatePCJ}. + */ +public class MongoBatchUpdatePCJ implements BatchUpdatePCJ { +private static final Logger log = LoggerFactory.getLogger(MongoBatchUpdatePCJ.class); + +private final MongoConnectionDetails connectionDetails; +private final InstanceExists instanceExists; +private final MongoClient mongoClient; + +/** + * Constructs an instance of {@link MongoBatchUpdatePCJ}. + * + * @param connectionDetails - Details to connect to the server. (not null) + * @param instanceExists - The interactor used to check if a Rya instance exists. (not null) + * @param mongoClient - The {@link MongoClient} to use when batch updating. (not null) + */ +public MongoBatchUpdatePCJ( +final Mo
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159798116 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java --- @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.rya.api.client.BatchUpdatePCJ; +import org.apache.rya.api.client.CreatePCJ; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.PCJDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResultHandlerBase; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; + +/** + * A Mongo implementation of {@link CreatePCJ}. + */ +public class MongoBatchUpdatePCJ implements BatchUpdatePCJ { +private static final Logger log = LoggerFactory.getLogger(MongoBatchUpdatePCJ.class); + +private final MongoConnectionDetails connectionDetails; +private final InstanceExists instanceExists; +private final MongoClient mongoClient; + +/** + * Constructs an instance of {@link MongoBatchUpdatePCJ}. + * + * @param connectionDetails - Details to connect to the server. (not null) + * @param instanceExists - The interactor used to check if a Rya instance exists. (not null) + * @param mongoClient - The {@link MongoClient} to use when batch updating. (not null) + */ +public MongoBatchUpdatePCJ( +final Mo
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159803009 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPrecomputedJoinIndexer.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the state of the Precomputed Join indices that are used by Rya. + */ +@DefaultAnnotation(NonNull.class) +public class MongoPrecomputedJoinIndexer extends BasePcjIndexer { --- End diff -- Yeah, still not getting what you're doing here. The base class appears to be dealing with entities and there is no updater for the base class to interact with. In general, you wouldn't want the base class to interact with the updater class given that any class that extends it would be locked into using that updater. ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159802759 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/BasePcjIndexer.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongodb.pcj; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Collections.singleton; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.groupingBy; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.entity.model.Entity; +import org.apache.rya.indexing.entity.storage.EntityStorage; +import org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoSecondaryIndex; +import org.openrdf.model.URI; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that may be used to update an {@link EntityStorage} as new + * {@link RyaStatement}s are added to/removed from the Rya instance. + */ +@DefaultAnnotation(NonNull.class) +public abstract class BasePcjIndexer implements PcjIndexer, MongoSecondaryIndex { + +private final AtomicReference configuration = new AtomicReference<>(); +private final AtomicReference pcjDocs = new AtomicReference<>(); + +@Override +public void setConf(final Configuration conf) { +requireNonNull(conf); +pcjDocs.set( getPcjStorage(conf) ); +} + +@Override +public Configuration getConf() { +return configuration.get(); +} + +@Override +public void storeStatement(final RyaStatement statement) throws IOException { +requireNonNull(statement); +storeStatements( singleton(statement) ); +} + +@Override +public void storeStatements(final Collection statements) throws IOException { +requireNonNull(statements); --- End diff -- Yeah, I don't get what this method or this class is doing. In the logic below, it appears that you are dealing with entities, which is not correct. The PCJ indexer feeds the updater statements. You maintain in your response to my comments about the MongoPCJIndexer that this class does all of the "heavy lifting". That seems impossible to me given that this base class appears to deal with entities and there is no updater to interact with. ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159754654 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java --- @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.rya.api.client.BatchUpdatePCJ; +import org.apache.rya.api.client.CreatePCJ; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.PCJDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResultHandlerBase; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.mongodb.MongoClient; + +/** + * A Mongo implementation of {@link CreatePCJ}. --- End diff -- Mongo Implementation of BatchUpdatePCJ ---
[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/172#discussion_r159754029 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaToRdfConversions.java --- @@ -40,8 +40,8 @@ */ public class RyaToRdfConversions { -public static URI convertURI(RyaURI uri) { --- End diff -- RyaType is too general of an argument here. Per our discussion, create a private method that does what you want. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153631342 --- Diff: extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java --- @@ -109,11 +109,11 @@ public void deletePeriodicPCJ() throws Exception { vf.createLiteral(dtf.newXMLGregorianCalendar(time4))), vf.createStatement(vf.createURI("urn:obs_4"), vf.createURI("uri:hasId"), vf.createLiteral("id_4"))); -runTest(query, statements, 29); +runTest(query, statements, 30); --- End diff -- Yes. Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153631182 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. + * + */ +public class BindingHashShardingFunction { + +private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter(); +private static final int HASH_LEN = 4; + +/** + * Generates a sharded rowId. + * + * @param nodeId - Node Id with type and UUID + * @param varOrder - VarOrder used to order BindingSet values + * @param bs - BindingSet with partially formed query values + * @return - serialized Bytes rowId for storing BindingSet results in Fluo + */ +public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) { +String[] rowPrefixAndId = nodeId.split("_"); +Preconditions.checkArgument(rowPrefixAndId.length == 2); +String prefix = rowPrefixAndId[0]; +String id = rowPrefixAndId[1]; + +String firstBindingString = ""; +Bytes rowSuffix = Bytes.of(id); +if (varOrder.getVariableOrders().size() > 0) { +VariableOrder first = new VariableOrder(varOrder.getVariableOrders().get(0)); +firstBindingString = BS_CONVERTER.convert(bs, first); +rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs); +} + +BytesBuilder builder = Bytes.builder(); +builder.append(Bytes.of(prefix + ":")); +builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + firstBindingString))); +builder.append(":"); +builder.append(rowSuffix); +return builder.toBytes(); +} + +/** + * Generates a sharded rowId. + * + * @param nodeId - Node Id with type and UUID + * @param firstBsVal - String representation of the first BsValue + * @return - serialized Bytes prefix for scanning rows + */ +public static Bytes getShardedScanPrefix(String nodeId, Value firstBsVal) { +Preconditions.checkNotNull(firstBsVal); + +final RyaType ryaValue = RdfToRyaConversions.convertValue(firstBsVal); +final String bindingString = ryaValue.getData() + TYPE_DELIM + ryaValue.getDataType(); + +return getShardedScanPrefix(nodeId, bindingString); +} + +/** + * Generates a sharded rowId from the indicated nodeId and bindingString. + * + * @param nodeId - NodeId with tyep and UUID + * @param bindingString - String representation of BindingSet values, as formed by {@link BindingSetStringConverter} + *. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153631103 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. + * + */ +public class BindingHashShardingFunction { + +private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter(); +private static final int HASH_LEN = 4; + +/** + * Generates a sharded rowId. + * + * @param nodeId - Node Id with type and UUID + * @param varOrder - VarOrder used to order BindingSet values + * @param bs - BindingSet with partially formed query values + * @return - serialized Bytes rowId for storing BindingSet results in Fluo + */ +public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) { +String[] rowPrefixAndId = nodeId.split("_"); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153631009 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. --- End diff -- It does both. Reworded to adds or removes... ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153630697 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + +/** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx + * @param ids - ids to add to the StatementPattern nodeId Set + */ +public static void addStatementPatternIds(TransactionBase tx, Set ids) { +Optional val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)); +StringBuilder builder = new StringBuilder(); +if (val.isPresent()) { +builder.append(val.get().toString()); +builder.append(VAR_DELIM); +} +String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString(); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); +} + +/** + * Remove specified Set of ids from the Fluo table and updates the entry with Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that + * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx + * @param ids - ids to remove from the StatementPattern nodeId Set + */ +public static void removeStatementPatternIds(TransactionBase tx, Set ids) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153630587 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + +/** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx + * @param ids - ids to add to the StatementPattern nodeId Set + */ +public static void addStatementPatternIds(TransactionBase tx, Set ids) { +Optional val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)); +StringBuilder builder = new StringBuilder(); +if (val.isPresent()) { +builder.append(val.get().toString()); +builder.append(VAR_DELIM); +} +String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString(); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); +} + +/** + * Remove specified Set of ids from the Fluo table and updates the entry with Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that + * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153630245 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatementPatternIdCacheSupplier { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153630157 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + +private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + NODEID_BS_DELIM); + +/** + * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to {@link Bytes}. + * @param tripleBytes - serialized {@link RyaStatement} + * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes + */ +public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) { +BytesBuilder builder = Bytes.builder(); +return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes(); +} + +/** + * Removes the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to a byte array. + * @param prefixedTriple - serialized RyaStatement with prepended triple prefix, converted to Bytes + * @return - serialized {@link RyaStatement} in byte array form + */ +public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes prefixedTriple) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153629832 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + +private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + NODEID_BS_DELIM); + +/** + * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to {@link Bytes}. + * @param tripleBytes - serialized {@link RyaStatement} + * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes + */ +public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) { +BytesBuilder builder = Bytes.builder(); +return builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes(); +} + +/** + * Removes the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to a byte array. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153629802 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + +private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + NODEID_BS_DELIM); + +/** + * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to {@link Bytes}. + * @param tripleBytes - serialized {@link RyaStatement} + * @return - serialized RyaStatement with prepended triple prefix, converted to Bytes + */ +public static Bytes addTriplePrefixAndConvertToBytes(byte[] tripleBytes) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153629576 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + +private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + NODEID_BS_DELIM); + +/** + * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and converts to {@link Bytes}. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153628506 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { + +private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + NODEID_BS_DELIM); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627802 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaStatement; + +/** + * This class is a utility class for adding and removing the Triple prefix to + * Statements ingested into the Rya Fluo application. The Triple prefix is added + * to supported range based compactions for removing transient data from the Fluo + * application. This prefix supports the Transient data recipe described on the + * the Fluo website, and reduces the computational load on the system by cleaning up + * old deleted Triples and notifications using a targeted range compaction. + * + */ +public class TriplePrefixUtils { --- End diff -- Nuuupe. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627442 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. + * + */ +public class BindingHashShardingFunction { + +private static final BindingSetStringConverter BS_CONVERTER = new BindingSetStringConverter(); +private static final int HASH_LEN = 4; + +/** + * Generates a sharded rowId. + * + * @param nodeId - Node Id with type and UUID + * @param varOrder - VarOrder used to order BindingSet values + * @param bs - BindingSet with partially formed query values + * @return - serialized Bytes rowId for storing BindingSet results in Fluo + */ +public static Bytes addShard(String nodeId, VariableOrder varOrder, VisibilityBindingSet bs) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627306 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. + * + */ +public class BindingHashShardingFunction { --- End diff -- Not doing this. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627243 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java --- @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.util; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Bytes.BytesBuilder; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Value; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.hash.Hashing; + +/** + * This class adds and removes a hash to and from the rowId for sharding purposes. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627110 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + +/** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx + * @param ids - ids to add to the StatementPattern nodeId Set + */ +public static void addStatementPatternIds(TransactionBase tx, Set ids) { +Optional val = Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS)); +StringBuilder builder = new StringBuilder(); +if (val.isPresent()) { +builder.append(val.get().toString()); +builder.append(VAR_DELIM); +} +String idString = builder.append(Joiner.on(VAR_DELIM).join(ids)).toString(); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, Bytes.of(idString)); +tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, Bytes.of(Hashing.sha256().hashString(idString).toString())); +} + +/** + * Remove specified Set of ids from the Fluo table and updates the entry with Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the hash of the updated nodeId Set and writes that + * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153627036 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + +/** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx + * @param ids - ids to add to the StatementPattern nodeId Set + */ +public static void addStatementPatternIds(TransactionBase tx, Set ids) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153626670 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { + +/** + * Add specified Set of ids to the Fluo table with Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also + * updates the hash of the updated nodeId Set and writes that to the Column + * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH} + * + * @param tx --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153625655 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.google.common.hash.Hashing; + +/** + * Utility class for updating and removing StatementPattern nodeIds in the Fluo table. All StatementPattern nodeIds are + * stored in a single set under a single entry in the Fluo table. This is to eliminate the need for a scan to find all + * StatementPattern nodeIds every time a new Triple enters the Fluo table. Instead, the nodeIds are cached locally, and + * only updated when the local hash of the nodeId set is dirty (not equal to the hash in the Fluo table). + */ +public class StatementPatternIdManager { --- End diff -- Yeah, not going to add that. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153625215 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StatementPatternIdCacheSupplier { --- End diff -- Done. Gonna hold off on refactoring the suppliers to be the same. I could maybe do something with generics, but then I'd have to refactor the cache types to extend some common interface. The cache are different enough that that would be an artificial type def. Prolly not worth it for the purposes of combining suppliers...Lemme know if you have something different in mind. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153624484 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.collect.Sets; + +/** + * This class caches the StatementPattern Ids so they don't have + * to be looked up each time a new Statement needs to be processed + * in the TripleObserver. + * + */ +public class StatementPatternIdCache { + +private final ReentrantLock lock = new ReentrantLock(); +private static Optional HASH; +private static Set IDS; + +public StatementPatternIdCache() { +HASH = Optional.empty(); +IDS = new HashSet<>(); +} + +/** + * This method retrieves the StatementPattern NodeIds registered in the Fluo table. + * This method looks up the hash of the Statement Pattern Id String hash, and if it + * is the same as the cached hash, then the cache Set of nodeIds is returned. Otherwise, + * this method retrieves the ids from the Fluo table. This method is thread safe. + * @param tx + * @return - Set of StatementPattern nodeIds + */ +public Set getStatementPatternIds(TransactionBase tx) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153608050 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.collect.Sets; + +/** + * This class caches the StatementPattern Ids so they don't have + * to be looked up each time a new Statement needs to be processed + * in the TripleObserver. + * + */ +public class StatementPatternIdCache { + +private final ReentrantLock lock = new ReentrantLock(); +private static Optional HASH; +private static Set IDS; + +public StatementPatternIdCache() { +HASH = Optional.empty(); +IDS = new HashSet<>(); +} + +/** + * This method retrieves the StatementPattern NodeIds registered in the Fluo table. + * This method looks up the hash of the Statement Pattern Id String hash, and if it --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153606055 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.collect.Sets; + +/** + * This class caches the StatementPattern Ids so they don't have + * to be looked up each time a new Statement needs to be processed + * in the TripleObserver. + * + */ +public class StatementPatternIdCache { + +private final ReentrantLock lock = new ReentrantLock(); +private static Optional HASH; +private static Set IDS; + +public StatementPatternIdCache() { --- End diff -- Okay. Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153589892 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; + +import com.google.common.collect.Sets; + +/** + * This class caches the StatementPattern Ids so they don't have + * to be looked up each time a new Statement needs to be processed + * in the TripleObserver. + * + */ +public class StatementPatternIdCache { --- End diff -- Not including that. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153589770 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCacheSupplier { + +private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class); +private static FluoQueryMetadataCache CACHE; +private static boolean initialized = false; +private static final int DEFAULT_CAPACITY = 1; +private static final int DEFAULT_CONCURRENCY = 8; +private static final ReentrantLock lock = new ReentrantLock(); + +/** + * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the + * indicated capacity and concurrencyLevel if one is provided. + * + * @param capacity - capacity used to create a new cache + * @param concurrencyLevel - concurrencyLevel used to create a new cache + */ +public static FluoQueryMetadataCache getOrCreateCache(int capacity, int concurrencyLevel) { +lock.lock(); +try { +if (!initialized) { +LOG.debug("Cache has not been initialized. Initializing cache with capacity: {} and concurrencylevel: {}", capacity, +concurrencyLevel); +CACHE = new FluoQueryMetadataCache(new FluoQueryMetadataDAO(), capacity, concurrencyLevel); +initialized = true; +} else { +LOG.warn( +"A cache has already been initialized, so a cache with capacity: {} and concurrency level: {} will not be created. Returning existing cache with capacity: {} and concurrencylevel: {}", +capacity, concurrencyLevel, CACHE.getCapacity(), CACHE.getConcurrencyLevel()); +} +return CACHE; +} finally { +lock.unlock(); +} +} + +/** + * Returns cache with the name {@link FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it --- End diff -- Updated. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153589466 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCacheSupplier { + +private static final Logger LOG = LoggerFactory.getLogger(MetadataCacheSupplier.class); +private static FluoQueryMetadataCache CACHE; +private static boolean initialized = false; +private static final int DEFAULT_CAPACITY = 1; +private static final int DEFAULT_CONCURRENCY = 8; +private static final ReentrantLock lock = new ReentrantLock(); + +/** + * Returns an existing cache with the specified instance name, or creates a cache. The created cache will have the --- End diff -- It does. It enforces "singleton like" behavior in that it only creates the instance if it doesn't already exist. It allows for the initial cache construciton to be configurable. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153588977 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java --- @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.locks.ReentrantLock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCacheSupplier { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153588803 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; +commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +this.capacity = capacity; +this.concurrencyLevel = concurrencyLevel; +} + +/** + * @return - capacity of this cache in terms of max number of entries + */ +public int getCapacity() { +return capacity; +} + +/** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ +public int getConcurrencyLevel() { +return concurrencyLevel; +} + + +@Override +public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); + +try { +checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); +LOG.debug("Retrieving Metadata from Cache: {}", nodeId); +return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable() { +@Override +public CommonNodeMetadata call() throws Exception { +LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId); +return dao.readStatementPatternMetadata(tx, nodeId); +} +}); +} catch (Exception e) { +throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e); +} +} + +@Override +public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); +try { +checkArgument(type.isPresent() && type.get() == NodeType
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153588548 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; +commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +this.capacity = capacity; +this.concurrencyLevel = concurrencyLevel; +} + +/** + * @return - capacity of this cache in terms of max number of entries + */ +public int getCapacity() { +return capacity; +} + +/** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ +public int getConcurrencyLevel() { +return concurrencyLevel; +} + + +@Override +public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); + +try { +checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); +LOG.debug("Retrieving Metadata from Cache: {}", nodeId); +return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable() { +@Override +public CommonNodeMetadata call() throws Exception { +LOG.debug("Seeking Metadata from Fluo Table: {}", nodeId); +return dao.readStatementPatternMetadata(tx, nodeId); +} +}); +} catch (Exception e) { +throw new RuntimeException("Unable to access StatementPatternMetadata for nodeId: " + nodeId, e); +} +} + +@Override +public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); +try { +checkArgument(type.isPresent() && type.get() == NodeType
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153587213 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; +commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +this.capacity = capacity; +this.concurrencyLevel = concurrencyLevel; +} + +/** + * @return - capacity of this cache in terms of max number of entries + */ +public int getCapacity() { +return capacity; +} + +/** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ +public int getConcurrencyLevel() { +return concurrencyLevel; +} + + +@Override +public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); + +try { +checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); +LOG.debug("Retrieving Metadata from Cache: {}", nodeId); +return (StatementPatternMetadata) commonNodeMetadataCache.get(nodeId, new Callable() { --- End diff -- Ooh. That is elegant. I'll keep it in mind going forward. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153586913 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; +commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +this.capacity = capacity; +this.concurrencyLevel = concurrencyLevel; +} + +/** + * @return - capacity of this cache in terms of max number of entries + */ +public int getCapacity() { +return capacity; +} + +/** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ +public int getConcurrencyLevel() { +return concurrencyLevel; +} + + +@Override +public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { +Optional type = NodeType.fromNodeId(nodeId); + +try { +checkArgument(type.isPresent() && type.get() == NodeType.STATEMENT_PATTERN); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153585160 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; +commonNodeMetadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +metadataCache = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build(); +this.capacity = capacity; +this.concurrencyLevel = concurrencyLevel; +} + +/** + * @return - capacity of this cache in terms of max number of entries + */ +public int getCapacity() { +return capacity; +} + +/** + * @return - concurrencyLevel of this cache,in terms of number of partitions that distinct threads can operate on + * without waiting for other threads + */ +public int getConcurrencyLevel() { +return concurrencyLevel; +} + + +@Override +public StatementPatternMetadata readStatementPatternMetadata(SnapshotBase tx, String nodeId) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153584424 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153584456 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; +private final Cache commonNodeMetadataCache; +private final Cache metadataCache; +private int capacity; +private int concurrencyLevel; + +/** + * Creates a FluoQueryMetadataCache with the specified capacity. Old, unused results are evicted as necessary. + * + * @param capacity - max size of the cache + */ +public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, int concurrencyLevel) { +this.dao = dao; --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153583867 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { + +private static final Logger LOG = LoggerFactory.getLogger(FluoQueryMetadataCache.class); +private final FluoQueryMetadataDAO dao; --- End diff -- I like that. I'll keep it in mind going forward. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153583702 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { --- End diff -- Yeah, not doing that. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153582032 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.indexing.pcj.fluo.app.query; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.concurrent.Callable; + +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +/** + * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that has been retrieved from Fluo. This class first + * checks the cache to see if the metadata is present before delegating to the underlying DAO method to retrieve the + * data. + * + */ +public class FluoQueryMetadataCache extends FluoQueryMetadataDAO { --- End diff -- The delete question might actually be a problem. I'm going to create a Jira ticket for this. We need some way to notify the workers when metadata has been deleted so that the caches on each JVM can purge the metadata for any deleted nodeIds. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153578897 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java --- @@ -70,53 +68,50 @@ public void process(final TransactionBase tx, final Bytes brow, final Column col // Get string representation of triple. final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow); log.trace("Transaction ID: {}\nRya Statement: {}\n", tx.getStartTimestamp(), ryaStatement); +log.trace("Beginging to process triple."); final String triple = IncUpdateDAO.getTripleString(ryaStatement); -// Iterate over each of the Statement Patterns that are being matched against. -final RowScanner spScanner = tx.scanner() -.over(Span.prefix(SP_PREFIX)) - -// Only fetch rows that have the pattern in them. There will only be a single row with a pattern per SP. -.fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN) -.byRow() -.build(); +Set spIDs = SP_ID_CACHE.getStatementPatternIds(tx); //see if triple matches conditions of any of the SP -for (final ColumnScanner colScanner : spScanner) { -// Get the Statement Pattern's node id. -final String spID = colScanner.getsRow(); - +for (String spID: spIDs) { // Fetch its metadata. final StatementPatternMetadata spMetadata = QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID); +log.trace("Retrieved metadata: {}", spMetadata); + // Attempt to match the triple against the pattern. final String pattern = spMetadata.getStatementPattern(); final VariableOrder varOrder = spMetadata.getVariableOrder(); final String bindingSetString = getBindingSet(triple, pattern, varOrder); +log.trace("Created binding set match string: {}", bindingSetString); + // Statement matches to a binding set. if(bindingSetString.length() != 0) { // Fetch the triple's visibility label. final String visibility = tx.gets(brow.toString(), FluoQueryColumns.TRIPLES, ""); // Create the Row ID for the emitted binding set. It does not contain visibilities. -final String row = spID + NODEID_BS_DELIM + bindingSetString; -final Bytes rowBytes = Bytes.of( row.getBytes(Charsets.UTF_8) ); +//final String row = spID + NODEID_BS_DELIM + bindingSetString; --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153578792 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -20,39 +22,79 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; +import com.google.common.base.Preconditions; + /** * This class represents a batch order to delete all entries in the Fluo table indicated * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, * which uses this batch information along with the nodeId passed into the Observer to perform - * batch deletes. + * batch deletes. * */ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); - -public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +private Optional nodeId; + +/** + * Create a new SpanBatchInformation object. + * @param nodeId - Optional nodeId that is used to filter returned results. Useful if the hash + * is not included in the Span. + * @param batchSize - size of batch to be deleted + * @param column - column whose entries will be deleted + * @param span - Span indicating the range of data to delete. Sometimes the Span cannot contain the hash + * (for example, if you are deleting all of the results associated with a nodeId). In this case, a nodeId + * should be specified along with a Span equal to the prefix of the nodeId. + */ +public SpanBatchDeleteInformation(Optional nodeId, int batchSize, Column column, Span span) { super(batchSize, Task.Delete, column, span); +Preconditions.checkNotNull(nodeId); --- End diff -- checkNotNull and requireNonNull are intermixed everywhere. Not worried about this. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153578221 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -20,39 +22,79 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; +import com.google.common.base.Preconditions; + /** * This class represents a batch order to delete all entries in the Fluo table indicated * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, * which uses this batch information along with the nodeId passed into the Observer to perform - * batch deletes. + * batch deletes. * */ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); - -public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +private Optional nodeId; + +/** + * Create a new SpanBatchInformation object. + * @param nodeId - Optional nodeId that is used to filter returned results. Useful if the hash --- End diff -- Addressed. Provided a link to BindingHashShardingFunction which documents how sharded row keys are generated. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153577373 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -20,39 +22,79 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.Span; +import com.google.common.base.Preconditions; + /** * This class represents a batch order to delete all entries in the Fluo table indicated * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, * which uses this batch information along with the nodeId passed into the Observer to perform - * batch deletes. + * batch deletes. * */ public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153575392 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java --- @@ -231,30 +225,38 @@ public void updateJoinResults( * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update * @return Span to retrieve sibling node's BindingSets to form join results */ -private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) { +private Span getSpan(TransactionBase tx, final String childId, final VisibilityBindingSet childBindingSet, final String siblingId) { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); final List commonVars = getCommonVars(childVarOrder, siblingVarOrder); -// Get the Binding strings -final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); -final String[] childBindingArray = childBindingSetString.split("\u0001"); -final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); - -// Create the prefix that will be used to scan for binding sets of the sibling node. -// This prefix includes the sibling Node ID and the common variable values from -// childBindingSet. -String siblingScanPrefix = ""; -for(int i = 0; i < commonVars.size(); i++) { -if(siblingScanPrefix.length() == 0) { -siblingScanPrefix = childBindingStrings[i]; -} else { -siblingScanPrefix += DELIM + childBindingStrings[i]; -} +//// Get the Binding strings +//final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); +//final String[] childBindingArray = childBindingSetString.split("\u0001"); +//final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); + +Bytes scanPrefix = null; +if(!commonVars.isEmpty()) { +scanPrefix = getRowKey(siblingId, new VariableOrder(commonVars), childBindingSet); +} else { +scanPrefix = getRowKey(siblingId, siblingVarOrder, childBindingSet); } -siblingScanPrefix = siblingId + NODEID_BS_DELIM + siblingScanPrefix; -return Span.prefix(siblingScanPrefix); +// --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153575367 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java --- @@ -231,30 +225,38 @@ public void updateJoinResults( * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update * @return Span to retrieve sibling node's BindingSets to form join results */ -private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) { +private Span getSpan(TransactionBase tx, final String childId, final VisibilityBindingSet childBindingSet, final String siblingId) { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); final List commonVars = getCommonVars(childVarOrder, siblingVarOrder); -// Get the Binding strings -final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); -final String[] childBindingArray = childBindingSetString.split("\u0001"); -final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); - -// Create the prefix that will be used to scan for binding sets of the sibling node. -// This prefix includes the sibling Node ID and the common variable values from -// childBindingSet. -String siblingScanPrefix = ""; -for(int i = 0; i < commonVars.size(); i++) { -if(siblingScanPrefix.length() == 0) { -siblingScanPrefix = childBindingStrings[i]; -} else { -siblingScanPrefix += DELIM + childBindingStrings[i]; -} +//// Get the Binding strings +//final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); +//final String[] childBindingArray = childBindingSetString.split("\u0001"); +//final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); + +Bytes scanPrefix = null; --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153575097 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java --- @@ -231,30 +225,38 @@ public void updateJoinResults( * @param siblingId - Id of the sibling node whose BindingSets will be retrieved and joined with the update * @return Span to retrieve sibling node's BindingSets to form join results */ -private Span getSpan(TransactionBase tx, final String childId, final BindingSet childBindingSet, final String siblingId) { +private Span getSpan(TransactionBase tx, final String childId, final VisibilityBindingSet childBindingSet, final String siblingId) { // Get the common variable orders. These are used to build the prefix. final VariableOrder childVarOrder = getVarOrder(tx, childId); final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId); final List commonVars = getCommonVars(childVarOrder, siblingVarOrder); -// Get the Binding strings -final String childBindingSetString = VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder); -final String[] childBindingArray = childBindingSetString.split("\u0001"); -final String[] childBindingStrings = FluoStringConverter.toBindingStrings(childBindingArray[0]); - -// Create the prefix that will be used to scan for binding sets of the sibling node. -// This prefix includes the sibling Node ID and the common variable values from -// childBindingSet. -String siblingScanPrefix = ""; -for(int i = 0; i < commonVars.size(); i++) { -if(siblingScanPrefix.length() == 0) { -siblingScanPrefix = childBindingStrings[i]; -} else { -siblingScanPrefix += DELIM + childBindingStrings[i]; -} +//// Get the Binding strings --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153574758 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java --- @@ -30,15 +30,18 @@ public static final String TYPE_DELIM = "<<~>>"; //to be used in construction of id for each node -public static final String SP_PREFIX = "STATEMENT_PATTERN"; -public static final String JOIN_PREFIX = "JOIN"; -public static final String FILTER_PREFIX = "FILTER"; -public static final String AGGREGATION_PREFIX = "AGGREGATION"; -public static final String QUERY_PREFIX = "QUERY"; -public static final String PROJECTION_PREFIX = "PROJECTION"; -public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; -public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; - +public static final String TRIPLE_PREFIX = "T"; --- End diff -- No. Just more efficient. Since I was making so many changes to how results were stored, I figured it was extremely unnecessary to have such large prefixes. Also, when specifying splits in the fluo app, it is necessary to indicate the prefixes that you want to split over. This becomes cumbersome with such large prefixes. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153574297 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java --- @@ -42,7 +43,8 @@ private static final WholeRowTripleResolver tr = new WholeRowTripleResolver(); public static RyaStatement deserializeTriple(final Bytes row) { -final byte[] rowArray = row.toArray(); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153573688 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + +public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { +return make(BindingHashShardingFunction.removeHash(prefixBytes, row)); +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +builder +.append("NodeId: " + nodeId).append("\n") +.append("BindingSet String: " + bindingSetString); +return builder.toString(); +} + +@Override +public boolean equals(Object other) { +if(other == null) { return false;} +if(this == other) { return true;} + +if (other instanceof BindingSetRow) { +BindingSetRow row = (BindingSetRow) other; +return new EqualsBuilder().append(this.nodeId, row.nodeId).append(this.bindingSetString, row.bindingSetString) --- End diff -- Okay, but only because you were nice enough to code it up for me :) ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153573207 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + +public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { +return make(BindingHashShardingFunction.removeHash(prefixBytes, row)); +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +builder +.append("NodeId: " + nodeId).append("\n") +.append("BindingSet String: " + bindingSetString); +return builder.toString(); +} + +@Override +public boolean equals(Object other) { +if(other == null) { return false;} --- End diff -- Yep. Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153572801 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + +public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { +return make(BindingHashShardingFunction.removeHash(prefixBytes, row)); +} + +@Override +public String toString() { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153571640 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + +public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { +return make(BindingHashShardingFunction.removeHash(prefixBytes, row)); +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +builder +.append("NodeId: " + nodeId).append("\n") +.append("BindingSet String: " + bindingSetString); +return builder.toString(); +} + +@Override +public boolean equals(Object other) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153570866 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) { final String bindingSetString = rowArray.length == 2 ? rowArray[1] : ""; return new BindingSetRow(nodeId, bindingSetString); } + +public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, Bytes row) { --- End diff -- See above. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153570806 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java --- @@ -21,15 +21,16 @@ import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import net.jcip.annotations.Immutable; /** - * The values of an Accumulo Row ID for a row that stores a Binding set for - * a specific Node ID of a query. + * The values of an Accumulo Row ID for a row that stores a Binding set for a specific Node ID of a query. --- End diff -- I don't think this is the appropriate place to document this. I added documentation to the class BindingHashShardingFunction about how the sharded row id is created and linked this class to the makeFromShardRow(...) method in BindingSetRow. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153565747 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +public abstract class AbstractNodeUpdater { --- End diff -- The class provides the common functionality for generating row keys to all of the updaters that extend it. By shard row creation in the parent, it limits the number of files that I would have to update to change the strategy for generating row keys. I think it's fine as is. ---
[GitHub] incubator-rya pull request #251: Candidate/rya 406
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/251#discussion_r153563566 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/pom.xml --- @@ -1,25 +1,16 @@ - -http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +