+Ismael Doing it in “normal” way, especially for Kafka, may require some additional non-evident steps (well, of course it can be documented). So, I’d prefer to have a more user-friendly API around it, like we have for reading Avro messages with a schema stored in Confluent Schema Registry, which actually just extends a current API by adding a new method "withValueDeserializer(DeserializerProvider<V>)” and provides a new ConfluentSchemaRegistryDeserializerProvider class that incapsulates all business logic inside. So, I'd suggest to follow the same way for KafkaIO write part.
Any thoughts on this? PS: For those (me including), who are curious why KafkaIO has coders and serdes in the same time, this Jira [1] can be interesting to read (just found it recently) [1] https://issues.apache.org/jira/browse/BEAM-1573 <https://issues.apache.org/jira/browse/BEAM-1573> — Alexey > On 9 Feb 2022, at 17:15, Kenneth Knowles <k...@apache.org> wrote: > > Good point. Doing things the "normal" way for users of the storage system is > a good on-ramp. Conversely, having a "normal Beam" way is good for people who > use Beam more than Kafka. Can we have both easily? > > Kenn > > On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.cast...@neo4j.com > <mailto:matt.cast...@neo4j.com>> wrote: > Of-course IMO it would be fine as well to not force developers to use > withKeySerializer() / withValueSerializer() in the first place. > This way you could use the standard way of configuring the Kafka serializer > classes using properties as per the Kafka Consumer/Producer documentation. > > Just an idea. > Matt > > On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org > <mailto:k...@apache.org>> wrote: > Just a quick type check: is it the case that a Serializer<Object> is expected > to be able to properly serde any subclass of object? More generally that any > Serializer<? super V> should be able to properly serde V? Typically this > isn't the case. Not saying we shouldn't make the proposed change, but it > could result in surprises. > > Another possibility based on experience with coders, I would highlight three > types of serde that could apply to Serializer as well as it does to Coder: > > 1. handles just a single type (VarIntCoder, etc) > 2. lossy/converts concrete types because it is allowed (ListCoder works for > any list, but does not restore the original concrete subclass) > 3. generic/tagging (SerializableCoder which restores the concrete subclass) > > The API in KafkaIO is right for types 1 and 2 but too strict for type 3. But > the new API is great for type 3, potentially dangerous for type 2 and 1 (but > mostly type 1 it will be irrelevant). > > We could have a separate entrypoint for type 3, like > .withGenericValueCoder(Serializer<? super V>) that makes it very clear that > if you call this one you have to pass a Serializer that tags the concrete > subclass and restores it. Often, the only relevant type will be > Serializer<Object> so we could even make that the parameter type. > > Kenn > > On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com > <mailto:bhule...@google.com>> wrote: > The issue here is that KafkaAvroSerializer implements Serializer<Object>, and > not Serializer<GenericRecord> [1]. So you need to erase the type to force it. > I think Moritz's suggestion is actually to update the signature here [2] to > make the type parameter `? super V`, so that a Serializer<Object> will be > acceptable. That change would be preferable to updating the docs. > > [1] > https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27 > > <https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27> > [2] > https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379 > > <https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379> > On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com > <mailto:sachi...@google.com>> wrote: > Thanks both, that's great - > > On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com > <mailto:matt.cast...@neo4j.com>> wrote: > Thanks a lot Moritz. Your suggestion worked immediately. > > You sort of get on the wrong track since my favorite IDE suggests: > > .withValueSerializer((Class<? extends Serializer<GenericRecord>>) > KafkaAvroSerializer.class) > > ... which simply doesn't even compile for me. > > incompatible types: > java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot be > converted to java.lang.Class<? extends > org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>> > > It sort of puts you on the wrong footing hence my question. > If you don't mind I'll simply create a PR to amend the Javadoc for KafkaIO. > > https://issues.apache.org/jira/browse/BEAM-13854 > <https://issues.apache.org/jira/browse/BEAM-13854> > > Easier to figure out was AvroCoder.of(schema) but it might make sense to > document that in the same context as well. > > Thanks again! > > Cheers, > Matt > > > On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com > <mailto:mm...@talend.com>> wrote: > Just having a quick look, it looks like the respective interface in KafkaIO > should rather look like this to support KafkaAvroSerializer, which is a > Serializer<Object>: > > > > public Write<K, V> withValueSerializer(Class<? extends Serializer<? super > V>> valueSerializer) > > > > Thoughts? > > Cheers, Moritz > > > > From: Moritz Mack <mm...@talend.com <mailto:mm...@talend.com>> > Date: Tuesday, 8. February 2022 at 15:55 > To: dev@beam.apache.org <mailto:dev@beam.apache.org> <dev@beam.apache.org > <mailto:dev@beam.apache.org>>, matt.cast...@neo4j.com > <mailto:matt.cast...@neo4j.com> <matt.cast...@neo4j.com > <mailto:matt.cast...@neo4j.com>> > Subject: Re: KafkaIO.write and Avro > > Hi Matt, Unfortunately, the types don’t play well when using > KafkaAvroSerializer. It currently requires a cast :/ The following will work: > write.withValueSerializer((Class)KafkaAvroSerializer.class)) > > ZjQcmQRYFpfptBannerStart > > This Message Is From an External Sender > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Hi Matt, > > > > Unfortunately, the types don’t play well when using KafkaAvroSerializer. It > currently requires a cast :/ > > The following will work: > > write.withValueSerializer((Class)KafkaAvroSerializer.class)) > > > > This seems to be the cause of repeated confusion, so probably worth improving > the user experience here! > > > > Cheers, > > Moritz > > > > From: Matt Casters <matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com>> > Date: Tuesday, 8. February 2022 at 14:17 > To: Beam Development List <dev@beam.apache.org <mailto:dev@beam.apache.org>> > Subject: KafkaIO.write and Avro > > Dear Beams, When sending Avro values to Kafka, say GenericRecord, you > typically specify option value.serializer as > "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a > bunch of other options for authentication ZjQcmQRYFpfptBannerStart > > This Message Is From an External Sender > > This message came from outside your organization. > > Exercise caution when opening attachments or clicking any links. > > ZjQcmQRYFpfptBannerEnd > > Dear Beams, > > > > When sending Avro values to Kafka, say GenericRecord, you typically specify > option value.serializer as > "io.confluent.kafka.serializers.KafkaAvroSerializer". This along with a > bunch of other options for authentication and so on verifies the schema > stored in the Avro record with a schema registry. Unfortunately, I couldn't > figure out how to pass this serializer class to KafkaIO.write() as it's not > acceptable to the withValueSerializer() method. > > > > For KafkaIO.read() we made a specific provision in the form of class > ConfluentSchemaRegistryDeserializer but it doesn't look like we covered the > producer side of Avro values yet. > > > > I'd be happy to dive into the code to add proper support for a Confluent > schema registry in KafkaIO.write() but I was just wondering if there was > something I might have overlooked. It's hard to find samples or > documentation on producing Avro messages with Beam. > > > > Thanks in advance, > > Matt > > > > As a recipient of an email from Talend, your contact personal data will be on > our systems. Please see our privacy notice (updated August 2020) at Talend, > Inc. <https://www.talend.com/contacts-privacy-policy/> > > > As a recipient of an email from Talend, your contact personal data will be on > our systems. Please see our privacy notice (updated August 2020) at Talend, > Inc. <https://www.talend.com/contacts-privacy-policy/> > > > > -- > Neo4j Chief Solutions Architect > ✉ matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com> > > > > > > -- > Neo4j Chief Solutions Architect > ✉ matt.cast...@neo4j.com <mailto:matt.cast...@neo4j.com> > > >