[GitHub] incubator-rya pull request #305: Rya 135 collection name

2018-11-02 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/305#discussion_r230497599
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -35,19 +35,17 @@
  * A {@link RdfCloudTripleStoreConfiguration} that configures how Rya 
connects to a MongoDB Rya triple store.
  */
 public class MongoDBRdfConfiguration extends 
RdfCloudTripleStoreConfiguration {
+public static final String RYA_TRIPLES_COLLECTION = "rya_triples";
 
 // MongoDB Server connection values.
 public static final String MONGO_HOSTNAME = "mongo.db.instance";
 public static final String MONGO_PORT = "mongo.db.port";
 
 // MongoDB Database values.
-public static final String MONGO_DB_NAME = "mongo.db.name";
+public static final String RYA_INSTANCE_NAME = "mongo.db.name";
--- End diff --

I don't think this field should be renamed. It's still the Mongo DB Name. 
Elsewhere in the code we should decide that the rya instance name also happens 
to be the db name. If this is in fact just the instance name, and the DB name 
isn't allowed to be configured anymore, then the field's value shouldn't be 
"mongo.db.name" anymore.


---


[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection

2018-09-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/300#discussion_r217849376
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
 ---
@@ -73,12 +72,12 @@
 protected T storageStrategy;
 
 private MongoDbBatchWriter mongoDbBatchWriter;
+protected String collectionName;
--- End diff --

How did it work before, then?


---


[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection

2018-09-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/300#discussion_r217848961
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
 ---
@@ -73,12 +72,12 @@
 protected T storageStrategy;
 
 private MongoDbBatchWriter mongoDbBatchWriter;
+protected String collectionName;
--- End diff --

Oh, the subclasses change it directly. That's unclear, why not leave it the 
way it was?


---


[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection

2018-09-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/300#discussion_r217848617
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java
 ---
@@ -73,12 +72,12 @@
 protected T storageStrategy;
 
 private MongoDbBatchWriter mongoDbBatchWriter;
+protected String collectionName;
--- End diff --

Is this field ever set anywhere?


---


[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection

2018-09-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/300#discussion_r217847623
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -197,22 +195,22 @@ public void setMongoPassword(final String password) {
  * @return The name of the Rya instance to connect to. (default: rya)
  */
 public String getRyaInstanceName() {
-return get(MONGO_COLLECTION_PREFIX, "rya");
+return get(MONGO_DB_NAME, "rya");
--- End diff --

So an application can say set 

setMongoDBName("a")
setRyaInstanceName("b")

And then getMongoDBName() returns "b"? That's unexpected.


---


[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection

2018-09-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/300#discussion_r217846855
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java
 ---
@@ -182,8 +167,7 @@ private C getConf(C conf) {
 conf.setMongoPassword(pass);
 }
 conf.setMongoDBName(mongoDBName);
-conf.setRyaInstanceName(mongoCollectionPrefix);
-conf.setTablePrefix(mongoCollectionPrefix);
+conf.setRyaInstanceName(mongoDBName);
--- End diff --

If Mongo DB Name and Rya Instance Name are always the same, why not just 
have one of those fields?


---


[GitHub] incubator-rya issue #301: Version fix

2018-09-14 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/301
  
Please make a defect for this.


---


[GitHub] incubator-rya pull request #299: RYA-500: Make RdfFileInputTool to accept mu...

2018-07-04 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/299#discussion_r200156492
  
--- Diff: extras/rya.manual/src/site/markdown/loaddata.md ---
@@ -92,29 +92,55 @@ The default "format" is RDF/XML, but these formats are 
supported : RDFXML, NTRIP
 
 ## Bulk Loading data
 
-Bulk loading data is done through Map Reduce jobs
+Bulk loading data is done through Map Reduce jobs.
 
 ### Bulk Load RDF data
 
-This Map Reduce job will read files into memory and parse them into 
statements. The statements are saved into the store. Here is an example for 
storing in Accumulo:
+This Map Reduce job will read files into memory and parse them into 
statements. The statements are saved into the triplestore. 
+Here are the steps to prepare and run the job:
+
+  * Load the RDF data to HDFS. It can be single of multiple volumes and 
directories in them.
+  * Also load the `mapreduce/target/rya.mapreduce--shaded.jar` 
executable jar file to HDFS.
+  * Run the following sample command:
 
 ```
-hadoop jar target/rya.mapreduce-3.2.10-SNAPSHOT-shaded.jar 
org.apache.rya.accumulo.mr.RdfFileInputTool -Dac.zk=localhost:2181 
-Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret 
-Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples /tmp/temp.ntrips
+hadoop hdfs://volume/rya.mapreduce--shaded.jar 
org.apache.rya.accumulo.mr.tools.RdfFileInputTool -Dac.zk=localhost:2181 
-Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret 
-Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples 
hdfs://volume/dir1,hdfs://volume/dir2,hdfs://volume/file1.nt
 ```
 
 Options:
 
-- rdf.tablePrefix : The tables (spo, po, osp) are prefixed with this 
qualifier. The tables become: 
(rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp
-- ac.* : Accumulo connection parameters
-- rdf.format : See RDFFormat from RDF4J, samples include (Trig, N-Triples, 
RDF/XML)
-- sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity : If any of 
these are set to true, statements will also be
+- **rdf.tablePrefix** - The tables (spo, po, osp) are prefixed with this 
qualifier.
+The tables become: 
(rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp
+- **ac.*** - Accumulo connection parameters
+- **rdf.format** - See RDFFormat from RDF4J, samples include (Trig, 
N-Triples, RDF/XML)
+- **sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity** - If any 
of these are set to true, statements will also be
 added to the enabled secondary indices.
-- sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates: If 
the associated indexer is enabled, these options specify
+- **sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates** - 
If the associated indexer is enabled, these options specify
 which statements should be sent to that indexer (based on the 
predicate). If not given, all indexers will attempt to index
 all statements.
 
-The argument is the directory/file to load. This file needs to be loaded 
into HDFS before running. If loading a directory, all files should have the 
same RDF
-format.
+The positional argument is a comma separated list of directories/files to 
load.
+They need to be loaded into HDFS before running. If loading a directory,
+all files should have the same RDF format.
+
+Once the data is loaded, it is actually a good practice to compact your 
tables.
+You can do this by opening the accumulo shell shell and running the compact
--- End diff --

"the accumulo shell shell and"


---


[GitHub] incubator-rya pull request #299: RYA-500: Make RdfFileInputTool to accept mu...

2018-07-04 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/299#discussion_r200156998
  
--- Diff: extras/rya.manual/src/site/markdown/loaddata.md ---
@@ -92,29 +92,55 @@ The default "format" is RDF/XML, but these formats are 
supported : RDFXML, NTRIP
 
 ## Bulk Loading data
 
-Bulk loading data is done through Map Reduce jobs
+Bulk loading data is done through Map Reduce jobs.
 
 ### Bulk Load RDF data
 
-This Map Reduce job will read files into memory and parse them into 
statements. The statements are saved into the store. Here is an example for 
storing in Accumulo:
+This Map Reduce job will read files into memory and parse them into 
statements. The statements are saved into the triplestore. 
+Here are the steps to prepare and run the job:
+
+  * Load the RDF data to HDFS. It can be single of multiple volumes and 
directories in them.
+  * Also load the `mapreduce/target/rya.mapreduce--shaded.jar` 
executable jar file to HDFS.
+  * Run the following sample command:
 
 ```
-hadoop jar target/rya.mapreduce-3.2.10-SNAPSHOT-shaded.jar 
org.apache.rya.accumulo.mr.RdfFileInputTool -Dac.zk=localhost:2181 
-Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret 
-Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples /tmp/temp.ntrips
+hadoop hdfs://volume/rya.mapreduce--shaded.jar 
org.apache.rya.accumulo.mr.tools.RdfFileInputTool -Dac.zk=localhost:2181 
-Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret 
-Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples 
hdfs://volume/dir1,hdfs://volume/dir2,hdfs://volume/file1.nt
 ```
 
 Options:
 
-- rdf.tablePrefix : The tables (spo, po, osp) are prefixed with this 
qualifier. The tables become: 
(rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp
-- ac.* : Accumulo connection parameters
-- rdf.format : See RDFFormat from RDF4J, samples include (Trig, N-Triples, 
RDF/XML)
-- sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity : If any of 
these are set to true, statements will also be
+- **rdf.tablePrefix** - The tables (spo, po, osp) are prefixed with this 
qualifier.
+The tables become: 
(rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp
+- **ac.*** - Accumulo connection parameters
+- **rdf.format** - See RDFFormat from RDF4J, samples include (Trig, 
N-Triples, RDF/XML)
+- **sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity** - If any 
of these are set to true, statements will also be
 added to the enabled secondary indices.
-- sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates: If 
the associated indexer is enabled, these options specify
+- **sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates** - 
If the associated indexer is enabled, these options specify
 which statements should be sent to that indexer (based on the 
predicate). If not given, all indexers will attempt to index
 all statements.
 
-The argument is the directory/file to load. This file needs to be loaded 
into HDFS before running. If loading a directory, all files should have the 
same RDF
-format.
+The positional argument is a comma separated list of directories/files to 
load.
+They need to be loaded into HDFS before running. If loading a directory,
+all files should have the same RDF format.
+
+Once the data is loaded, it is actually a good practice to compact your 
tables.
+You can do this by opening the accumulo shell shell and running the compact
+command on the generated tables. Remember the generated tables will be
+prefixed by the rdf.tablePrefix property you assigned above.
+The default tablePrefix is `rts`.
--- End diff --

"rts" is an unexpected default for table prefix. I think it's "rya_" 
everywhere else in the application. Is this the value this tool was already 
using?


---


[GitHub] incubator-rya issue #298: RYA-497 Make the Rya Accumulo Kafka Connect Sink b...

2018-06-04 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/298
  
Bump.


---


[GitHub] incubator-rya issue #298: RYA-497 Make the Rya Accumulo Kafka Connect Sink b...

2018-05-31 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/298
  
Could somebody pull this in?


---


[GitHub] incubator-rya pull request #298: RYA-497 Make the Rya Accumulo Kafka Connect...

2018-05-22 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/298

RYA-497 Make the Rya Accumulo Kafka Connect Sink batch write instead …

…of flushing after every Statement.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-497

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/298.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #298


commit 7e0862a4452bab889c0680c3a7c63f104d898f7e
Author: kchilton2 
Date:   2018-05-22T22:02:49Z

RYA-497 Make the Rya Accumulo Kafka Connect Sink batch write instead of 
flushing after every Statement.




---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r188060281
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
 ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio 
Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements 
Deserializer> {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsDeserializer.class);
+
+private static final BinaryRDFParserFactory PARSER_FACTORY = new 
BinaryRDFParserFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public Set deserialize(final String topic, final byte[] 
data) {
+if(data == null || data.length == 0) {
+// Return null because that is the contract of this method.
+return null;
+}
+
+try {
+final RDFParser parser = PARSER_FACTORY.getParser();
+final Set statements = new HashSet<>();
+
+parser.setRDFHandler(new RDFHandler() {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187742000
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

I mean that's true, but I don't think it's worth documenting.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187736288
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private MongoRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new MongoRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187736164
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
 ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link 
MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+public static final String HOSTNAME = "mongo.hostname";
+private static final String HOSTNAME_DOC = "The Mongo DB hostname the 
Sail connections wlll use.";
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187736039
  
--- Diff: 
extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
 ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+@Test
+public void instanceExists() throws Exception {
+// Install an instance of Rya.
+final String ryaInstanceName = "rya";
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+super.getMongoHostname(),
+super.getMongoPort(),
+Optional.empty(),
+Optional.empty());
+
+final InstallConfiguration installConfig = 
InstallConfiguration.builder()
+.setEnableTableHashPrefix(false)
+.setEnableEntityCentricIndex(false)
+.setEnableFreeTextIndex(false)
+.setEnableTemporalIndex(false)
+.setEnablePcjIndex(false)
+.setEnableGeoIndex(false)
+.build();
+
+final RyaClient ryaClient = 
MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+// This will pass because the Rya instance exists.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+@Test(expected = ConnectException.class)
+public void instanceDoesNotExist() throws Exception {
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, 
"instance-does-not-exist");
+
+// Starting the task will fail because the Rya instance does 
not exist.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+// TODO show that inserts using visibilities work.
--- End diff --

Oh yea, we're not supporting that. Lemme delete that comment. 


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187735910
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+if(records.isEmpty()) {
+return;
+}
+
+// If a transaction has not been started yet, then start one.
+if(!conn.isActive()) {
+conn.begin();
+}
+
   

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187735067
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

"/**
 * Indicates that all members of the class or package should be annotated 
with the default value of the supplied
 * annotation class. This would be used for behavior annotations such as 
@NonNull, @CheckForNull,
 * or @CheckReturnValue. In particular, you can use 
@DefaultAnnotation(NonNull.class) on a class or package,
 * and then use @Nullable only on those parameters, methods or fields that 
you want to allow to be null.
 */"


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734692
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private AccumuloRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new AccumuloRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734887
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

It indicates by default the parameters are not null.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734448
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

I think saying I'm using the RDF4J Rio Binary format is more useful than 
indicating how I went about doing that since that's what the code is.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734176
  
--- Diff: 
extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
 ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+@Test
+public void parses() {
--- End diff --

Could you give an example of a malformed field? Do you just mean fields 
that are not part of the schema? That's not illegal. They just get ignored.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733815
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
--- End diff --

That's true. Feel free to write an improvement ticket for that.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733573
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

That field is nullable because this is a stateful object, but the 
parameters into the start(...) function may not be null. I'll add a null check 
there.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733216
  
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+
+
+The parent project for all Rya Kafka Connect work. All projects thare are 
part 
+of that system must use this project's pom as their parent pom.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732940
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

I don't wan to mess with how our configuration objects are initialized for 
the scope of this ticket.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732007
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

For me it is. I don't really want to refactor the entire method name in 
this review, though. I just needed it to be static so that I could use it.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731624
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
  * on their child subtrees.
  * @param value whether to use aggregation pipeline optimization.
  */
-public void setUseAggregationPipeline(boolean value) {
+public void setUseAggregationPipeline(final boolean value) {
 setBoolean(USE_AGGREGATION_PIPELINE, value);
 }
 
 @Override
 public List> getOptimizers() {
-List> optimizers = super.getOptimizers();
+final List> optimizers = 
super.getOptimizers();
 if (getUseAggregationPipeline()) {
-Class cl = AggregationPipelineQueryOptimizer.class;
+final Class cl = AggregationPipelineQueryOptimizer.class;
 @SuppressWarnings("unchecked")
+final
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731454
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+} catch(final RyaClientException e) {
+throw new ConnectException

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731377
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731146
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
 ---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect 
Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+private static final Logger log = 
LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+/**
+ * Command line parameters that are used by this command to configure 
itself.
+ */
+public static class WriteParameters extends KafkaParameters {
+@Parameter(names = {"--statementsFile", "-f"}, required = true, 
description = "The file of RDF statements to load into Rya Streams.")
+public String statementsFile;
+}
+
+@Override
+public String getCommand() {
+return "write";
+}
+
+@Override
+public String getDescription() {
+return "Writes Statements to the specified Kafka topic.";
+}
+
+@Override
+public boolean validArguments(final String[] args) {
+boolean valid = true;
+try {
+new JCommander(new WriteParameters(), args);
+} catch(final ParameterException e) {
+valid = false;
+}
+return valid;
+}
+
+/**
+ * @return Describes what arguments may be provided to the command.
+ */
+@Override
+public String getUsage() {
+final JCommander parser = new JCommander(new WriteParameters());
+
+final StringBuilder usage = new StringBuilder();
+parser.usage(usage);
+return usage.toString();
+}
+
+@Override
+public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
+requireNonNull(args);
+
+// Parse the command line arguments.
+final WriteParameters params = new WriteParameters();
+try {
+new JCommander(params, args);
+} catch(final ParameterException e) {
+ 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731028
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
 ---
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic 
using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+/**
+ * Maps from command strings to the object that performs the command.
+ */
+private static final ImmutableMap 
COMMANDS;
+static {
+final Set> commandClasses = 
new HashSet<>();
+commandClasses.add(ReadStatementsCommand.class);
+commandClasses.add(WriteStatementsCommand.class);
+final ImmutableMap.Builder builder 
= ImmutableMap.builder();
+for(final Class commandClass : 
commandClasses) {
+try {
+final RyaKafkaClientCommand command = 
commandClass.newInstance();
+builder.put(command.getCommand(), command);
+} catch (InstantiationException | IllegalAccessException e) {
+System.err.println("Could not run the application because 
a RyaKafkaClientCommand is missing its empty constructor.");
+e.printStackTrace();
+}
+}
+COMMANDS = builder.build();
+}
+
+private static final String USAGE = makeUsage(COMMANDS);
+
+public static void main(final String[] args) {
+// If no command provided or the command isn't recognized, then 
print the usage.
+if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+System.out.println(USAGE);
+System.exit(1);
+}
+
+// Fetch the command that will be executed.
+final String command = args[0];
+final String[] commandArgs = Arrays.copyOfRange(args, 1, 
args.length);
+final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+// Print usage if the arguments are invalid for the command.
+if(!clientCommand.validArguments(commandArgs)) {
+System.out.println(clientCommand.getUsage());
+System.exit(1);
+}
+
+// Execute the command.
+try {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730865
  
--- Diff: extras/kafka.connect/client/pom.xml ---
@@ -0,0 +1,135 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+4.0.0
+
+
+org.apache.rya
+rya.kafka.connect.parent
+4.0.0-incubating-SNAPSHOT
+
+
+rya.kafka.connect.client
+
+Apache Rya Kafka Connect - Client
+Contains a client that may be used to load Statements 
into 
+ a Kafka topic to be read by Kafka Connect.
+
+
+
+
+org.apache.rya
+rya.sail
+
+
+org.apache.rya
+rya.kafka.connect.api
+
+
+
+
+org.eclipse.rdf4j
+rdf4j-model
+
+
+com.google.guava
+guava
+
+
+com.beust
+jcommander
+
+
+com.github.stephenc.findbugs
+findbugs-annotations
+
+
+org.apache.kafka
+kafka-clients
+
+
+
--- End diff --

It does, good call. Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730515
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+if(records.isEmpty()) {
+return;
+}
+
+// If a transaction has not been started yet, then start one.
+if(!conn.isActive()) {
+conn.begin();
+}
+
   

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729976
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer> {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsSerializer.class);
+
+private static final BinaryRDFWriterFactory WRITER_FACTORY = new 
BinaryRDFWriterFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public byte[] serialize(final String topic, final Set data) 
{
+if(data == null) {
+// Returning null because that is the contract of this method.
+return null;
+}
+
+// Write the statements using a Binary RDF Writer.
+final ByteArrayOutputStream boas = new ByteArrayOutputStream();
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730101
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
 ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s 
that write to Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+/**
+ * Get the configuration that will be provided to the tasks when 
{@link #taskConfigs(int)} is invoked.
+ * 
+ * Only called after start has been invoked
+ *
+ * @return The configuration object for the connector.
+ * @throws IllegalStateException Thrown if {@link 
SinkConnector#start(Map)} has not been invoked yet.
+ */
+protected abstract AbstractConfig getConfig() throws 
IllegalStateException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version") : "UNKNOWN";
+}
+
+@Override
+public List> taskConfigs(final int maxTasks) {
+final List> configs = new 
ArrayList<>(maxTasks);
+for(int i = 0; i < maxTasks; i++) {
+configs.add( getConfig().originalsStrings() );
+}
+return configs;
+}
+
+@Override
+public void stop() {
+// Nothing to do since the RyaSinkconnector has no background 
monitoring.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729813
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
 ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio 
Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements 
Deserializer> {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsDeserializer.class);
+
+private static final BinaryRDFParserFactory PARSER_FACTORY = new 
BinaryRDFParserFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public Set deserialize(final String topic, final byte[] 
data) {
+if(data == null || data.length == 0) {
+// Return null because that is the contract of this method.
+return null;
+}
+
+try {
+final RDFParser parser = PARSER_FACTORY.getParser();
+final Set statements = new HashSet<>();
+
+parser.setRDFHandler(new RDFHandler() {
+@Override
+public void handleStatement(final Statement statement) 
throws RDFHandlerException {
+log.debug("Statement: " + statement);
+statements.add( statement );
+}
+
+@Override public void startRDF() throws 
RDFHandlerException { }
+@Override public void handleNamespace(final String arg0, 
final String arg1) throws RDFHandlerException { }
+@Override public void handleComment(final String arg0) 
throws RDFHandlerException { }
+@Override public void endRDF() throws RDFHandlerException 
{ }
+});
+
+parser.parse(new ByteArrayInputStream(data), null);
+return statements;
+
+} catch(final RDFParseException | RDFHandlerException | 
IOException e) {
+log.error("Could not deserialize a Set of VisibilityStatement 
objects using the RDF4J Rio Binray format.", e);
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729543
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729286
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

I'm assuming rya.api. Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729157
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187728842
  
--- Diff: 
extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java
 ---
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link AccumuloRyaSinkTask}.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187727065
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+fina

[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725947
  
--- Diff: 
extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.shell.util.InstallPrompt;
+import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.shell.Bootstrap;
+import org.springframework.shell.core.CommandResult;
+import org.springframework.shell.core.JLineShellComponent;
+
+import com.google.common.base.Optional;
+
+/**
+ * Integration tests the methods of {@link RyaCommands}.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725865
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
+ */
+public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase {
+
+@Test
+public void queryFindsAllLoadedStatements_fromSet() throws Exception {
+// Using the Rya Client, install an instance of Rya for the test.
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+getUsername(),
+getPassword().toCharArray(),
+getInstanceName(),
+getZookeepers());
+
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+final String ryaInstance = 
UUID.randomUUID().toString().replace('-', '_');
+client.getInstall().install(ryaInstance, 
InstallConfiguration.builder().build());
+
+// Load some data into the instance.
+final ValueFactory vf = SimpleValueFactory.getInstance();
+final Set statements = Sets.newHashSet(
+vf.createStatement(vf.createIRI("urn:Alice"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")),
+vf.createStatement(vf.createIRI("urn:Charlie"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:David"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")),
+vf.createStatement(vf.createIRI("urn:Eve"), 
vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob")));
+client.getLoadStatements().loadStatements(ryaInstance, statements);
+
+// Execute a query.
+final Set fetched = new HashSet<>();
+try(final TupleQueryResult result = 
client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE 
{ ?s ?p ?o }")) {
+while(result.hasNext()) {
+final BindingSet bs = result.next();
+
+// If this is the statement that indicates the Rya version
+
if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version")))
 {
+continue;
+}
+
+// Otherwise add it to the list of fetched statements.
+fetched.add( vf.createStatement(
+(Resource

[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725795
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
+ */
+public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase {
+
+@Test
+public void queryFindsAllLoadedStatements_fromSet() throws Exception {
+// Using the Rya Client, install an instance of Rya for the test.
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+getUsername(),
+getPassword().toCharArray(),
+getInstanceName(),
+getZookeepers());
+
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+final String ryaInstance = 
UUID.randomUUID().toString().replace('-', '_');
+client.getInstall().install(ryaInstance, 
InstallConfiguration.builder().build());
+
+// Load some data into the instance.
+final ValueFactory vf = SimpleValueFactory.getInstance();
+final Set statements = Sets.newHashSet(
+vf.createStatement(vf.createIRI("urn:Alice"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")),
+vf.createStatement(vf.createIRI("urn:Charlie"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:David"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")),
+vf.createStatement(vf.createIRI("urn:Eve"), 
vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob")));
+client.getLoadStatements().loadStatements(ryaInstance, statements);
+
+// Execute a query.
+final Set fetched = new HashSet<>();
+try(final TupleQueryResult result = 
client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE 
{ ?s ?p ?o }")) {
+while(result.hasNext()) {
+final BindingSet bs = result.next();
+
+// If this is the statement that indicates the Rya version
+
if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version")))
 {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187724995
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657223
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+fina

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657023
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+} catch(final RyaClientException e) {
+throw new ConnectException

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-10 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/296

RYA-487 Kafka Connect Sinks

Ignore RYA-487. That's a separate PR that this depended on to create 
documentation for the manual.

This PR implements Kafka Connect Sinks for both Accumulo and MongoDB backed 
Rya. I wrote a manual page that explains design decisions that were made, notes 
about deploying the sinks, a quick start, and future work we may want to 
consider.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-487

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #296


commit ed1def04283a2e893a1c45014628dc7fe10717eb
Author: kchilton2 
Date:   2018-05-08T22:33:24Z

RYA-494 Fixed a bug where the shell was not loading or displaying all 
Statements.

commit c2d802bd51d4d6951aa54e48b4038d64462bd8fd
Author: kchilton2 
Date:   2018-04-17T19:10:26Z

RYA-487 Implement Kafka Connect Sink implementations for Accumulo and Mongo 
DB backed Rya.




---


[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-09 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/295

RYA-494 Fixed a bug where the shell was not loading or displaying all…

… Statements.

This ended up being a display bug. The code that was iterating and display 
binding sets was skipping one. I left a bunch of tests in that helped narrow 
down where the bug was.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-494

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #295


commit 942c3420bb14bafca4cde6d0b06a3ef100f07772
Author: kchilton2 
Date:   2018-05-08T22:33:24Z

RYA-494 Fixed a bug where the shell was not loading or displaying all 
Statements.




---


[GitHub] incubator-rya issue #293: RYA-491 Repackaged reusable Mongo DB test code int...

2018-04-25 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/293
  
Updated.


---


[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...

2018-04-25 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/293#discussion_r184151510
  
--- Diff: 
dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAO2IT.java ---
@@ -40,11 +40,11 @@
 /**
  * Integration tests the methods of {@link MongoDBRyaDAO}.
  */
-public class MongoDBRyaDAO2IT extends MongoITBase {
+public class MongoDBRyaDAO2IT extends MongoRyaITBase {
--- End diff --

I'm not messing with classes that already exist except to deal with my 
repackaging.


---


[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...

2018-04-25 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/293#discussion_r184151567
  
--- Diff: 
dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java
 ---
@@ -423,28 +423,28 @@ public void testRequiredTimestamp() throws Exception {
 + "  ?A rdfs:subClassOf ?B .\n"
 + "  ?B rdfs:subClassOf ?A .\n"
 + "}";
-List varNames = Arrays.asList("A", "B");
-Multiset expectedSolutions = HashMultiset.create();
+final List varNames = Arrays.asList("A", "B");
+final Multiset expectedSolutions = 
HashMultiset.create();
 expectedSolutions.add(new ListBindingSet(varNames, person, 
FOAF.PERSON));
 expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, 
person));
 expectedSolutions.add(new ListBindingSet(varNames, thing, 
OWL.THING));
 expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, 
thing));
 // Prepare query and convert to pipeline
-QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, 
null).getTupleExpr());
-SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(getRyaCollection());
+final QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, 
null).getTupleExpr());
+final SparqlToPipelineTransformVisitor visitor = new 
SparqlToPipelineTransformVisitor(getRyaCollection());
 queryTree.visit(visitor);
 Assert.assertTrue(queryTree.getArg() instanceof 
AggregationPipelineQueryNode);
-AggregationPipelineQueryNode pipelineNode = 
(AggregationPipelineQueryNode) queryTree.getArg();
+final AggregationPipelineQueryNode pipelineNode = 
(AggregationPipelineQueryNode) queryTree.getArg();
 // Extend the pipeline by requiring a timestamp of zero (should 
have no effect)
 pipelineNode.requireSourceTimestamp(0);
-Multiset solutions = HashMultiset.create();
+final Multiset solutions = HashMultiset.create();
 CloseableIteration iter = 
pipelineNode.evaluate(new QueryBindingSet());
 while (iter.hasNext()) {
 solutions.add(iter.next());
 }
 Assert.assertEquals(expectedSolutions, solutions);
 // Extend the pipeline by requiring a future timestamp (should 
produce no results)
-long delta = 1000 * 60 * 60 * 24;
+final long delta = 1000 * 60 * 60 * 24;
--- End diff --

I'm not messing with classes that already exist except to deal with my 
repackaging.


---


[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...

2018-04-25 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/293#discussion_r184151234
  
--- Diff: pom.xml ---
@@ -381,6 +458,26 @@ under the License.
 rya.indexing.pcj
 ${project.version}
 
+
+org.apache.rya
+rya.kafka.connect.parent
--- End diff --

Yep, not sure how that got in there.


---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-04-25 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
Eric, please close this.


---


[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...

2018-04-24 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/293

RYA-491 Repackaged reusable Mongo DB test code into a project named '…

…rya.test.mongo'.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-491

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/293.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #293


commit 6752b3749eb851f9da74f75a516d49e1b24fc2c5
Author: kchilton2 
Date:   2018-04-19T19:18:38Z

RYA-489 Moved common Accumulo integration test code to the 
rya.test.accumulo project.

commit 5d973efe08a5f9d3b6e2f0a8ac8475baf12f71d8
Author: kchilton2 
Date:   2018-04-24T20:32:05Z

RYA-491 Repackaged reusable Mongo DB test code into a project named 
'rya.test.mongo'.




---


[GitHub] incubator-rya issue #292: Rya 489

2018-04-24 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/292
  
Could somebody code review this or pull it in?


---


[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...

2018-04-24 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/290
  
Could somebody code review this or pull it in?


---


[GitHub] incubator-rya issue #245: [RYA-405] Migrate from Sesame to rdf4j libs

2018-04-24 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/245
  
This can be closed. Eric finished this work and it has been pulled into 
master.


---


[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...

2018-04-23 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/290
  
asfbot build


---


[GitHub] incubator-rya issue #289: RYA-486 Updated the project's version to 4.0.0-inc...

2018-04-20 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/289
  
Could a committer pull this in?


---


[GitHub] incubator-rya issue #291: RYA-405 Migration of OpenRDF Sesame libraries to R...

2018-04-20 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/291
  
I'd say ship it.


---


[GitHub] incubator-rya pull request #292: Rya 489

2018-04-19 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/292

Rya 489

This is pretty much just a repackaging commit. I moved the generic accumulo 
test code to a new "rya.test.accumulo" project. I had to introduce some code 
for performing installation steps to one of the test classes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-489

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/292.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #292


commit 9f8dc746f9e610b6e895acaf39c36c6e96a11f4e
Author: kchilton2 
Date:   2018-04-17T04:20:13Z

RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT.

commit 46f582771e1dfe575522ca82e303a8ad4077f529
Author: Jorge Machado 
Date:   2017-10-18T09:32:39Z

RYA-405 Migration of OpenRDF Sesame libraries to RDF4J

Co-authored-by: eric.white 
Co-authored-by: Jorge Machado 

commit afd2b3ec91a28622a195cb1f1f45d5c6297d350d
Author: eric.white 
Date:   2018-04-19T18:28:51Z

RYA-405 PR Updates

commit 2e692c29f4b2fd30aae52ee863b4776aee9628d6
Author: kchilton2 
Date:   2018-04-19T19:18:38Z

RYA-489 Moved common Accumulo integration test code to the 
rya.test.accumulo project.




---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182825471
  
--- Diff: extras/vagrantExample/src/main/vagrant/Vagrantfile ---
@@ -52,8 +52,8 @@ Vagrant.configure(2) do |config|
 export ACCUMULO_VERSION=1.6.5
 ###export ACCUMULO_VERSION=1.7.1
 export HADOOP_VERSION=2.7.2
-export RYA_EXAMPLE_VERSION=3.2.10-SNAPSHOT
-export SESAME_VERSION=2.7.6
+export RYA_EXAMPLE_VERSION=3.2.13-SNAPSHOT
--- End diff --

This will need to be 4.0.0. This commit should be after the version update 
commit.


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182596150
  
--- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/Node.java 
---
@@ -21,14 +21,14 @@
 
 
 
-import org.openrdf.model.impl.URIImpl;
+import org.eclipse.rdf4j.model.impl.SimpleIRI;
 
 /**
  * A Node is an expected node in the global graph. This typing of the URI 
allows us to dictate the difference between a
--- End diff --

"This typing of the [IRI] ..."


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182596798
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java
 ---
@@ -19,31 +19,29 @@
  * under the License.
  */
 
-
 import java.io.Closeable;
 import java.io.Flushable;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configurable;
-import org.openrdf.model.URI;
-
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.domain.RyaURI;
+import org.eclipse.rdf4j.model.IRI;
 
 public interface RyaSecondaryIndexer extends Closeable, Flushable, 
Configurable {
/**
 * initialize after setting configuration.
 */
-public void init();
+   public void init();
--- End diff --

Tab


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182596437
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/domain/VarNameUtils.java ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.domain;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.rdf4j.model.Value;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.algebra.Var;
+import org.eclipse.rdf4j.query.algebra.helpers.TupleExprs;
+
+/**
+ * Utility methods and constants for RDF {@link Var} names.
+ */
+public final class VarNameUtils {
--- End diff --

Us needing this class at all is kind of a smell. Maybe note that this is 
discouraged if there's any other way to figure out if your Var is constant or 
anonymous. I think there are methods that tell you that information.


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182596172
  
--- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/Node.java 
---
@@ -21,14 +21,14 @@
 
 
 
-import org.openrdf.model.impl.URIImpl;
+import org.eclipse.rdf4j.model.impl.SimpleIRI;
 
 /**
  * A Node is an expected node in the global graph. This typing of the URI 
allows us to dictate the difference between a
  * URI that is just an Attribute on the subject vs. a URI that is another 
subject Node in the global graph. It does not
--- End diff --

IRI


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182602468
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ThreshholdPlanSelector.java
 ---
@@ -111,9 +110,9 @@ public double getCost(TupleExpr te, double indexWeight, 
double commonVarWeight,
 double dirProductScale;
 
 if(queryNodeCount > nodeCount) {
-dirProductScale = 1/((double)(queryNodeCount - nodeCount));
+dirProductScale = 1/ (queryNodeCount - nodeCount);
--- End diff --

Are we sure about getting rid of the double casts here? We don't lose 
precision in the division?


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182601712
  
--- Diff: dao/accumulo.rya/pom.xml ---
@@ -43,25 +43,30 @@ under the License.
 
 
 
-org.openrdf.sesame
-sesame-rio-ntriples
+org.eclipse.rdf4j
+rdf4j-rio-ntriples
 
 
-org.openrdf.sesame
-sesame-rio-nquads
+org.eclipse.rdf4j
+rdf4j-rio-nquads
 
 
-org.openrdf.sesame
-sesame-queryalgebra-evaluation
+org.eclipse.rdf4j
+rdf4j-queryalgebra-evaluation
+
+
+org.eclipse.rdf4j
+rdf4j-sail-api
+${org.eclipse.rdf4j.version}
--- End diff --

Add to rya-project dependency management


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182596490
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/domain/VarNameUtils.java ---
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.domain;
+
+import org.apache.commons.lang.StringUtils;
+import org.eclipse.rdf4j.model.Value;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.algebra.Var;
+import org.eclipse.rdf4j.query.algebra.helpers.TupleExprs;
+
+/**
+ * Utility methods and constants for RDF {@link Var} names.
+ */
+public final class VarNameUtils {
+private static final ValueFactory VF = 
SimpleValueFactory.getInstance();
+
+/**
+ * Prepended to the start of constant var names.
+ */
+public static final String CONSTANT_PREFIX = "_const_";
+private static final String LEGACY_CONSTANT_PREFIX = "-const-";
+
+/**
+ * Prepended to the start of anonymous var names.
+ */
+public static final String ANONYMOUS_PREFIX = "_anon_";
+private static final String LEGACY_ANONYMOUS_PREFIX = "-anon-";
+
+/**
+ * Private constructor to prevent instantiation.
+ */
+private VarNameUtils() {
+}
+
+/**
+ * Prepends the constant prefix to the specified value.
+ * @param value the value to add the constant prefix to.
+ * @return the value with the constant prefix attached before it.
+ */
+public static String prependConstant(final String value) {
--- End diff --

Params and return type are @ Nullable


---


[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...

2018-04-19 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/291#discussion_r182602289
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java
 ---
@@ -157,18 +156,18 @@ private void buildQuery(final TupleExpr tupleExpr, 
final StatementPattern matchS
 tupleExpr.visit(fVisitor);
 final List results = Lists.newArrayList();
 for(int i = 0; i < fVisitor.func.size(); i++){
-results.add(new IndexingExpr(fVisitor.func.get(i), 
matchStatement, fVisitor.args.get(i)));
+results.add(new IndexingExpr(fVisitor.func.get(i), 
matchStatement, Arrays.stream(fVisitor.args.get(i)).toArray()));
--- End diff --

This is kind of a weird change.


---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-04-18 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
This can be declined.


---


[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...

2018-04-18 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/290
  
That test failing has nothing to do with this commit. We should open a 
defect and fix the test.


---


[GitHub] incubator-rya pull request #290: RYA-488 Moved the Geo Indexing specific mav...

2018-04-17 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/290

RYA-488 Moved the Geo Indexing specific maven repositories into the g…



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-488

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/290.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #290


commit 80a3b76afc3d7aa8391ea7228de1019d7e644529
Author: kchilton2 
Date:   2018-04-17T21:10:59Z

RYA-488 Moved the Geo Indexing specific maven repositories into the 
geoindexing profile.




---


[GitHub] incubator-rya issue #245: [RYA-405] Migrate from Sesame to rdf4j libs

2018-04-17 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/245
  
@ejwhite922 Are you able to squash down the history where it makes sense so 
that this won't be messy when it goes in?


---


[GitHub] incubator-rya pull request #289: RYA-486 Updated the project's version to 4....

2018-04-16 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/289

RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya RYA-486

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/289.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #289


commit 9f8dc746f9e610b6e895acaf39c36c6e96a11f4e
Author: kchilton2 
Date:   2018-04-17T04:20:13Z

RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT.




---


[GitHub] incubator-rya pull request #285: RYA-469 Added tests for Rya Streams join it...

2018-03-30 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/285#discussion_r178385888
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java
 ---
@@ -77,30 +77,39 @@
 /**
  * This is the maximum value of a UTF-8 character.
  */
-private static final String END_RANGE_SUFFIX = new String(new byte[] { 
(byte) 0XFF }, Charsets.UTF_8);
+private static final String END_RANGE_SUFFIX = new String(new byte[] { 
(byte) 0xFF }, Charsets.UTF_8);
+
+/**
+ * Indicates where the end of the join variables occurs.
+ */
+private static final String JOIN_VAR_END_MARKER = new 
String("~!^~".getBytes(Charsets.UTF_8), Charsets.UTF_8);
--- End diff --

I'm not sure that the marker should be in the visually renderable byte 
ranges since, while very unlikely, they could legitimately appear in the data 
that is being indexed.


---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-03-16 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
Also, to uninstall it I was using
rpm -qa | grep rya

Then whatever that reported as the installed package I used 
rpm -e 

That is what is not working.


---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-03-16 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
I used the RPM you gave me built using this patch when it did not work. I 
had never installed the bad state RPM.


---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-03-15 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
https://github.com/apache/incubator-rya/pull/284


---


[GitHub] incubator-rya pull request #284: Reverting the RPM build to use the RPM comm...

2018-03-15 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/284

Reverting the RPM build to use the RPM command again. The RPM build i…

…s now activated using a profile.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya fix-rpm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #284


commit c801f69f79547f4cd36e5e996693e7f767fd564a
Author: Kevin Chilton 
Date:   2018-03-15T21:29:49Z

Reverting the RPM build to use the RPM command again. The RPM build is now 
activated using a profile.




---


[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...

2018-03-15 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/282
  
This patch does not create an RPM that can be uninstalled. I'm going to 
revert all of these changes that Eric made to how we build RPMs out, and put my 
stuff back in. I'll make the RPM build a profile so Windows users can still 
build our app.


---


[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...

2018-03-02 Thread kchilton2
GitHub user kchilton2 reopened a pull request:

https://github.com/apache/incubator-rya/pull/273

Updating the Rya Manual to include Rya Streams.

## Description
Wrote the manual page for Rya Streams.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya manual-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/273.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #273


commit f9c579af6997dbb66405e185ef4bf71eaa56e7d5
Author: kchilton2 
Date:   2018-02-09T17:33:36Z

Updating the Rya Manual to include Rya Streams.




---


[GitHub] incubator-rya issue #273: Updating the Rya Manual to include Rya Streams.

2018-03-02 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/273
  
Never mind. This one was not included.


---


[GitHub] incubator-rya issue #272: RYA-443 Rya Streams Query Manager daemon program.

2018-03-02 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/272
  
I'm closing this because https://github.com/apache/incubator-rya/pull/279 
takes it over.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-03-02 Thread kchilton2
Github user kchilton2 closed the pull request at:

https://github.com/apache/incubator-rya/pull/272


---


[GitHub] incubator-rya issue #275: RYA-466 Update the Rya Streams Client to stream re...

2018-03-02 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/275
  
I'm closing this because https://github.com/apache/incubator-rya/pull/279 
takes it over.


---


[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...

2018-03-02 Thread kchilton2
Github user kchilton2 closed the pull request at:

https://github.com/apache/incubator-rya/pull/273


---


[GitHub] incubator-rya issue #273: Updating the Rya Manual to include Rya Streams.

2018-03-02 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/273
  
I'm closing this because https://github.com/apache/incubator-rya/pull/279 
takes it over.


---


[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...

2018-03-02 Thread kchilton2
Github user kchilton2 closed the pull request at:

https://github.com/apache/incubator-rya/pull/275


---


[GitHub] incubator-rya pull request #277: Rya 460

2018-03-02 Thread kchilton2
Github user kchilton2 closed the pull request at:

https://github.com/apache/incubator-rya/pull/277


---


[GitHub] incubator-rya issue #277: Rya 460

2018-03-02 Thread kchilton2
Github user kchilton2 commented on the issue:

https://github.com/apache/incubator-rya/pull/277
  
I'm closing this because https://github.com/apache/incubator-rya/pull/279 
takes it over.


---


[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...

2018-02-27 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/275#discussion_r171065067
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java ---
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.regex.Pattern;
+
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility class that is used to glean insight into the structure of 
SPARQL queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryInvestigator {
+
+private static final SPARQLParser PARSER = new SPARQLParser();
+
+private QueryInvestigator() { }
+
+/**
+ * Determines whether a SPARQL command is a CONSTRUCT or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is a CONSTRUCT query; 
otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed 
query or update.
+ */
+public static boolean isConstruct(final String sparql) throws 
MalformedQueryException {
+requireNonNull(sparql);
+
+try {
+// Constructs are queries, so try to create a ParsedQuery.
+PARSER.parseQuery(sparql, null);
+
+// Check to see if the SPARQL looks like a CONSTRUCT query.
+return Pattern.matches(".*?construct.*?where.*", 
sparql.toLowerCase());
--- End diff --

Just write a better regex.


---


[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...

2018-02-27 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/275#discussion_r171064989
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java ---
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.regex.Pattern;
+
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility class that is used to glean insight into the structure of 
SPARQL queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryInvestigator {
+
+private static final SPARQLParser PARSER = new SPARQLParser();
+
+private QueryInvestigator() { }
+
+/**
+ * Determines whether a SPARQL command is a CONSTRUCT or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is a CONSTRUCT query; 
otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed 
query or update.
+ */
+public static boolean isConstruct(final String sparql) throws 
MalformedQueryException {
+requireNonNull(sparql);
+
+try {
+// Constructs are queries, so try to create a ParsedQuery.
+PARSER.parseQuery(sparql, null);
+
+// Check to see if the SPARQL looks like a CONSTRUCT query.
+return Pattern.matches(".*?construct.*?where.*", 
sparql.toLowerCase());
--- End diff --

Any SELECT could also create a ParsedGraphQuery, so that will not work.


---


[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...

2018-02-27 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/275#discussion_r171032022
  
--- Diff: 
common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java ---
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.utils;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.regex.Pattern;
+
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A utility class that is used to glean insight into the structure of 
SPARQL queries.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryInvestigator {
+
+private static final SPARQLParser PARSER = new SPARQLParser();
+
+private QueryInvestigator() { }
+
+/**
+ * Determines whether a SPARQL command is a CONSTRUCT or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is a CONSTRUCT query; 
otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed 
query or update.
+ */
+public static boolean isConstruct(final String sparql) throws 
MalformedQueryException {
+requireNonNull(sparql);
+
+try {
+// Constructs are queries, so try to create a ParsedQuery.
+PARSER.parseQuery(sparql, null);
+
+// Check to see if the SPARQL looks like a CONSTRUCT query.
+return Pattern.matches(".*?construct.*?where.*", 
sparql.toLowerCase());
+
+} catch(final MalformedQueryException queryE) {
+try {
+// Maybe it's an update.
+PARSER.parseUpdate(sparql, null);
+
+// It was, so return false.
+return false;
+
+} catch(final MalformedQueryException updateE) {
+// It's not. Actually malformed.
+throw queryE;
+}
+}
+}
+
+/**
+ * Determines whether a SPARQL command is an INSERT with a WHERE 
clause or not.
+ *
+ * @param sparql - The SPARQL to evaluate. (not null)
+ * @return {@code true} if the provided SPARQL is an INSERT update; 
otherwise {@code false}.
+ * @throws MalformedQueryException The SPARQL is neither a well formed 
query or update.
+ */
+public static boolean isInsertWhere(final String sparql) throws 
MalformedQueryException {
+requireNonNull(sparql);
+
+try {
+// Inserts are updated, so try to create a ParsedUpdate.
+PARSER.parseUpdate(sparql, null);
+
+// Check to see if the SPARQL looks like an INSERT query.
+return Pattern.matches(".*?insert.*?where.*", 
sparql.toLowerCase());
--- End diff --

.*? is part of the regex. It will not match on ?where. But yea, it will def 
hit the 'where' part.


---


[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...

2018-02-26 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/273#discussion_r170711583
  
--- Diff: extras/rya.manual/src/site/markdown/rya-streams.md ---
@@ -0,0 +1,385 @@
+
+
+# Rya Streams
+
+Introduced in 3.2.12
+
+## Disclaimer
+This is a Beta feature. We do not guarantee newer versions of Rya Streams
+will be compatible with this version. You may need to remove all of your
+queries and their associated data from your Rya Streams system and then 
+reprocess them using the upgraded system.
+
+# Table of Contents
+- [Introduction](#introduction)
+- [Architecture](#architecture)
+- [Quick Start](#quick-start)
+- [Use Cases](#use-cases)
+- [Future Work](#future-work)
+
+
+
+## Introduction 
+Rya Streams is a system that processes SPARQL queries over streams of RDF 
+Statements that may have Visibilities attached to them. It does this by 
+utilizing Kafka as a data processing platform.
+
+There are three basic building blocks that the system depends on:
+
+* **Streams Query** - This is a SPARQL query that is registered wth Rya 
Streams.
+  It is associated with a specific Rya instance because that Rya instance 
+  determines which Statements the query will evaluate. It has an ID that 
+  uniquely identifies it across all of the queries that are managed by the 
+  system, whether or not the system should be processing it, and whether 
or not 
+  the results of the query needs to be inserted back into the Rya instance 
the 
+  source statements come from.
+  
+* **Query Change Log** - A list of changes that have been performed to the
+  Streams Queries of a specific Rya Instance. This log contains the 
absolute  
+  truth about what queries are registered, which are running, and which 
generate
+  new statements that need to be inserted back into Rya.
+
+* **Query Manager** - A daemon application that reacts to new/deleted 
Query 
+  Change Logs as well as new entries within those logs. It starts and stops
+  Kafka Streams processing jobs for the active Streams Queries. 
+
+The Quick Start section explains how the Rya Streams Client is used to 
interact
+with the system using a simple SPARQL query and some sample Statements.
+
+
+
+## Architecture ##
+
+The following image is a high level view of how Rya Streams interacts with
+Rya to process queries.
+
+![alt text](../images/rya-streams-high-level.png)
+
+1. The Rya Streams client is used to register/update/delete queries.
+2. Rya Streams notices the change starts/stops processing a query based on
+   what the change was.
+3. Statements are discovered for ingest by whatever application is loading 
data
+   into Rya.
+4. Those Statements are written to Rya Streams so that the Streams Queries 
may
+   process them and produce results.
+5. Those Statements are also written to the Rya instance for ad-hoc 
querying.
+6. Rya Streams produces Visibility Binding Sets and/or Visibility 
Statements 
+   that are written back to Rya.
+7. Those same Visibility Binding Sets and/or Visibility Statements are made
+   available to other systems as well. 
+
+
+
+## Quick Start ##
+This tutorial demonstrates how to install and start the Rya Streams 
system. It
+must be configured and running on its own before any Rya instances may use 
it. 
+After performing the steps of this quick start, you will have installed 
the 
+system, demonstrated that it is functioning properly, and then may use the 
Rya 
+Shell to configure Rya instances to use it.  
+ 
+This tutorial assumes you are starting fresh and have no existing Kafka, 
or 
+Zookeeper data. The two services must already be installed and running.
+
+### Step 1: Download the applications ###
+
+You can fetch the artifacts you need to follow this Quick Start from our
+[downloads page](http://rya.apache.org/download/). Click on the release of
+interest and follow the "Central repository for Maven and other dependency 
+managers" URL.
+
+Fetch the following two artifacts:
+
+Artifact Id | Type 
+--- | ---
+rya.streams.client | shaded jar
+rya.streams.query-manager | rpm
+
+### Step 2: Install the Query Manager ###
+
+Copy the RPM to the CentOS 7 machine the Query Manager will be installed 
on.
+Install it using the following command:
+
+```
+yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm
+```
+
+It will install the program to **/opt/rya-streams-query-manager-3.2.12**. 
Follow
+the directions that are in the README.txt file within that directory to 
finish
+configuration. 
+
+### Step 3: Register a Str

[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...

2018-02-26 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/273#discussion_r170711002
  
--- Diff: extras/rya.manual/src/site/markdown/rya-streams.md ---
@@ -0,0 +1,385 @@
+
+
+# Rya Streams
+
+Introduced in 3.2.12
+
+## Disclaimer
+This is a Beta feature. We do not guarantee newer versions of Rya Streams
+will be compatible with this version. You may need to remove all of your
+queries and their associated data from your Rya Streams system and then 
+reprocess them using the upgraded system.
+
+# Table of Contents
+- [Introduction](#introduction)
+- [Architecture](#architecture)
+- [Quick Start](#quick-start)
+- [Use Cases](#use-cases)
+- [Future Work](#future-work)
+
+
+
+## Introduction 
+Rya Streams is a system that processes SPARQL queries over streams of RDF 
+Statements that may have Visibilities attached to them. It does this by 
+utilizing Kafka as a data processing platform.
+
+There are three basic building blocks that the system depends on:
+
+* **Streams Query** - This is a SPARQL query that is registered wth Rya 
Streams.
+  It is associated with a specific Rya instance because that Rya instance 
+  determines which Statements the query will evaluate. It has an ID that 
+  uniquely identifies it across all of the queries that are managed by the 
+  system, whether or not the system should be processing it, and whether 
or not 
+  the results of the query needs to be inserted back into the Rya instance 
the 
+  source statements come from.
+  
+* **Query Change Log** - A list of changes that have been performed to the
+  Streams Queries of a specific Rya Instance. This log contains the 
absolute  
+  truth about what queries are registered, which are running, and which 
generate
+  new statements that need to be inserted back into Rya.
+
+* **Query Manager** - A daemon application that reacts to new/deleted 
Query 
+  Change Logs as well as new entries within those logs. It starts and stops
+  Kafka Streams processing jobs for the active Streams Queries. 
+
+The Quick Start section explains how the Rya Streams Client is used to 
interact
+with the system using a simple SPARQL query and some sample Statements.
+
+
+
+## Architecture ##
+
+The following image is a high level view of how Rya Streams interacts with
+Rya to process queries.
+
+![alt text](../images/rya-streams-high-level.png)
+
+1. The Rya Streams client is used to register/update/delete queries.
+2. Rya Streams notices the change starts/stops processing a query based on
+   what the change was.
+3. Statements are discovered for ingest by whatever application is loading 
data
+   into Rya.
+4. Those Statements are written to Rya Streams so that the Streams Queries 
may
+   process them and produce results.
+5. Those Statements are also written to the Rya instance for ad-hoc 
querying.
+6. Rya Streams produces Visibility Binding Sets and/or Visibility 
Statements 
+   that are written back to Rya.
+7. Those same Visibility Binding Sets and/or Visibility Statements are made
+   available to other systems as well. 
+
+
+
+## Quick Start ##
+This tutorial demonstrates how to install and start the Rya Streams 
system. It
+must be configured and running on its own before any Rya instances may use 
it. 
+After performing the steps of this quick start, you will have installed 
the 
+system, demonstrated that it is functioning properly, and then may use the 
Rya 
+Shell to configure Rya instances to use it.  
+ 
+This tutorial assumes you are starting fresh and have no existing Kafka, 
or 
+Zookeeper data. The two services must already be installed and running.
+
+### Step 1: Download the applications ###
+
+You can fetch the artifacts you need to follow this Quick Start from our
+[downloads page](http://rya.apache.org/download/). Click on the release of
+interest and follow the "Central repository for Maven and other dependency 
+managers" URL.
+
+Fetch the following two artifacts:
+
+Artifact Id | Type 
+--- | ---
+rya.streams.client | shaded jar
+rya.streams.query-manager | rpm
+
+### Step 2: Install the Query Manager ###
+
+Copy the RPM to the CentOS 7 machine the Query Manager will be installed 
on.
+Install it using the following command:
+
+```
+yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm
+```
+
+It will install the program to **/opt/rya-streams-query-manager-3.2.12**. 
Follow
+the directions that are in the README.txt file within that directory to 
finish
+configuration. 
+
+### Step 3: Register a Str

[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...

2018-02-26 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/273#discussion_r170710886
  
--- Diff: extras/rya.manual/src/site/markdown/rya-streams.md ---
@@ -0,0 +1,385 @@
+
+
+# Rya Streams
+
+Introduced in 3.2.12
+
+## Disclaimer
+This is a Beta feature. We do not guarantee newer versions of Rya Streams
+will be compatible with this version. You may need to remove all of your
+queries and their associated data from your Rya Streams system and then 
+reprocess them using the upgraded system.
+
+# Table of Contents
+- [Introduction](#introduction)
+- [Architecture](#architecture)
+- [Quick Start](#quick-start)
+- [Use Cases](#use-cases)
+- [Future Work](#future-work)
+
+
+
+## Introduction 
+Rya Streams is a system that processes SPARQL queries over streams of RDF 
+Statements that may have Visibilities attached to them. It does this by 
+utilizing Kafka as a data processing platform.
+
+There are three basic building blocks that the system depends on:
+
+* **Streams Query** - This is a SPARQL query that is registered wth Rya 
Streams.
+  It is associated with a specific Rya instance because that Rya instance 
+  determines which Statements the query will evaluate. It has an ID that 
+  uniquely identifies it across all of the queries that are managed by the 
+  system, whether or not the system should be processing it, and whether 
or not 
+  the results of the query needs to be inserted back into the Rya instance 
the 
+  source statements come from.
+  
+* **Query Change Log** - A list of changes that have been performed to the
+  Streams Queries of a specific Rya Instance. This log contains the 
absolute  
+  truth about what queries are registered, which are running, and which 
generate
+  new statements that need to be inserted back into Rya.
+
+* **Query Manager** - A daemon application that reacts to new/deleted 
Query 
+  Change Logs as well as new entries within those logs. It starts and stops
+  Kafka Streams processing jobs for the active Streams Queries. 
+
+The Quick Start section explains how the Rya Streams Client is used to 
interact
+with the system using a simple SPARQL query and some sample Statements.
+
+
+
+## Architecture ##
+
+The following image is a high level view of how Rya Streams interacts with
+Rya to process queries.
+
+![alt text](../images/rya-streams-high-level.png)
+
+1. The Rya Streams client is used to register/update/delete queries.
+2. Rya Streams notices the change starts/stops processing a query based on
+   what the change was.
+3. Statements are discovered for ingest by whatever application is loading 
data
+   into Rya.
+4. Those Statements are written to Rya Streams so that the Streams Queries 
may
+   process them and produce results.
+5. Those Statements are also written to the Rya instance for ad-hoc 
querying.
+6. Rya Streams produces Visibility Binding Sets and/or Visibility 
Statements 
+   that are written back to Rya.
+7. Those same Visibility Binding Sets and/or Visibility Statements are made
+   available to other systems as well. 
+
+
+
+## Quick Start ##
+This tutorial demonstrates how to install and start the Rya Streams 
system. It
+must be configured and running on its own before any Rya instances may use 
it. 
+After performing the steps of this quick start, you will have installed 
the 
+system, demonstrated that it is functioning properly, and then may use the 
Rya 
+Shell to configure Rya instances to use it.  
+ 
+This tutorial assumes you are starting fresh and have no existing Kafka, 
or 
+Zookeeper data. The two services must already be installed and running.
+
+### Step 1: Download the applications ###
+
+You can fetch the artifacts you need to follow this Quick Start from our
+[downloads page](http://rya.apache.org/download/). Click on the release of
+interest and follow the "Central repository for Maven and other dependency 
+managers" URL.
+
+Fetch the following two artifacts:
+
+Artifact Id | Type 
+--- | ---
+rya.streams.client | shaded jar
+rya.streams.query-manager | rpm
+
+### Step 2: Install the Query Manager ###
+
+Copy the RPM to the CentOS 7 machine the Query Manager will be installed 
on.
+Install it using the following command:
+
+```
+yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm
+```
+
+It will install the program to **/opt/rya-streams-query-manager-3.2.12**. 
Follow
+the directions that are in the README.txt file within that directory to 
finish
+configuration. 
+
+### Step 3: Register a Str

  1   2   3   4   5   6   7   >