Just a quick type check: is it the case that a Serializer<Object> is
expected to be able to properly serde any subclass of object? More
generally that any Serializer<? super V> should be able to properly serde
V? Typically this isn't the case. Not saying we shouldn't make the proposed
change, but it could result in surprises.

Another possibility based on experience with coders, I would highlight
three types of serde that could apply to Serializer as well as it does to
Coder:

1. handles just a single type (VarIntCoder, etc)
2. lossy/converts concrete types because it is allowed (ListCoder works for
any list, but does *not* restore the original concrete subclass)
3. generic/tagging (SerializableCoder which restores the concrete subclass)

The API in KafkaIO is right for types 1 and 2 but too strict for type 3.
But the new API is great for type 3, potentially dangerous for type 2 and 1
(but mostly type 1 it will be irrelevant).

We could have a separate entrypoint for type 3, like
.withGenericValueCoder(Serializer<? super V>) that makes it very clear that
if you call this one you have to pass a Serializer that tags the concrete
subclass and restores it. Often, the only relevant type will be
Serializer<Object> so we could even make that the parameter type.

Kenn

On Tue, Feb 8, 2022 at 10:57 AM Brian Hulette <bhule...@google.com> wrote:

> The issue here is that KafkaAvroSerializer implements Serializer<Object>,
> and not Serializer<GenericRecord> [1]. So you need to erase the type to
> force it. I think Moritz's suggestion is actually to update the signature
> here [2] to make the type parameter `? super V`, so that a
> Serializer<Object> will be acceptable. That change would be preferable to
> updating the docs.
>
> [1]
> https://github.com/confluentinc/schema-registry/blob/cb6ca203ea9d26c84ed1e92d072f793f7bc2417b/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroSerializer.java#L27
> [2]
> https://github.com/apache/beam/blob/963c04ae6d503a97b9a169f07750c5d8c33fdd6c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L2379
>
> On Tue, Feb 8, 2022 at 9:05 AM Sachin Agarwal <sachi...@google.com> wrote:
>
>> Thanks both, that's great -
>>
>> On Tue, Feb 8, 2022 at 8:00 AM Matt Casters <matt.cast...@neo4j.com>
>> wrote:
>>
>>> Thanks a lot Moritz.  Your suggestion worked immediately.
>>>
>>> You sort of get on the wrong track since my favorite IDE suggests:
>>>
>>> .withValueSerializer((Class<? extends Serializer<GenericRecord>>)
>>> KafkaAvroSerializer.class)
>>>
>>> ... which simply doesn't even compile for me.
>>>
>>>  incompatible types:
>>> java.lang.Class<io.confluent.kafka.serializers.KafkaAvroSerializer> cannot
>>> be converted to java.lang.Class<? extends
>>> org.apache.kafka.common.serialization.Serializer<org.apache.avro.generic.GenericRecord>>
>>>
>>> It sort of puts you on the wrong footing hence my question.
>>> If you don't mind I'll simply create a PR to amend the Javadoc for
>>> KafkaIO.
>>>
>>> https://issues.apache.org/jira/browse/BEAM-13854
>>>
>>> Easier to figure out was AvroCoder.of(schema) but it might make sense to
>>> document that in the same context as well.
>>>
>>> Thanks again!
>>>
>>> Cheers,
>>> Matt
>>>
>>>
>>> On Tue, Feb 8, 2022 at 4:09 PM Moritz Mack <mm...@talend.com> wrote:
>>>
>>>> Just having a quick look, it looks like the respective interface in
>>>> KafkaIO should rather look like this to support KafkaAvroSerializer, which
>>>> is a Serializer<Object>:
>>>>
>>>>
>>>>
>>>> public Write<K, V>  withValueSerializer(Class<? extends Serializer<? super
>>>> V>> valueSerializer)
>>>>
>>>>
>>>>
>>>> Thoughts?
>>>>
>>>> Cheers, Moritz
>>>>
>>>>
>>>>
>>>> *From: *Moritz Mack <mm...@talend.com>
>>>> *Date: *Tuesday, 8. February 2022 at 15:55
>>>> *To: *dev@beam.apache.org <dev@beam.apache.org>, matt.cast...@neo4j.com
>>>> <matt.cast...@neo4j.com>
>>>> *Subject: *Re: KafkaIO.write and Avro
>>>>
>>>> Hi Matt, Unfortunately, the types don’t play well when using
>>>> KafkaAvroSerializer. It currently requires a cast :/ The following will
>>>> work: write.withValueSerializer((Class)KafkaAvroSerializer.class)) ‍ ‍ ‍ ‍
>>>> ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍ ‍
>>>> ZjQcmQRYFpfptBannerStart
>>>>
>>>> This Message Is From an External Sender
>>>>
>>>> This message came from outside your organization.
>>>>
>>>> Exercise caution when opening attachments or clicking any links.
>>>>
>>>> ZjQcmQRYFpfptBannerEnd
>>>>
>>>> Hi Matt,
>>>>
>>>>
>>>>
>>>> Unfortunately, the types don’t play well when using
>>>> KafkaAvroSerializer. It currently requires a cast :/
>>>>
>>>> The following will work:
>>>>
>>>> write.withValueSerializer((Class)KafkaAvroSerializer.class))
>>>>
>>>>
>>>>
>>>> This seems to be the cause of repeated confusion, so probably worth
>>>> improving the user experience here!
>>>>
>>>>
>>>>
>>>> Cheers,
>>>>
>>>> Moritz
>>>>
>>>>
>>>>
>>>> *From: *Matt Casters <matt.cast...@neo4j.com>
>>>> *Date: *Tuesday, 8. February 2022 at 14:17
>>>> *To: *Beam Development List <dev@beam.apache.org>
>>>> *Subject: *KafkaIO.write and Avro
>>>>
>>>> Dear Beams, When sending Avro values to Kafka, say GenericRecord, you
>>>> typically specify option value.serializer as
>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>> bunch of other options for authentication ZjQcmQRYFpfptBannerStart
>>>>
>>>> This Message Is From an External Sender
>>>>
>>>> This message came from outside your organization.
>>>>
>>>> Exercise caution when opening attachments or clicking any links.
>>>>
>>>> ZjQcmQRYFpfptBannerEnd
>>>>
>>>> Dear Beams,
>>>>
>>>>
>>>>
>>>> When sending Avro values to Kafka, say GenericRecord, you typically
>>>> specify option value.serializer as
>>>> "io.confluent.kafka.serializers.KafkaAvroSerializer".  This along with a
>>>> bunch of other options for authentication and so on verifies the schema
>>>> stored in the Avro record with a schema registry.   Unfortunately, I
>>>> couldn't figure out how to pass this serializer class to KafkaIO.write() as
>>>> it's not acceptable to the withValueSerializer() method.
>>>>
>>>>
>>>>
>>>> For KafkaIO.read() we made a specific provision in the form of
>>>> class ConfluentSchemaRegistryDeserializer but it doesn't look like we
>>>> covered the producer side of Avro values yet.
>>>>
>>>>
>>>>
>>>> I'd be happy to dive into the code to add proper support for a
>>>> Confluent schema registry in KafkaIO.write() but I was just wondering if
>>>> there was something I might have overlooked.  It's hard to find samples or
>>>> documentation on producing Avro messages with Beam.
>>>>
>>>>
>>>>
>>>> Thanks in advance,
>>>>
>>>> Matt
>>>>
>>>>
>>>>
>>>> *As a recipient of an email from Talend, your contact personal data
>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>
>>>>
>>>>
>>>> *As a recipient of an email from Talend, your contact personal data
>>>> will be on our systems. Please see our privacy notice (updated August 2020)
>>>> at Talend, Inc. <https://www.talend.com/contacts-privacy-policy/>*
>>>>
>>>>
>>>>
>>>
>>> --
>>> Neo4j Chief Solutions Architect
>>> *✉   *matt.cast...@neo4j.com
>>>
>>>
>>>
>>>

Reply via email to