Github user ejwhite922 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/296#discussion_r187980159
--- Diff:
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio
Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements
Deserializer<Set<Statement>> {
+ private static final Logger log =
LoggerFactory.getLogger(StatementsDeserializer.class);
+
+ private static final BinaryRDFParserFactory PARSER_FACTORY = new
BinaryRDFParserFactory();
+
+ @Override
+ public void configure(final Map<String, ?> configs, final boolean
isKey) {
+ // Nothing to do.
+ }
+
+ @Override
+ public Set<Statement> deserialize(final String topic, final byte[]
data) {
+ if(data == null || data.length == 0) {
+ // Return null because that is the contract of this method.
+ return null;
+ }
+
+ try {
+ final RDFParser parser = PARSER_FACTORY.getParser();
+ final Set<Statement> statements = new HashSet<>();
+
+ parser.setRDFHandler(new RDFHandler() {
--- End diff --
Use AbstractRDFHandler since only handleStatement() needs to be overridden
and the other overridden methods can be deleted.
---