This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6e9b6c2 Issue #2757: Kafka source connector should just transfer bytes (#2761) 6e9b6c2 is described below commit 6e9b6c2a8ffbeae1edabf88486f8432f23c29d74 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Wed Oct 10 11:08:27 2018 -0700 Issue #2757: Kafka source connector should just transfer bytes (#2761) *Motivation* connector is mostly used for transferring bytes from kafka to pulsar. since we haven't mapped schema between kafka and pulsar, we should just transfer bytes for now. *Changes* Add a bytes source and make bytes source as a default setting. Keep string source there for BC consideration. --- .../pulsar/io/kafka/KafkaAbstractSource.java | 6 ++- .../apache/pulsar/io/kafka/KafkaBytesSource.java | 48 ++++++++++++++++++++++ .../resources/META-INF/services/pulsar-io.yaml | 2 +- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 494d91b..b6c6840 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -78,6 +78,10 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { } + protected Properties beforeCreateConsumer(Properties props) { + return props; + } + @Override public void close() throws InterruptedException { LOG.info("Stopping kafka source"); @@ -96,7 +100,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { public void start() { runnerThread = new Thread(() -> { LOG.info("Starting kafka source"); - consumer = new KafkaConsumer<>(props); + consumer = new KafkaConsumer<>(beforeCreateConsumer(props)); consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); ConsumerRecords<String, byte[]> consumerRecords; diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java new file mode 100644 index 0000000..1e99208 --- /dev/null +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.kafka; + +import java.util.Properties; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; + +/** + * Simple Kafka Source that just transfers the value part of the kafka records + * as Strings + */ +@Slf4j +public class KafkaBytesSource extends KafkaAbstractSource<byte[]> { + + @Override + protected Properties beforeCreateConsumer(Properties props) { + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + log.info("Created kafka consumer config : {}", props); + return props; + } + + @Override + public byte[] extractValue(ConsumerRecord<String, byte[]> record) { + return record.value(); + } +} diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml index 7afc154..c3fd86d 100644 --- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml @@ -19,5 +19,5 @@ name: kafka description: Kafka source and sink connector -sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource +sourceClass: org.apache.pulsar.io.kafka.KafkaBytesSource sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink