This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a1bf90ca91 fix: use writer types in Skipper for resolved named record 
types (#9605)
a1bf90ca91 is described below

commit a1bf90ca91d295ff81ce359c37a3e476d3a9ad7d
Author: Ariel Miculas-Trif <[email protected]>
AuthorDate: Thu Apr 9 00:36:51 2026 +0300

    fix: use writer types in Skipper for resolved named record types (#9605)
    
    # Which issue does this PR close?
    #9655
    
    # Rationale for this change
    When a writer-only field references a named Avro type that was
    previously resolved against a reader schema, `parse_type` returns the
    registered reader-resolved type from the shared resolver. This caused
    two problems:
    
    1. The Skipper built its struct sub-skippers from the reader's field
    list, which omits writer-only fields. Their bytes were never consumed,
    leaving the cursor at the wrong position for all subsequent records.
    
    2. Reader fields carry resolution-induced nullability (e.g. a writer
    plain `long` matched against a reader `["null", long]` gains
    `nullability = Some(NullFirst)`). The Skipper read a union-tag byte that
    was never written, causing "Unexpected EOF" errors.
    
    Fix: store the writer's data type in `ResolvedField::ToReader` alongside
    the reader index. The Skipper's `Codec::Struct` arm now iterates
    `rec.writer_fields` and uses the writer type from every entry - both
    `ToReader(_, wdt)` and `Skip(wdt)` - so it always follows the writer's
    wire format.
    
    # What changes are included in this PR?
    
    
    # Are these changes tested?
    Yes, added unit tests.
    
    # Are there any user-facing changes?
    No
---
 arrow-avro/src/codec.rs         |  16 +--
 arrow-avro/src/reader/mod.rs    | 215 ++++++++++++++++++++++++++++++++++++++++
 arrow-avro/src/reader/record.rs |  29 ++++--
 3 files changed, 246 insertions(+), 14 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 92a0ed0519..455d4c2f71 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -94,7 +94,9 @@ pub(crate) struct ResolvedRecord {
 #[derive(Debug, Clone, PartialEq)]
 pub(crate) enum ResolvedField {
     /// Resolves to a field indexed in the reader schema.
-    ToReader(usize),
+    /// The `AvroDataType` is the writer's type for this field, used by the 
Skipper
+    /// to correctly consume writer bytes when the whole record is being 
skipped.
+    ToReader(usize, AvroDataType),
     /// For fields present in the writer's schema but not the reader's, this 
stores their data type.
     /// This is needed to correctly skip over these fields during 
deserialization.
     Skip(AvroDataType),
@@ -2341,10 +2343,10 @@ impl<'a> Maker<'a> {
             .iter()
             .enumerate()
             .map(|(writer_index, writer_field)| {
+                let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
                 if let Some(reader_index) = writer_to_reader[writer_index] {
-                    Ok(ResolvedField::ToReader(reader_index))
+                    Ok(ResolvedField::ToReader(reader_index, dt))
                 } else {
-                    let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
                     Ok(ResolvedField::Skip(dt))
                 }
             })
@@ -2888,7 +2890,7 @@ mod tests {
                 default_fields,
             }) => {
                 assert_eq!(writer_fields.len(), 1);
-                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
+                assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, 
_)));
                 assert_eq!(default_fields.len(), 1);
                 assert_eq!(default_fields[0], 1);
             }
@@ -2981,7 +2983,7 @@ mod tests {
                 default_fields,
             }) => {
                 assert_eq!(writer_fields.len(), 1);
-                assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
+                assert!(matches!(writer_fields[0], ResolvedField::ToReader(0, 
_)));
                 assert_eq!(default_fields.len(), 1);
                 assert_eq!(default_fields[0], 1);
             }
@@ -3802,9 +3804,9 @@ mod tests {
         assert!(matches!(
             &rec.writer_fields[..],
             &[
-                ResolvedField::ToReader(1),
+                ResolvedField::ToReader(1, _),
                 ResolvedField::Skip(_),
-                ResolvedField::ToReader(0),
+                ResolvedField::ToReader(0, _),
             ]
         ));
         assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 070204f2bc..6dbf5b1553 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -9590,4 +9590,219 @@ mod test {
             "entire RecordBatch mismatch (schema, all columns, all rows)"
         );
     }
