nsivabalan opened a new issue, #18891:
URL: https://github.com/apache/hudi/issues/18891

   ### Tips before filing an issue
   
   - [x] Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)?
   
   ### Describe the problem you faced
   
   `org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer` extends 
Confluent's `KafkaAvroDeserializer` and overrides only the 4-arg 
`deserialize(String, Boolean, byte[], Schema)` to inject the user-configured 
`hoodie.kafka.avro.value.deserializer.schema` as the reader schema. However, 
the Kafka consumer / Connect framework can invoke any of three other overloads:
   
   - `deserialize(String, byte[])`
   - `deserialize(String, byte[], Schema)`
   - `deserialize(String, Headers, byte[])`  ← Kafka 3.x `Deserializer` default 
method
   
   Those overloads inherit the parent's behavior and do not pass `sourceSchema` 
as the reader schema. As a result, Avro `ResolvingDecoder` reads the record 
with the writer's schema (the older schema fetched from the registry by the 
message's schema ID) rather than the configured evolved reader schema. When the 
application then accesses positional fields that exist only in the newer reader 
schema (e.g. fields 20–23 on a Debezium CDC `before`/`after` nested record), it 
throws `ArrayIndexOutOfBoundsException`.
   
   Which overload is called depends on the runtime: Kafka Connect runtimes that 
route through the headers-aware Consumer path land on `deserialize(String, 
Headers, byte[])`. Plain Consumer paths often land on `deserialize(String, 
byte[])`. Either way, the `sourceSchema` injection is silently bypassed.
   
   ### To Reproduce
   
   Steps to reproduce the behavior:
   
   1. Configure `HoodieStreamer` with `--source-class 
org.apache.hudi.utilities.sources.AvroKafkaSource` and the Confluent schema 
registry deserializer set to 
`org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer`.
   2. Set `hoodie.kafka.avro.value.deserializer.schema` to an evolved schema 
that adds nullable fields (with defaults) to a nested record relative to the 
schema used when the records were originally serialized to the topic.
   3. Consume those older messages on a Kafka client / runtime that invokes 
`deserialize(String, byte[])` or `deserialize(String, Headers, byte[])` (e.g. 
Strimzi/Kafka-Connect bundles in newer versions).
   4. Application accesses a positional field that exists only in the evolved 
reader schema.
   5. Observe `ArrayIndexOutOfBoundsException`.
   
   ### Expected behavior
   
   All four `deserialize` overloads should consistently inject the configured 
`sourceSchema` so Avro schema resolution defaults new nullable fields to null 
and positional access on the reader-schema view is safe.
   
   ### Environment Description
   
   - Hudi version: master (1.x line) and 0.x — both affected
   - Spark version: any
   - Hive version: n/a
   - Hadoop version: n/a
   - Storage (HDFS/S3/GCS..): any
   - Running on Docker?: any
   
   ### Additional context
   
   The bug is latent — many runtimes happen to route only through the 4-arg 
overload and never hit the other paths. A Kafka-Connect / Strimzi runtime 
upgrade (or anything that switches the consumer to the headers-aware default 
method in `org.apache.kafka.common.serialization.Deserializer`) makes the bug 
visible.
   
   PR: will link.
   
   ### Stacktrace
   
   ```
   java.lang.ArrayIndexOutOfBoundsException: Index 20 out of bounds for length 
20
       at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
       ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to