This is an automated email from the ASF dual-hosted git repository.
liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 77cf09e Document Kafka Source Schema management (#10084)
77cf09e is described below
commit 77cf09eafa4f1626a53a1fe2e65dd25f377c1127
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Apr 1 02:20:50 2021 +0200
Document Kafka Source Schema management (#10084)
* Document Kafka Source Schema management
* Update site2/docs/io-kafka-source.md
Co-authored-by: Yu Liu <[email protected]>
* Update site2/docs/io-kafka-source.md
Co-authored-by: Jennifer Huang
<[email protected]>
* fix
* Update site2/docs/io-kafka-source.md
Co-authored-by: Jennifer Huang
<[email protected]>
* Apply suggestions from @Anonymitaet
Co-authored-by: Yu Liu <[email protected]>
Co-authored-by: Enrico Olivelli <[email protected]>
Co-authored-by: Yu Liu <[email protected]>
Co-authored-by: Jennifer Huang
<[email protected]>
---
site2/docs/io-kafka-source.md | 43 +++++++++++++++++++++++++++++++++++--------
1 file changed, 35 insertions(+), 8 deletions(-)
diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md
index d5e1ed8..3872b16 100644
--- a/site2/docs/io-kafka-source.md
+++ b/site2/docs/io-kafka-source.md
@@ -17,7 +17,7 @@ The configuration of the Kafka source connector has the
following properties.
| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated
list of host and port pairs for establishing the initial connection to the
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated
list of host and port pairs for establishing the initial connection to the
Kafka cluster. |
| `groupId` |String| true | " " (empty string) | A unique string that
identifies the group of consumer processes to which this consumer belongs. |
| `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch
response. |
| `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's
offset is periodically committed in the background.<br/><br/> This committed
offset is used when the process fails as the position from which a new consumer
begins. |
@@ -25,11 +25,40 @@ The configuration of the Kafka source connector has the
following properties.
| `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats
to the consumer when using Kafka's group management facilities.
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than
`sessionTimeoutMs`**.|
| `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect
consumer failures when using Kafka's group management facility. |
| `topic` | String|true | " " (empty string)| The Kafka topic which sends
messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer
configuration properties to be passed to consumers. <br/><br/>**Note: other
properties specified in the connector configuration file take precedence over
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer
configuration properties to be passed to consumers. <br/><br/>**Note: other
properties specified in the connector configuration file take precedence over
this configuration**. |
| `keyDeserializationClass` | String|false |
org.apache.kafka.common.serialization.StringDeserializer | The deserializer
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by
a specific implementation of
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
| `valueDeserializationClass` | String|false |
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer
class for Kafka consumers to deserialize values.
| `autoOffsetReset` | String | false | "earliest" | The default offset reset
policy. |
+### Schema Management
+
+This Kafka source connector applies the schema to the topic depending on the
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is
`org.apache.kafka.common.serialization.StringDeserializer`, you can set
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is
`io.confluent.kafka.serializers.KafkaAvroDeserializer`, Pulsar downloads the
AVRO schema from the Confluent Schema Registry®
+and sets it properly on the Pulsar topic.
+
+In this case, you need to set `schema.registry.url` inside of the
`consumerConfigProperties` configuration entry
+of the source.
+
+If `keyDeserializationClass` is not
`org.apache.kafka.common.serialization.StringDeserializer`, it means
+that you do not have a String as key and the Kafka Source uses the KeyValue
schema type with the SEPARATED encoding.
+
+Pulsar supports AVRO format for keys.
+
+In this case, you can have a Pulsar topic with the following properties:
+- Schema: KeyValue schema with SEPARATED encoding
+- Key: the content of key of the Kafka message (base64 encoded)
+- Value: the content of value of the Kafka message
+- KeySchema: the schema detected from `keyDeserializationClass`
+- ValueSchema: the schema detected from `valueDeserializationClass`
+
+Topic compaction and partition routing use the Pulsar key, that contains the
Kafka key, and so they are driven by the same value that you have on Kafka.
+
+When you consume data from Pulsar topics, you can use the `KeyValue` schema.
In this way, you can decode the data properly.
+If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
### Example
@@ -60,7 +89,7 @@ Before using the Kafka source connector, you need to create a
configuration file
## Usage
-Here is an example of using the Kafka source connecter with the configuration
file as shown previously.
+Here is an example of using the Kafka source connector with the configuration
file as shown previously.
1. Download a Kafka client and a Kafka connector.
@@ -95,7 +124,7 @@ Here is an example of using the Kafka source connecter with
the configuration fi
5. Pull a Pulsar image and start Pulsar standalone.
```bash
- $ docker pull apachepulsar/pulsar:2.4.0
+ $ docker pull apachepulsar/pulsar:{{pulsar:version}}
$ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v
$PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0
bin/pulsar standalone
```
@@ -130,9 +159,8 @@ Here is an example of using the Kafka source connecter with
the configuration fi
8. Copy the following files to Pulsar.
```bash
- $ docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
+ $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar
pulsar-kafka-standalone:/pulsar
$ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
- $ docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
$ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
$ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
```
@@ -143,7 +171,7 @@ Here is an example of using the Kafka source connecter with
the configuration fi
$ docker exec -it pulsar-kafka-standalone /bin/bash
$ ./bin/pulsar-admin source localrun \
- --archive ./pulsar-io-kafka-2.4.0.nar \
+ --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
@@ -168,4 +196,3 @@ Here is an example of using the Kafka source connecter with
the configuration fi
```bash
Received message: 'hello world'
```
-