Attaching these two links which kinda point in the same direction as my
previous e-mail:

https://ambitious.systems/avro-writers-vs-readers-schema
https://ambitious.systems/avro-schema-resolution

On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu <zei...@gmail.com>
wrote:

> Hi everyone,
>
> When using KafkaIO to deserialize to avro SpecificRecords in combination
> with ConfluentSchemaRegistryDeserializerProvider, it fails when the schema
> in the avro generated classes (theSpecificRecords) and the schema registry
> schema (used to serialize the given message) mismatch.
>
> My scenario is that my Avro generated classes are ahead of what's in the
> schema registry. So when deserialization happens, it tries to use the
> schema registry schema to deserialize to the SpecificRecord class and that
> fails with field order mismatches or field type mismatches depending on the
> situation.
>
> I know you're thinking that my schema evolution is bad and that I should
> go sit in the corner. However, only new fields are added to the schema, so
> it should not be an issue.
>
> Has anyone seen this happen to them?
>
> What I think happens:
>
> 1. ConfluentSchemaRegistryDeserializerProvider configures the
> AbstractKafkaDeserializer to use the confluent schema as the reader schema.
> [1][2]
> 2. If specific.avro.reader is set to true, the
> ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls
> AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the
> ConfluentSchemaRegistryDeserializer sets the reader schema and the
> AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas
> are fetched from the schema registry. Both classes fetch the same schema
> separately.
> 3. Now this is a problem, because at my understanding, the write schema is
> used to tell avro what schema was used to serialize the object and the
> reader schema is used to tell avro what to deserialize those bytes to, in
> case it's not the same schema. I'm really not sure about this, but I think
> that's how it works.
> 4. Because both read and write schema are fetched from the schema
> registry, but our SpecificRecord class has an evolved schema that is not
> used, deserialization to that class fails.
> 5. This is why I think that if specific.avro.reader is set to true, the
> ConfluentSchemaRegistryDeserializer class should pass the schema fetch from
> the SpecificRecord class on line [1].
>
> Would anyone be able to confirm or infirm the above logic makes sense?
>
> As for my issue, as a workaround, I just wrote a DeserializerProvider that
> does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but
> passes in the schema fetched from my SpecificRecord class and serialization
> works properly.
>
> Cheers,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133
> [2]
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144
> [3]
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175
> [4]
> https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152
>

Reply via email to