Hi Matt, Unfortunately, the types don’t play well when using KafkaAvroSerializer. It currently requires a cast :/ The following will work: write.withValueSerializer((Class)KafkaAvroSerializer.class))
This seems to be the cause of repeated confusion, so probably worth improving the user experience here! Cheers, Moritz From: Matt Casters <matt.cast...@neo4j.com> Date: Tuesday, 8. February 2022 at 14:17 To: Beam Development List <dev@beam.apache.org> Subject: KafkaIO.write and Avro Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a bunch of other options for authentication ZjQcmQRYFpfptBannerStart This Message Is From an External Sender This message came from outside your organization. Exercise caution when opening attachments or clicking any links. ZjQcmQRYFpfptBannerEnd Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically specify option value.serializer as "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a bunch of other options for authentication and so on verifies the schema stored in the Avro record with a schema registry. Unfortunately, I couldn't figure out how to pass this serializer class to KafkaIO.write() as it's not acceptable to the withValueSerializer() method. For KafkaIO.read() we made a specific provision in the form of class ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the producer side of Avro values yet. I'd be happy to dive into the code to add proper support for a Confluent schema registry in KafkaIO.write() but I was just wondering if there was something I might have overlooked. It's hard to find samples or documentation on producing Avro messages with Beam. Thanks in advance, Matt As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice (updated August 2020) at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>