+Ismael

Doing it in “normal” way, especially for Kafka, may require some additional 
non-evident steps (well, of course it can be documented). So, I’d prefer to 
have a more user-friendly API around it, like we have for reading Avro messages 
with a schema stored in Confluent Schema Registry, which actually just extends 
a current API by adding a new method 
"withValueDeserializer(DeserializerProvider<V>)” and provides a new 
ConfluentSchemaRegistryDeserializerProvider class that incapsulates all 
business logic inside. So, I'd suggest to follow the same way for KafkaIO write 
part.

Any thoughts on this?

PS: For those (me including), who are curious why KafkaIO has coders and serdes 
in the same time, this Jira [1] can be interesting to read (just found it 
recently)  

[1] https://issues.apache.org/jira/browse/BEAM-1573 
<https://issues.apache.org/jira/browse/BEAM-1573>

—
Alexey


> On 9 Feb 2022, at 17:15, Kenneth Knowles <k...@apache.org> wrote:
> 
> Good point. Doing things the "normal" way for users of the storage system is 
> a good on-ramp. Conversely, having a "normal Beam" way is good for people who 
> use Beam more than Kafka. Can we have both easily?
> 
> Kenn
> 
> On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.cast...@neo4j.com 
> <mailto:matt.cast...@neo4j.com>> wrote:
> Of-course IMO it would be fine as well to not force developers to use 
> withKeySerializer() / withValueSerializer() in the first place.
> This way you could use the standard way of configuring the Kafka serializer 
> classes using properties as per the Kafka Consumer/Producer documentation.
> 
> Just an idea.
> Matt
> 
> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org 
> <mailto:k...@apache.org>> wrote:
> Just a quick type check: is it the case that a Serializer<Object> is expected 
> to be able to properly serde any subclass of object? More generally that any 
> Serializer<? super V> should be able to properly serde V? Typically this 
> isn't the case. Not saying we shouldn't make the proposed change, but it 
> could result in surprises.
> 
> Another possibility based on experience with coders, I would highlight three 
> types of serde that could apply to Serializer as well as it does to Coder:
> 
> 1. handles just a single type (VarIntCoder, etc)
> 2. lossy/converts concrete types because it is allowed (ListCoder works for 
> any list, but does not restore the original concrete subclass)
> 3. generic/tagging (SerializableCoder which restores the concrete subclass)
> 
> The API in KafkaIO is right for types 1 and 2 but too strict for type 3. But 
> the new API is great for type 3, potentially dangerous for type 2 and 1 (but 
> mostly type 1 it will be irrelevant).
> 
> We could have a separate entrypoint for type 3, like 
> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that 
> if you call this one you have to pass a Serializer that tags the concrete 
> subclass and restores it. Often, the only relevant type will be 
> Serializer<Object> so we could even make that the parameter type.
> 
> Kenn
> 
> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com 
> <mailto:bhule...@google.com>> wrote:
> The issue here is that KafkaAvroSerializer implements Serializer<Object>, and 
> not Serializer<GenericRecord> [1]. So you need to erase the type to force it. 
> I think Moritz's suggestion is actually to update the signature here [2] to 
> make the type parameter `? super V`, so that a Serializer<Object> will be 
> acceptable. That change would be preferable to updating the docs.
> 
> [1] 
> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>  
> <https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27>
> [2] 
> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>  
> <https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379>
> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com 
> <mailto:sachi...@google.com>> wrote:
> Thanks both, that's great - 
> 
> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com 
> <mailto:matt.cast...@neo4j.com>> wrote:
> Thanks a lot Moritz.  Your suggestion worked immediately.
> 
> You sort of get on the wrong track since my favorite IDE suggests: 
> 
> .withValueSerializer((Class<? extends Serializer<GenericRecord>>) 
> KafkaAvroSerializer.class)
> 
> ... which simply doesn't even compile for me. 
> 
>  incompatible types: 
> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot be 
> converted to java.lang.Class<? extends 
> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
> 
> It sort of puts you on the wrong footing hence my question.
> If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO.
> 
> https://issues.apache.org/jira/browse/BEAM-13854 
> <https://issues.apache.org/jira/browse/BEAM-13854>
> 
> Easier to figure out was AvroCoder.of(schema) but it might make sense to 
> document that in the same context as well.  
> 
> Thanks again!
> 
> Cheers,
> Matt
> 
> 
> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com 
> <mailto:mm...@talend.com>> wrote:
> Just having a quick look, it looks like the respective interface in KafkaIO 
> should rather look like this to support KafkaAvroSerializer, which is a 
> Serializer<Object>:
> 
>  
> 
> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super 
> V>> valueSerializer)
> 
>  
> 
> Thoughts?
> 
> Cheers, Moritz
> 
>  
> 
> From: Moritz Mack <mm...@talend.com <mailto:mm...@talend.com>>
> Date: Tuesday, 8. February 2022 at 15:55
> To: dev@beam.apache.org <mailto:dev@beam.apache.org> <dev@beam.apache.org 
> <mailto:dev@beam.apache.org>>, matt.cast...@neo4j.com 
> <mailto:matt.cast...@neo4j.com> <matt.cast...@neo4j.com 
> <mailto:matt.cast...@neo4j.com>>
> Subject: Re: KafkaIO.write and Avro
> 
> Hi Matt, Unfortunately, the types don’t play well when using 
> KafkaAvroSerializer. It currently requires a cast :/ The following will work: 
> write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ 
> ZjQcmQRYFpfptBannerStart
> 
> This Message Is From an External Sender
> 
> This message came from outside your organization.
> 
> Exercise caution when opening attachments or clicking any links. 
> 
> ZjQcmQRYFpfptBannerEnd
> 
> Hi Matt,
> 
>  
> 
> Unfortunately, the types don’t play well when using KafkaAvroSerializer. It 
> currently requires a cast :/
> 
> The following will work:
> 
> write.withValueSerializer((Class)KafkaAvroSerializer.class))
> 
>  
> 
> This seems to be the cause of repeated confusion, so probably worth improving 
> the user experience here!
> 
>  
> 
> Cheers,
> 
> Moritz
> 
>  
> 
> From: Matt Casters <matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com>>
> Date: Tuesday, 8. February 2022 at 14:17
> To: Beam Development List <dev@beam.apache.org <mailto:dev@beam.apache.org>>
> Subject: KafkaIO.write and Avro
> 
> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you 
> typically specify option value.serializer as 
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a 
> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
> 
> This Message Is From an External Sender
> 
> This message came from outside your organization.
> 
> Exercise caution when opening attachments or clicking any links. 
> 
> ZjQcmQRYFpfptBannerEnd
> 
> Dear Beams,
> 
>  
> 
> When sending Avro values to Kafka, say GenericRecord, you typically specify 
> option value.serializer as 
> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a 
> bunch of other options for authentication and so on verifies the schema 
> stored in the Avro record with a schema registry.   Unfortunately, I couldn't 
> figure out how to pass this serializer class to KafkaIO.write() as it's not 
> acceptable to the withValueSerializer() method.
> 
>  
> 
> For KafkaIO.read() we made a specific provision in the form of class 
> ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the 
> producer side of Avro values yet.
> 
>   
> 
> I'd be happy to dive into the code to add proper support for a Confluent 
> schema registry in KafkaIO.write() but I was just wondering if there was 
> something I might have overlooked.  It's hard to find samples or 
> documentation on producing Avro messages with Beam.
> 
>  
> 
> Thanks in advance,
> 
> Matt
> 
>  
> 
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice (updated August 2020) at Talend, 
> Inc. <https://www.talend.com/contacts-privacy-policy/>
>  
> 
> As a recipient of an email from Talend, your contact personal data will be on 
> our systems. Please see our privacy notice (updated August 2020) at Talend, 
> Inc. <https://www.talend.com/contacts-privacy-policy/>
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com>
> 
> 
> 
> 
> 
> -- 
> Neo4j Chief Solutions Architect
> ✉   matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com>
> 
> 
> 

Reply via email to