This is an automated email from the ASF dual-hosted git repository. huanlimeng pushed a commit to branch pulsar-io-kafka in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0334ddbd30a5c23309c3203d9a89390be0fa4843 Author: HuanliMeng <48120384+huanli-m...@users.noreply.github.com> AuthorDate: Mon May 9 11:41:04 2022 +0800 Update kafka source connector docs --- site2/docs/io-kafka-source.md | 152 +++++++++++---------- .../version-2.10.0/io-kafka-source.md | 152 +++++++++++---------- .../version-2.9.2/io-kafka-source.md | 150 ++++++++++---------- 3 files changed, 243 insertions(+), 211 deletions(-) diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md index 1bce28381d6..61de221f4ea 100644 --- a/site2/docs/io-kafka-source.md +++ b/site2/docs/io-kafka-source.md @@ -17,18 +17,18 @@ The configuration of the Kafka source connector has the following properties. | Name | Type| Required | Default | Description |------|----------|---------|-------------|-------------| -| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | +| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. | | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. | | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. | | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. | | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.| | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. | -| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. | -| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | +| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. | +| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java). | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. -| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. | +| `autoOffsetReset` | String | false | earliest | The default offset reset policy. | ### Schema Management @@ -64,83 +64,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API. Before using the Kafka source connector, you need to create a configuration file through one of the following methods. -* JSON +- JSON - ```json + ```json { - "configs": { - "bootstrapServers": "pulsar-kafka:9092", - "groupId": "test-pulsar-io", - "topic": "my-topic", - "sessionTimeoutMs": "10000", - "autoCommitEnabled": false - } + "bootstrapServers": "pulsar-kafka:9092", + "groupId": "test-pulsar-io", + "topic": "my-topic", + "sessionTimeoutMs": "10000", + "autoCommitEnabled": false } - ``` + ``` -* YAML +- YAML - ```yaml + ```yaml configs: - bootstrapServers: "pulsar-kafka:9092" - groupId: "test-pulsar-io" - topic: "my-topic" - sessionTimeoutMs: "10000" + bootstrapServers: "pulsar-kafka:9092" + groupId: "test-pulsar-io" + topic: "my-topic" + sessionTimeoutMs: "10000" autoCommitEnabled: false ``` ## Usage -Here is an example of using the Kafka source connector with the configuration file as shown previously. +You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster. -1. Download a Kafka client and a Kafka connector. +### Standalone cluster - ```bash - $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar +This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode. - $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar - ``` +#### Prerequisites -2. Create a network. - - ```bash - $ docker network create kafka-pulsar - ``` +- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). -3. Pull a ZooKeeper image and start ZooKeeper. - - ```bash - $ docker pull wurstmeister/zookeeper +#### Steps - $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper - ``` +1. Download and start the Confluent Platform. -4. Pull a Kafka image and start Kafka. - - ```bash - $ docker pull wurstmeister/kafka:2.11-1.0.2 - - $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2 - ``` +For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally. -5. Pull a Pulsar image and start Pulsar standalone. - - ```bash - $ docker pull apachepulsar/pulsar:{{pulsar:version}} - - $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone - ``` +2. Pull a Pulsar image and start Pulsar in standalone mode. + + ```bash + docker pull apachepulsar/pulsar:latest + + docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone + ``` + +3. Create a producer file _kafka-producer.py_. -6. Create a producer file _kafka-producer.py_. - ```python from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092') + producer = KafkaProducer(bootstrap_servers='localhost:9092') future = producer.send('my-topic', b'hello world') future.get() ``` -7. Create a consumer file _pulsar-client.py_. +4. Create a consumer file _pulsar-client.py_. ```python import pulsar @@ -158,23 +140,20 @@ Here is an example of using the Kafka source connector with the configuration fi client.close() ``` -8. Copy the following files to Pulsar. - +5. Copy the following files to Pulsar. + ```bash - $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar - $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf - $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/ - $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/ + docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar + docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf ``` -9. Open a new terminal window and start the Kafka source connector in local run mode. +6. Open a new terminal window and start the Kafka source connector in local run mode. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + docker exec -it pulsar-kafka-standalone /bin/bash - $ ./bin/pulsar-admin source localrun \ - --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \ - --classname org.apache.pulsar.io.kafka.KafkaBytesSource \ + ./bin/pulsar-admin source localrun \ + --archive ./pulsar-io-kafka.nar \ --tenant public \ --namespace default \ --name kafka \ @@ -183,18 +162,49 @@ Here is an example of using the Kafka source connector with the configuration fi --parallelism 1 ``` -10. Open a new terminal window and run the consumer. +7. Open a new terminal window and run the Kafka producer locally. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + python3 kafka-producer.py + ``` - $ pip install kafka-python +8. Open a new terminal window and run the Pulsar consumer locally. - $ python3 kafka-producer.py + ```bash + python3 pulsar-client.py ``` - The following information appears on the consumer terminal window. +The following information appears on the consumer terminal window. ```bash Received message: 'hello world' ``` + +### On-premises cluster + +This example explains how to create a Kafka source connector in an on-premises cluster. + +1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory. + + ``` + cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar + ``` + +2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/). + + ``` + PULSAR_HOME/bin/pulsar-admin sources reload + ``` + +3. Check whether the Kafka source connector is available on the list or not. + + ``` + PULSAR_HOME/bin/pulsar-admin sources available-sources + ``` + +4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--14) command. + + ``` + PULSAR_HOME/bin/pulsar-admin sources create \ + --source-config-file <kafka-source-config.yaml> + ``` \ No newline at end of file diff --git a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md index 21611696648..e100f7e5f64 100644 --- a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md +++ b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md @@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties. | Name | Type| Required | Default | Description |------|----------|---------|-------------|-------------| -| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | +| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. | | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. | | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. | | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. | | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.| | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. | -| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. | -| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | +| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. | +| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java). | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. -| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. | +| `autoOffsetReset` | String | false | earliest | The default offset reset policy. | ### Schema Management @@ -65,83 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API. Before using the Kafka source connector, you need to create a configuration file through one of the following methods. -* JSON +- JSON - ```json + ```json { - "configs": { - "bootstrapServers": "pulsar-kafka:9092", - "groupId": "test-pulsar-io", - "topic": "my-topic", - "sessionTimeoutMs": "10000", - "autoCommitEnabled": false - } + "bootstrapServers": "pulsar-kafka:9092", + "groupId": "test-pulsar-io", + "topic": "my-topic", + "sessionTimeoutMs": "10000", + "autoCommitEnabled": false } - ``` + ``` -* YAML +- YAML - ```yaml + ```yaml configs: - bootstrapServers: "pulsar-kafka:9092" - groupId: "test-pulsar-io" - topic: "my-topic" - sessionTimeoutMs: "10000" + bootstrapServers: "pulsar-kafka:9092" + groupId: "test-pulsar-io" + topic: "my-topic" + sessionTimeoutMs: "10000" autoCommitEnabled: false ``` ## Usage -Here is an example of using the Kafka source connector with the configuration file as shown previously. +You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster. -1. Download a Kafka client and a Kafka connector. +### Standalone cluster - ```bash - $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar +This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode. - $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar - ``` +#### Prerequisites -2. Create a network. - - ```bash - $ docker network create kafka-pulsar - ``` +- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). -3. Pull a ZooKeeper image and start ZooKeeper. - - ```bash - $ docker pull wurstmeister/zookeeper +#### Steps - $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper - ``` +1. Download and start the Confluent Platform. -4. Pull a Kafka image and start Kafka. - - ```bash - $ docker pull wurstmeister/kafka:2.11-1.0.2 - - $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2 - ``` +For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally. -5. Pull a Pulsar image and start Pulsar standalone. - - ```bash - $ docker pull apachepulsar/pulsar:{{pulsar:version}} - - $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone - ``` +2. Pull a Pulsar image and start Pulsar in standalone mode. + + ```bash + docker pull apachepulsar/pulsar:latest + + docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone + ``` + +3. Create a producer file _kafka-producer.py_. -6. Create a producer file _kafka-producer.py_. - ```python from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092') + producer = KafkaProducer(bootstrap_servers='localhost:9092') future = producer.send('my-topic', b'hello world') future.get() ``` -7. Create a consumer file _pulsar-client.py_. +4. Create a consumer file _pulsar-client.py_. ```python import pulsar @@ -159,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi client.close() ``` -8. Copy the following files to Pulsar. - +5. Copy the following files to Pulsar. + ```bash - $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar - $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf - $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/ - $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/ + docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar + docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf ``` -9. Open a new terminal window and start the Kafka source connector in local run mode. +6. Open a new terminal window and start the Kafka source connector in local run mode. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + docker exec -it pulsar-kafka-standalone /bin/bash - $ ./bin/pulsar-admin source localrun \ - --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \ - --classname org.apache.pulsar.io.kafka.KafkaBytesSource \ + ./bin/pulsar-admin source localrun \ + --archive ./pulsar-io-kafka.nar \ --tenant public \ --namespace default \ --name kafka \ @@ -184,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi --parallelism 1 ``` -10. Open a new terminal window and run the consumer. +7. Open a new terminal window and run the Kafka producer locally. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + python3 kafka-producer.py + ``` - $ pip install kafka-python +8. Open a new terminal window and run the Pulsar consumer locally. - $ python3 kafka-producer.py + ```bash + python3 pulsar-client.py ``` - The following information appears on the consumer terminal window. +The following information appears on the consumer terminal window. ```bash Received message: 'hello world' ``` + +### On-premises cluster + +This example explains how to create a Kafka source connector in an on-premises cluster. + +1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory. + + ``` + cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar + ``` + +2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/). + + ``` + PULSAR_HOME/bin/pulsar-admin sources reload + ``` + +3. Check whether the Kafka source connector is available on the list or not. + + ``` + PULSAR_HOME/bin/pulsar-admin sources available-sources + ``` + +4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--14) command. + + ``` + PULSAR_HOME/bin/pulsar-admin sources create \ + --source-config-file <kafka-source-config.yaml> + ``` \ No newline at end of file diff --git a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md index 71122fdf930..a92dda36453 100644 --- a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md +++ b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md @@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties. | Name | Type| Required | Default | Description |------|----------|---------|-------------|-------------| -| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | +| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. | | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. | | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. | | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. | | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. | | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.| | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. | -| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. | -| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | +| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. | +| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java). | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. -| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. | +| `autoOffsetReset` | String | false | earliest | The default offset reset policy. | ### Schema Management @@ -65,81 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API. Before using the Kafka source connector, you need to create a configuration file through one of the following methods. -* JSON +- JSON - ```json + ```json { - "bootstrapServers": "pulsar-kafka:9092", - "groupId": "test-pulsar-io", - "topic": "my-topic", - "sessionTimeoutMs": "10000", - "autoCommitEnabled": false + "bootstrapServers": "pulsar-kafka:9092", + "groupId": "test-pulsar-io", + "topic": "my-topic", + "sessionTimeoutMs": "10000", + "autoCommitEnabled": false } - ``` + ``` -* YAML +- YAML - ```yaml + ```yaml configs: - bootstrapServers: "pulsar-kafka:9092" - groupId: "test-pulsar-io" - topic: "my-topic" - sessionTimeoutMs: "10000" + bootstrapServers: "pulsar-kafka:9092" + groupId: "test-pulsar-io" + topic: "my-topic" + sessionTimeoutMs: "10000" autoCommitEnabled: false ``` ## Usage -Here is an example of using the Kafka source connector with the configuration file as shown previously. +You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster. -1. Download a Kafka client and a Kafka connector. +### Standalone cluster - ```bash - $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar +This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode. - $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar - ``` +#### Prerequisites -2. Create a network. - - ```bash - $ docker network create kafka-pulsar - ``` +- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). -3. Pull a ZooKeeper image and start ZooKeeper. - - ```bash - $ docker pull wurstmeister/zookeeper +#### Steps - $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper - ``` +1. Download and start the Confluent Platform. -4. Pull a Kafka image and start Kafka. - - ```bash - $ docker pull wurstmeister/kafka:2.11-1.0.2 - - $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2 - ``` +For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally. -5. Pull a Pulsar image and start Pulsar standalone. - - ```bash - $ docker pull apachepulsar/pulsar:{{pulsar:version}} - - $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone - ``` +2. Pull a Pulsar image and start Pulsar in standalone mode. + + ```bash + docker pull apachepulsar/pulsar:latest + + docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone + ``` + +3. Create a producer file _kafka-producer.py_. -6. Create a producer file _kafka-producer.py_. - ```python from kafka import KafkaProducer - producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092') + producer = KafkaProducer(bootstrap_servers='localhost:9092') future = producer.send('my-topic', b'hello world') future.get() ``` -7. Create a consumer file _pulsar-client.py_. +4. Create a consumer file _pulsar-client.py_. ```python import pulsar @@ -157,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi client.close() ``` -8. Copy the following files to Pulsar. - +5. Copy the following files to Pulsar. + ```bash - $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar - $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf - $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/ - $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/ + docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar + docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf ``` -9. Open a new terminal window and start the Kafka source connector in local run mode. +6. Open a new terminal window and start the Kafka source connector in local run mode. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + docker exec -it pulsar-kafka-standalone /bin/bash - $ ./bin/pulsar-admin source localrun \ - --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \ - --classname org.apache.pulsar.io.kafka.KafkaBytesSource \ + ./bin/pulsar-admin source localrun \ + --archive ./pulsar-io-kafka.nar \ --tenant public \ --namespace default \ --name kafka \ @@ -182,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi --parallelism 1 ``` -10. Open a new terminal window and run the consumer. +7. Open a new terminal window and run the Kafka producer locally. ```bash - $ docker exec -it pulsar-kafka-standalone /bin/bash + python3 kafka-producer.py + ``` - $ pip install kafka-python +8. Open a new terminal window and run the Pulsar consumer locally. - $ python3 kafka-producer.py + ```bash + python3 pulsar-client.py ``` - The following information appears on the consumer terminal window. +The following information appears on the consumer terminal window. ```bash Received message: 'hello world' ``` + +### On-premises cluster + +This example explains how to create a Kafka source connector in an on-premises cluster. + +1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory. + + ``` + cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar + ``` + +2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/). + + ``` + PULSAR_HOME/bin/pulsar-admin sources reload + ``` + +3. Check whether the Kafka source connector is available on the list or not. + + ``` + PULSAR_HOME/bin/pulsar-admin sources available-sources + ``` + +4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.8.0-SNAPSHOT/#-em-create-em--14) command. + + ``` + PULSAR_HOME/bin/pulsar-admin sources create \ + --source-config-file <kafka-source-config.yaml> + ``` \ No newline at end of file