[GitHub] incubator-rya pull request #305: Rya 135 collection name
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/305#discussion_r230497599 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -35,19 +35,17 @@ * A {@link RdfCloudTripleStoreConfiguration} that configures how Rya connects to a MongoDB Rya triple store. */ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { +public static final String RYA_TRIPLES_COLLECTION = "rya_triples"; // MongoDB Server connection values. public static final String MONGO_HOSTNAME = "mongo.db.instance"; public static final String MONGO_PORT = "mongo.db.port"; // MongoDB Database values. -public static final String MONGO_DB_NAME = "mongo.db.name"; +public static final String RYA_INSTANCE_NAME = "mongo.db.name"; --- End diff -- I don't think this field should be renamed. It's still the Mongo DB Name. Elsewhere in the code we should decide that the rya instance name also happens to be the db name. If this is in fact just the instance name, and the DB name isn't allowed to be configured anymore, then the field's value shouldn't be "mongo.db.name" anymore. ---
[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/300#discussion_r217849376 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java --- @@ -73,12 +72,12 @@ protected T storageStrategy; private MongoDbBatchWriter mongoDbBatchWriter; +protected String collectionName; --- End diff -- How did it work before, then? ---
[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/300#discussion_r217848961 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java --- @@ -73,12 +72,12 @@ protected T storageStrategy; private MongoDbBatchWriter mongoDbBatchWriter; +protected String collectionName; --- End diff -- Oh, the subclasses change it directly. That's unclear, why not leave it the way it was? ---
[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/300#discussion_r217848617 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java --- @@ -73,12 +72,12 @@ protected T storageStrategy; private MongoDbBatchWriter mongoDbBatchWriter; +protected String collectionName; --- End diff -- Is this field ever set anywhere? ---
[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/300#discussion_r217847623 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -197,22 +195,22 @@ public void setMongoPassword(final String password) { * @return The name of the Rya instance to connect to. (default: rya) */ public String getRyaInstanceName() { -return get(MONGO_COLLECTION_PREFIX, "rya"); +return get(MONGO_DB_NAME, "rya"); --- End diff -- So an application can say set setMongoDBName("a") setRyaInstanceName("b") And then getMongoDBName() returns "b"? That's unexpected. ---
[GitHub] incubator-rya pull request #300: RYA-135 Hard code triples collection
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/300#discussion_r217846855 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/AbstractMongoDBRdfConfigurationBuilder.java --- @@ -182,8 +167,7 @@ private C getConf(C conf) { conf.setMongoPassword(pass); } conf.setMongoDBName(mongoDBName); -conf.setRyaInstanceName(mongoCollectionPrefix); -conf.setTablePrefix(mongoCollectionPrefix); +conf.setRyaInstanceName(mongoDBName); --- End diff -- If Mongo DB Name and Rya Instance Name are always the same, why not just have one of those fields? ---
[GitHub] incubator-rya issue #301: Version fix
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/301 Please make a defect for this. ---
[GitHub] incubator-rya pull request #299: RYA-500: Make RdfFileInputTool to accept mu...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/299#discussion_r200156492 --- Diff: extras/rya.manual/src/site/markdown/loaddata.md --- @@ -92,29 +92,55 @@ The default "format" is RDF/XML, but these formats are supported : RDFXML, NTRIP ## Bulk Loading data -Bulk loading data is done through Map Reduce jobs +Bulk loading data is done through Map Reduce jobs. ### Bulk Load RDF data -This Map Reduce job will read files into memory and parse them into statements. The statements are saved into the store. Here is an example for storing in Accumulo: +This Map Reduce job will read files into memory and parse them into statements. The statements are saved into the triplestore. +Here are the steps to prepare and run the job: + + * Load the RDF data to HDFS. It can be single of multiple volumes and directories in them. + * Also load the `mapreduce/target/rya.mapreduce--shaded.jar` executable jar file to HDFS. + * Run the following sample command: ``` -hadoop jar target/rya.mapreduce-3.2.10-SNAPSHOT-shaded.jar org.apache.rya.accumulo.mr.RdfFileInputTool -Dac.zk=localhost:2181 -Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret -Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples /tmp/temp.ntrips +hadoop hdfs://volume/rya.mapreduce--shaded.jar org.apache.rya.accumulo.mr.tools.RdfFileInputTool -Dac.zk=localhost:2181 -Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret -Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples hdfs://volume/dir1,hdfs://volume/dir2,hdfs://volume/file1.nt ``` Options: -- rdf.tablePrefix : The tables (spo, po, osp) are prefixed with this qualifier. The tables become: (rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp -- ac.* : Accumulo connection parameters -- rdf.format : See RDFFormat from RDF4J, samples include (Trig, N-Triples, RDF/XML) -- sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity : If any of these are set to true, statements will also be +- **rdf.tablePrefix** - The tables (spo, po, osp) are prefixed with this qualifier. +The tables become: (rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp +- **ac.*** - Accumulo connection parameters +- **rdf.format** - See RDFFormat from RDF4J, samples include (Trig, N-Triples, RDF/XML) +- **sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity** - If any of these are set to true, statements will also be added to the enabled secondary indices. -- sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates: If the associated indexer is enabled, these options specify +- **sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates** - If the associated indexer is enabled, these options specify which statements should be sent to that indexer (based on the predicate). If not given, all indexers will attempt to index all statements. -The argument is the directory/file to load. This file needs to be loaded into HDFS before running. If loading a directory, all files should have the same RDF -format. +The positional argument is a comma separated list of directories/files to load. +They need to be loaded into HDFS before running. If loading a directory, +all files should have the same RDF format. + +Once the data is loaded, it is actually a good practice to compact your tables. +You can do this by opening the accumulo shell shell and running the compact --- End diff -- "the accumulo shell shell and" ---
[GitHub] incubator-rya pull request #299: RYA-500: Make RdfFileInputTool to accept mu...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/299#discussion_r200156998 --- Diff: extras/rya.manual/src/site/markdown/loaddata.md --- @@ -92,29 +92,55 @@ The default "format" is RDF/XML, but these formats are supported : RDFXML, NTRIP ## Bulk Loading data -Bulk loading data is done through Map Reduce jobs +Bulk loading data is done through Map Reduce jobs. ### Bulk Load RDF data -This Map Reduce job will read files into memory and parse them into statements. The statements are saved into the store. Here is an example for storing in Accumulo: +This Map Reduce job will read files into memory and parse them into statements. The statements are saved into the triplestore. +Here are the steps to prepare and run the job: + + * Load the RDF data to HDFS. It can be single of multiple volumes and directories in them. + * Also load the `mapreduce/target/rya.mapreduce--shaded.jar` executable jar file to HDFS. + * Run the following sample command: ``` -hadoop jar target/rya.mapreduce-3.2.10-SNAPSHOT-shaded.jar org.apache.rya.accumulo.mr.RdfFileInputTool -Dac.zk=localhost:2181 -Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret -Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples /tmp/temp.ntrips +hadoop hdfs://volume/rya.mapreduce--shaded.jar org.apache.rya.accumulo.mr.tools.RdfFileInputTool -Dac.zk=localhost:2181 -Dac.instance=accumulo -Dac.username=root -Dac.pwd=secret -Drdf.tablePrefix=triplestore_ -Drdf.format=N-Triples hdfs://volume/dir1,hdfs://volume/dir2,hdfs://volume/file1.nt ``` Options: -- rdf.tablePrefix : The tables (spo, po, osp) are prefixed with this qualifier. The tables become: (rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp -- ac.* : Accumulo connection parameters -- rdf.format : See RDFFormat from RDF4J, samples include (Trig, N-Triples, RDF/XML) -- sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity : If any of these are set to true, statements will also be +- **rdf.tablePrefix** - The tables (spo, po, osp) are prefixed with this qualifier. +The tables become: (rdf.tablePrefix)spo,(rdf.tablePrefix)po,(rdf.tablePrefix)osp +- **ac.*** - Accumulo connection parameters +- **rdf.format** - See RDFFormat from RDF4J, samples include (Trig, N-Triples, RDF/XML) +- **sc.use_freetext, sc.use_geo, sc.use_temporal, sc.use_entity** - If any of these are set to true, statements will also be added to the enabled secondary indices. -- sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates: If the associated indexer is enabled, these options specify +- **sc.freetext.predicates, sc.geo.predicates, sc.temporal.predicates** - If the associated indexer is enabled, these options specify which statements should be sent to that indexer (based on the predicate). If not given, all indexers will attempt to index all statements. -The argument is the directory/file to load. This file needs to be loaded into HDFS before running. If loading a directory, all files should have the same RDF -format. +The positional argument is a comma separated list of directories/files to load. +They need to be loaded into HDFS before running. If loading a directory, +all files should have the same RDF format. + +Once the data is loaded, it is actually a good practice to compact your tables. +You can do this by opening the accumulo shell shell and running the compact +command on the generated tables. Remember the generated tables will be +prefixed by the rdf.tablePrefix property you assigned above. +The default tablePrefix is `rts`. --- End diff -- "rts" is an unexpected default for table prefix. I think it's "rya_" everywhere else in the application. Is this the value this tool was already using? ---
[GitHub] incubator-rya issue #298: RYA-497 Make the Rya Accumulo Kafka Connect Sink b...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/298 Bump. ---
[GitHub] incubator-rya issue #298: RYA-497 Make the Rya Accumulo Kafka Connect Sink b...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/298 Could somebody pull this in? ---
[GitHub] incubator-rya pull request #298: RYA-497 Make the Rya Accumulo Kafka Connect...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/298 RYA-497 Make the Rya Accumulo Kafka Connect Sink batch write instead ⦠â¦of flushing after every Statement. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-497 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/298.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #298 commit 7e0862a4452bab889c0680c3a7c63f104d898f7e Author: kchilton2 Date: 2018-05-22T22:02:49Z RYA-497 Make the Rya Accumulo Kafka Connect Sink batch write instead of flushing after every Statement. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r188060281 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java --- @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandler; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer> { +private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + +private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public Set deserialize(final String topic, final byte[] data) { +if(data == null || data.length == 0) { +// Return null because that is the contract of this method. +return null; +} + +try { +final RDFParser parser = PARSER_FACTORY.getParser(); +final Set statements = new HashSet<>(); + +parser.setRDFHandler(new RDFHandler() { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187742000 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- I mean that's true, but I don't think it's worth documenting. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187736288 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private MongoRyaSinkConfig config = null; + +@Override +public void start(final Map props) { +this.config = new MongoRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187736164 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java --- @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + +public static final String HOSTNAME = "mongo.hostname"; +private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use."; --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187736039 --- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.test.mongo.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoRyaSinkTask}. + */ +public class MongoRyaSinkTaskIT extends MongoITBase { + +@Test +public void instanceExists() throws Exception { +// Install an instance of Rya. +final String ryaInstanceName = "rya"; +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +super.getMongoHostname(), +super.getMongoPort(), +Optional.empty(), +Optional.empty()); + +final InstallConfiguration installConfig = InstallConfiguration.builder() +.setEnableTableHashPrefix(false) +.setEnableEntityCentricIndex(false) +.setEnableFreeTextIndex(false) +.setEnableTemporalIndex(false) +.setEnablePcjIndex(false) +.setEnableGeoIndex(false) +.build(); + +final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient()); +ryaClient.getInstall().install(ryaInstanceName, installConfig); + +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Map config = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + +// This will pass because the Rya instance exists. +task.start(config); +} finally { +task.stop(); +} +} + +@Test(expected = ConnectException.class) +public void instanceDoesNotExist() throws Exception { +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Map config = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist"); + +// Starting the task will fail because the Rya instance does not exist. +task.start(config); +} finally { +task.stop(); +} +} + +// TODO show that inserts using visibilities work. --- End diff -- Oh yea, we're not supporting that. Lemme delete that comment. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187735910 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +if(records.isEmpty()) { +return; +} + +// If a transaction has not been started yet, then start one. +if(!conn.isActive()) { +conn.begin(); +} +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187735067 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- "/** * Indicates that all members of the class or package should be annotated with the default value of the supplied * annotation class. This would be used for behavior annotations such as @NonNull, @CheckForNull, * or @CheckReturnValue. In particular, you can use @DefaultAnnotation(NonNull.class) on a class or package, * and then use @Nullable only on those parameters, methods or fields that you want to allow to be null. */" ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734692 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private AccumuloRyaSinkConfig config = null; + +@Override +public void start(final Map props) { +this.config = new AccumuloRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734887 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- It indicates by default the parameters are not null. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734448 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- I think saying I'm using the RDF4J Rio Binary format is more useful than indicating how I went about doing that since that's what the code is. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734176 --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java --- @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link AccumuloRyaSinkConfig}. + */ +public class AccumuloRyaSinkConfigTest { + +@Test +public void parses() { --- End diff -- Could you give an example of a malformed field? Do you just mean fields that are not part of the schema? That's not illegal. They just get ignored. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733815 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( --- End diff -- That's true. Feel free to write an improvement ticket for that. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733573 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- That field is nullable because this is a stateful object, but the parameters into the start(...) function may not be null. I'll add a null check there. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733216 --- Diff: extras/kafka.connect/README.md --- @@ -0,0 +1,22 @@ + + +The parent project for all Rya Kafka Connect work. All projects thare are part +of that system must use this project's pom as their parent pom. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732940 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- I don't wan to mess with how our configuration objects are initialized for the scope of this ticket. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732007 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- For me it is. I don't really want to refactor the entire method name in this review, though. I just needed it to be static so that I could use it. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731624 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ -public void setUseAggregationPipeline(boolean value) { +public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public List> getOptimizers() { -List> optimizers = super.getOptimizers(); +final List> optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { -Class cl = AggregationPipelineQueryOptimizer.class; +final Class cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") +final --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731454 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} +} catch(final RyaClientException e) { +throw new ConnectException
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731377 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731146 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java --- @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.kafka.connect.api.StatementsSerializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class WriteStatementsCommand implements RyaKafkaClientCommand { +private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class); + +/** + * Command line parameters that are used by this command to configure itself. + */ +public static class WriteParameters extends KafkaParameters { +@Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") +public String statementsFile; +} + +@Override +public String getCommand() { +return "write"; +} + +@Override +public String getDescription() { +return "Writes Statements to the specified Kafka topic."; +} + +@Override +public boolean validArguments(final String[] args) { +boolean valid = true; +try { +new JCommander(new WriteParameters(), args); +} catch(final ParameterException e) { +valid = false; +} +return valid; +} + +/** + * @return Describes what arguments may be provided to the command. + */ +@Override +public String getUsage() { +final JCommander parser = new JCommander(new WriteParameters()); + +final StringBuilder usage = new StringBuilder(); +parser.usage(usage); +return usage.toString(); +} + +@Override +public void execute(final String[] args) throws ArgumentsException, ExecutionException { +requireNonNull(args); + +// Parse the command line arguments. +final WriteParameters params = new WriteParameters(); +try { +new JCommander(params, args); +} catch(final ParameterException e) { +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731028 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java --- @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException; +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand; +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format + * the Rya Kafka Connect Sinks expect. + */ +@DefaultAnnotation(NonNull.class) +public class CLIDriver { + +/** + * Maps from command strings to the object that performs the command. + */ +private static final ImmutableMap COMMANDS; +static { +final Set> commandClasses = new HashSet<>(); +commandClasses.add(ReadStatementsCommand.class); +commandClasses.add(WriteStatementsCommand.class); +final ImmutableMap.Builder builder = ImmutableMap.builder(); +for(final Class commandClass : commandClasses) { +try { +final RyaKafkaClientCommand command = commandClass.newInstance(); +builder.put(command.getCommand(), command); +} catch (InstantiationException | IllegalAccessException e) { +System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor."); +e.printStackTrace(); +} +} +COMMANDS = builder.build(); +} + +private static final String USAGE = makeUsage(COMMANDS); + +public static void main(final String[] args) { +// If no command provided or the command isn't recognized, then print the usage. +if (args.length == 0 || !COMMANDS.containsKey(args[0])) { +System.out.println(USAGE); +System.exit(1); +} + +// Fetch the command that will be executed. +final String command = args[0]; +final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); +final RyaKafkaClientCommand clientCommand = COMMANDS.get(command); + +// Print usage if the arguments are invalid for the command. +if(!clientCommand.validArguments(commandArgs)) { +System.out.println(clientCommand.getUsage()); +System.exit(1); +} + +// Execute the command. +try { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730865 --- Diff: extras/kafka.connect/client/pom.xml --- @@ -0,0 +1,135 @@ + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.rya +rya.kafka.connect.parent +4.0.0-incubating-SNAPSHOT + + +rya.kafka.connect.client + +Apache Rya Kafka Connect - Client +Contains a client that may be used to load Statements into + a Kafka topic to be read by Kafka Connect. + + + + +org.apache.rya +rya.sail + + +org.apache.rya +rya.kafka.connect.api + + + + +org.eclipse.rdf4j +rdf4j-model + + +com.google.guava +guava + + +com.beust +jcommander + + +com.github.stephenc.findbugs +findbugs-annotations + + +org.apache.kafka +kafka-clients + + + --- End diff -- It does, good call. Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730515 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +if(records.isEmpty()) { +return; +} + +// If a transaction has not been started yet, then start one. +if(!conn.isActive()) { +conn.begin(); +} +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729976 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerializer implements Serializer> { +private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class); + +private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public byte[] serialize(final String topic, final Set data) { +if(data == null) { +// Returning null because that is the contract of this method. +return null; +} + +// Write the statements using a Binary RDF Writer. +final ByteArrayOutputStream boas = new ByteArrayOutputStream(); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730101 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.sink.SinkConnector; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkConnector extends SinkConnector { + +/** + * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked. + * + * Only called after start has been invoked + * + * @return The configuration object for the connector. + * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet. + */ +protected abstract AbstractConfig getConfig() throws IllegalStateException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN"; +} + +@Override +public List> taskConfigs(final int maxTasks) { +final List> configs = new ArrayList<>(maxTasks); +for(int i = 0; i < maxTasks; i++) { +configs.add( getConfig().originalsStrings() ); +} +return configs; +} + +@Override +public void stop() { +// Nothing to do since the RyaSinkconnector has no background monitoring. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729813 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java --- @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandler; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer> { +private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + +private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public Set deserialize(final String topic, final byte[] data) { +if(data == null || data.length == 0) { +// Return null because that is the contract of this method. +return null; +} + +try { +final RDFParser parser = PARSER_FACTORY.getParser(); +final Set statements = new HashSet<>(); + +parser.setRDFHandler(new RDFHandler() { +@Override +public void handleStatement(final Statement statement) throws RDFHandlerException { +log.debug("Statement: " + statement); +statements.add( statement ); +} + +@Override public void startRDF() throws RDFHandlerException { } +@Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { } +@Override public void handleComment(final String arg0) throws RDFHandlerException { } +@Override public void endRDF() throws RDFHandlerException { } +}); + +parser.parse(new ByteArrayInputStream(data), null); +return statements; + +} catch(final RDFParseException | RDFHandlerException | IOException e) { +log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729543 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729286 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- I'm assuming rya.api. Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729157 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187728842 --- Diff: extras/kafka.connect/accumulo-it/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTaskIT.java --- @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloRyaSinkTask}. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187727065 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +fina
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725947 --- Diff: extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java --- @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.shell.util.InstallPrompt; +import org.apache.rya.shell.util.PasswordPrompt; +import org.apache.rya.shell.util.SparqlPrompt; +import org.junit.Test; +import org.springframework.context.ApplicationContext; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.CommandResult; +import org.springframework.shell.core.JLineShellComponent; + +import com.google.common.base.Optional; + +/** + * Integration tests the methods of {@link RyaCommands}. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725865 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + +@Test +public void queryFindsAllLoadedStatements_fromSet() throws Exception { +// Using the Rya Client, install an instance of Rya for the test. +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); + +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + +final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); +client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + +// Load some data into the instance. +final ValueFactory vf = SimpleValueFactory.getInstance(); +final Set statements = Sets.newHashSet( +vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), +vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), +vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); +client.getLoadStatements().loadStatements(ryaInstance, statements); + +// Execute a query. +final Set fetched = new HashSet<>(); +try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { +while(result.hasNext()) { +final BindingSet bs = result.next(); + +// If this is the statement that indicates the Rya version + if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version"))) { +continue; +} + +// Otherwise add it to the list of fetched statements. +fetched.add( vf.createStatement( +(Resource
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725795 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + +@Test +public void queryFindsAllLoadedStatements_fromSet() throws Exception { +// Using the Rya Client, install an instance of Rya for the test. +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); + +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + +final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); +client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + +// Load some data into the instance. +final ValueFactory vf = SimpleValueFactory.getInstance(); +final Set statements = Sets.newHashSet( +vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), +vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), +vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); +client.getLoadStatements().loadStatements(ryaInstance, statements); + +// Execute a query. +final Set fetched = new HashSet<>(); +try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { +while(result.hasNext()) { +final BindingSet bs = result.next(); + +// If this is the statement that indicates the Rya version + if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version"))) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187724995 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657223 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +fina
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657023 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final Map taskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} +} catch(final RyaClientException e) { +throw new ConnectException
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/296 RYA-487 Kafka Connect Sinks Ignore RYA-487. That's a separate PR that this depended on to create documentation for the manual. This PR implements Kafka Connect Sinks for both Accumulo and MongoDB backed Rya. I wrote a manual page that explains design decisions that were made, notes about deploying the sinks, a quick start, and future work we may want to consider. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-487 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #296 commit ed1def04283a2e893a1c45014628dc7fe10717eb Author: kchilton2 Date: 2018-05-08T22:33:24Z RYA-494 Fixed a bug where the shell was not loading or displaying all Statements. commit c2d802bd51d4d6951aa54e48b4038d64462bd8fd Author: kchilton2 Date: 2018-04-17T19:10:26Z RYA-487 Implement Kafka Connect Sink implementations for Accumulo and Mongo DB backed Rya. ---
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/295 RYA-494 Fixed a bug where the shell was not loading or displaying all⦠⦠Statements. This ended up being a display bug. The code that was iterating and display binding sets was skipping one. I left a bunch of tests in that helped narrow down where the bug was. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-494 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #295 commit 942c3420bb14bafca4cde6d0b06a3ef100f07772 Author: kchilton2 Date: 2018-05-08T22:33:24Z RYA-494 Fixed a bug where the shell was not loading or displaying all Statements. ---
[GitHub] incubator-rya issue #293: RYA-491 Repackaged reusable Mongo DB test code int...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/293 Updated. ---
[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/293#discussion_r184151510 --- Diff: dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAO2IT.java --- @@ -40,11 +40,11 @@ /** * Integration tests the methods of {@link MongoDBRyaDAO}. */ -public class MongoDBRyaDAO2IT extends MongoITBase { +public class MongoDBRyaDAO2IT extends MongoRyaITBase { --- End diff -- I'm not messing with classes that already exist except to deal with my repackaging. ---
[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/293#discussion_r184151567 --- Diff: dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java --- @@ -423,28 +423,28 @@ public void testRequiredTimestamp() throws Exception { + " ?A rdfs:subClassOf ?B .\n" + " ?B rdfs:subClassOf ?A .\n" + "}"; -List varNames = Arrays.asList("A", "B"); -Multiset expectedSolutions = HashMultiset.create(); +final List varNames = Arrays.asList("A", "B"); +final Multiset expectedSolutions = HashMultiset.create(); expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON)); expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person)); expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING)); expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing)); // Prepare query and convert to pipeline -QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); -SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); +final QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr()); +final SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection()); queryTree.visit(visitor); Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode); -AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); +final AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg(); // Extend the pipeline by requiring a timestamp of zero (should have no effect) pipelineNode.requireSourceTimestamp(0); -Multiset solutions = HashMultiset.create(); +final Multiset solutions = HashMultiset.create(); CloseableIteration iter = pipelineNode.evaluate(new QueryBindingSet()); while (iter.hasNext()) { solutions.add(iter.next()); } Assert.assertEquals(expectedSolutions, solutions); // Extend the pipeline by requiring a future timestamp (should produce no results) -long delta = 1000 * 60 * 60 * 24; +final long delta = 1000 * 60 * 60 * 24; --- End diff -- I'm not messing with classes that already exist except to deal with my repackaging. ---
[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/293#discussion_r184151234 --- Diff: pom.xml --- @@ -381,6 +458,26 @@ under the License. rya.indexing.pcj ${project.version} + +org.apache.rya +rya.kafka.connect.parent --- End diff -- Yep, not sure how that got in there. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 Eric, please close this. ---
[GitHub] incubator-rya pull request #293: RYA-491 Repackaged reusable Mongo DB test c...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/293 RYA-491 Repackaged reusable Mongo DB test code into a project named '⦠â¦rya.test.mongo'. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-491 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/293.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #293 commit 6752b3749eb851f9da74f75a516d49e1b24fc2c5 Author: kchilton2 Date: 2018-04-19T19:18:38Z RYA-489 Moved common Accumulo integration test code to the rya.test.accumulo project. commit 5d973efe08a5f9d3b6e2f0a8ac8475baf12f71d8 Author: kchilton2 Date: 2018-04-24T20:32:05Z RYA-491 Repackaged reusable Mongo DB test code into a project named 'rya.test.mongo'. ---
[GitHub] incubator-rya issue #292: Rya 489
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/292 Could somebody code review this or pull it in? ---
[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/290 Could somebody code review this or pull it in? ---
[GitHub] incubator-rya issue #245: [RYA-405] Migrate from Sesame to rdf4j libs
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/245 This can be closed. Eric finished this work and it has been pulled into master. ---
[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/290 asfbot build ---
[GitHub] incubator-rya issue #289: RYA-486 Updated the project's version to 4.0.0-inc...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/289 Could a committer pull this in? ---
[GitHub] incubator-rya issue #291: RYA-405 Migration of OpenRDF Sesame libraries to R...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/291 I'd say ship it. ---
[GitHub] incubator-rya pull request #292: Rya 489
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/292 Rya 489 This is pretty much just a repackaging commit. I moved the generic accumulo test code to a new "rya.test.accumulo" project. I had to introduce some code for performing installation steps to one of the test classes. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-489 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/292.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #292 commit 9f8dc746f9e610b6e895acaf39c36c6e96a11f4e Author: kchilton2 Date: 2018-04-17T04:20:13Z RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT. commit 46f582771e1dfe575522ca82e303a8ad4077f529 Author: Jorge Machado Date: 2017-10-18T09:32:39Z RYA-405 Migration of OpenRDF Sesame libraries to RDF4J Co-authored-by: eric.white Co-authored-by: Jorge Machado commit afd2b3ec91a28622a195cb1f1f45d5c6297d350d Author: eric.white Date: 2018-04-19T18:28:51Z RYA-405 PR Updates commit 2e692c29f4b2fd30aae52ee863b4776aee9628d6 Author: kchilton2 Date: 2018-04-19T19:18:38Z RYA-489 Moved common Accumulo integration test code to the rya.test.accumulo project. ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182825471 --- Diff: extras/vagrantExample/src/main/vagrant/Vagrantfile --- @@ -52,8 +52,8 @@ Vagrant.configure(2) do |config| export ACCUMULO_VERSION=1.6.5 ###export ACCUMULO_VERSION=1.7.1 export HADOOP_VERSION=2.7.2 -export RYA_EXAMPLE_VERSION=3.2.10-SNAPSHOT -export SESAME_VERSION=2.7.6 +export RYA_EXAMPLE_VERSION=3.2.13-SNAPSHOT --- End diff -- This will need to be 4.0.0. This commit should be after the version update commit. ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182596150 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/Node.java --- @@ -21,14 +21,14 @@ -import org.openrdf.model.impl.URIImpl; +import org.eclipse.rdf4j.model.impl.SimpleIRI; /** * A Node is an expected node in the global graph. This typing of the URI allows us to dictate the difference between a --- End diff -- "This typing of the [IRI] ..." ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182596798 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/persist/index/RyaSecondaryIndexer.java --- @@ -19,31 +19,29 @@ * under the License. */ - import java.io.Closeable; import java.io.Flushable; import java.io.IOException; import java.util.Collection; import java.util.Set; import org.apache.hadoop.conf.Configurable; -import org.openrdf.model.URI; - import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; +import org.eclipse.rdf4j.model.IRI; public interface RyaSecondaryIndexer extends Closeable, Flushable, Configurable { /** * initialize after setting configuration. */ -public void init(); + public void init(); --- End diff -- Tab ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182596437 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/VarNameUtils.java --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.domain; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.algebra.Var; +import org.eclipse.rdf4j.query.algebra.helpers.TupleExprs; + +/** + * Utility methods and constants for RDF {@link Var} names. + */ +public final class VarNameUtils { --- End diff -- Us needing this class at all is kind of a smell. Maybe note that this is discouraged if there's any other way to figure out if your Var is constant or anonymous. I think there are methods that tell you that information. ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182596172 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/Node.java --- @@ -21,14 +21,14 @@ -import org.openrdf.model.impl.URIImpl; +import org.eclipse.rdf4j.model.impl.SimpleIRI; /** * A Node is an expected node in the global graph. This typing of the URI allows us to dictate the difference between a * URI that is just an Attribute on the subject vs. a URI that is another subject Node in the global graph. It does not --- End diff -- IRI ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182602468 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/IndexPlanValidator/ThreshholdPlanSelector.java --- @@ -111,9 +110,9 @@ public double getCost(TupleExpr te, double indexWeight, double commonVarWeight, double dirProductScale; if(queryNodeCount > nodeCount) { -dirProductScale = 1/((double)(queryNodeCount - nodeCount)); +dirProductScale = 1/ (queryNodeCount - nodeCount); --- End diff -- Are we sure about getting rid of the double casts here? We don't lose precision in the division? ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182601712 --- Diff: dao/accumulo.rya/pom.xml --- @@ -43,25 +43,30 @@ under the License. -org.openrdf.sesame -sesame-rio-ntriples +org.eclipse.rdf4j +rdf4j-rio-ntriples -org.openrdf.sesame -sesame-rio-nquads +org.eclipse.rdf4j +rdf4j-rio-nquads -org.openrdf.sesame -sesame-queryalgebra-evaluation +org.eclipse.rdf4j +rdf4j-queryalgebra-evaluation + + +org.eclipse.rdf4j +rdf4j-sail-api +${org.eclipse.rdf4j.version} --- End diff -- Add to rya-project dependency management ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182596490 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/domain/VarNameUtils.java --- @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.domain; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.algebra.Var; +import org.eclipse.rdf4j.query.algebra.helpers.TupleExprs; + +/** + * Utility methods and constants for RDF {@link Var} names. + */ +public final class VarNameUtils { +private static final ValueFactory VF = SimpleValueFactory.getInstance(); + +/** + * Prepended to the start of constant var names. + */ +public static final String CONSTANT_PREFIX = "_const_"; +private static final String LEGACY_CONSTANT_PREFIX = "-const-"; + +/** + * Prepended to the start of anonymous var names. + */ +public static final String ANONYMOUS_PREFIX = "_anon_"; +private static final String LEGACY_ANONYMOUS_PREFIX = "-anon-"; + +/** + * Private constructor to prevent instantiation. + */ +private VarNameUtils() { +} + +/** + * Prepends the constant prefix to the specified value. + * @param value the value to add the constant prefix to. + * @return the value with the constant prefix attached before it. + */ +public static String prependConstant(final String value) { --- End diff -- Params and return type are @ Nullable ---
[GitHub] incubator-rya pull request #291: RYA-405 Migration of OpenRDF Sesame librari...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/291#discussion_r182602289 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/FilterFunctionOptimizer.java --- @@ -157,18 +156,18 @@ private void buildQuery(final TupleExpr tupleExpr, final StatementPattern matchS tupleExpr.visit(fVisitor); final List results = Lists.newArrayList(); for(int i = 0; i < fVisitor.func.size(); i++){ -results.add(new IndexingExpr(fVisitor.func.get(i), matchStatement, fVisitor.args.get(i))); +results.add(new IndexingExpr(fVisitor.func.get(i), matchStatement, Arrays.stream(fVisitor.args.get(i)).toArray())); --- End diff -- This is kind of a weird change. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 This can be declined. ---
[GitHub] incubator-rya issue #290: RYA-488 Moved the Geo Indexing specific maven repo...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/290 That test failing has nothing to do with this commit. We should open a defect and fix the test. ---
[GitHub] incubator-rya pull request #290: RYA-488 Moved the Geo Indexing specific mav...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/290 RYA-488 Moved the Geo Indexing specific maven repositories into the g⦠You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-488 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/290.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #290 commit 80a3b76afc3d7aa8391ea7228de1019d7e644529 Author: kchilton2 Date: 2018-04-17T21:10:59Z RYA-488 Moved the Geo Indexing specific maven repositories into the geoindexing profile. ---
[GitHub] incubator-rya issue #245: [RYA-405] Migrate from Sesame to rdf4j libs
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/245 @ejwhite922 Are you able to squash down the history where it makes sense so that this won't be messy when it goes in? ---
[GitHub] incubator-rya pull request #289: RYA-486 Updated the project's version to 4....
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/289 RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-486 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/289.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #289 commit 9f8dc746f9e610b6e895acaf39c36c6e96a11f4e Author: kchilton2 Date: 2018-04-17T04:20:13Z RYA-486 Updated the project's version to 4.0.0-incubating-SNAPSHOT. ---
[GitHub] incubator-rya pull request #285: RYA-469 Added tests for Rya Streams join it...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/285#discussion_r178385888 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/join/KeyValueJoinStateStore.java --- @@ -77,30 +77,39 @@ /** * This is the maximum value of a UTF-8 character. */ -private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0XFF }, Charsets.UTF_8); +private static final String END_RANGE_SUFFIX = new String(new byte[] { (byte) 0xFF }, Charsets.UTF_8); + +/** + * Indicates where the end of the join variables occurs. + */ +private static final String JOIN_VAR_END_MARKER = new String("~!^~".getBytes(Charsets.UTF_8), Charsets.UTF_8); --- End diff -- I'm not sure that the marker should be in the visually renderable byte ranges since, while very unlikely, they could legitimately appear in the data that is being indexed. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 Also, to uninstall it I was using rpm -qa | grep rya Then whatever that reported as the installed package I used rpm -e That is what is not working. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 I used the RPM you gave me built using this patch when it did not work. I had never installed the bad state RPM. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 https://github.com/apache/incubator-rya/pull/284 ---
[GitHub] incubator-rya pull request #284: Reverting the RPM build to use the RPM comm...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/284 Reverting the RPM build to use the RPM command again. The RPM build i⦠â¦s now activated using a profile. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya fix-rpm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/284.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #284 commit c801f69f79547f4cd36e5e996693e7f767fd564a Author: Kevin Chilton Date: 2018-03-15T21:29:49Z Reverting the RPM build to use the RPM command again. The RPM build is now activated using a profile. ---
[GitHub] incubator-rya issue #282: RYA-443 Fixed Rya Streams Query Manager RPM versio...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/282 This patch does not create an RPM that can be uninstalled. I'm going to revert all of these changes that Eric made to how we build RPMs out, and put my stuff back in. I'll make the RPM build a profile so Windows users can still build our app. ---
[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...
GitHub user kchilton2 reopened a pull request: https://github.com/apache/incubator-rya/pull/273 Updating the Rya Manual to include Rya Streams. ## Description Wrote the manual page for Rya Streams. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya manual-improvements Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/273.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #273 commit f9c579af6997dbb66405e185ef4bf71eaa56e7d5 Author: kchilton2 Date: 2018-02-09T17:33:36Z Updating the Rya Manual to include Rya Streams. ---
[GitHub] incubator-rya issue #273: Updating the Rya Manual to include Rya Streams.
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/273 Never mind. This one was not included. ---
[GitHub] incubator-rya issue #272: RYA-443 Rya Streams Query Manager daemon program.
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/272 I'm closing this because https://github.com/apache/incubator-rya/pull/279 takes it over. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 closed the pull request at: https://github.com/apache/incubator-rya/pull/272 ---
[GitHub] incubator-rya issue #275: RYA-466 Update the Rya Streams Client to stream re...
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/275 I'm closing this because https://github.com/apache/incubator-rya/pull/279 takes it over. ---
[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...
Github user kchilton2 closed the pull request at: https://github.com/apache/incubator-rya/pull/273 ---
[GitHub] incubator-rya issue #273: Updating the Rya Manual to include Rya Streams.
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/273 I'm closing this because https://github.com/apache/incubator-rya/pull/279 takes it over. ---
[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...
Github user kchilton2 closed the pull request at: https://github.com/apache/incubator-rya/pull/275 ---
[GitHub] incubator-rya pull request #277: Rya 460
Github user kchilton2 closed the pull request at: https://github.com/apache/incubator-rya/pull/277 ---
[GitHub] incubator-rya issue #277: Rya 460
Github user kchilton2 commented on the issue: https://github.com/apache/incubator-rya/pull/277 I'm closing this because https://github.com/apache/incubator-rya/pull/279 takes it over. ---
[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/275#discussion_r171065067 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java --- @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import static java.util.Objects.requireNonNull; + +import java.util.regex.Pattern; + +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A utility class that is used to glean insight into the structure of SPARQL queries. + */ +@DefaultAnnotation(NonNull.class) +public class QueryInvestigator { + +private static final SPARQLParser PARSER = new SPARQLParser(); + +private QueryInvestigator() { } + +/** + * Determines whether a SPARQL command is a CONSTRUCT or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ +public static boolean isConstruct(final String sparql) throws MalformedQueryException { +requireNonNull(sparql); + +try { +// Constructs are queries, so try to create a ParsedQuery. +PARSER.parseQuery(sparql, null); + +// Check to see if the SPARQL looks like a CONSTRUCT query. +return Pattern.matches(".*?construct.*?where.*", sparql.toLowerCase()); --- End diff -- Just write a better regex. ---
[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/275#discussion_r171064989 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java --- @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import static java.util.Objects.requireNonNull; + +import java.util.regex.Pattern; + +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A utility class that is used to glean insight into the structure of SPARQL queries. + */ +@DefaultAnnotation(NonNull.class) +public class QueryInvestigator { + +private static final SPARQLParser PARSER = new SPARQLParser(); + +private QueryInvestigator() { } + +/** + * Determines whether a SPARQL command is a CONSTRUCT or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ +public static boolean isConstruct(final String sparql) throws MalformedQueryException { +requireNonNull(sparql); + +try { +// Constructs are queries, so try to create a ParsedQuery. +PARSER.parseQuery(sparql, null); + +// Check to see if the SPARQL looks like a CONSTRUCT query. +return Pattern.matches(".*?construct.*?where.*", sparql.toLowerCase()); --- End diff -- Any SELECT could also create a ParsedGraphQuery, so that will not work. ---
[GitHub] incubator-rya pull request #275: RYA-466 Update the Rya Streams Client to st...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/275#discussion_r171032022 --- Diff: common/rya.api/src/main/java/org/apache/rya/api/utils/QueryInvestigator.java --- @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.utils; + +import static java.util.Objects.requireNonNull; + +import java.util.regex.Pattern; + +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A utility class that is used to glean insight into the structure of SPARQL queries. + */ +@DefaultAnnotation(NonNull.class) +public class QueryInvestigator { + +private static final SPARQLParser PARSER = new SPARQLParser(); + +private QueryInvestigator() { } + +/** + * Determines whether a SPARQL command is a CONSTRUCT or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is a CONSTRUCT query; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ +public static boolean isConstruct(final String sparql) throws MalformedQueryException { +requireNonNull(sparql); + +try { +// Constructs are queries, so try to create a ParsedQuery. +PARSER.parseQuery(sparql, null); + +// Check to see if the SPARQL looks like a CONSTRUCT query. +return Pattern.matches(".*?construct.*?where.*", sparql.toLowerCase()); + +} catch(final MalformedQueryException queryE) { +try { +// Maybe it's an update. +PARSER.parseUpdate(sparql, null); + +// It was, so return false. +return false; + +} catch(final MalformedQueryException updateE) { +// It's not. Actually malformed. +throw queryE; +} +} +} + +/** + * Determines whether a SPARQL command is an INSERT with a WHERE clause or not. + * + * @param sparql - The SPARQL to evaluate. (not null) + * @return {@code true} if the provided SPARQL is an INSERT update; otherwise {@code false}. + * @throws MalformedQueryException The SPARQL is neither a well formed query or update. + */ +public static boolean isInsertWhere(final String sparql) throws MalformedQueryException { +requireNonNull(sparql); + +try { +// Inserts are updated, so try to create a ParsedUpdate. +PARSER.parseUpdate(sparql, null); + +// Check to see if the SPARQL looks like an INSERT query. +return Pattern.matches(".*?insert.*?where.*", sparql.toLowerCase()); --- End diff -- .*? is part of the regex. It will not match on ?where. But yea, it will def hit the 'where' part. ---
[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/273#discussion_r170711583 --- Diff: extras/rya.manual/src/site/markdown/rya-streams.md --- @@ -0,0 +1,385 @@ + + +# Rya Streams + +Introduced in 3.2.12 + +## Disclaimer +This is a Beta feature. We do not guarantee newer versions of Rya Streams +will be compatible with this version. You may need to remove all of your +queries and their associated data from your Rya Streams system and then +reprocess them using the upgraded system. + +# Table of Contents +- [Introduction](#introduction) +- [Architecture](#architecture) +- [Quick Start](#quick-start) +- [Use Cases](#use-cases) +- [Future Work](#future-work) + + + +## Introduction +Rya Streams is a system that processes SPARQL queries over streams of RDF +Statements that may have Visibilities attached to them. It does this by +utilizing Kafka as a data processing platform. + +There are three basic building blocks that the system depends on: + +* **Streams Query** - This is a SPARQL query that is registered wth Rya Streams. + It is associated with a specific Rya instance because that Rya instance + determines which Statements the query will evaluate. It has an ID that + uniquely identifies it across all of the queries that are managed by the + system, whether or not the system should be processing it, and whether or not + the results of the query needs to be inserted back into the Rya instance the + source statements come from. + +* **Query Change Log** - A list of changes that have been performed to the + Streams Queries of a specific Rya Instance. This log contains the absolute + truth about what queries are registered, which are running, and which generate + new statements that need to be inserted back into Rya. + +* **Query Manager** - A daemon application that reacts to new/deleted Query + Change Logs as well as new entries within those logs. It starts and stops + Kafka Streams processing jobs for the active Streams Queries. + +The Quick Start section explains how the Rya Streams Client is used to interact +with the system using a simple SPARQL query and some sample Statements. + + + +## Architecture ## + +The following image is a high level view of how Rya Streams interacts with +Rya to process queries. + +![alt text](../images/rya-streams-high-level.png) + +1. The Rya Streams client is used to register/update/delete queries. +2. Rya Streams notices the change starts/stops processing a query based on + what the change was. +3. Statements are discovered for ingest by whatever application is loading data + into Rya. +4. Those Statements are written to Rya Streams so that the Streams Queries may + process them and produce results. +5. Those Statements are also written to the Rya instance for ad-hoc querying. +6. Rya Streams produces Visibility Binding Sets and/or Visibility Statements + that are written back to Rya. +7. Those same Visibility Binding Sets and/or Visibility Statements are made + available to other systems as well. + + + +## Quick Start ## +This tutorial demonstrates how to install and start the Rya Streams system. It +must be configured and running on its own before any Rya instances may use it. +After performing the steps of this quick start, you will have installed the +system, demonstrated that it is functioning properly, and then may use the Rya +Shell to configure Rya instances to use it. + +This tutorial assumes you are starting fresh and have no existing Kafka, or +Zookeeper data. The two services must already be installed and running. + +### Step 1: Download the applications ### + +You can fetch the artifacts you need to follow this Quick Start from our +[downloads page](http://rya.apache.org/download/). Click on the release of +interest and follow the "Central repository for Maven and other dependency +managers" URL. + +Fetch the following two artifacts: + +Artifact Id | Type +--- | --- +rya.streams.client | shaded jar +rya.streams.query-manager | rpm + +### Step 2: Install the Query Manager ### + +Copy the RPM to the CentOS 7 machine the Query Manager will be installed on. +Install it using the following command: + +``` +yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm +``` + +It will install the program to **/opt/rya-streams-query-manager-3.2.12**. Follow +the directions that are in the README.txt file within that directory to finish +configuration. + +### Step 3: Register a Str
[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/273#discussion_r170711002 --- Diff: extras/rya.manual/src/site/markdown/rya-streams.md --- @@ -0,0 +1,385 @@ + + +# Rya Streams + +Introduced in 3.2.12 + +## Disclaimer +This is a Beta feature. We do not guarantee newer versions of Rya Streams +will be compatible with this version. You may need to remove all of your +queries and their associated data from your Rya Streams system and then +reprocess them using the upgraded system. + +# Table of Contents +- [Introduction](#introduction) +- [Architecture](#architecture) +- [Quick Start](#quick-start) +- [Use Cases](#use-cases) +- [Future Work](#future-work) + + + +## Introduction +Rya Streams is a system that processes SPARQL queries over streams of RDF +Statements that may have Visibilities attached to them. It does this by +utilizing Kafka as a data processing platform. + +There are three basic building blocks that the system depends on: + +* **Streams Query** - This is a SPARQL query that is registered wth Rya Streams. + It is associated with a specific Rya instance because that Rya instance + determines which Statements the query will evaluate. It has an ID that + uniquely identifies it across all of the queries that are managed by the + system, whether or not the system should be processing it, and whether or not + the results of the query needs to be inserted back into the Rya instance the + source statements come from. + +* **Query Change Log** - A list of changes that have been performed to the + Streams Queries of a specific Rya Instance. This log contains the absolute + truth about what queries are registered, which are running, and which generate + new statements that need to be inserted back into Rya. + +* **Query Manager** - A daemon application that reacts to new/deleted Query + Change Logs as well as new entries within those logs. It starts and stops + Kafka Streams processing jobs for the active Streams Queries. + +The Quick Start section explains how the Rya Streams Client is used to interact +with the system using a simple SPARQL query and some sample Statements. + + + +## Architecture ## + +The following image is a high level view of how Rya Streams interacts with +Rya to process queries. + +![alt text](../images/rya-streams-high-level.png) + +1. The Rya Streams client is used to register/update/delete queries. +2. Rya Streams notices the change starts/stops processing a query based on + what the change was. +3. Statements are discovered for ingest by whatever application is loading data + into Rya. +4. Those Statements are written to Rya Streams so that the Streams Queries may + process them and produce results. +5. Those Statements are also written to the Rya instance for ad-hoc querying. +6. Rya Streams produces Visibility Binding Sets and/or Visibility Statements + that are written back to Rya. +7. Those same Visibility Binding Sets and/or Visibility Statements are made + available to other systems as well. + + + +## Quick Start ## +This tutorial demonstrates how to install and start the Rya Streams system. It +must be configured and running on its own before any Rya instances may use it. +After performing the steps of this quick start, you will have installed the +system, demonstrated that it is functioning properly, and then may use the Rya +Shell to configure Rya instances to use it. + +This tutorial assumes you are starting fresh and have no existing Kafka, or +Zookeeper data. The two services must already be installed and running. + +### Step 1: Download the applications ### + +You can fetch the artifacts you need to follow this Quick Start from our +[downloads page](http://rya.apache.org/download/). Click on the release of +interest and follow the "Central repository for Maven and other dependency +managers" URL. + +Fetch the following two artifacts: + +Artifact Id | Type +--- | --- +rya.streams.client | shaded jar +rya.streams.query-manager | rpm + +### Step 2: Install the Query Manager ### + +Copy the RPM to the CentOS 7 machine the Query Manager will be installed on. +Install it using the following command: + +``` +yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm +``` + +It will install the program to **/opt/rya-streams-query-manager-3.2.12**. Follow +the directions that are in the README.txt file within that directory to finish +configuration. + +### Step 3: Register a Str
[GitHub] incubator-rya pull request #273: Updating the Rya Manual to include Rya Stre...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/273#discussion_r170710886 --- Diff: extras/rya.manual/src/site/markdown/rya-streams.md --- @@ -0,0 +1,385 @@ + + +# Rya Streams + +Introduced in 3.2.12 + +## Disclaimer +This is a Beta feature. We do not guarantee newer versions of Rya Streams +will be compatible with this version. You may need to remove all of your +queries and their associated data from your Rya Streams system and then +reprocess them using the upgraded system. + +# Table of Contents +- [Introduction](#introduction) +- [Architecture](#architecture) +- [Quick Start](#quick-start) +- [Use Cases](#use-cases) +- [Future Work](#future-work) + + + +## Introduction +Rya Streams is a system that processes SPARQL queries over streams of RDF +Statements that may have Visibilities attached to them. It does this by +utilizing Kafka as a data processing platform. + +There are three basic building blocks that the system depends on: + +* **Streams Query** - This is a SPARQL query that is registered wth Rya Streams. + It is associated with a specific Rya instance because that Rya instance + determines which Statements the query will evaluate. It has an ID that + uniquely identifies it across all of the queries that are managed by the + system, whether or not the system should be processing it, and whether or not + the results of the query needs to be inserted back into the Rya instance the + source statements come from. + +* **Query Change Log** - A list of changes that have been performed to the + Streams Queries of a specific Rya Instance. This log contains the absolute + truth about what queries are registered, which are running, and which generate + new statements that need to be inserted back into Rya. + +* **Query Manager** - A daemon application that reacts to new/deleted Query + Change Logs as well as new entries within those logs. It starts and stops + Kafka Streams processing jobs for the active Streams Queries. + +The Quick Start section explains how the Rya Streams Client is used to interact +with the system using a simple SPARQL query and some sample Statements. + + + +## Architecture ## + +The following image is a high level view of how Rya Streams interacts with +Rya to process queries. + +![alt text](../images/rya-streams-high-level.png) + +1. The Rya Streams client is used to register/update/delete queries. +2. Rya Streams notices the change starts/stops processing a query based on + what the change was. +3. Statements are discovered for ingest by whatever application is loading data + into Rya. +4. Those Statements are written to Rya Streams so that the Streams Queries may + process them and produce results. +5. Those Statements are also written to the Rya instance for ad-hoc querying. +6. Rya Streams produces Visibility Binding Sets and/or Visibility Statements + that are written back to Rya. +7. Those same Visibility Binding Sets and/or Visibility Statements are made + available to other systems as well. + + + +## Quick Start ## +This tutorial demonstrates how to install and start the Rya Streams system. It +must be configured and running on its own before any Rya instances may use it. +After performing the steps of this quick start, you will have installed the +system, demonstrated that it is functioning properly, and then may use the Rya +Shell to configure Rya instances to use it. + +This tutorial assumes you are starting fresh and have no existing Kafka, or +Zookeeper data. The two services must already be installed and running. + +### Step 1: Download the applications ### + +You can fetch the artifacts you need to follow this Quick Start from our +[downloads page](http://rya.apache.org/download/). Click on the release of +interest and follow the "Central repository for Maven and other dependency +managers" URL. + +Fetch the following two artifacts: + +Artifact Id | Type +--- | --- +rya.streams.client | shaded jar +rya.streams.query-manager | rpm + +### Step 2: Install the Query Manager ### + +Copy the RPM to the CentOS 7 machine the Query Manager will be installed on. +Install it using the following command: + +``` +yum install -y rya.streams.query-manager-3.2.12-incubating.noarch.rpm +``` + +It will install the program to **/opt/rya-streams-query-manager-3.2.12**. Follow +the directions that are in the README.txt file within that directory to finish +configuration. + +### Step 3: Register a Str