This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d13fe5b Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791) d13fe5b is described below commit d13fe5bc287d2bc082da1682703d01dc92140a3b Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Mon Oct 29 14:41:36 2018 +0800 Debezium: integrate KafkaConnectSource with MySqlConnectorTask (#2791) ### Motivation This change is to integrate KafkaConnectSource with MySqlConnectorTask. Steps to run this is like this: 1, use image in debezium tutorial, start mysql server ```docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8``` 2, start pulsar standalone ```bin/pulsar standalone ``` 3, start consume data from connect source target topic, After step 4 is start, it should get data. ```bin/pulsar-client consume -s "my-sub-name" kafka-connect-topic -n 0``` 4, start source connect, It should produce data into Pulsar topics. ```bin/pulsar-admin source localrun --tenant public --namespace default --name kafka --destination-topic-name kafka-connect-topic --sourceConfigFile debezium-mysql-source-config.yaml --archive connectors/pulsar-io-kafka-connect-adaptor-2.2.0-SNAPSHOT.nar --brokerServiceUrl pulsar://127.0.0.1:6650``` --- distribution/io/src/assemble/io.xml | 6 +++ pom.xml | 2 +- pulsar-io/debezium/pom.xml | 8 ++-- .../pulsar/io/debezium/PulsarDatabaseHistory.java | 2 +- pulsar-io/kafka-connect-adaptor/pom.xml | 22 ++++++++++ .../io/kafka/connect/KafkaConnectSource.java | 27 +++++++++--- .../resources/META-INF/services/pulsar-io.yaml | 22 ++++++++++ .../resources/debezium-mysql-source-config.yaml | 51 ++++++++++++++++++++++ .../io/kafka/connect/KafkaConnectSourceTest.java | 6 +-- 9 files changed, 131 insertions(+), 15 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index c527fc7..1a190a4 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -93,5 +93,11 @@ <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> + + <file> + <source>${basedir}/../../pulsar-io/kafka-connect-adaptor/target/pulsar-io-kafka-connect-adaptor-${project.version}.nar</source> + <outputDirectory>connectors</outputDirectory> + <fileMode>644</fileMode> + </file> </files> </assembly> diff --git a/pom.xml b/pom.xml index eac83c0..ef870ab 100644 --- a/pom.xml +++ b/pom.xml @@ -178,7 +178,7 @@ flexible messaging model and an intuitive client API.</description> <presto.version>0.206</presto.version> <flink.version>1.6.0</flink.version> <scala.binary.version>2.11</scala.binary.version> - <debezium-core.version>0.8.2</debezium-core.version> + <debezium.version>0.8.2</debezium.version> <!-- test dependencies --> <arquillian-cube.version>1.15.1</arquillian-cube.version> diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml index fb79196..3a178ab 100644 --- a/pulsar-io/debezium/pom.xml +++ b/pulsar-io/debezium/pom.xml @@ -41,13 +41,13 @@ <dependency> <groupId>io.debezium</groupId> <artifactId>debezium-core</artifactId> - <version>${debezium-core.version}</version> + <version>${debezium.version}</version> </dependency> <dependency> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-io-kafka-connect-adaptor</artifactId> - <version>${project.version}</version> + <groupId>io.debezium</groupId> + <artifactId>debezium-connector-mysql</artifactId> + <version>${debezium.version}</version> </dependency> <dependency> diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java index bc97fc6..820c5a4 100644 --- a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java +++ b/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java @@ -61,7 +61,7 @@ public final class PulsarDatabaseHistory extends AbstractDatabaseHistory { .withValidation(Field::isRequired); public static final Field SERVICE_URL = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.service.url") - .withDisplayName("Kafka broker addresses") + .withDisplayName("Pulsar broker addresses") .withType(Type.STRING) .withWidth(Width.LONG) .withImportance(Importance.HIGH) diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index e0f0574..b2d0eea 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -39,6 +39,12 @@ </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-debezium</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka-client.version}</version> @@ -52,6 +58,12 @@ <dependency> <groupId>org.apache.kafka</groupId> + <artifactId>connect-json</artifactId> + <version>${kafka-client.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${kafka-client.version}</version> </dependency> @@ -102,4 +114,14 @@ </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </project> diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index 61635f8..5aac5dd 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.IdentityHashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -74,21 +75,25 @@ public class KafkaConnectSource implements Source<byte[]> { }); // get the source class name from config and create source task from reflection - sourceTask = ((Class<? extends SourceTask>)config.get(TaskConfig.TASK_CLASS_CONFIG)) + sourceTask = ((Class<? extends SourceTask>)Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))) .asSubclass(SourceTask.class) .getDeclaredConstructor() .newInstance(); + // initialize the key and value converter - keyConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG)) + keyConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))) .asSubclass(Converter.class) .getDeclaredConstructor() .newInstance(); - valueConverter = ((Class<? extends Converter>)config.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG)) + valueConverter = ((Class<? extends Converter>)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG))) .asSubclass(Converter.class) .getDeclaredConstructor() .newInstance(); + keyConverter.configure(config, true); + valueConverter.configure(config, false); + offsetStore = new PulsarOffsetBackingStore(); offsetStore.configure(new PulsarKafkaWorkerConfig(stringConfig)); offsetStore.start(); @@ -117,7 +122,12 @@ public class KafkaConnectSource implements Source<byte[]> { while (true) { if (currentBatch == null) { flushFuture = new CompletableFuture<>(); - currentBatch = sourceTask.poll().iterator(); + List<SourceRecord> recordList = sourceTask.poll(); + if (recordList == null) { + Thread.sleep(1000); + continue; + } + currentBatch = recordList.iterator(); } if (currentBatch.hasNext()) { return processSourceRecord(currentBatch.next()); @@ -126,7 +136,7 @@ public class KafkaConnectSource implements Source<byte[]> { synchronized (this) { hasOutstandingRecords = !outstandingRecords.isEmpty(); } - if (hasOutstandingRecords) { + if (!hasOutstandingRecords) { // there is no records any more, then waiting for the batch to complete writing // to sink and the offsets are committed as well flushFuture.get(); @@ -161,7 +171,7 @@ public class KafkaConnectSource implements Source<byte[]> { @Override public Optional<Long> getEventTime() { - return Optional.of(srcRecord.timestamp()); + return Optional.ofNullable(srcRecord.timestamp()); } @Override @@ -202,6 +212,11 @@ public class KafkaConnectSource implements Source<byte[]> { flushFuture.completeExceptionally(new Exception("Sink Error")); } } + + @Override + public Optional<String> getDestinationTopic() { + return Optional.of(srcRecord.topic()); + } }; } diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000..4887063 --- /dev/null +++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: kafka-connect-adaptor +description: Kafka source connect adaptor +sourceClass: org.apache.pulsar.io.kafka.connect.KafkaConnectSource diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml new file mode 100644 index 0000000..2e288cb --- /dev/null +++ b/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +tenant: "test" +namespace: "test-namespace" +name: "debezium-kafka-source" + +##autoAck: true +parallelism: 1 + +configs: + ## sourceTask + task.class: "io.debezium.connector.mysql.MySqlConnectorTask" + + ## config for mysql, docker image: debezium/example-mysql:0.8 + database.hostname: "localhost" + database.port: "3306" + database.user: "debezium" + database.password: "dbz" + database.server.id: "184054" + database.server.name: "dbserver1" + database.whitelist: "inventory" + + database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" + database.history.pulsar.topic: "history-topic" + database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" + ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG + key.converter: "org.apache.kafka.connect.json.JsonConverter" + value.converter: "org.apache.kafka.connect.json.JsonConverter" + ## PULSAR_SERVICE_URL_CONFIG + pulsar.service.url: "pulsar://127.0.0.1:6650" + ## OFFSET_STORAGE_TOPIC_CONFIG + offset.storage.topic: "offset-topic" + + diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java index b82daab..d0f0a63 100644 --- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java +++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java @@ -70,9 +70,9 @@ public class KafkaConnectSourceTest extends ProducerConsumerBase { super.internalSetup(); super.producerBaseSetup(); - config.put(TaskConfig.TASK_CLASS_CONFIG, org.apache.kafka.connect.file.FileStreamSourceTask.class); - config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class); - config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, org.apache.kafka.connect.storage.StringConverter.class); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); + config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter"); this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset"; config.put(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG, brokerUrl.toString());