+
+    // Build Avro OCF bytes whose schema contains a TypeName::Ref
+    //
+    // Schema written to the OCF header verbatim:
+    // ```text
+    // Root {
+    //   ts:    Timestamp { seconds: long, nanos: int },
+    //   extra: Event     { time: "Timestamp" }        <- TypeName::Ref
+    // }
+    // ```
+    fn make_type_ref_ocf() -> Vec<u8> {
+        use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter, 
types::Value};
+        let schema_json = r#"{
+            "type": "record", "name": "Root",
+            "fields": [
+                {"name": "ts", "type": {"type": "record", "name": "Timestamp", 
"fields": [
+                    {"name": "seconds", "type": "long"},
+                    {"name": "nanos",   "type": "int"}
+                ]}},
+                {"name": "extra", "type": {"type": "record", "name": "Event", 
"fields": [
+                    {"name": "time", "type": "Timestamp"}
+                ]}}
+            ]
+        }"#;
+        let schema = ApacheSchema::parse_str(schema_json).expect("valid 
schema");
+        let mut out = Vec::new();
+        {
+            let mut writer = ApacheWriter::new(&schema, &mut out);
+            let ts_val = |s: i64, n: i32| {
+                Value::Record(vec![
+                    ("seconds".into(), Value::Long(s)),
+                    ("nanos".into(), Value::Int(n)),
+                ])
+            };
+            // Two rows: ts={1000,100}/extra.time={-1,-1}  and  
ts={2000,200}/extra.time={-2,-2}.
+            for (ts_s, ts_n, ex_s, ex_n) in [(1000i64, 100i32, -1i64, -1i32), 
(2000, 200, -2, -2)] {
+                let row = Value::Record(vec![
+                    ("ts".into(), ts_val(ts_s, ts_n)),
+                    (
+                        "extra".into(),
+                        Value::Record(vec![("time".into(), ts_val(ex_s, 
ex_n))]),
+                    ),
+                ]);
+                writer.append_value_ref(&row).expect("append row");
+            }
+            writer.flush().expect("flush");
+        }
+        out
+    }
+
+    // writer-plain / reader-nullable mismatch.
+    //
+    // The writer schema uses a TypeName::Ref ("Timestamp" referenced in 
`extra.time`).
+    // The reader wraps `ts` in `["null", T]` unions and omits `extra`.
+    // The Skipper for `extra.time` resolves "Timestamp" via the resolver and 
must use
+    // the writer's plain field types (long, int) — not the nullable reader 
types - when
+    // consuming bytes.  Without the fix, it skips union-encoded fields from 
plain data,
+    // reads the wrong number of bytes, and corrupts row 2's `ts.seconds`.
+    #[test]
+    fn test_nullable_reader_schema_vs_plain_writer_nested_struct() {
+        let bytes = make_type_ref_ocf();
+        let reader_schema = AvroSchema::new(
+            r#"{"type":"record","name":"Root","fields":[
+                
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
+                    {"name":"seconds","type":["null","long"]},
+                    {"name":"nanos",  "type":["null","int"]}
+                ]}]}
+            ]}"#
+            .to_string(),
+        );
+        let mut reader = ReaderBuilder::new()
+            .with_reader_schema(reader_schema)
+            .build(Cursor::new(bytes))
+            .expect("reader should build");
+        let batch = reader
+            .next()
+            .expect("should have a batch")
+            .expect("reading should succeed");
+        assert_eq!(batch.num_rows(), 2);
+        let ts = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let seconds = ts
+            .column_by_name("seconds")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(seconds.value(0), 1000);
+        assert_eq!(seconds.value(1), 2000);
+    }
+
+    // Skipper must consume all writer fields, including writer-only ones.
+    //
+    // The writer schema uses a TypeName::Ref ("Timestamp" referenced in 
`extra.time`).
+    // The reader requests only `ts.seconds` (no `nanos`, no `extra`).
+    // The Skipper for `extra.time` resolves "Timestamp" and must skip both 
`seconds`
+    // and `nanos` bytes.  Without the fix it skips only `seconds`, leaving 
the `nanos`
+    // bytes in the buffer and corrupting row 2's `ts.seconds` read.
+    #[test]
+    fn test_skipper_consumes_writer_only_struct_fields() {
+        let bytes = make_type_ref_ocf();
+        let reader_schema = AvroSchema::new(
+            r#"{"type":"record","name":"Root","fields":[
+                
{"name":"ts","type":{"type":"record","name":"Timestamp","fields":[
+                    {"name":"seconds","type":"long"}
+                ]}}
+            ]}"#
+            .to_string(),
+        );
+        let mut reader = ReaderBuilder::new()
+            .with_reader_schema(reader_schema)
+            .build(Cursor::new(bytes))
+            .expect("reader should build");
+        let batch = reader
+            .next()
+            .expect("should have a batch")
+            .expect("Skipper must consume both seconds and nanos for 
extra.time");
+        assert_eq!(batch.num_rows(), 2);
+        let ts = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let seconds = ts
+            .column_by_name("seconds")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(seconds.value(0), 1000);
+        assert_eq!(seconds.value(1), 2000);
+    }
+
+    // The Skipper for a skipped array field must consume all bytes of each 
element,
+    // including every field of a nested struct resolved via a TypeName::Ref.
+    //
+    // Writer: `Root { ts: Timestamp{seconds,nanos}, events: 
array<Event{time:"Timestamp"}> }`
+    // Reader: only `ts` with nullable wrappers; `events` is absent (forces a 
Skip).
+    // The Skipper for `events` resolves each element's `time` field as 
"Timestamp"
+    // and must use the writer's plain {seconds,nanos} definition — not the
+    // nullable-wrapped reader type — when consuming bytes.
+    #[test]
+    fn test_skip_array_of_structs_uses_writer_schema_not_resolved() {
+        use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter, 
types::Value};
+        let schema_json = r#"{
+            "type": "record", "name": "Root",
+            "fields": [
+                {"name": "ts", "type": {"type": "record", "name": "Timestamp", 
"fields": [
+                    {"name": "seconds", "type": "long"},
+                    {"name": "nanos",   "type": "int"}
+                ]}},
+                {"name": "events", "type": {"type": "array", "items": {
+                    "type": "record", "name": "Event", "fields": [
+                        {"name": "time", "type": "Timestamp"}
+                    ]
+                }}}
+            ]
+        }"#;
+        let schema = ApacheSchema::parse_str(schema_json).expect("valid 
schema");
+        let mut bytes = Vec::new();
+        {
+            let mut writer = ApacheWriter::new(&schema, &mut bytes);
+            // One row: ts={100, 5}, events=[{time={200, 1}}]
+            let ts_val = |s: i64, n: i32| {
+                Value::Record(vec![
+                    ("seconds".into(), Value::Long(s)),
+                    ("nanos".into(), Value::Int(n)),
+                ])
+            };
+            let row = Value::Record(vec![
+                ("ts".into(), ts_val(100, 5)),
+                (
+                    "events".into(),
+                    Value::Array(vec![Value::Record(vec![("time".into(), 
ts_val(200, 1))])]),
+                ),
+            ]);
+            writer.append_value_ref(&row).expect("append row");
+            writer.flush().expect("flush");
+        }
+
+        // Reader omits `events` (forces Skip) and wraps `ts` fields in 
nullable unions.
+        let reader_schema = AvroSchema::new(
+            r#"{"type":"record","name":"Root","fields":[
+                
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
+                    {"name":"seconds","type":["null","long"]},
+                    {"name":"nanos",  "type":["null","int"]}
+                ]}]}
+            ]}"#
+            .to_string(),
+        );
+        let mut reader = ReaderBuilder::new()
+            .with_reader_schema(reader_schema)
+            .build(Cursor::new(bytes))
+            .expect("reader should build");
+        let batch = reader
+            .next()
+            .expect("should have a batch")
+            .expect("Skipper must consume all events bytes using writer field 
types");
+        assert_eq!(batch.num_rows(), 1);
+        let ts = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let seconds = ts
+            .column_by_name("seconds")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int64Array>()
+            .unwrap();
+        assert_eq!(seconds.value(0), 100);
+    }
 }
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 97cdeed20f..b71a6bdc7c 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -2458,7 +2458,7 @@ impl<'a> ProjectorBuilder<'a> {
             .writer_fields
             .iter()
             .map(|field| match field {
-                ResolvedField::ToReader(index) => 
Ok(FieldProjection::ToReader(*index)),
+                ResolvedField::ToReader(index, _) => 
Ok(FieldProjection::ToReader(*index)),
                 ResolvedField::Skip(datatype) => {
                     let skipper = Skipper::from_avro(datatype)?;
                     Ok(FieldProjection::Skip(skipper))
@@ -2568,12 +2568,27 @@ impl Skipper {
             Codec::Uuid => Self::UuidString, // encoded as string
             Codec::Enum(_) => Self::Enum,
             Codec::List(item) => 
Self::List(Box::new(Skipper::from_avro(item)?)),
-            Codec::Struct(fields) => Self::Struct(
-                fields
-                    .iter()
-                    .map(|f| Skipper::from_avro(f.data_type()))
-                    .collect::<Result<_, _>>()?,
-            ),
+            Codec::Struct(fields) => {
+                if let Some(ResolutionInfo::Record(rec)) = 
dt.resolution.as_ref() {
+                    Self::Struct(
+                        rec.writer_fields
+                            .iter()
+                            .map(|wf| match wf {
+                                ResolvedField::ToReader(_, wdt) | 
ResolvedField::Skip(wdt) => {
+                                    Skipper::from_avro(wdt)
+                                }
+                            })
+                            .collect::<Result<_, _>>()?,
+                    )
+                } else {
+                    Self::Struct(
+                        fields
+                            .iter()
+                            .map(|f| Skipper::from_avro(f.data_type()))
+                            .collect::<Result<_, _>>()?,
+                    )
+                }
+            }
             Codec::Map(values) => 
Self::Map(Box::new(Skipper::from_avro(values)?)),
             Codec::Interval => Self::DurationFixed12,
             Codec::Union(encodings, _, _) => {

Reply via email to