So yes, reading the generic records with a consumer worked great. It's
really convenient to have a way of handling both the coder and the
deserializer at once.

To test I hooked KafkaIO up to a free Confluent Cloud service with schema
registry. Reading works great and once I have my next fixes ready for that
wretched Neo4jIO the generic solution to all of this will go into
apache/hop master.  Maybe it can serve as a sample of sorts.

The problem is really on the write/producer side.  Back on the subject of
that coder, and I'm looking at the crazy things folks will do with Apache
Hop, the ability to specify a coder for class GenericRow
(AvroCoder.of(schema)) in a pipeline works only if you're only sending to
one topic with one KafkaIO.write instance with a single schema. This system
would fall apart if you would want to write to multiple topics in the same
pipeline.

So perhaps that scenario would make the case for having a
ConfluentSchemaRegistrySerializer facility.

Cheers,

Matt

Op wo 9 feb. 2022 19:18 schreef Alexey Romanenko <aromanenko....@gmail.com>:

>
> On 8 Feb 2022, at 14:16, Matt Casters <matt.cast...@neo4j.com> wrote:
>
> For KafkaIO.read() we made a specific provision in the form of
> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
> covered the producer side of Avro values yet.
>
>
> Talking about read from Kafka with Avro and Confluent Schema Registry,
> have you tried to use an API that KafkaIO provides [1] using
> DeserializerProvider?
>
> It would be something like this (a code snippet from KafkaIO Javadoc):
>
> KafkaIO.<Long, GenericRecord>read()
>       .withBootstrapServers("broker_1:9092,broker_2:9092")
>       .withTopic("my_topic")
>       .withKeyDeserializer(LongDeserializer.class)
>       // Use Confluent Schema Registry, specify schema registry URL and value 
> subject
>       .withValueDeserializer(
>           
> ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081";, 
> "my_topic-value")
>
> I think we can add the similar API for write part as well to make it more
> easy-to-use for users.
>
>
>   I'd be happy to dive into the code to add proper support for a Confluent
> schema registry in KafkaIO.write() but I was just wondering if there was
> something I might have overlooked.  It's hard to find samples or
> documentation on producing Avro messages with Beam.
>
>
> I agree that we have a good field for improvement here but, tbh, KafkaIO
> Javadoc contains a dedicated section for that [2] (see “Use Avro schema
> with Confluent Schema Registry”).
>
> —
> Alexey
>
>
> [1]
> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider-
> [2]
> https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>
>
>
>
>

Reply via email to