Good point. Doing things the "normal" way for users of the storage system is a good on-ramp. Conversely, having a "normal Beam" way is good for people who use Beam more than Kafka. Can we have both easily?
Kenn On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.cast...@neo4j.com> wrote: > Of-course IMO it would be fine as well to not force developers to use > withKeySerializer() / withValueSerializer() in the first place. > This way you could use the standard way of configuring the Kafka > serializer classes using properties as per the Kafka Consumer/Producer > documentation. > > Just an idea. > Matt > > On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org> wrote: > >> Just a quick type check: is it the case that a Serializer<Object> is >> expected to be able to properly serde any subclass of object? More >> generally that any Serializer<? super V> should be able to properly serde >> V? Typically this isn't the case. Not saying we shouldn't make the proposed >> change, but it could result in surprises. >> >> Another possibility based on experience with coders, I would highlight >> three types of serde that could apply to Serializer as well as it does to >> Coder: >> >> 1. handles just a single type (VarIntCoder, etc) >> 2. lossy/converts concrete types because it is allowed (ListCoder works >> for any list, but does *not* restore the original concrete subclass) >> 3. generic/tagging (SerializableCoder which restores the concrete >> subclass) >> >> The API in KafkaIO is right for types 1 and 2 but too strict for type 3. >> But the new API is great for type 3, potentially dangerous for type 2 and 1 >> (but mostly type 1 it will be irrelevant). >> >> We could have a separate entrypoint for type 3, like >> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that >> if you call this one you have to pass a Serializer that tags the concrete >> subclass and restores it. Often, the only relevant type will be >> Serializer<Object> so we could even make that the parameter type. >> >> Kenn >> >> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com> >> wrote: >> >>> The issue here is that KafkaAvroSerializer implements >>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to >>> erase the type to force it. I think Moritz's suggestion is actually to >>> update the signature here [2] to make the type parameter `? super V`, so >>> that a Serializer<Object> will be acceptable. That change would be >>> preferable to updating the docs. >>> >>> [1] >>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27 >>> [2] >>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379 >>> >>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com> >>> wrote: >>> >>>> Thanks both, that's great - >>>> >>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com> >>>> wrote: >>>> >>>>> Thanks a lot Moritz. Your suggestion worked immediately. >>>>> >>>>> You sort of get on the wrong track since my favorite IDE suggests: >>>>> >>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>) >>>>> KafkaAvroSerializer.class) >>>>> >>>>> ... which simply doesn't even compile for me. >>>>> >>>>> incompatible types: >>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot >>>>> be converted to java.lang.Class<? extends >>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>> >>>>> >>>>> It sort of puts you on the wrong footing hence my question. >>>>> If you don't mind I'll simply create a PR to amend the Javadoc for >>>>> KafkaIO. >>>>> >>>>> https://issues.apache.org/jira/browse/BEAM-13854 >>>>> >>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense >>>>> to document that in the same context as well. >>>>> >>>>> Thanks again! >>>>> >>>>> Cheers, >>>>> Matt >>>>> >>>>> >>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote: >>>>> >>>>>> Just having a quick look, it looks like the respective interface in >>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, >>>>>> which >>>>>> is a Serializer<Object>: >>>>>> >>>>>> >>>>>> >>>>>> public Write<K, V> withValueSerializer(Class<? extends Serializer<? >>>>>> super >>>>>> V>> valueSerializer) >>>>>> >>>>>> >>>>>> >>>>>> Thoughts? >>>>>> >>>>>> Cheers, Moritz >>>>>> >>>>>> >>>>>> >>>>>> *From: *Moritz Mack <mm...@talend.com> >>>>>> *Date: *Tuesday, 8. February 2022 at 15:55 >>>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>, >>>>>> matt.cast...@neo4j.com <matt.cast...@neo4j.com> >>>>>> *Subject: *Re: KafkaIO.write and Avro >>>>>> >>>>>> Hi Matt, Unfortunately, the types don’t play well when using >>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will >>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>>> >>>>>> >>>>>> ZjQcmQRYFpfptBannerStart >>>>>> >>>>>> This Message Is From an External Sender >>>>>> >>>>>> This message came from outside your organization. >>>>>> >>>>>> Exercise caution when opening attachments or clicking any links. >>>>>> >>>>>> ZjQcmQRYFpfptBannerEnd >>>>>> >>>>>> Hi Matt, >>>>>> >>>>>> >>>>>> >>>>>> Unfortunately, the types don’t play well when using >>>>>> KafkaAvroSerializer. It currently requires a cast :/ >>>>>> >>>>>> The following will work: >>>>>> >>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class)) >>>>>> >>>>>> >>>>>> >>>>>> This seems to be the cause of repeated confusion, so probably worth >>>>>> improving the user experience here! >>>>>> >>>>>> >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Moritz >>>>>> >>>>>> >>>>>> >>>>>> *From: *Matt Casters <matt.cast...@neo4j.com> >>>>>> *Date: *Tuesday, 8. February 2022 at 14:17 >>>>>> *To: *Beam Development List <dev@beam.apache.org> >>>>>> *Subject: *KafkaIO.write and Avro >>>>>> >>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you >>>>>> typically specify option value.serializer as >>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a >>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart >>>>>> >>>>>> This Message Is From an External Sender >>>>>> >>>>>> This message came from outside your organization. >>>>>> >>>>>> Exercise caution when opening attachments or clicking any links. >>>>>> >>>>>> ZjQcmQRYFpfptBannerEnd >>>>>> >>>>>> Dear Beams, >>>>>> >>>>>> >>>>>> >>>>>> When sending Avro values to Kafka, say GenericRecord, you typically >>>>>> specify option value.serializer as >>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a >>>>>> bunch of other options for authentication and so on verifies the schema >>>>>> stored in the Avro record with a schema registry. Unfortunately, I >>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() >>>>>> as >>>>>> it's not acceptable to the withValueSerializer() method. >>>>>> >>>>>> >>>>>> >>>>>> For KafkaIO.read() we made a specific provision in the form of >>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we >>>>>> covered the producer side of Avro values yet. >>>>>> >>>>>> >>>>>> >>>>>> I'd be happy to dive into the code to add proper support for a >>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if >>>>>> there was something I might have overlooked. It's hard to find samples >>>>>> or >>>>>> documentation on producing Avro messages with Beam. >>>>>> >>>>>> >>>>>> >>>>>> Thanks in advance, >>>>>> >>>>>> Matt >>>>>> >>>>>> >>>>>> >>>>>> *As a recipient of an email from Talend, your contact personal data >>>>>> will be on our systems. Please see our privacy notice (updated August >>>>>> 2020) >>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>>> >>>>>> >>>>>> >>>>>> *As a recipient of an email from Talend, your contact personal data >>>>>> will be on our systems. Please see our privacy notice (updated August >>>>>> 2020) >>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>* >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Neo4j Chief Solutions Architect >>>>> *✉ *matt.cast...@neo4j.com >>>>> >>>>> >>>>> >>>>> > > -- > Neo4j Chief Solutions Architect > *✉ *matt.cast...@neo4j.com > > > >