Github user isper3at commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187739974
--- Diff:
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to
Rya.
+ * <p/>
+ * Implementations of this class only need to specify functionality that
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+ private static final Logger log =
LoggerFactory.getLogger(RyaSinkTask.class);
+
+ @Nullable
+ private SailRepository sailRepo = null;
+
+ @Nullable
+ private SailRepositoryConnection conn = null;
+
+ /**
+ * Throws an exception if the configured Rya Instance is not already
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not
installed to the configured database
+ * or we were unable to figure out if it is installed.
+ */
+ protected abstract void checkRyaInstanceExists(final Map<String,
String> taskConfig) throws ConnectException;
+
+ /**
+ * Creates an initialized {@link Sail} object that may be used to
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created.
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+ protected abstract Sail makeSail(final Map<String, String> taskConfig)
throws ConnectException;
+
+ @Override
+ public String version() {
+ return Manifests.exists("Build-Version") ?
Manifests.read("Build-Version"): "UNKNOWN";
+ }
+
+ @Override
+ public void start(final Map<String, String> props) throws
ConnectException {
+ requireNonNull(props);
+
+ // Ensure the configured Rya Instance is installed within the
configured database.
+ checkRyaInstanceExists(props);
+
+ // Create the Sail object that is connected to the Rya Instance.
+ final Sail sail = makeSail(props);
+ sailRepo = new SailRepository( sail );
+ conn = sailRepo.getConnection();
+ }
+
+ @Override
+ public void put(final Collection<SinkRecord> records) {
+ requireNonNull(records);
+
+ // Return immediately if there are no records to handle.
+ if(records.isEmpty()) {
+ return;
+ }
+
+ // If a transaction has not been started yet, then start one.
+ if(!conn.isActive()) {
+ conn.begin();
+ }
+
+ // Iterate through the records and write them to the Sail object.
+ for(final SinkRecord record : records) {
+ // If everything has been configured correctly, then the
record's value will be a Set<Statement>.
+ conn.add((Set<? extends Statement>) record.value());
+ }
+ }
+
+ @Override
+ public void flush(final Map<TopicPartition, OffsetAndMetadata>
currentOffets) {
+ // Flush the current transaction.
+ conn.commit();
+ }
+
+ @Override
+ public void stop() {
+ try {
--- End diff --
ð
---