> On 8 Feb 2022, at 14:16, Matt Casters <[email protected]> wrote:
>
> For KafkaIO.read() we made a specific provision in the form of class
> ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the
> producer side of Avro values yet.
Talking about read from Kafka with Avro and Confluent Schema Registry, have you
tried to use an API that KafkaIO provides [1] using DeserializerProvider?
It would be something like this (a code snippet from KafkaIO Javadoc):
KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("my_topic")
.withKeyDeserializer(LongDeserializer.class)
// Use Confluent Schema Registry, specify schema registry URL and value
subject
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081",
"my_topic-value")
I think we can add the similar API for write part as well to make it more
easy-to-use for users.
> I'd be happy to dive into the code to add proper support for a Confluent
> schema registry in KafkaIO.write() but I was just wondering if there was
> something I might have overlooked. It's hard to find samples or
> documentation on producing Avro messages with Beam.
I agree that we have a good field for improvement here but, tbh, KafkaIO
Javadoc contains a dedicated section for that [2] (see “Use Avro schema with
Confluent Schema Registry”).
—
Alexey
[1]
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-
<https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider->
[2]
https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
<https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html>