Thanks for the question! Interesting... I didn’t go deep into the details yet (I will!) but can it be related to this change? [1][2]
[1] https://issues.apache.org/jira/browse/BEAM-10759 <https://issues.apache.org/jira/browse/BEAM-10759> [2] https://github.com/apache/beam/pull/12630 <https://github.com/apache/beam/pull/12630> — Alexey > On 28 Jul 2022, at 22:43, Cristian Constantinescu <zei...@gmail.com> wrote: > > Attaching these two links which kinda point in the same direction as my > previous e-mail: > > https://ambitious.systems/avro-writers-vs-readers-schema > <https://ambitious.systems/avro-writers-vs-readers-schema> > https://ambitious.systems/avro-schema-resolution > <https://ambitious.systems/avro-schema-resolution> > > On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu <zei...@gmail.com > <mailto:zei...@gmail.com>> wrote: > Hi everyone, > > When using KafkaIO to deserialize to avro SpecificRecords in combination with > ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the > avro generated classes (theSpecificRecords) and the schema registry schema > (used to serialize the given message) mismatch. > > My scenario is that my Avro generated classes are ahead of what's in the > schema registry. So when deserialization happens, it tries to use the schema > registry schema to deserialize to the SpecificRecord class and that fails > with field order mismatches or field type mismatches depending on the > situation. > > I know you're thinking that my schema evolution is bad and that I should go > sit in the corner. However, only new fields are added to the schema, so it > should not be an issue. > > Has anyone seen this happen to them? > > What I think happens: > > 1. ConfluentSchemaRegistryDeserializerProvider configures the > AbstractKafkaDeserializer to use the confluent schema as the reader schema. > [1][2] > 2. If specific.avro.reader is set to true, the > ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls > AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the > ConfluentSchemaRegistryDeserializer sets the reader schema and the > AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas > are fetched from the schema registry. Both classes fetch the same schema > separately. > 3. Now this is a problem, because at my understanding, the write schema is > used to tell avro what schema was used to serialize the object and the reader > schema is used to tell avro what to deserialize those bytes to, in case it's > not the same schema. I'm really not sure about this, but I think that's how > it works. > 4. Because both read and write schema are fetched from the schema registry, > but our SpecificRecord class has an evolved schema that is not used, > deserialization to that class fails. > 5. This is why I think that if specific.avro.reader is set to true, the > ConfluentSchemaRegistryDeserializer class should pass the schema fetch from > the SpecificRecord class on line [1]. > > Would anyone be able to confirm or infirm the above logic makes sense? > > As for my issue, as a workaround, I just wrote a DeserializerProvider that > does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but > passes in the schema fetched from my SpecificRecord class and serialization > works properly. > > Cheers, > Cristian > > [1] > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133 > > <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133>[2] > > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144 > > <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144> > [3] > https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175 > > <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175> > [4] > https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152 > > <https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152>