[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472708#comment-16472708 ]
ASF GitHub Bot commented on RYA-487: ------------------------------------ Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739974 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * <p/> + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + + @Nullable + private SailRepository sailRepo = null; + + @Nullable + private SailRepositoryConnection conn = null; + + /** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ + protected abstract void checkRyaInstanceExists(final Map<String, String> taskConfig) throws ConnectException; + + /** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ + protected abstract Sail makeSail(final Map<String, String> taskConfig) throws ConnectException; + + @Override + public String version() { + return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; + } + + @Override + public void start(final Map<String, String> props) throws ConnectException { + requireNonNull(props); + + // Ensure the configured Rya Instance is installed within the configured database. + checkRyaInstanceExists(props); + + // Create the Sail object that is connected to the Rya Instance. + final Sail sail = makeSail(props); + sailRepo = new SailRepository( sail ); + conn = sailRepo.getConnection(); + } + + @Override + public void put(final Collection<SinkRecord> records) { + requireNonNull(records); + + // Return immediately if there are no records to handle. + if(records.isEmpty()) { + return; + } + + // If a transaction has not been started yet, then start one. + if(!conn.isActive()) { + conn.begin(); + } + + // Iterate through the records and write them to the Sail object. + for(final SinkRecord record : records) { + // If everything has been configured correctly, then the record's value will be a Set<Statement>. + conn.add((Set<? extends Statement>) record.value()); + } + } + + @Override + public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffets) { + // Flush the current transaction. + conn.commit(); + } + + @Override + public void stop() { + try { --- End diff -- 👍 > Kafka Connect Rya Sink > ---------------------- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature > Affects Versions: 4.0.0 > Reporter: Kevin Chilton > Assignee: Kevin Chilton > Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)