This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 77cf09e  Document Kafka Source Schema management (#10084)
77cf09e is described below

commit 77cf09eafa4f1626a53a1fe2e65dd25f377c1127
Author: Enrico Olivelli <[email protected]>
AuthorDate: Thu Apr 1 02:20:50 2021 +0200

    Document Kafka Source Schema management (#10084)
    
    * Document Kafka Source Schema management
    
    * Update site2/docs/io-kafka-source.md
    
    Co-authored-by: Yu Liu <[email protected]>
    
    * Update site2/docs/io-kafka-source.md
    
    Co-authored-by: Jennifer Huang 
<[email protected]>
    
    * fix
    
    * Update site2/docs/io-kafka-source.md
    
    Co-authored-by: Jennifer Huang 
<[email protected]>
    
    * Apply suggestions from @Anonymitaet
    
    Co-authored-by: Yu Liu <[email protected]>
    
    Co-authored-by: Enrico Olivelli <[email protected]>
    Co-authored-by: Yu Liu <[email protected]>
    Co-authored-by: Jennifer Huang 
<[email protected]>
---
 site2/docs/io-kafka-source.md | 43 +++++++++++++++++++++++++++++++++++--------
 1 file changed, 35 insertions(+), 8 deletions(-)

diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md
index d5e1ed8..3872b16 100644
--- a/site2/docs/io-kafka-source.md
+++ b/site2/docs/io-kafka-source.md
@@ -17,7 +17,7 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
@@ -25,11 +25,40 @@ The configuration of the Kafka source connector has the 
following properties.
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka source connector applies the schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer`, Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and sets it properly on the Pulsar topic.
+
+In this case, you need to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+If `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer`, it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+Pulsar supports AVRO format for keys.
+
+In this case, you can have a Pulsar topic with the following properties:
+- Schema: KeyValue schema with SEPARATED encoding
+- Key: the content of key of the Kafka message (base64 encoded)
+- Value: the content of value of the Kafka message
+- KeySchema: the schema detected from `keyDeserializationClass`
+- ValueSchema: the schema detected from `valueDeserializationClass`
+
+Topic compaction and partition routing use the Pulsar key, that contains the 
Kafka key, and so they are driven by the same value that you have on Kafka.
+
+When you consume data from Pulsar topics, you can use the `KeyValue` schema. 
In this way, you can decode the data properly.
+If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
 
 ### Example
 
@@ -60,7 +89,7 @@ Before using the Kafka source connector, you need to create a 
configuration file
 
 ## Usage
 
-Here is an example of using the Kafka source connecter with the configuration 
file as shown previously.
+Here is an example of using the Kafka source connector with the configuration 
file as shown previously.
 
 1. Download a Kafka client and a Kafka connector.
 
@@ -95,7 +124,7 @@ Here is an example of using the Kafka source connecter with 
the configuration fi
 5. Pull a Pulsar image and start Pulsar standalone.
    
    ```bash
-   $ docker pull apachepulsar/pulsar:2.4.0
+   $ docker pull apachepulsar/pulsar:{{pulsar:version}}
    
    $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v 
$PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 
bin/pulsar standalone
    ```
@@ -130,9 +159,8 @@ Here is an example of using the Kafka source connecter with 
the configuration fi
 8. Copy the following files to Pulsar.
    
     ```bash
-    $ docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
+    $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar 
pulsar-kafka-standalone:/pulsar
     $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
-    $ docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
     $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
     $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
     ```
@@ -143,7 +171,7 @@ Here is an example of using the Kafka source connecter with 
the configuration fi
     $ docker exec -it pulsar-kafka-standalone /bin/bash
 
     $ ./bin/pulsar-admin source localrun \
-    --archive ./pulsar-io-kafka-2.4.0.nar \
+    --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
     --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
     --tenant public \
     --namespace default \
@@ -168,4 +196,3 @@ Here is an example of using the Kafka source connecter with 
the configuration fi
     ```bash
     Received message: 'hello world'
     ```
-

Reply via email to