Just having a quick look, it looks like the respective interface in KafkaIO 
should rather look like this to support KafkaAvroSerializer, which is a 
Serializer<Object>:

public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super V>> 
valueSerializer)

Thoughts?
Cheers, Moritz

From: Moritz Mack <mm...@talend.com>
Date: Tuesday, 8. February 2022 at 15:55
To: dev@beam.apache.org <dev@beam.apache.org>, matt.cast...@neo4j.com 
<matt.cast...@neo4j.com>
Subject: Re: KafkaIO.write and Avro
Hi Matt, Unfortunately, the types don’t play well when using 
KafkaAvroSerializer. It currently requires a cast :/ The following will work: 
write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Hi Matt,

Unfortunately, the types don’t play well when using KafkaAvroSerializer. It 
currently requires a cast :/
The following will work:
write.withValueSerializer((Class)KafkaAvroSerializer.class))

This seems to be the cause of repeated confusion, so probably worth improving 
the user experience here!

Cheers,
Moritz

From: Matt Casters <matt.cast...@neo4j.com>
Date: Tuesday, 8. February 2022 at 14:17
To: Beam Development List <dev@beam.apache.org>
Subject: KafkaIO.write and Avro
Dear Beams, When sending Avro values to Kafka, say GenericRecord, you typically 
specify option value.serializer as 
"io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch 
of other options for authentication ZjQcmQRYFpfptBannerStart
This Message Is From an External Sender
This message came from outside your organization.
Exercise caution when opening attachments or clicking any links.
ZjQcmQRYFpfptBannerEnd
Dear Beams,

When sending Avro values to Kafka, say GenericRecord, you typically specify 
option value.serializer as 
"io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a bunch 
of other options for authentication and so on verifies the schema stored in the 
Avro record with a schema registry.   Unfortunately, I couldn't figure out how 
to pass this serializer class to KafkaIO.write() as it's not acceptable to the 
withValueSerializer() method.

For KafkaIO.read() we made a specific provision in the form of class 
ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the 
producer side of Avro values yet.

I'd be happy to dive into the code to add proper support for a Confluent schema 
registry in KafkaIO.write() but I was just wondering if there was something I 
might have overlooked.  It's hard to find samples or documentation on producing 
Avro messages with Beam.

Thanks in advance,

Matt


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice (updated August 2020) at Talend, 
Inc. <https://www.talend.com/contacts-privacy-policy/>


As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice (updated August 2020) at Talend, 
Inc. <https://www.talend.com/contacts-privacy-policy/>


Reply via email to