[GitHub] incubator-rya pull request #267: RYA-440 Added commands to Rya Shell used to...

2018-01-26 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/267#discussion_r164219745
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java
 ---
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.RyaStreamsClient;
+import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery;
+import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Constructs instances of {@link RyaStreamsClient} that are connected to 
a Kafka cluster.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaRyaStreamsClientFactory {
+private static final Logger log = 
LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class);
+
+/**
+ * Initialize a {@link RyaStreamsClient} that will interact with an 
instance of Rya Streams
+ * that is backed by Kafka.
+ *
+ * @param ryaInstance - The name of the Rya Instance the client is 
connected to. (not null)
+ * @param kafkaHostname - The hostname of the Kafka Broker.
+ * @param kafkaPort - The port of the Kafka Broker.
+ * @return The initialized commands.
+ */
+public static RyaStreamsClient make(
+final String ryaInstance,
+final String kafkaHostname,
+final int kafkaPort) {
+requireNonNull(kafkaHostname);
--- End diff --

Null check on ryaInstance


---


[GitHub] incubator-rya pull request #267: RYA-440 Added commands to Rya Shell used to...

2018-01-26 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/267#discussion_r164218885
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java
 ---
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import 
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that implements the core functionality of the {@link 
SetRyaStreamsConfiguration} interactor.
+ * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so 
that the common code may update
+ * any implementation of that repository.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class SetRyaStreamsConfigurationBase implements 
SetRyaStreamsConfiguration {
+
+private final InstanceExists instanceExists;
+
+/**
+ * Constructs an instance of {@link SetRyaStreamsConfigurationBase}.
+ *
+ * @param instanceExists - The interactor used to verify Rya instances 
exist. (not null)
+ */
+public SetRyaStreamsConfigurationBase(final InstanceExists 
instanceExists) {
+this.instanceExists = requireNonNull(instanceExists);
+}
+
+/**
+ * Get a {@link RyaDetailsRepository} that is connected to a specific 
instance of Rya.
+ *
+ * @param ryaInstance - The Rya instance the repository must be 
connected to. (not null)
+ * @return A {@link RyaDetailsRepository} connected to the specified 
Rya instance.
+ */
+protected abstract RyaDetailsRepository getRyaDetailsRepo(String 
ryaInstance);
+
+@Override
+public void setRyaStreamsConfiguration(final String ryaInstance, final 
RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, 
RyaClientException{
--- End diff --

Null check on ryaInstance


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159936556
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjIndexer.java
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Updates the PCJs that are in a {@link MongoPcjDocuments}.
+ */
+public interface PcjIndexer extends RyaSecondaryIndexer {
+/**
+ * Creates the {@link MongoPcjDocuments} that will be used by the 
indexer.
+ *
+ * @param conf - Indicates how the {@link MongoPcjDocuments} is 
initialized. (not null)
+ * @return The {@link MongoPcjDocuments} that will be used by this 
indexer.
+ */
+public @Nullable MongoPcjDocuments getPcjStorage(Configuration conf);
--- End diff --

Seems like the primary purpose of this class is to parse the config object 
to create the MongoPcjDocuments object.  I don't think that this functionality 
should be lumped in with the role of the Indexer.  In the case of PCJs, the 
primary role of the indexer is to route statements to the updater service, 
which then routes Bindingset results to Kafka, Mongo, Accumulo, etc for use 
with PCJs.  You are primarily creating and using the indexer for the purpose of 
parsing the config object to create the MongoPcjDocuments object.  You are not 
using it for its primary purpose, which is confusing.  Also, you are adding a 
Mongo specific method to a very general indexing interface and not indicating 
that the extension is Mongo specific.  This is also very misleading.  I think 
all of this would be much cleaner if you separated the parsing of 
config/creation of MongoPcjDocuments from the indexing API.  By combining them 
you are abusing established patterns of usage.


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159935133
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjQueryNode.java
 ---
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.utils.IteratorWrapper;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Indexing Node for PCJs expressions to be inserted into execution plan to
+ * delegate entity portion of query to {@link MongoPrecomputedJoinIndexer}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class PcjQueryNode extends ExternalTupleSet implements 
ExternalBatchingIterator {
+private static final Logger log = Logger.getLogger(PcjQueryNode.class);
+private final String tablename;
+private final PcjIndexer indexer;
+private final MongoPcjDocuments pcjDocs;
+
+/**
+ * Creates a new {@link PcjQueryNode}.
+ *
+ * @param sparql - name of sparql query whose results will be stored 
in PCJ table
+ * @param conf - Rya Configuration
+ * @param tablename - name of an existing PCJ table
+ *
+ * @throws MalformedQueryException - The SPARQL query needs to contain 
a projection.
+ */
+public PcjQueryNode(final String sparql, final String tablename, final 
MongoPcjDocuments pcjDocs) throws MalformedQueryException {
+this.pcjDocs = checkNotNull(pcjDocs);
+indexer = new MongoPrecomputedJoinIndexer();
+this.tablename = tablename;
+final SPARQLParser sp = new SPARQLParser();
+final ParsedTupleQuery pq = (ParsedTupleQuery) 
sp.parseQuery(sparql, null);
+final TupleExpr te = pq.getTupleExpr();
+Preconditions.checkArgument(PCJOptimizerUtilities.isPCJValid(te), 
"TupleExpr is an invalid PCJ.");
+
+final Optional projection = new 
ParsedQueryUtil().findProjection(pq);
+if (!projection.isPresent()) {
+throw new MalformedQueryException("SPARQL query '" + sparql + 
"' does not contain a Projection.");
+}
+setProjectionExpr(projection.get());
+}
+
+/**
+ * Creates a new {@link PcjQueryNode}.
+ *
+ * @param accCon - connection to a valid Accumulo instance
+ * @param tablename - name of an existing PCJ table
+ */
+public P

[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r160796664
  
--- Diff: 
extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java
 ---
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.api.resolver.RyaToRdfConversions;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.RepositoryException;
+
+import com.mongodb.MongoClient;
+import com.mongodb.client.FindIterable;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.util.JSON;
+
+/**
+ * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows:
+ *
+ * 
+ * 
+ * - PCJ Metadata Doc -
+ * {
+ *   _id: [table_name]_METADATA,
+ *   sparql: [sparql query to match results],
+ *   cardinality: [number of results]
+ * }
+ *
+ * - PCJ Results Doc -
+ * {
+ *   pcjName: [table_name],
+ *   auths: [auths]
+ *   [binding_var1]: {
+ * uri: [type_uri],
+ * value: value
+ *   }
+ *   .
+ *   .
+ *   .
+ *   [binding_varn]: {
+ * uri: [type_uri],
+ * value: value
+ *   }
+ * }
+ * 
+ * 
+ */
+public class MongoPcjDocuments {
+public static final String PCJ_COLLECTION_NAME = "pcjs";
+
+// metadata fields
+public static final String CARDINALITY_FIELD = "cardinality";
+public static final String SPARQL_FIELD = "sparql";
+public static final String PCJ_ID = "_id";
+public static final String VAR_ORDER_ID = "varOrders";
+
+// pcj results fields
+private static final String BINDING_VALUE = "value";
+private static final String BINDING_TYPE = "uri";
+private static final String AUTHS_FIELD = "auths";
+private static final String PCJ_NAME = "pcjName";
+
+private final MongoCollection pcjCollection;
+private static final PcjVarOrderFactory pcjVarOrderFactory = new 
ShiftVarOrderFactory();
+
+/**
+ * Creates a new {@link MongoPcjDocuments}.
+ * @param client - The {@link MongoClient} to use to connect to mongo.
+ * @param ryaInstanceName - The rya instance to

[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159934928
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjQueryNode.java
 ---
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.utils.IteratorWrapper;
+import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
+import org.apache.rya.indexing.external.tupleSet.ParsedQueryUtil;
+import org.apache.rya.indexing.pcj.matching.PCJOptimizerUtilities;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.rdftriplestore.evaluation.ExternalBatchingIterator;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Indexing Node for PCJs expressions to be inserted into execution plan to
+ * delegate entity portion of query to {@link MongoPrecomputedJoinIndexer}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class PcjQueryNode extends ExternalTupleSet implements 
ExternalBatchingIterator {
--- End diff --

Did this ever get renamed to something more Mongo specific?


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160768697
  
--- Diff: 
extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+/**
+ * Query visitor that identifies all triple patterns represented as
+ * {@link StatementPattern}s in a query, which therefore represent triples
+ * that could potentially contribute to a solution. Considers only the 
statement
+ * patterns themselves, i.e. the leaves of the query tree, and does not 
consider
+ * other constraints that may restrict the set of triples that may be 
relevant.
+ * This means relying on this analysis to determine whether a fact can be 
part
+ * of a solution can yield false positives, but not false negatives.
+ */
+class AntecedentVisitor extends QueryModelVisitorBase {
--- End diff --

Got it.  When you were talking about rules I didn't realize you were doing 
so within the context of your Rule/Ruleset API.  I thought you were using the 
term rule less formally and talking rules within the context of a Construct 
query (i.e. the rule being triggered was whether to generate the statement or 
not).  It makes sense for a Rule to trigger another if the consequents of rule 
1 intersect the antecedents of rule2. 


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160702104
  
--- Diff: 
extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+/**
+ * Query visitor that identifies all triple patterns represented as
+ * {@link StatementPattern}s in a query, which therefore represent triples
+ * that could potentially contribute to a solution. Considers only the 
statement
+ * patterns themselves, i.e. the leaves of the query tree, and does not 
consider
+ * other constraints that may restrict the set of triples that may be 
relevant.
+ * This means relying on this analysis to determine whether a fact can be 
part
+ * of a solution can yield false positives, but not false negatives.
+ */
+class AntecedentVisitor extends QueryModelVisitorBase {
--- End diff --

I wasn't aware that the EXISTS graph pattern could be used as a filter 
operator.  I see your point.  Just for clarity, in your example you meant that 
the rule would be triggered by ?x a :T and ?x :p :A, right?  That is, whatever 
values of ?x you use in your construct statements must satisfy the condition 
that ?x occurs in a statement of the form ?x a :T and ?x :p :A, right?


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160698418
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
 ---
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.mongodb.MongoDbRdfConstants;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import 
org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
+import 
org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Projections;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Represents a portion of a query tree as MongoDB aggregation pipeline. 
Should
+ * be built bottom-up: start with a statement pattern implemented as a 
$match
+ * step, then add steps to the pipeline to handle higher levels of the 
query
+ * tree. Methods are provided to add certain supported query operations

[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160699176
  
--- Diff: 
dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Not;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.Sets;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoCollection;
+
+public class SparqlToPipelineTransformVisitorTest {
+
+private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+private static final String LUBM = "urn:lubm";
+private static final URI UNDERGRAD = VF.createURI(LUBM, 
"UndergraduateStudent");
+private static final URI PROFESSOR = VF.createURI(LUBM, "Professor");
+private static final URI COURSE = VF.createURI(LUBM, "Course");
+private static final URI TAKES = VF.createURI(LUBM, "takesCourse");
+private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse");
+
+private static Var constant(URI value) {
+return new Var(value.stringValue(), value);
+}
+
+MongoCollection collection;
+
+@Before
+@SuppressWarnings("unchecked")
+public void setUp() {
+collection = Mockito.mock(MongoCollection.class);
+Mockito.when(collection.getNamespace()).thenReturn(new 
MongoNamespace("db", "collection"));
+}
+
+@Test
+public void testStatementPattern() throws Exception {
+QueryRoot query = new QueryRoot(new StatementPattern(
+new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)));
+SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(collection);
+query.visit(visitor);
+Assert.assertTrue(query.getArg() instanceof 
AggregationPipelineQueryNode);
+AggregationPipelineQueryNode pipelineNode = 
(AggregationPipelineQueryNode) query.getArg();
+Assert.assertEquals(Sets.newHashSet("x"), 
pipelineNode.getAssuredBindingNames());
+}
+
+@Test
+public void testJoin() throws Exception {
+QueryRoot query = new QueryRoot(new Join(
+new StatementPattern(new Var("x"), constant(RDF.TYPE), 
constant(UNDERGRAD)),
+new StatementPattern(new Var("x"), constant(TAKES), new 
Var("course";
+SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(collection);
+query.visit(visitor);
+Assert.assertTrue(query.getArg() instanceof 
AggregationPipelineQueryNode);
+AggregationPipelineQueryNode pipe

[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-10 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160697260
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.bson.Document;
+import org.openrdf.query.algebra.Distinct;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Visitor that transforms a SPARQL query tree by replacing as much of the 
tree
+ * as possible with one or more {@code AggregationPipelineQueryNode}s.
+ * 
+ * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation
+ * pipeline which is equivalent to the replaced portion of the original 
query.
+ * Evaluating this node executes the pipeline and converts the results into
+ * query solutions. If only part of the query was transformed, the 
remaining
+ * query logic (higher up in the query tree) can be applied to those
+ * intermediate solutions as normal.
+ * 
+ * In general, processes the tree in bottom-up order: A leaf node
+ * ({@link StatementPattern}) is replaced with a pipeline that matches the
+ * corresponding statements. Then, if the parent node's semantics are 
supported
+ * by the visitor, stages are appended to the pipeline and the subtree at 
the
+ * parent node is replaced with the extended pipeline. This continues up 
the
+ * tree until reaching a node that cannot be transformed, in which case 
that
+ * node's child is now a single {@code AggregationPipelineQueryNode} (a 
leaf
+ * node) instead of the previous subtree, or until the entire tree has been
+ * subsumed into a single pipeline node.
+ * 
+ * Nodes which are transformed into pipeline stages:
+ * 
+ * A {@code StatementPattern} node forms the beginning of each 
pipeline.
+ * Single-argument operations {@link Projection}, {@link 
MultiProjection},
+ * {@link Extension}, {@link Distinct}, and {@link Reduced} will be 
transformed
+ * into pipeline stages whenever the child {@link TupleExpr} represents a
+ * pipeline.
+ * A {@link Filter} operation will be appended to the pipeline when its
+ * child {@code TupleExpr} represents a pipeline and the filter condition 
is a
+ * type of {@link ValueExpr} understood by {@code 
AggregationPipelineQueryNode}.
+ * A {@link Join} operation will be appended to the pipeline when one 
child
+ * is a {@code StatementPattern} and the other is an
+ * {@code AggregationPipelineQueryNode}.
+ * 
+ */
+public class SparqlToPipelineTransformVisitor extends 
QueryModelVisitorBase {
--- End diff --

As the trees are left leaning by default, I don' think that you have to 
worry about the case where you join two joins of statement patterns.


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-09 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160421704
  
--- Diff: 
extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+/**
+ * Query visitor that identifies all triple patterns represented as
+ * {@link StatementPattern}s in a query, which therefore represent triples
+ * that could potentially contribute to a solution. Considers only the 
statement
+ * patterns themselves, i.e. the leaves of the query tree, and does not 
consider
+ * other constraints that may restrict the set of triples that may be 
relevant.
+ * This means relying on this analysis to determine whether a fact can be 
part
+ * of a solution can yield false positives, but not false negatives.
+ */
+class AntecedentVisitor extends QueryModelVisitorBase {
--- End diff --

Doesn't matter too much, but 
StatementPatternCollector.process(QueryModelNode node) will give you a list 
containing the StatementPatterns in the node.


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160318466
  
--- Diff: 
dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java
 ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.bson.Document;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Not;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.Sets;
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.MongoCollection;
+
+public class SparqlToPipelineTransformVisitorTest {
+
+private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+
+private static final String LUBM = "urn:lubm";
+private static final URI UNDERGRAD = VF.createURI(LUBM, 
"UndergraduateStudent");
+private static final URI PROFESSOR = VF.createURI(LUBM, "Professor");
+private static final URI COURSE = VF.createURI(LUBM, "Course");
+private static final URI TAKES = VF.createURI(LUBM, "takesCourse");
+private static final URI TEACHES = VF.createURI(LUBM, "teachesCourse");
+
+private static Var constant(URI value) {
+return new Var(value.stringValue(), value);
+}
+
+MongoCollection collection;
+
+@Before
+@SuppressWarnings("unchecked")
+public void setUp() {
+collection = Mockito.mock(MongoCollection.class);
+Mockito.when(collection.getNamespace()).thenReturn(new 
MongoNamespace("db", "collection"));
+}
+
+@Test
+public void testStatementPattern() throws Exception {
+QueryRoot query = new QueryRoot(new StatementPattern(
+new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD)));
+SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(collection);
+query.visit(visitor);
+Assert.assertTrue(query.getArg() instanceof 
AggregationPipelineQueryNode);
+AggregationPipelineQueryNode pipelineNode = 
(AggregationPipelineQueryNode) query.getArg();
+Assert.assertEquals(Sets.newHashSet("x"), 
pipelineNode.getAssuredBindingNames());
+}
+
+@Test
+public void testJoin() throws Exception {
+QueryRoot query = new QueryRoot(new Join(
+new StatementPattern(new Var("x"), constant(RDF.TYPE), 
constant(UNDERGRAD)),
+new StatementPattern(new Var("x"), constant(TAKES), new 
Var("course";
+SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(collection);
+query.visit(visitor);
+Assert.assertTrue(query.getArg() instanceof 
AggregationPipelineQueryNode);
+AggregationPipelineQueryNode pipe

[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160315599
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
 ---
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.mongodb.MongoDbRdfConstants;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import 
org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
+import 
org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Projections;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Represents a portion of a query tree as MongoDB aggregation pipeline. 
Should
+ * be built bottom-up: start with a statement pattern implemented as a 
$match
+ * step, then add steps to the pipeline to handle higher levels of the 
query
+ * tree. Methods are provided to add certain supported query operations

[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160317536
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitor.java
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import java.util.Arrays;
+
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.bson.Document;
+import org.openrdf.query.algebra.Distinct;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.Reduced;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+
+/**
+ * Visitor that transforms a SPARQL query tree by replacing as much of the 
tree
+ * as possible with one or more {@code AggregationPipelineQueryNode}s.
+ * 
+ * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation
+ * pipeline which is equivalent to the replaced portion of the original 
query.
+ * Evaluating this node executes the pipeline and converts the results into
+ * query solutions. If only part of the query was transformed, the 
remaining
+ * query logic (higher up in the query tree) can be applied to those
+ * intermediate solutions as normal.
+ * 
+ * In general, processes the tree in bottom-up order: A leaf node
+ * ({@link StatementPattern}) is replaced with a pipeline that matches the
+ * corresponding statements. Then, if the parent node's semantics are 
supported
+ * by the visitor, stages are appended to the pipeline and the subtree at 
the
+ * parent node is replaced with the extended pipeline. This continues up 
the
+ * tree until reaching a node that cannot be transformed, in which case 
that
+ * node's child is now a single {@code AggregationPipelineQueryNode} (a 
leaf
+ * node) instead of the previous subtree, or until the entire tree has been
+ * subsumed into a single pipeline node.
+ * 
+ * Nodes which are transformed into pipeline stages:
+ * 
+ * A {@code StatementPattern} node forms the beginning of each 
pipeline.
+ * Single-argument operations {@link Projection}, {@link 
MultiProjection},
+ * {@link Extension}, {@link Distinct}, and {@link Reduced} will be 
transformed
+ * into pipeline stages whenever the child {@link TupleExpr} represents a
+ * pipeline.
+ * A {@link Filter} operation will be appended to the pipeline when its
+ * child {@code TupleExpr} represents a pipeline and the filter condition 
is a
+ * type of {@link ValueExpr} understood by {@code 
AggregationPipelineQueryNode}.
+ * A {@link Join} operation will be appended to the pipeline when one 
child
+ * is a {@code StatementPattern} and the other is an
+ * {@code AggregationPipelineQueryNode}.
+ * 
+ */
+public class SparqlToPipelineTransformVisitor extends 
QueryModelVisitorBase {
--- End diff --

Does the AggregationPipelineQueryNode have to be evaluated first in the 
query plan?  Do you do anything to group nodes that can be evaluated in an 
AggregationPipelineQueryNode together?


---


[GitHub] incubator-rya pull request #255: RYA-417 Forward-chaining batch rules engine

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/255#discussion_r160316024
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
 ---
@@ -0,0 +1,862 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.mongodb.aggregation;
+
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.CONTEXT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.DOCUMENT_VISIBILITY;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.OBJECT_TYPE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.PREDICATE_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.STATEMENT_METADATA;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.SUBJECT_HASH;
+import static 
org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.function.Function;
+
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.mongodb.MongoDbRdfConstants;
+import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import 
org.apache.rya.mongodb.document.operators.query.ConditionalOperators;
+import 
org.apache.rya.mongodb.document.visibility.DocumentVisibilityAdapter;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Compare;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.impl.ExternalSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.BsonField;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.Projections;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Represents a portion of a query tree as MongoDB aggregation pipeline. 
Should
+ * be built bottom-up: start with a statement pattern implemented as a 
$match
+ * step, then add steps to the pipeline to handle higher levels of the 
query
+ * tree. Methods are provided to add certain supported query operations

[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160262975
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a mechanism for storing {@link VisibilityBindingSet}s that 
have been emitted from either side of
+ * a Join and a way to fetch all {@link VisibilityBindingSet}s that join 
with it from the other side.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface JoinStateStore {
+
+/**
+ * Store a {@link VisibilityBindingSet} based on the side it was 
emitted from.
+ *
+ * @param result - The result whose value will be stored. (not null)
+ */
+public void store(BinaryResult result);
--- End diff --

It's just that there are at least two implementations of Serializers for 
VisiblityBindingSet int eh code base.  Granted that they might be for specific 
serialization/encoder interfaces.  Were you at least able to reuse 
VisibilityBindingSetSerDe in the implementation of this interface?


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160262440
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link JoinProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+private final String stateStoreName;
+private final IterativeJoin join;
+private final List joinVars;
+private final List allVars;
+
+/**
+ * Constructs an instance of {@link JoinProcessorSupplier}.
+ *
+ * @param stateStoreName - The name of the state store the processor 
will use. (not null)
+ * @param join - The join function the supplied processor will use. 
(not null)
+ * @param joinVars - The variables that the supplied processor will 
join over. (not null)
+ * @param allVars - An ordered list of all the variables that may 
appear in resulting Binding Sets.
+ *   This list must lead with the same variables and order as {@code 
joinVars}. (not null)
+ * @param resultFactory - The factory that the supplied processors 
will use to create results. (not null)
+ * @throws IllegalArgumentException Thrown if {@code allVars} does not 
start with {@code joinVars}.
+ */
+public JoinProcessorSupplier(
+final String stateStoreName,
+final IterativeJoin join,
+final List joinVars,
+final List allVars,
+final ProcessorResultFactory resultFactory) throws 
IllegalArgumentException {
+super(resultFactory);
+this.stateStoreName = requireNonNull(stateStoreName);
+this.join = requireNonNull(join);
+this.joinVars = requireNonNull(joinVars);
+this.allVars = requireNonNull(allVars);
+
+if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+throw new IllegalArgumentException("All vars must be lead by 
the join vars, but it did not. " +
--- End diff --

"The vars List must start with the joinVars List, but it did not" or "The 
joinVars List must be a prefix of the vars List".  I understand that "start 
with" and "prefix" are not perfectly well defined in the context of List, but 
"lead by" stood out as being awkward to me.  


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-08 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160261077
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.Iterator;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An {@link Iterator} that is also {@link AutoCloseable}.
+ *
+ * @param  - The type of elements that will be iterated over.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface CloseableIterator extends Iterator, AutoCloseable { 
}
--- End diff --

I thought that you implemented another instance of CloseableIterator for 
similar reasons when you were doing the Fluo work.  Did that get deleted?


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-07 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160071035
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/CloseableIterator.java
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import java.util.Iterator;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An {@link Iterator} that is also {@link AutoCloseable}.
+ *
+ * @param  - The type of elements that will be iterated over.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface CloseableIterator extends Iterator, AutoCloseable { 
}
--- End diff --

Do we really need another CloseableIterator interface?


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-07 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160069169
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/StartQuery.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.api.interactor;
+
+import java.util.UUID;
+
+import org.apache.rya.streams.api.exception.RyaStreamsException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Starts processing a query in Rya Streams.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface StartQuery {
--- End diff --

Whats the difference between StartQuery and RunQuery?


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-07 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160071321
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinStateStore.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a mechanism for storing {@link VisibilityBindingSet}s that 
have been emitted from either side of
+ * a Join and a way to fetch all {@link VisibilityBindingSet}s that join 
with it from the other side.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface JoinStateStore {
+
+/**
+ * Store a {@link VisibilityBindingSet} based on the side it was 
emitted from.
+ *
+ * @param result - The result whose value will be stored. (not null)
+ */
+public void store(BinaryResult result);
--- End diff --

Yet another way of serializing VisiblityBindingsets?  Is this really 
necessary?


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-07 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160071093
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorSupplier.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors.join;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.rya.api.function.join.IterativeJoin;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.processors.ProcessorResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult;
+import 
org.apache.rya.streams.kafka.processors.ProcessorResult.BinaryResult.Side;
+import org.apache.rya.streams.kafka.processors.ProcessorResultFactory;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessor;
+import org.apache.rya.streams.kafka.processors.RyaStreamsProcessorSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link JoinProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class JoinProcessorSupplier extends RyaStreamsProcessorSupplier {
+
+private final String stateStoreName;
+private final IterativeJoin join;
+private final List joinVars;
+private final List allVars;
+
+/**
+ * Constructs an instance of {@link JoinProcessorSupplier}.
+ *
+ * @param stateStoreName - The name of the state store the processor 
will use. (not null)
+ * @param join - The join function the supplied processor will use. 
(not null)
+ * @param joinVars - The variables that the supplied processor will 
join over. (not null)
+ * @param allVars - An ordered list of all the variables that may 
appear in resulting Binding Sets.
+ *   This list must lead with the same variables and order as {@code 
joinVars}. (not null)
+ * @param resultFactory - The factory that the supplied processors 
will use to create results. (not null)
+ * @throws IllegalArgumentException Thrown if {@code allVars} does not 
start with {@code joinVars}.
+ */
+public JoinProcessorSupplier(
+final String stateStoreName,
+final IterativeJoin join,
+final List joinVars,
+final List allVars,
+final ProcessorResultFactory resultFactory) throws 
IllegalArgumentException {
+super(resultFactory);
+this.stateStoreName = requireNonNull(stateStoreName);
+this.join = requireNonNull(join);
+this.joinVars = requireNonNull(joinVars);
+this.allVars = requireNonNull(allVars);
+
+if(!allVars.subList(0, joinVars.size()).equals(joinVars)) {
+throw new IllegalArgumentException("All vars must be lead by 
the join vars, but it did not. " +
--- End diff --

Awkward Exception message.


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-07 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160070703
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/ProcessorResult.java
 ---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Objects;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a value that is emitted from a Rya Streams {@link 
Processor}. We can't just emit a
+ * {@link VisibilityBindingSet} because some downstream processors require 
more information about
+ * which upstream processor is emitting the result in order to do their 
work.
+ * 
+ * Currently there are only two types processors:
--- End diff --

types of


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160011353
  
--- Diff: extras/rya.streams/api/pom.xml ---
@@ -0,0 +1,74 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+
+org.apache.rya
+rya.streams.parent
+3.2.12-incubating-SNAPSHOT
+
+
+4.0.0
+rya.streams.api
+
+Apache Rya Streams API
+
+This module contains the Rya Streams API components.
+
+
+
+
+
+org.apache.rya
+rya.api.model
+
+
+
+org.openrdf.sesame
+sesame-queryparser-sparql
+
+
+
+org.slf4j
+slf4j-api
+
+
+com.google.guava
+guava
+
+
+
--- End diff --

Included twice.


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r160009857
  
--- Diff: 
common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Add to the {@link AggregationState}'s sum if the child Binding Set 
contains the binding name
+ * that is being summed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class SumFunction implements AggregationFunction {
+private static final Logger log = 
LoggerFactory.getLogger(SumFunction.class);
+
+@Override
+public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+checkArgument(aggregation.getAggregationType() == 
AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements.");
--- End diff --

Preconditions


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r159984278
  
--- Diff: 
common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s min if the child binding Set 
contains the binding name that is being
+ * mined by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MinFunction implements AggregationFunction {
+
+private final ValueComparator compare = new ValueComparator();
+
+@Override
+public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+checkArgument(aggregation.getAggregationType() == 
AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements.");
--- End diff --

Preconditions


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r159984048
  
--- Diff: 
common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java
 ---
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigInteger;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Increments the {@link AggregationState}'s count if the child Binding 
Set contains the binding name
+ * that is being counted by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class CountFunction implements AggregationFunction {
+@Override
+public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+checkArgument(aggregation.getAggregationType() == 
AggregationType.COUNT, "The CountFunction only accepts COUNT 
AggregationElements.");
--- End diff --

Preconditions


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r159983121
  
--- Diff: common/pom.xml ---
@@ -34,6 +34,8 @@ under the License.
 
 
 rya.api
+rya.api.model
+rya.api.function
--- End diff --

Based on the contents of this module, it seems like rya.api.evaluation 
would a better name.  Naming the module function suggests that the module 
consists entirely of Filter Functions.  Just a nit tho.



---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r159984158
  
--- Diff: 
common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.evaluation.util.ValueComparator;
+import org.openrdf.query.impl.MapBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s max if the child binding Set 
contains the binding name that is being
+ * maxed by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class MaxFunction implements AggregationFunction {
+
+private final ValueComparator compare = new ValueComparator();
+
+@Override
+public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+checkArgument(aggregation.getAggregationType() == 
AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements.");
--- End diff --

Preconditions


---


[GitHub] incubator-rya pull request #257: RYA-377 Rya Streams

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/257#discussion_r159983828
  
--- Diff: 
common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.aggregation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Map;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Value;
+import org.openrdf.model.datatypes.XMLDatatypeUtil;
+import org.openrdf.model.impl.DecimalLiteralImpl;
+import org.openrdf.model.impl.IntegerLiteralImpl;
+import org.openrdf.query.algebra.MathExpr.MathOp;
+import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
+import org.openrdf.query.algebra.evaluation.util.MathUtil;
+import org.openrdf.query.impl.MapBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Update the {@link AggregationState}'s average if the child Binding Set 
contains the binding name
+ * that is being averaged by the {@link AggregationElement}.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class AverageFunction implements AggregationFunction {
+private static final Logger log = 
LoggerFactory.getLogger(AverageFunction.class);
+
+@Override
+public void update(final AggregationElement aggregation, final 
AggregationState state, final VisibilityBindingSet childBindingSet) {
+checkArgument(aggregation.getAggregationType() == 
AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE 
AggregationElements.");
--- End diff --

Preconditions.


---


[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/258#discussion_r159947383
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoConnectionDetails.java
 ---
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * The information the shell used to connect to Mongo server, not the DB 
or collections.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoConnectionDetails {
+
+private final Optional username;
+private final Optional password;
+private final String hostname;
+private final int port;
+
+/**
+ * Constructs an instance of {@link MongoConnectionDetails}.
+ *
+ * @param hostname - The hostname of the Mongo DB that was connected 
to. (not null)
+ * @param port - The port of the Mongo DB that was connected to.
+ * @param username - The username that was used to establish the 
connection
+ *   when performing administrative operations. (not null)
+ * @param password - The password that was used to establish the 
connection
+ *   when performing administrative operations. (not null)
+ */
+public MongoConnectionDetails(
+final String hostname,
+final int port,
+final Optional username,
+final Optional password) {
+this.hostname = requireNonNull(hostname);
+this.port = port;
+this.username = requireNonNull(username);
+this.password = requireNonNull(password);
+}
+
+/**
+ * @return The hostname of the Mongo DB that was connected to.
+ */
+public String getHostname() {
+return hostname;
+}
+
+/**
+ * @return The port of the Mongo DB that was connected to.
+ */
+public int getPort() {
+return port;
+}
+
+ /**
+  * @return The username that was used to establish the connection 
when performing administrative operations.
+  */
+ public Optional getUsername() {
+ return this.username;
+ }
+
+ /**
+  * @return The password that was used to establish the connection 
when performing administrative operations.
+  */
+ public Optional getPassword() {
+ return password;
+ }
+
+/**
+ * Create a {@link MongoDBRdfConfiguration} that is using this 
object's values.
+ *
+ * @param ryaInstanceName - The Rya instance to connect to. (not null)
+ * @return Constructs a new {@link MongoDBRdfConfiguration} object 
with values from this object.
+ */
+public MongoDBRdfConfiguration build(final String ryaInstanceName) {
+// Note, we don't use the MongoDBRdfConfigurationBuilder here 
because it explicitly sets
+// authorizations and visibilities to an empty string if they are 
not set on the builder.
+// If they are null in the MongoRdfConfiguration object, it may do 
the right thing.
+final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration();
+conf.setBoolean(ConfigUtils.USE_MONGO, true);
+conf.setMongoHostname(hostname);
+conf.setMongoPort("" + port);
+conf.setMongoDBName(ryaInstanceName);
+
+if(username.isPresent()) {
+conf.setMongoUser(username.get());
+}
+
+if(password.isPresent()) {
+conf.s

[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/258#discussion_r159949461
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoUninstall.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.Uninstall;
+
+import com.mongodb.MongoClient;
+import com.mongodb.MongoException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * An Mongo implementation of the {@link Uninstall} command.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoUninstall implements Uninstall {
+
+private final MongoClient adminClient;
+private final InstanceExists instanceExists;
+
+/**
+ * Constructs an instance of {@link MongoUninstall}.
+ *
+ * @param adminClient- Provides programmatic access to the instance of 
Mongo that hosts Rya instances. (not null)
+ * @param instanceExists - The interactor used to check if a Rya 
instance exists. (not null)
+ */
+public MongoUninstall(final MongoClient adminClient, final 
MongoInstanceExists instanceExists) {
+this.adminClient = requireNonNull(adminClient);
+this.instanceExists = requireNonNull(instanceExists);
+}
+
+@Override
+public void uninstall(final String ryaInstanceName) throws 
InstanceDoesNotExistException, RyaClientException {
+try {
--- End diff --

Preconditions


---


[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/258#discussion_r159945788
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java ---
@@ -53,37 +55,37 @@
  */
 public RyaClient(
 final Install install,
-final CreatePCJ createPcj,
-final DeletePCJ deletePcj,
-final CreatePeriodicPCJ createPeriodicPcj,
-final DeletePeriodicPCJ deletePeriodicPcj,
-final ListIncrementalQueries listIncrementalQueries,
-final BatchUpdatePCJ batchUpdatePcj,
+final Optional createPcj,
+final Optional deletePcj,
+final Optional createPeriodicPcj,
+final Optional deletePeriodicPcj,
+final Optional listIncrementalQueries,
+final Optional batchUpdatePcj,
 final GetInstanceDetails getInstanceDetails,
 final InstanceExists instanceExists,
 final ListInstances listInstances,
-final AddUser addUser,
-final RemoveUser removeUser,
+final Optional addUser,
+final Optional removeUser,
 final Uninstall uninstall,
 final LoadStatements loadStatements,
 final LoadStatementsFile loadStatementsFile,
 final ExecuteSparqlQuery executeSparqlQuery) {
 this.install = requireNonNull(install);
 this.createPcj = requireNonNull(createPcj);
 this.deletePcj = requireNonNull(deletePcj);
-this.createPeriodicPcj = createPeriodicPcj;
-this.deletePeriodicPcj = deletePeriodicPcj;
-this.listIncrementalQueries = listIncrementalQueries;
+this.createPeriodicPcj = requireNonNull(createPeriodicPcj);
+this.deletePeriodicPcj = requireNonNull(deletePeriodicPcj);
+this.listIncrementalQueries = 
requireNonNull(listIncrementalQueries);
 this.bactchUpdatePCJ = requireNonNull(batchUpdatePcj);
 this.getInstanceDetails = requireNonNull(getInstanceDetails);
 this.instanceExists = requireNonNull(instanceExists);
 this.listInstances = requireNonNull(listInstances);
 this.addUser = requireNonNull(addUser);
 this.removeUser = requireNonNull(removeUser);
 this.uninstall = requireNonNull(uninstall);
-this.loadStatements = requireNonNull(loadStatements);
--- End diff --

Why did the null checks get removed?


---


[GitHub] incubator-rya pull request #258: RYA-104 Mongo Rya Shell Integration

2018-01-05 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/258#discussion_r159950520
  
--- Diff: extras/rya.pcj.fluo/pom.xml ---
@@ -37,7 +37,7 @@
 pcj.fluo.api
 pcj.fluo.app
 pcj.fluo.client
-pcj.fluo.integration
--- End diff --

This probably shouldn't be commented out.


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159803931
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java
 ---
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.matching;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-import 
org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import 
org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator;
-import 
org.apache.rya.indexing.IndexPlanValidator.ValidIndexCombinationGenerator;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
-import org.apache.rya.indexing.external.matching.ExternalSetProvider;
-import org.apache.rya.indexing.external.matching.QuerySegment;
-import org.apache.rya.indexing.external.tupleSet.AccumuloIndexSet;
-import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
-import org.openrdf.query.algebra.TupleExpr;
-import org.openrdf.sail.SailException;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Implementation of {@link ExternalSetProvider} that provides {@link 
ExternalTupleSet}s.
- * This provider uses either user specified Accumulo configuration 
information or user a specified
- * List of ExternalTupleSets to populate an internal cache of 
ExternalTupleSets.  If Accumulo configuration
- * is provided, the provider connects to an instance of RyaDetails and 
populates the cache with
- * PCJs registered in RyaDetails.
- *
- */
-public class AccumuloIndexSetProvider implements 
ExternalSetProvider {
--- End diff --

Why are you deleting anything related to Accumulo?  What was this replaced 
by?
  


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159803093
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/PcjIndexer.java
 ---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.persist.index.RyaSecondaryIndexer;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Updates the PCJs that are in a {@link MongoPcjDocuments}.
+ */
+public interface PcjIndexer extends RyaSecondaryIndexer {
+/**
+ * Creates the {@link MongoPcjDocuments} that will be used by the 
indexer.
+ *
+ * @param conf - Indicates how the {@link MongoPcjDocuments} is 
initialized. (not null)
+ * @return The {@link MongoPcjDocuments} that will be used by this 
indexer.
+ */
+public @Nullable MongoPcjDocuments getPcjStorage(Configuration conf);
--- End diff --

Shouldn't this method be getPcjDocuments?


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159801144
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/BasePcjIndexer.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.singleton;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.groupingBy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.openrdf.model.URI;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that may be used to update an {@link EntityStorage} as new
+ * {@link RyaStatement}s are added to/removed from the Rya instance.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class BasePcjIndexer implements PcjIndexer, 
MongoSecondaryIndex {
+
+private final AtomicReference configuration = 
new AtomicReference<>();
+private final AtomicReference pcjDocs = new 
AtomicReference<>();
+
+@Override
+public void setConf(final Configuration conf) {
+requireNonNull(conf);
+pcjDocs.set( getPcjStorage(conf) );
+}
+
+@Override
+public Configuration getConf() {
+return configuration.get();
+}
+
+@Override
+public void storeStatement(final RyaStatement statement) throws 
IOException {
+requireNonNull(statement);
+storeStatements( singleton(statement) );
+}
+
+@Override
+public void storeStatements(final Collection statements) 
throws IOException {
+requireNonNull(statements);
+
+final Map> groupedBySubject = 
statements.stream()
+.collect(groupingBy(RyaStatement::getSubject));
+
+for(final Entry> entry : 
groupedBySubject.entrySet()) {
+try {
+updateEntity(entry.getKey(), entry.getValue());
+} catch (final EntityStorageException e) {
+throw new IOException("Failed to update the Entity 
index.", e);
+}
+}
+}
+
+/**
+ * Updates a {@link Entity} to reflect new {@link RyaStatement}s.
+ *
+ * @param subject - The Subject of the {@link Entity} the statements 
are for. (not null)
+ * @param statements - Statements that the {@link Entity} will be 
updated with. (not null)
+ */
+private void updateEntity(final RyaURI subject, final 
Collection statements) throws EntityStorageException {
+requireNonNull(subject);
--- End diff --

This method still seems off to me.  You're going to be adding statements to 
any updater that you need.  There is no need to include this method.


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159802000
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPrecomputedJoinIndexer.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Updates the state of the Precomputed Join indices that are used by Rya.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoPrecomputedJoinIndexer extends BasePcjIndexer {
+private static final Logger log = 
Logger.getLogger(MongoPrecomputedJoinIndexer.class);
+
+@Override
+public MongoPcjDocuments getPcjStorage(final Configuration conf) {
--- End diff --

Why does this return an instance of MongoPcjDocuments but the method is 
called getPcjStorage?  Shouldn't it return an instance of PcjStorage?


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159798308
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java
 ---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.rya.api.client.BatchUpdatePCJ;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.PCJDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import 
org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import 
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+
+/**
+ * A Mongo implementation of {@link CreatePCJ}.
+ */
+public class MongoBatchUpdatePCJ implements BatchUpdatePCJ {
+private static final Logger log = 
LoggerFactory.getLogger(MongoBatchUpdatePCJ.class);
+
+private final MongoConnectionDetails connectionDetails;
+private final InstanceExists instanceExists;
+private final MongoClient mongoClient;
+
+/**
+ * Constructs an instance of {@link MongoBatchUpdatePCJ}.
+ *
+ * @param connectionDetails - Details to connect to the server. (not 
null)
+ * @param instanceExists - The interactor used to check if a Rya 
instance exists. (not null)
+ * @param mongoClient - The {@link MongoClient} to use when batch 
updating. (not null)
+ */
+public MongoBatchUpdatePCJ(
+final Mo

[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159798116
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java
 ---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.rya.api.client.BatchUpdatePCJ;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.PCJDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import 
org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import 
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+
+/**
+ * A Mongo implementation of {@link CreatePCJ}.
+ */
+public class MongoBatchUpdatePCJ implements BatchUpdatePCJ {
+private static final Logger log = 
LoggerFactory.getLogger(MongoBatchUpdatePCJ.class);
+
+private final MongoConnectionDetails connectionDetails;
+private final InstanceExists instanceExists;
+private final MongoClient mongoClient;
+
+/**
+ * Constructs an instance of {@link MongoBatchUpdatePCJ}.
+ *
+ * @param connectionDetails - Details to connect to the server. (not 
null)
+ * @param instanceExists - The interactor used to check if a Rya 
instance exists. (not null)
+ * @param mongoClient - The {@link MongoClient} to use when batch 
updating. (not null)
+ */
+public MongoBatchUpdatePCJ(
+final Mo

[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159803009
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/MongoPrecomputedJoinIndexer.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Updates the state of the Precomputed Join indices that are used by Rya.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoPrecomputedJoinIndexer extends BasePcjIndexer {
--- End diff --

Yeah, still not getting what you're doing here.  The base class appears to 
be dealing with entities and there is no updater for the base class to interact 
with.  In general, you wouldn't want the base class to interact with the 
updater class given that any class that extends it would be locked into using 
that updater.


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159802759
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/pcj/BasePcjIndexer.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.mongodb.pcj;
+
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Collections.singleton;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.groupingBy;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.indexing.entity.model.Entity;
+import org.apache.rya.indexing.entity.storage.EntityStorage;
+import 
org.apache.rya.indexing.entity.storage.EntityStorage.EntityStorageException;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjDocuments;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoSecondaryIndex;
+import org.openrdf.model.URI;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A base class that may be used to update an {@link EntityStorage} as new
+ * {@link RyaStatement}s are added to/removed from the Rya instance.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class BasePcjIndexer implements PcjIndexer, 
MongoSecondaryIndex {
+
+private final AtomicReference configuration = 
new AtomicReference<>();
+private final AtomicReference pcjDocs = new 
AtomicReference<>();
+
+@Override
+public void setConf(final Configuration conf) {
+requireNonNull(conf);
+pcjDocs.set( getPcjStorage(conf) );
+}
+
+@Override
+public Configuration getConf() {
+return configuration.get();
+}
+
+@Override
+public void storeStatement(final RyaStatement statement) throws 
IOException {
+requireNonNull(statement);
+storeStatements( singleton(statement) );
+}
+
+@Override
+public void storeStatements(final Collection statements) 
throws IOException {
+requireNonNull(statements);
--- End diff --

Yeah, I don't get what this method or this class is doing.  In the logic 
below, it appears that you are dealing with entities, which is not correct.  
The PCJ indexer feeds the updater statements.  You maintain in your response to 
my comments about the MongoPCJIndexer that this class does all of the "heavy 
lifting".  That seems impossible to me given that this base class appears to 
deal with entities and there is no updater to interact with.  


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159754654
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoBatchUpdatePCJ.java
 ---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.rya.api.client.BatchUpdatePCJ;
+import org.apache.rya.api.client.CreatePCJ;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.InstanceExists;
+import org.apache.rya.api.client.PCJDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import 
org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import 
org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.mongo.MongoPcjStorage;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.MongoClient;
+
+/**
+ * A Mongo implementation of {@link CreatePCJ}.
--- End diff --

Mongo Implementation of BatchUpdatePCJ


---


[GitHub] incubator-rya pull request #172: RYA-303 Mongo PCJ Support

2018-01-04 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/172#discussion_r159754029
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/resolver/RyaToRdfConversions.java
 ---
@@ -40,8 +40,8 @@
  */
 public class RyaToRdfConversions {
 
-public static URI convertURI(RyaURI uri) {
--- End diff --

RyaType is too general of an argument here.  Per our discussion, create a 
private method that does what you want.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153631342
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeletePeriodicPCJ.java
 ---
@@ -109,11 +109,11 @@ public void deletePeriodicPCJ() throws Exception {
 
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
 vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")));
 
-runTest(query, statements, 29);
+runTest(query, statements, 30);
--- End diff --

Yes.  Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153631182
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
+ *
+ */
+public class BindingHashShardingFunction {
+
+private static final BindingSetStringConverter BS_CONVERTER = new 
BindingSetStringConverter();
+private static final int HASH_LEN = 4;
+
+/**
+ * Generates a sharded rowId.
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param varOrder - VarOrder used to order BindingSet values
+ * @param bs - BindingSet with partially formed query values
+ * @return - serialized Bytes rowId for storing BindingSet results in 
Fluo
+ */
+public static Bytes addShard(String nodeId, VariableOrder varOrder, 
VisibilityBindingSet bs) {
+String[] rowPrefixAndId = nodeId.split("_");
+Preconditions.checkArgument(rowPrefixAndId.length == 2);
+String prefix = rowPrefixAndId[0];
+String id = rowPrefixAndId[1];
+
+String firstBindingString = "";
+Bytes rowSuffix = Bytes.of(id);
+if (varOrder.getVariableOrders().size() > 0) {
+VariableOrder first = new 
VariableOrder(varOrder.getVariableOrders().get(0));
+firstBindingString = BS_CONVERTER.convert(bs, first);
+rowSuffix = RowKeyUtil.makeRowKey(id, varOrder, bs);
+}
+
+BytesBuilder builder = Bytes.builder();
+builder.append(Bytes.of(prefix + ":"));
+builder.append(genHash(Bytes.of(id + NODEID_BS_DELIM + 
firstBindingString)));
+builder.append(":");
+builder.append(rowSuffix);
+return builder.toBytes();
+}
+
+/**
+ * Generates a sharded rowId.
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param firstBsVal - String representation of the first BsValue
+ * @return - serialized Bytes prefix for scanning rows
+ */
+public static Bytes getShardedScanPrefix(String nodeId, Value 
firstBsVal) {
+Preconditions.checkNotNull(firstBsVal);
+
+final RyaType ryaValue = 
RdfToRyaConversions.convertValue(firstBsVal);
+final String bindingString = ryaValue.getData() + TYPE_DELIM + 
ryaValue.getDataType();
+
+return getShardedScanPrefix(nodeId, bindingString);
+}
+
+/**
+ * Generates a sharded rowId from the indicated nodeId and 
bindingString.
+ *
+ * @param nodeId - NodeId with tyep and UUID
+ * @param bindingString - String representation of BindingSet values, 
as formed by {@link BindingSetStringConverter}
+ *.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153631103
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
+ *
+ */
+public class BindingHashShardingFunction {
+
+private static final BindingSetStringConverter BS_CONVERTER = new 
BindingSetStringConverter();
+private static final int HASH_LEN = 4;
+
+/**
+ * Generates a sharded rowId.
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param varOrder - VarOrder used to order BindingSet values
+ * @param bs - BindingSet with partially formed query values
+ * @return - serialized Bytes rowId for storing BindingSet results in 
Fluo
+ */
+public static Bytes addShard(String nodeId, VariableOrder varOrder, 
VisibilityBindingSet bs) {
+String[] rowPrefixAndId = nodeId.split("_");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153631009
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
--- End diff --

It does both.  Reworded to adds or removes...


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153630697
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+/**
+ * Add specified Set of ids to the Fluo table with Column {@link 
FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the 
Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+public static void addStatementPatternIds(TransactionBase tx, 
Set ids) {
+Optional val = 
Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), 
STATEMENT_PATTERN_IDS));
+StringBuilder builder = new StringBuilder();
+if (val.isPresent()) {
+builder.append(val.get().toString());
+builder.append(VAR_DELIM);
+}
+String idString = 
builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, 
Bytes.of(idString));
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, 
Bytes.of(Hashing.sha256().hashString(idString).toString()));
+}
+
+/**
+ * Remove specified Set of ids from the Fluo table and updates the 
entry with Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the 
hash of the updated nodeId Set and writes that
+ * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
+ * @param ids - ids to remove from the StatementPattern nodeId Set
+ */
+public static void removeStatementPatternIds(TransactionBase tx, 
Set ids) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153630587
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+/**
+ * Add specified Set of ids to the Fluo table with Column {@link 
FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the 
Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+public static void addStatementPatternIds(TransactionBase tx, 
Set ids) {
+Optional val = 
Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), 
STATEMENT_PATTERN_IDS));
+StringBuilder builder = new StringBuilder();
+if (val.isPresent()) {
+builder.append(val.get().toString());
+builder.append(VAR_DELIM);
+}
+String idString = 
builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, 
Bytes.of(idString));
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, 
Bytes.of(Hashing.sha256().hashString(idString).toString()));
+}
+
+/**
+ * Remove specified Set of ids from the Fluo table and updates the 
entry with Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the 
hash of the updated nodeId Set and writes that
+ * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153630245
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatementPatternIdCacheSupplier {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153630157
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + 
NODEID_BS_DELIM);
+
+/**
+ * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and 
converts to {@link Bytes}.
+ * @param tripleBytes - serialized {@link RyaStatement}
+ * @return - serialized RyaStatement with prepended triple prefix, 
converted to Bytes
+ */
+public static Bytes addTriplePrefixAndConvertToBytes(byte[] 
tripleBytes) {
+BytesBuilder builder = Bytes.builder();
+return 
builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes();
+}
+
+/**
+ * Removes the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} 
and converts to a byte array.
+ * @param prefixedTriple - serialized RyaStatement with prepended 
triple prefix, converted to Bytes
+ * @return - serialized {@link RyaStatement} in byte array form
+ */
+public static byte[] removeTriplePrefixAndConvertToByteArray(Bytes 
prefixedTriple) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153629832
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + 
NODEID_BS_DELIM);
+
+/**
+ * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and 
converts to {@link Bytes}.
+ * @param tripleBytes - serialized {@link RyaStatement}
+ * @return - serialized RyaStatement with prepended triple prefix, 
converted to Bytes
+ */
+public static Bytes addTriplePrefixAndConvertToBytes(byte[] 
tripleBytes) {
+BytesBuilder builder = Bytes.builder();
+return 
builder.append(TRIPLE_PREFIX_BYTES).append(tripleBytes).toBytes();
+}
+
+/**
+ * Removes the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} 
and converts to a byte array.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153629802
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + 
NODEID_BS_DELIM);
+
+/**
+ * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and 
converts to {@link Bytes}.
+ * @param tripleBytes - serialized {@link RyaStatement}
+ * @return - serialized RyaStatement with prepended triple prefix, 
converted to Bytes
+ */
+public static Bytes addTriplePrefixAndConvertToBytes(byte[] 
tripleBytes) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153629576
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + 
NODEID_BS_DELIM);
+
+/**
+ * Adds the prefix {@link TriplePrefixUtils#TRIPLE_PREFIX_BYTES} and 
converts to {@link Bytes}.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153628506
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
+
+private static final Bytes TRIPLE_PREFIX_BYTES = Bytes.of("T" + 
NODEID_BS_DELIM);
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627802
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/TriplePrefixUtils.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaStatement;
+
+/**
+ * This class is a utility class for adding and removing the Triple prefix 
to
+ * Statements ingested into the Rya Fluo application.  The Triple prefix 
is added
+ * to supported range based compactions for removing transient data from 
the Fluo
+ * application.  This prefix supports the Transient data recipe described 
on the
+ * the Fluo website, and reduces the computational load on the system by 
cleaning up
+ * old deleted Triples and notifications using a targeted range compaction.
+ *
+ */
+public class TriplePrefixUtils {
--- End diff --

Nuuupe.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627442
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
+ *
+ */
+public class BindingHashShardingFunction {
+
+private static final BindingSetStringConverter BS_CONVERTER = new 
BindingSetStringConverter();
+private static final int HASH_LEN = 4;
+
+/**
+ * Generates a sharded rowId.
+ *
+ * @param nodeId - Node Id with type and UUID
+ * @param varOrder - VarOrder used to order BindingSet values
+ * @param bs - BindingSet with partially formed query values
+ * @return - serialized Bytes rowId for storing BindingSet results in 
Fluo
+ */
+public static Bytes addShard(String nodeId, VariableOrder varOrder, 
VisibilityBindingSet bs) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627306
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
+ *
+ */
+public class BindingHashShardingFunction {
--- End diff --

Not doing this.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627243
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/util/BindingHashShardingFunction.java
 ---
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.util;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter.TYPE_DELIM;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Bytes.BytesBuilder;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.Value;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.hash.Hashing;
+
+/**
+ * This class adds and removes a hash to and from the rowId for sharding 
purposes.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627110
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+/**
+ * Add specified Set of ids to the Fluo table with Column {@link 
FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the 
Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+public static void addStatementPatternIds(TransactionBase tx, 
Set ids) {
+Optional val = 
Optional.fromNullable(tx.get(Bytes.of(STATEMENT_PATTERN_ID), 
STATEMENT_PATTERN_IDS));
+StringBuilder builder = new StringBuilder();
+if (val.isPresent()) {
+builder.append(val.get().toString());
+builder.append(VAR_DELIM);
+}
+String idString = 
builder.append(Joiner.on(VAR_DELIM).join(ids)).toString();
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS, 
Bytes.of(idString));
+tx.set(Bytes.of(STATEMENT_PATTERN_ID), STATEMENT_PATTERN_IDS_HASH, 
Bytes.of(Hashing.sha256().hashString(idString).toString()));
+}
+
+/**
+ * Remove specified Set of ids from the Fluo table and updates the 
entry with Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also updates the 
hash of the updated nodeId Set and writes that
+ * to the Column {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153627036
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+/**
+ * Add specified Set of ids to the Fluo table with Column {@link 
FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the 
Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
+ * @param ids - ids to add to the StatementPattern nodeId Set
+ */
+public static void addStatementPatternIds(TransactionBase tx, 
Set ids) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153626670
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
+
+/**
+ * Add specified Set of ids to the Fluo table with Column {@link 
FluoQueryColumns#STATEMENT_PATTERN_IDS}. Also
+ * updates the hash of the updated nodeId Set and writes that to the 
Column
+ * {@link FluoQueryColumns#STATEMENT_PATTERN_IDS_HASH}
+ *
+ * @param tx
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153625655
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdManager.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+
+/**
+ * Utility class for updating and removing StatementPattern nodeIds in the 
Fluo table. All StatementPattern nodeIds are
+ * stored in a single set under a single entry in the Fluo table. This is 
to eliminate the need for a scan to find all
+ * StatementPattern nodeIds every time a new Triple enters the Fluo table. 
Instead, the nodeIds are cached locally, and
+ * only updated when the local hash of the nodeId set is dirty (not equal 
to the hash in the Fluo table).
+ */
+public class StatementPatternIdManager {
--- End diff --

Yeah, not going to add that.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153625215
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCacheSupplier.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatementPatternIdCacheSupplier {
--- End diff --

Done.  Gonna hold off on refactoring the suppliers to be the same.  I could 
maybe do something with generics, but then I'd have to refactor the cache types 
to extend some common interface.  The cache are different enough that that 
would be an artificial type def.  Prolly not worth it for the purposes of 
combining suppliers...Lemme know if you have something different in mind.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153624484
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+private final ReentrantLock lock = new ReentrantLock();
+private static Optional HASH;
+private static Set IDS;
+
+public StatementPatternIdCache() {
+HASH = Optional.empty();
+IDS = new HashSet<>();
+}
+
+/**
+ * This method retrieves the StatementPattern NodeIds registered in 
the Fluo table.
+ * This method looks up the hash of the Statement Pattern Id String 
hash, and if it
+ * is the same as the cached hash, then the cache Set of nodeIds is 
returned.  Otherwise,
+ * this method retrieves the ids from the Fluo table.  This method is 
thread safe.
+ * @param tx
+ * @return - Set of StatementPattern nodeIds
+ */
+public Set getStatementPatternIds(TransactionBase tx) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153608050
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+private final ReentrantLock lock = new ReentrantLock();
+private static Optional HASH;
+private static Set IDS;
+
+public StatementPatternIdCache() {
+HASH = Optional.empty();
+IDS = new HashSet<>();
+}
+
+/**
+ * This method retrieves the StatementPattern NodeIds registered in 
the Fluo table.
+ * This method looks up the hash of the Statement Pattern Id String 
hash, and if it
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153606055
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
+
+private final ReentrantLock lock = new ReentrantLock();
+private static Optional HASH;
+private static Set IDS;
+
+public StatementPatternIdCache() {
--- End diff --

Okay.  Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153589892
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/StatementPatternIdCache.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.STATEMENT_PATTERN_ID;
+import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS;
+import static 
org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.STATEMENT_PATTERN_IDS_HASH;
+
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class caches the StatementPattern Ids so they don't have
+ * to be looked up each time a new Statement needs to be processed
+ * in the TripleObserver.
+ *
+ */
+public class StatementPatternIdCache {
--- End diff --

Not including that.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153589770
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MetadataCacheSupplier.class);
+private static FluoQueryMetadataCache CACHE;
+private static boolean initialized = false;
+private static final int DEFAULT_CAPACITY = 1;
+private static final int DEFAULT_CONCURRENCY = 8;
+private static final ReentrantLock lock = new ReentrantLock();
+
+/**
+ * Returns an existing cache with the specified instance name, or 
creates a cache. The created cache will have the
+ * indicated capacity and concurrencyLevel if one is provided.
+ *
+ * @param capacity - capacity used to create a new cache
+ * @param concurrencyLevel - concurrencyLevel used to create a new 
cache
+ */
+public static FluoQueryMetadataCache getOrCreateCache(int capacity, 
int concurrencyLevel) {
+lock.lock();
+try {
+if (!initialized) {
+LOG.debug("Cache has not been initialized.  Initializing 
cache with capacity: {} and concurrencylevel: {}", capacity,
+concurrencyLevel);
+CACHE = new FluoQueryMetadataCache(new 
FluoQueryMetadataDAO(), capacity, concurrencyLevel);
+initialized = true;
+} else {
+LOG.warn(
+"A cache has already been initialized, so a cache 
with capacity: {} and concurrency level: {} will not be created.  Returning 
existing cache with capacity: {} and concurrencylevel: {}",
+capacity, concurrencyLevel, CACHE.getCapacity(), 
CACHE.getConcurrencyLevel());
+}
+return CACHE;
+} finally {
+lock.unlock();
+}
+}
+
+/**
+ * Returns cache with the name {@link 
FluoQueryMetadataCache#FLUO_CACHE_INSTANCE} if it exists, otherwise creates it
--- End diff --

Updated.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153589466
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MetadataCacheSupplier.class);
+private static FluoQueryMetadataCache CACHE;
+private static boolean initialized = false;
+private static final int DEFAULT_CAPACITY = 1;
+private static final int DEFAULT_CONCURRENCY = 8;
+private static final ReentrantLock lock = new ReentrantLock();
+
+/**
+ * Returns an existing cache with the specified instance name, or 
creates a cache. The created cache will have the
--- End diff --

It does.  It enforces "singleton like" behavior in that it only creates the 
instance if it doesn't already exist.  It allows for the initial cache 
construciton to be configurable.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153588977
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/MetadataCacheSupplier.java
 ---
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MetadataCacheSupplier {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153588803
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
+commonNodeMetadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+metadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+this.capacity = capacity;
+this.concurrencyLevel = concurrencyLevel;
+}
+
+/**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+public int getCapacity() {
+return capacity;
+}
+
+/**
+ * @return - concurrencyLevel of this cache,in terms of number of 
partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+public int getConcurrencyLevel() {
+return concurrencyLevel;
+}
+
+
+@Override
+public StatementPatternMetadata 
readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+
+try {
+checkArgument(type.isPresent() && type.get() == 
NodeType.STATEMENT_PATTERN);
+LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+return (StatementPatternMetadata) 
commonNodeMetadataCache.get(nodeId, new Callable() {
+@Override
+public CommonNodeMetadata call() throws Exception {
+LOG.debug("Seeking Metadata from Fluo Table: {}", 
nodeId);
+return dao.readStatementPatternMetadata(tx, nodeId);
+}
+});
+} catch (Exception e) {
+throw new RuntimeException("Unable to access 
StatementPatternMetadata for nodeId: " + nodeId, e);
+}
+}
+
+@Override
+public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+try {
+checkArgument(type.isPresent() && type.get() == NodeType

[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153588548
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
+commonNodeMetadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+metadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+this.capacity = capacity;
+this.concurrencyLevel = concurrencyLevel;
+}
+
+/**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+public int getCapacity() {
+return capacity;
+}
+
+/**
+ * @return - concurrencyLevel of this cache,in terms of number of 
partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+public int getConcurrencyLevel() {
+return concurrencyLevel;
+}
+
+
+@Override
+public StatementPatternMetadata 
readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+
+try {
+checkArgument(type.isPresent() && type.get() == 
NodeType.STATEMENT_PATTERN);
+LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+return (StatementPatternMetadata) 
commonNodeMetadataCache.get(nodeId, new Callable() {
+@Override
+public CommonNodeMetadata call() throws Exception {
+LOG.debug("Seeking Metadata from Fluo Table: {}", 
nodeId);
+return dao.readStatementPatternMetadata(tx, nodeId);
+}
+});
+} catch (Exception e) {
+throw new RuntimeException("Unable to access 
StatementPatternMetadata for nodeId: " + nodeId, e);
+}
+}
+
+@Override
+public JoinMetadata readJoinMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+try {
+checkArgument(type.isPresent() && type.get() == NodeType

[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153587213
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
+commonNodeMetadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+metadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+this.capacity = capacity;
+this.concurrencyLevel = concurrencyLevel;
+}
+
+/**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+public int getCapacity() {
+return capacity;
+}
+
+/**
+ * @return - concurrencyLevel of this cache,in terms of number of 
partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+public int getConcurrencyLevel() {
+return concurrencyLevel;
+}
+
+
+@Override
+public StatementPatternMetadata 
readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+
+try {
+checkArgument(type.isPresent() && type.get() == 
NodeType.STATEMENT_PATTERN);
+LOG.debug("Retrieving Metadata from Cache: {}", nodeId);
+return (StatementPatternMetadata) 
commonNodeMetadataCache.get(nodeId, new Callable() {
--- End diff --

Ooh.  That is elegant.  I'll keep it in mind going forward.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153586913
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
+commonNodeMetadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+metadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+this.capacity = capacity;
+this.concurrencyLevel = concurrencyLevel;
+}
+
+/**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+public int getCapacity() {
+return capacity;
+}
+
+/**
+ * @return - concurrencyLevel of this cache,in terms of number of 
partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+public int getConcurrencyLevel() {
+return concurrencyLevel;
+}
+
+
+@Override
+public StatementPatternMetadata 
readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
+Optional type = NodeType.fromNodeId(nodeId);
+
+try {
+checkArgument(type.isPresent() && type.get() == 
NodeType.STATEMENT_PATTERN);
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153585160
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
+commonNodeMetadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+metadataCache = 
CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).maximumSize(capacity).build();
+this.capacity = capacity;
+this.concurrencyLevel = concurrencyLevel;
+}
+
+/**
+ * @return - capacity of this cache in terms of max number of entries
+ */
+public int getCapacity() {
+return capacity;
+}
+
+/**
+ * @return - concurrencyLevel of this cache,in terms of number of 
partitions that distinct threads can operate on
+ * without waiting for other threads
+ */
+public int getConcurrencyLevel() {
+return concurrencyLevel;
+}
+
+
+@Override
+public StatementPatternMetadata 
readStatementPatternMetadata(SnapshotBase tx, String nodeId) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153584424
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153584456
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
+private final Cache 
commonNodeMetadataCache;
+private final Cache metadataCache;
+private int capacity;
+private int concurrencyLevel;
+
+/**
+ * Creates a FluoQueryMetadataCache with the specified capacity. Old, 
unused results are evicted as necessary.
+ *
+ * @param capacity - max size of the cache
+ */
+public FluoQueryMetadataCache(FluoQueryMetadataDAO dao, int capacity, 
int concurrencyLevel) {
+this.dao = dao;
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153583867
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(FluoQueryMetadataCache.class);
+private final FluoQueryMetadataDAO dao;
--- End diff --

I like that.  I'll keep it in mind going forward.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153583702
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
--- End diff --

Yeah, not doing that.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153582032
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataCache.java
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.indexing.pcj.fluo.app.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.concurrent.Callable;
+
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * Wrapper for {@link FluoQueryMetadataDAO} that caches any metadata that 
has been retrieved from Fluo. This class first
+ * checks the cache to see if the metadata is present before delegating to 
the underlying DAO method to retrieve the
+ * data.
+ *
+ */
+public class FluoQueryMetadataCache extends FluoQueryMetadataDAO {
--- End diff --

The delete question might actually be a problem.  I'm going to create a 
Jira ticket for this.  We need some way to notify the workers when metadata has 
been deleted so that the caches on each JVM can purge the metadata for any 
deleted nodeIds.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153578897
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
 ---
@@ -70,53 +68,50 @@ public void process(final TransactionBase tx, final 
Bytes brow, final Column col
 // Get string representation of triple.
 final RyaStatement ryaStatement = 
IncUpdateDAO.deserializeTriple(brow);
 log.trace("Transaction ID: {}\nRya Statement: {}\n", 
tx.getStartTimestamp(), ryaStatement);
+log.trace("Beginging to process triple.");
 
 final String triple = IncUpdateDAO.getTripleString(ryaStatement);
 
-// Iterate over each of the Statement Patterns that are being 
matched against.
-final RowScanner spScanner = tx.scanner()
-.over(Span.prefix(SP_PREFIX))
-
-// Only fetch rows that have the pattern in them. There 
will only be a single row with a pattern per SP.
-.fetch(FluoQueryColumns.STATEMENT_PATTERN_PATTERN)
-.byRow()
-.build();
+Set spIDs = SP_ID_CACHE.getStatementPatternIds(tx);
 
 //see if triple matches conditions of any of the SP
-for (final ColumnScanner colScanner : spScanner) {
-// Get the Statement Pattern's node id.
-final String spID = colScanner.getsRow();
-
+for (String spID: spIDs) {
 // Fetch its metadata.
 final StatementPatternMetadata spMetadata = 
QUERY_METADATA_DAO.readStatementPatternMetadata(tx, spID);
 
+log.trace("Retrieved metadata: {}", spMetadata);
+
 // Attempt to match the triple against the pattern.
 final String pattern = spMetadata.getStatementPattern();
 final VariableOrder varOrder = spMetadata.getVariableOrder();
 final String bindingSetString = getBindingSet(triple, pattern, 
varOrder);
 
+log.trace("Created binding set match string: {}", 
bindingSetString);
+
 // Statement matches to a binding set.
 if(bindingSetString.length() != 0) {
 // Fetch the triple's visibility label.
 final String visibility = tx.gets(brow.toString(), 
FluoQueryColumns.TRIPLES, "");
 
 // Create the Row ID for the emitted binding set. It does 
not contain visibilities.
-final String row = spID + NODEID_BS_DELIM + 
bindingSetString;
-final Bytes rowBytes = Bytes.of( 
row.getBytes(Charsets.UTF_8) );
+//final String row = spID + NODEID_BS_DELIM + 
bindingSetString;
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153578792
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -20,39 +22,79 @@
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class represents a batch order to delete all entries in the Fluo 
table indicated
  * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
  * which uses this batch information along with the nodeId passed into the 
Observer to perform
- * batch deletes.  
+ * batch deletes.
  *
  */
 public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
 
 private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
-
-public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+private Optional nodeId;
+
+/**
+ * Create a new SpanBatchInformation object.
+ * @param nodeId - Optional nodeId that is used to filter returned 
results.  Useful if the hash
+ * is not included in the Span.
+ * @param batchSize - size of batch to be deleted
+ * @param column - column whose entries will be deleted
+ * @param span - Span indicating the range of data to delete.  
Sometimes the Span cannot contain the hash
+ * (for example, if you are deleting all of the results associated 
with a nodeId).  In this case, a nodeId
+ * should be specified along with a Span equal to the prefix of the 
nodeId.
+ */
+public SpanBatchDeleteInformation(Optional nodeId, int 
batchSize, Column column, Span span) {
 super(batchSize, Task.Delete, column, span);
+Preconditions.checkNotNull(nodeId);
--- End diff --

checkNotNull and requireNonNull are intermixed everywhere.  Not worried 
about this.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153578221
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -20,39 +22,79 @@
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class represents a batch order to delete all entries in the Fluo 
table indicated
  * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
  * which uses this batch information along with the nodeId passed into the 
Observer to perform
- * batch deletes.  
+ * batch deletes.
  *
  */
 public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
 
 private static final BatchBindingSetUpdater updater = new 
SpanBatchBindingSetUpdater();
-
-public SpanBatchDeleteInformation(int batchSize, Column column, Span 
span) {
+private Optional nodeId;
+
+/**
+ * Create a new SpanBatchInformation object.
+ * @param nodeId - Optional nodeId that is used to filter returned 
results.  Useful if the hash
--- End diff --

Addressed.  Provided a link to BindingHashShardingFunction which documents 
how sharded row keys are generated.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153577373
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java
 ---
@@ -20,39 +22,79 @@
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 
+import com.google.common.base.Preconditions;
+
 /**
  * This class represents a batch order to delete all entries in the Fluo 
table indicated
  * by the given Span and Column.  These batch orders are processed by the 
{@link BatchObserver},
  * which uses this batch information along with the nodeId passed into the 
Observer to perform
- * batch deletes.  
+ * batch deletes.
  *
  */
 public class SpanBatchDeleteInformation extends 
AbstractSpanBatchInformation {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153575392
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 ---
@@ -231,30 +225,38 @@ public void updateJoinResults(
  * @param siblingId - Id of the sibling node whose BindingSets will be 
retrieved and joined with the update
  * @return Span to retrieve sibling node's BindingSets to form join 
results
  */
-private Span getSpan(TransactionBase tx, final String childId, final 
BindingSet childBindingSet, final String siblingId) {
+private Span getSpan(TransactionBase tx, final String childId, final 
VisibilityBindingSet childBindingSet, final String siblingId) {
 // Get the common variable orders. These are used to build the 
prefix.
 final VariableOrder childVarOrder = getVarOrder(tx, childId);
 final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
 final List commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
 
-// Get the Binding strings
-final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
-final String[] childBindingArray = 
childBindingSetString.split("\u0001");
-final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
-
-// Create the prefix that will be used to scan for binding sets of 
the sibling node.
-// This prefix includes the sibling Node ID and the common 
variable values from
-// childBindingSet.
-String siblingScanPrefix = "";
-for(int i = 0; i < commonVars.size(); i++) {
-if(siblingScanPrefix.length() == 0) {
-siblingScanPrefix = childBindingStrings[i];
-} else {
-siblingScanPrefix += DELIM + childBindingStrings[i];
-}
+//// Get the Binding strings
+//final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
+//final String[] childBindingArray = 
childBindingSetString.split("\u0001");
+//final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
+
+Bytes scanPrefix = null;
+if(!commonVars.isEmpty()) {
+scanPrefix = getRowKey(siblingId, new 
VariableOrder(commonVars), childBindingSet);
+} else {
+scanPrefix = getRowKey(siblingId, siblingVarOrder, 
childBindingSet);
 }
-siblingScanPrefix = siblingId + NODEID_BS_DELIM + 
siblingScanPrefix;
-return Span.prefix(siblingScanPrefix);
+//
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153575367
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 ---
@@ -231,30 +225,38 @@ public void updateJoinResults(
  * @param siblingId - Id of the sibling node whose BindingSets will be 
retrieved and joined with the update
  * @return Span to retrieve sibling node's BindingSets to form join 
results
  */
-private Span getSpan(TransactionBase tx, final String childId, final 
BindingSet childBindingSet, final String siblingId) {
+private Span getSpan(TransactionBase tx, final String childId, final 
VisibilityBindingSet childBindingSet, final String siblingId) {
 // Get the common variable orders. These are used to build the 
prefix.
 final VariableOrder childVarOrder = getVarOrder(tx, childId);
 final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
 final List commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
 
-// Get the Binding strings
-final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
-final String[] childBindingArray = 
childBindingSetString.split("\u0001");
-final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
-
-// Create the prefix that will be used to scan for binding sets of 
the sibling node.
-// This prefix includes the sibling Node ID and the common 
variable values from
-// childBindingSet.
-String siblingScanPrefix = "";
-for(int i = 0; i < commonVars.size(); i++) {
-if(siblingScanPrefix.length() == 0) {
-siblingScanPrefix = childBindingStrings[i];
-} else {
-siblingScanPrefix += DELIM + childBindingStrings[i];
-}
+//// Get the Binding strings
+//final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
+//final String[] childBindingArray = 
childBindingSetString.split("\u0001");
+//final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
+
+Bytes scanPrefix = null;
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153575097
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java
 ---
@@ -231,30 +225,38 @@ public void updateJoinResults(
  * @param siblingId - Id of the sibling node whose BindingSets will be 
retrieved and joined with the update
  * @return Span to retrieve sibling node's BindingSets to form join 
results
  */
-private Span getSpan(TransactionBase tx, final String childId, final 
BindingSet childBindingSet, final String siblingId) {
+private Span getSpan(TransactionBase tx, final String childId, final 
VisibilityBindingSet childBindingSet, final String siblingId) {
 // Get the common variable orders. These are used to build the 
prefix.
 final VariableOrder childVarOrder = getVarOrder(tx, childId);
 final VariableOrder siblingVarOrder = getVarOrder(tx, siblingId);
 final List commonVars = getCommonVars(childVarOrder, 
siblingVarOrder);
 
-// Get the Binding strings
-final String childBindingSetString = 
VIS_BS_CONVERTER.convert(childBindingSet, childVarOrder);
-final String[] childBindingArray = 
childBindingSetString.split("\u0001");
-final String[] childBindingStrings = 
FluoStringConverter.toBindingStrings(childBindingArray[0]);
-
-// Create the prefix that will be used to scan for binding sets of 
the sibling node.
-// This prefix includes the sibling Node ID and the common 
variable values from
-// childBindingSet.
-String siblingScanPrefix = "";
-for(int i = 0; i < commonVars.size(); i++) {
-if(siblingScanPrefix.length() == 0) {
-siblingScanPrefix = childBindingStrings[i];
-} else {
-siblingScanPrefix += DELIM + childBindingStrings[i];
-}
+//// Get the Binding strings
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153574758
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
 ---
@@ -30,15 +30,18 @@
 public static final String TYPE_DELIM = "<<~>>";
 
 //to be used in construction of id for each node
-public static final String SP_PREFIX = "STATEMENT_PATTERN";
-public static final String JOIN_PREFIX = "JOIN";
-public static final String FILTER_PREFIX = "FILTER";
-public static final String AGGREGATION_PREFIX = "AGGREGATION";
-public static final String QUERY_PREFIX = "QUERY";
-public static final String PROJECTION_PREFIX = "PROJECTION";
-public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
-public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
-
+public static final String TRIPLE_PREFIX = "T";
--- End diff --

No.  Just more efficient.  Since I was making so many changes to how 
results were stored, I figured it was extremely unnecessary to have such large 
prefixes.  Also, when specifying splits in the fluo app, it is necessary to 
indicate the prefixes that you want to split over.  This becomes cumbersome 
with such large prefixes.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153574297
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncUpdateDAO.java
 ---
@@ -42,7 +43,8 @@
 private static final WholeRowTripleResolver tr = new 
WholeRowTripleResolver();
 
 public static RyaStatement deserializeTriple(final Bytes row) {
-final byte[] rowArray = row.toArray();
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153573688
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) {
 final String bindingSetString = rowArray.length == 2 ? rowArray[1] 
: "";
 return new BindingSetRow(nodeId, bindingSetString);
 }
+
+public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, 
Bytes row) {
+return make(BindingHashShardingFunction.removeHash(prefixBytes, 
row));
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+builder
+.append("NodeId: " + nodeId).append("\n")
+.append("BindingSet String: " + bindingSetString);
+return builder.toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if(other == null) { return false;}
+if(this == other) { return true;}
+
+if (other instanceof BindingSetRow) {
+BindingSetRow row = (BindingSetRow) other;
+return new EqualsBuilder().append(this.nodeId, 
row.nodeId).append(this.bindingSetString, row.bindingSetString)
--- End diff --

Okay, but only because you were nice enough to code it up for me :)


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153573207
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) {
 final String bindingSetString = rowArray.length == 2 ? rowArray[1] 
: "";
 return new BindingSetRow(nodeId, bindingSetString);
 }
+
+public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, 
Bytes row) {
+return make(BindingHashShardingFunction.removeHash(prefixBytes, 
row));
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+builder
+.append("NodeId: " + nodeId).append("\n")
+.append("BindingSet String: " + bindingSetString);
+return builder.toString();
+}
+
+@Override
+public boolean equals(Object other) {
+if(other == null) { return false;}
--- End diff --

Yep. Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153572801
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) {
 final String bindingSetString = rowArray.length == 2 ? rowArray[1] 
: "";
 return new BindingSetRow(nodeId, bindingSetString);
 }
+
+public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, 
Bytes row) {
+return make(BindingHashShardingFunction.removeHash(prefixBytes, 
row));
+}
+
+@Override
+public String toString() {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153571640
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) {
 final String bindingSetString = rowArray.length == 2 ? rowArray[1] 
: "";
 return new BindingSetRow(nodeId, bindingSetString);
 }
+
+public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, 
Bytes row) {
+return make(BindingHashShardingFunction.removeHash(prefixBytes, 
row));
+}
+
+@Override
+public String toString() {
+StringBuilder builder = new StringBuilder();
+builder
+.append("NodeId: " + nodeId).append("\n")
+.append("BindingSet String: " + bindingSetString);
+return builder.toString();
+}
+
+@Override
+public boolean equals(Object other) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153570866
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -77,4 +78,31 @@ public static BindingSetRow make(final Bytes row) {
 final String bindingSetString = rowArray.length == 2 ? rowArray[1] 
: "";
 return new BindingSetRow(nodeId, bindingSetString);
 }
+
+public static BindingSetRow makeFromShardedRow(Bytes prefixBytes, 
Bytes row) {
--- End diff --

See above.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153570806
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java
 ---
@@ -21,15 +21,16 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static 
org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
 
+import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.fluo.api.data.Bytes;
+import 
org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 import net.jcip.annotations.Immutable;
 
 /**
- * The values of an Accumulo Row ID for a row that stores a Binding set for
- * a specific Node ID of a query.
+ * The values of an Accumulo Row ID for a row that stores a Binding set 
for a specific Node ID of a query.
--- End diff --

I don't think this is the appropriate place to document this.  I added 
documentation to the class BindingHashShardingFunction about how the sharded 
row id is created and linked this class to the makeFromShardRow(...) method in 
BindingSetRow.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153565747
  
--- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AbstractNodeUpdater.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app;
+
+import org.apache.fluo.api.data.Bytes;
+import 
org.apache.rya.indexing.pcj.fluo.app.util.BindingHashShardingFunction;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+public abstract class AbstractNodeUpdater {
--- End diff --

The class provides the common functionality for generating row keys to all 
of the updaters that extend it.  By shard row creation in the parent, it limits 
the number of files that I would have to update to change the strategy for 
generating row keys.  I think it's fine as is.


---


[GitHub] incubator-rya pull request #251: Candidate/rya 406

2017-11-28 Thread meiercaleb
Github user meiercaleb commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/251#discussion_r153563566
  
--- Diff: extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---
@@ -1,25 +1,16 @@
 
-
-http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+

  1   2   3   4   5   6   >