Attaching these two links which kinda point in the same direction as my previous e-mail:
https://ambitious.systems/avro-writers-vs-readers-schema https://ambitious.systems/avro-schema-resolution On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu <zei...@gmail.com> wrote: > Hi everyone, > > When using KafkaIO to deserialize to avro SpecificRecords in combination > with ConfluentSchemaRegistryDeserializerProvider, it fails when the schema > in the avro generated classes (theSpecificRecords) and the schema registry > schema (used to serialize the given message) mismatch. > > My scenario is that my Avro generated classes are ahead of what's in the > schema registry. So when deserialization happens, it tries to use the > schema registry schema to deserialize to the SpecificRecord class and that > fails with field order mismatches or field type mismatches depending on the > situation. > > I know you're thinking that my schema evolution is bad and that I should > go sit in the corner. However, only new fields are added to the schema, so > it should not be an issue. > > Has anyone seen this happen to them? > > What I think happens: > > 1. ConfluentSchemaRegistryDeserializerProvider configures the > AbstractKafkaDeserializer to use the confluent schema as the reader schema. > [1][2] > 2. If specific.avro.reader is set to true, the > ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls > AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the > ConfluentSchemaRegistryDeserializer sets the reader schema and the > AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas > are fetched from the schema registry. Both classes fetch the same schema > separately. > 3. Now this is a problem, because at my understanding, the write schema is > used to tell avro what schema was used to serialize the object and the > reader schema is used to tell avro what to deserialize those bytes to, in > case it's not the same schema. I'm really not sure about this, but I think > that's how it works. > 4. Because both read and write schema are fetched from the schema > registry, but our SpecificRecord class has an evolved schema that is not > used, deserialization to that class fails. > 5. This is why I think that if specific.avro.reader is set to true, the > ConfluentSchemaRegistryDeserializer class should pass the schema fetch from > the SpecificRecord class on line [1]. > > Would anyone be able to confirm or infirm the above logic makes sense? > > As for my issue, as a workaround, I just wrote a DeserializerProvider that > does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but > passes in the schema fetched from my SpecificRecord class and serialization > works properly. > > Cheers, > Cristian > > [1] > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133 > [2] > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144 > [3] > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175 > [4] > https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152 >