So yes, reading the generic records with a consumer worked great. It's really convenient to have a way of handling both the coder and the deserializer at once.
To test I hooked KafkaIO up to a free Confluent Cloud service with schema registry. Reading works great and once I have my next fixes ready for that wretched Neo4jIO the generic solution to all of this will go into apache/hop master. Maybe it can serve as a sample of sorts. The problem is really on the write/producer side. Back on the subject of that coder, and I'm looking at the crazy things folks will do with Apache Hop, the ability to specify a coder for class GenericRow (AvroCoder.of(schema)) in a pipeline works only if you're only sending to one topic with one KafkaIO.write instance with a single schema. This system would fall apart if you would want to write to multiple topics in the same pipeline. So perhaps that scenario would make the case for having a ConfluentSchemaRegistrySerializer facility. Cheers, Matt Op wo 9 feb. 2022 19:18 schreef Alexey Romanenko <aromanenko....@gmail.com>: > > On 8 Feb 2022, at 14:16, Matt Casters <matt.cast...@neo4j.com> wrote: > > For KafkaIO.read() we made a specific provision in the form of > class ConfluentSchemaRegistryDeserializer but it doesn't look like we > covered the producer side of Avro values yet. > > > Talking about read from Kafka with Avro and Confluent Schema Registry, > have you tried to use an API that KafkaIO provides [1] using > DeserializerProvider? > > It would be something like this (a code snippet from KafkaIO Javadoc): > > KafkaIO.<Long, GenericRecord>read() > .withBootstrapServers("broker_1:9092,broker_2:9092") > .withTopic("my_topic") > .withKeyDeserializer(LongDeserializer.class) > // Use Confluent Schema Registry, specify schema registry URL and value > subject > .withValueDeserializer( > > ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", > "my_topic-value") > > I think we can add the similar API for write part as well to make it more > easy-to-use for users. > > > I'd be happy to dive into the code to add proper support for a Confluent > schema registry in KafkaIO.write() but I was just wondering if there was > something I might have overlooked. It's hard to find samples or > documentation on producing Avro messages with Beam. > > > I agree that we have a good field for improvement here but, tbh, KafkaIO > Javadoc contains a dedicated section for that [2] (see “Use Avro schema > with Confluent Schema Registry”). > > — > Alexey > > > [1] > https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withValueDeserializer-org.apache.beam.sdk.io.kafka.DeserializerProvider- > [2] > https://beam.apache.org/releases/javadoc/2.36.0/org/apache/beam/sdk/io/kafka/KafkaIO.html > > > > >