Good point. Doing things the "normal" way for users of the storage system
is a good on-ramp. Conversely, having a "normal Beam" way is good for
people who use Beam more than Kafka. Can we have both easily?

Kenn

On Wed, Feb 9, 2022 at 6:50 AM Matt Casters <matt.cast...@neo4j.com> wrote:

> Of-course IMO it would be fine as well to not force developers to use
> withKeySerializer() / withValueSerializer() in the first place.
> This way you could use the standard way of configuring the Kafka
> serializer classes using properties as per the Kafka Consumer/Producer
> documentation.
>
> Just an idea.
> Matt
>
> On Wed, Feb 9, 2022 at 3:40 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> Just a quick type check: is it the case that a Serializer<Object> is
>> expected to be able to properly serde any subclass of object? More
>> generally that any Serializer<? super V> should be able to properly serde
>> V? Typically this isn't the case. Not saying we shouldn't make the proposed
>> change, but it could result in surprises.
>>
>> Another possibility based on experience with coders, I would highlight
>> three types of serde that could apply to Serializer as well as it does to
>> Coder:
>>
>> 1. handles just a single type (VarIntCoder, etc)
>> 2. lossy/converts concrete types because it is allowed (ListCoder works
>> for any list, but does *not* restore the original concrete subclass)
>> 3. generic/tagging (SerializableCoder which restores the concrete
>> subclass)
>>
>> The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
>> But the new API is great for type 3, potentially dangerous for type 2 and 1
>> (but mostly type 1 it will be irrelevant).
>>
>> We could have a separate entrypoint for type 3, like
>> .withGenericValueCoder(Serializer<? super V>) that makes it very clear that
>> if you call this one you have to pass a Serializer that tags the concrete
>> subclass and restores it. Often, the only relevant type will be
>> Serializer<Object> so we could even make that the parameter type.
>>
>> Kenn
>>
>> On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> The issue here is that KafkaAvroSerializer implements
>>> Serializer<Object>, and not Serializer<GenericRecord> [1]. So you need to
>>> erase the type to force it. I think Moritz's suggestion is actually to
>>> update the signature here [2] to make the type parameter `? super V`, so
>>> that a Serializer<Object> will be acceptable. That change would be
>>> preferable to updating the docs.
>>>
>>> [1]
>>> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
>>> [2]
>>> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>>>
>>> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com>
>>> wrote:
>>>
>>>> Thanks both, that's great -
>>>>
>>>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com>
>>>> wrote:
>>>>
>>>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>>>
>>>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>>>
>>>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>>>> KafkaAvroSerializer.class)
>>>>>
>>>>> ... which simply doesn't even compile for me.
>>>>>
>>>>>  incompatible types:
>>>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>>>> be converted to java.lang.Class<? extends
>>>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>>>
>>>>> It sort of puts you on the wrong footing hence my question.
>>>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>>>> KafkaIO.
>>>>>
>>>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>>>
>>>>> Easier to figure out was AvroCoder.of(schema) but it might make sense
>>>>> to document that in the same context as well.
>>>>>
>>>>> Thanks again!
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>>
>>>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>>>
>>>>>> Just having a quick look, it looks like the respective interface in
>>>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, 
>>>>>> which
>>>>>> is a Serializer<Object>:
>>>>>>
>>>>>>
>>>>>>
>>>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? 
>>>>>> super
>>>>>> V>> valueSerializer)
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Cheers, Moritz
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Moritz Mack <mm...@talend.com>
>>>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>,
>>>>>> matt.cast...@neo4j.com <matt.cast...@neo4j.com>
>>>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>>>
>>>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ 
>>>>>> ‍
>>>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>>>> ZjQcmQRYFpfptBannerStart
>>>>>>
>>>>>> This Message Is From an External Sender
>>>>>>
>>>>>> This message came from outside your organization.
>>>>>>
>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>
>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>
>>>>>> Hi Matt,
>>>>>>
>>>>>>
>>>>>>
>>>>>> Unfortunately, the types don’t play well when using
>>>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>>>
>>>>>> The following will work:
>>>>>>
>>>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>>>
>>>>>>
>>>>>>
>>>>>> This seems to be the cause of repeated confusion, so probably worth
>>>>>> improving the user experience here!
>>>>>>
>>>>>>
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Moritz
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From: *Matt Casters <matt.cast...@neo4j.com>
>>>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>>>> *To: *Beam Development List <dev@beam.apache.org>
>>>>>> *Subject: *KafkaIO.write and Avro
>>>>>>
>>>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>>>> typically specify option value.serializer as
>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>>>
>>>>>> This Message Is From an External Sender
>>>>>>
>>>>>> This message came from outside your organization.
>>>>>>
>>>>>> Exercise caution when opening attachments or clicking any links.
>>>>>>
>>>>>> ZjQcmQRYFpfptBannerEnd
>>>>>>
>>>>>> Dear Beams,
>>>>>>
>>>>>>
>>>>>>
>>>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>>>> specify option value.serializer as
>>>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>>>> bunch of other options for authentication and so on verifies the schema
>>>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() 
>>>>>> as
>>>>>> it's not acceptable to the withValueSerializer() method.
>>>>>>
>>>>>>
>>>>>>
>>>>>> For KafkaIO.read() we made a specific provision in the form of
>>>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>>>> covered the producer side of Avro values yet.
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'd be happy to dive into the code to add proper support for a
>>>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>>>> there was something I might have overlooked.  It's hard to find samples 
>>>>>> or
>>>>>> documentation on producing Avro messages with Beam.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks in advance,
>>>>>>
>>>>>> Matt
>>>>>>
>>>>>>
>>>>>>
>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>> will be on our systems. Please see our privacy notice (updated August 
>>>>>> 2020)
>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>
>>>>>>
>>>>>>
>>>>>> *As a recipient of an email from Talend, your contact personal data
>>>>>> will be on our systems. Please see our privacy notice (updated August 
>>>>>> 2020)
>>>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Neo4j Chief Solutions Architect
>>>>> *✉   *matt.cast...@neo4j.com
>>>>>
>>>>>
>>>>>
>>>>>
>
> --
> Neo4j Chief Solutions Architect
> *✉   *matt.cast...@neo4j.com
>
>
>
>

Reply via email to