This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a1bf90ca91 fix: use writer types in Skipper for resolved named record
types (#9605)
a1bf90ca91 is described below
commit a1bf90ca91d295ff81ce359c37a3e476d3a9ad7d
Author: Ariel Miculas-Trif <[email protected]>
AuthorDate: Thu Apr 9 00:36:51 2026 +0300
fix: use writer types in Skipper for resolved named record types (#9605)
# Which issue does this PR close?
#9655
# Rationale for this change
When a writer-only field references a named Avro type that was
previously resolved against a reader schema, `parse_type` returns the
registered reader-resolved type from the shared resolver. This caused
two problems:
1. The Skipper built its struct sub-skippers from the reader's field
list, which omits writer-only fields. Their bytes were never consumed,
leaving the cursor at the wrong position for all subsequent records.
2. Reader fields carry resolution-induced nullability (e.g. a writer
plain `long` matched against a reader `["null", long]` gains
`nullability = Some(NullFirst)`). The Skipper read a union-tag byte that
was never written, causing "Unexpected EOF" errors.
Fix: store the writer's data type in `ResolvedField::ToReader` alongside
the reader index. The Skipper's `Codec::Struct` arm now iterates
`rec.writer_fields` and uses the writer type from every entry - both
`ToReader(_, wdt)` and `Skip(wdt)` - so it always follows the writer's
wire format.
# What changes are included in this PR?
# Are these changes tested?
Yes, added unit tests.
# Are there any user-facing changes?
No
---
arrow-avro/src/codec.rs | 16 +--
arrow-avro/src/reader/mod.rs | 215 ++++++++++++++++++++++++++++++++++++++++
arrow-avro/src/reader/record.rs | 29 ++++--
3 files changed, 246 insertions(+), 14 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 92a0ed0519..455d4c2f71 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -94,7 +94,9 @@ pub(crate) struct ResolvedRecord {
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum ResolvedField {
/// Resolves to a field indexed in the reader schema.
- ToReader(usize),
+ /// The `AvroDataType` is the writer's type for this field, used by the
Skipper
+ /// to correctly consume writer bytes when the whole record is being
skipped.
+ ToReader(usize, AvroDataType),
/// For fields present in the writer's schema but not the reader's, this
stores their data type.
/// This is needed to correctly skip over these fields during
deserialization.
Skip(AvroDataType),
@@ -2341,10 +2343,10 @@ impl<'a> Maker<'a> {
.iter()
.enumerate()
.map(|(writer_index, writer_field)| {
+ let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
if let Some(reader_index) = writer_to_reader[writer_index] {
- Ok(ResolvedField::ToReader(reader_index))
+ Ok(ResolvedField::ToReader(reader_index, dt))
} else {
- let dt = self.parse_type(&writer_field.r#type, writer_ns)?;
Ok(ResolvedField::Skip(dt))
}
})
@@ -2888,7 +2890,7 @@ mod tests {
default_fields,
}) => {
assert_eq!(writer_fields.len(), 1);
- assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
+ assert!(matches!(writer_fields[0], ResolvedField::ToReader(0,
_)));
assert_eq!(default_fields.len(), 1);
assert_eq!(default_fields[0], 1);
}
@@ -2981,7 +2983,7 @@ mod tests {
default_fields,
}) => {
assert_eq!(writer_fields.len(), 1);
- assert_eq!(writer_fields[0], ResolvedField::ToReader(0));
+ assert!(matches!(writer_fields[0], ResolvedField::ToReader(0,
_)));
assert_eq!(default_fields.len(), 1);
assert_eq!(default_fields[0], 1);
}
@@ -3802,9 +3804,9 @@ mod tests {
assert!(matches!(
&rec.writer_fields[..],
&[
- ResolvedField::ToReader(1),
+ ResolvedField::ToReader(1, _),
ResolvedField::Skip(_),
- ResolvedField::ToReader(0),
+ ResolvedField::ToReader(0, _),
]
));
assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 070204f2bc..6dbf5b1553 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -9590,4 +9590,219 @@ mod test {
"entire RecordBatch mismatch (schema, all columns, all rows)"
);
}
+
+ // Build Avro OCF bytes whose schema contains a TypeName::Ref
+ //
+ // Schema written to the OCF header verbatim:
+ // ```text
+ // Root {
+ // ts: Timestamp { seconds: long, nanos: int },
+ // extra: Event { time: "Timestamp" } <- TypeName::Ref
+ // }
+ // ```
+ fn make_type_ref_ocf() -> Vec<u8> {
+ use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter,
types::Value};
+ let schema_json = r#"{
+ "type": "record", "name": "Root",
+ "fields": [
+ {"name": "ts", "type": {"type": "record", "name": "Timestamp",
"fields": [
+ {"name": "seconds", "type": "long"},
+ {"name": "nanos", "type": "int"}
+ ]}},
+ {"name": "extra", "type": {"type": "record", "name": "Event",
"fields": [
+ {"name": "time", "type": "Timestamp"}
+ ]}}
+ ]
+ }"#;
+ let schema = ApacheSchema::parse_str(schema_json).expect("valid
schema");
+ let mut out = Vec::new();
+ {
+ let mut writer = ApacheWriter::new(&schema, &mut out);
+ let ts_val = |s: i64, n: i32| {
+ Value::Record(vec![
+ ("seconds".into(), Value::Long(s)),
+ ("nanos".into(), Value::Int(n)),
+ ])
+ };
+ // Two rows: ts={1000,100}/extra.time={-1,-1} and
ts={2000,200}/extra.time={-2,-2}.
+ for (ts_s, ts_n, ex_s, ex_n) in [(1000i64, 100i32, -1i64, -1i32),
(2000, 200, -2, -2)] {
+ let row = Value::Record(vec![
+ ("ts".into(), ts_val(ts_s, ts_n)),
+ (
+ "extra".into(),
+ Value::Record(vec![("time".into(), ts_val(ex_s,
ex_n))]),
+ ),
+ ]);
+ writer.append_value_ref(&row).expect("append row");
+ }
+ writer.flush().expect("flush");
+ }
+ out
+ }
+
+ // writer-plain / reader-nullable mismatch.
+ //
+ // The writer schema uses a TypeName::Ref ("Timestamp" referenced in
`extra.time`).
+ // The reader wraps `ts` in `["null", T]` unions and omits `extra`.
+ // The Skipper for `extra.time` resolves "Timestamp" via the resolver and
must use
+ // the writer's plain field types (long, int) — not the nullable reader
types - when
+ // consuming bytes. Without the fix, it skips union-encoded fields from
plain data,
+ // reads the wrong number of bytes, and corrupts row 2's `ts.seconds`.
+ #[test]
+ fn test_nullable_reader_schema_vs_plain_writer_nested_struct() {
+ let bytes = make_type_ref_ocf();
+ let reader_schema = AvroSchema::new(
+ r#"{"type":"record","name":"Root","fields":[
+
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
+ {"name":"seconds","type":["null","long"]},
+ {"name":"nanos", "type":["null","int"]}
+ ]}]}
+ ]}"#
+ .to_string(),
+ );
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(Cursor::new(bytes))
+ .expect("reader should build");
+ let batch = reader
+ .next()
+ .expect("should have a batch")
+ .expect("reading should succeed");
+ assert_eq!(batch.num_rows(), 2);
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+ let seconds = ts
+ .column_by_name("seconds")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(seconds.value(0), 1000);
+ assert_eq!(seconds.value(1), 2000);
+ }
+
+ // Skipper must consume all writer fields, including writer-only ones.
+ //
+ // The writer schema uses a TypeName::Ref ("Timestamp" referenced in
`extra.time`).
+ // The reader requests only `ts.seconds` (no `nanos`, no `extra`).
+ // The Skipper for `extra.time` resolves "Timestamp" and must skip both
`seconds`
+ // and `nanos` bytes. Without the fix it skips only `seconds`, leaving
the `nanos`
+ // bytes in the buffer and corrupting row 2's `ts.seconds` read.
+ #[test]
+ fn test_skipper_consumes_writer_only_struct_fields() {
+ let bytes = make_type_ref_ocf();
+ let reader_schema = AvroSchema::new(
+ r#"{"type":"record","name":"Root","fields":[
+
{"name":"ts","type":{"type":"record","name":"Timestamp","fields":[
+ {"name":"seconds","type":"long"}
+ ]}}
+ ]}"#
+ .to_string(),
+ );
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(Cursor::new(bytes))
+ .expect("reader should build");
+ let batch = reader
+ .next()
+ .expect("should have a batch")
+ .expect("Skipper must consume both seconds and nanos for
extra.time");
+ assert_eq!(batch.num_rows(), 2);
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+ let seconds = ts
+ .column_by_name("seconds")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(seconds.value(0), 1000);
+ assert_eq!(seconds.value(1), 2000);
+ }
+
+ // The Skipper for a skipped array field must consume all bytes of each
element,
+ // including every field of a nested struct resolved via a TypeName::Ref.
+ //
+ // Writer: `Root { ts: Timestamp{seconds,nanos}, events:
array<Event{time:"Timestamp"}> }`
+ // Reader: only `ts` with nullable wrappers; `events` is absent (forces a
Skip).
+ // The Skipper for `events` resolves each element's `time` field as
"Timestamp"
+ // and must use the writer's plain {seconds,nanos} definition — not the
+ // nullable-wrapped reader type — when consuming bytes.
+ #[test]
+ fn test_skip_array_of_structs_uses_writer_schema_not_resolved() {
+ use apache_avro::{Schema as ApacheSchema, Writer as ApacheWriter,
types::Value};
+ let schema_json = r#"{
+ "type": "record", "name": "Root",
+ "fields": [
+ {"name": "ts", "type": {"type": "record", "name": "Timestamp",
"fields": [
+ {"name": "seconds", "type": "long"},
+ {"name": "nanos", "type": "int"}
+ ]}},
+ {"name": "events", "type": {"type": "array", "items": {
+ "type": "record", "name": "Event", "fields": [
+ {"name": "time", "type": "Timestamp"}
+ ]
+ }}}
+ ]
+ }"#;
+ let schema = ApacheSchema::parse_str(schema_json).expect("valid
schema");
+ let mut bytes = Vec::new();
+ {
+ let mut writer = ApacheWriter::new(&schema, &mut bytes);
+ // One row: ts={100, 5}, events=[{time={200, 1}}]
+ let ts_val = |s: i64, n: i32| {
+ Value::Record(vec![
+ ("seconds".into(), Value::Long(s)),
+ ("nanos".into(), Value::Int(n)),
+ ])
+ };
+ let row = Value::Record(vec![
+ ("ts".into(), ts_val(100, 5)),
+ (
+ "events".into(),
+ Value::Array(vec![Value::Record(vec![("time".into(),
ts_val(200, 1))])]),
+ ),
+ ]);
+ writer.append_value_ref(&row).expect("append row");
+ writer.flush().expect("flush");
+ }
+
+ // Reader omits `events` (forces Skip) and wraps `ts` fields in
nullable unions.
+ let reader_schema = AvroSchema::new(
+ r#"{"type":"record","name":"Root","fields":[
+
{"name":"ts","type":["null",{"type":"record","name":"Timestamp","fields":[
+ {"name":"seconds","type":["null","long"]},
+ {"name":"nanos", "type":["null","int"]}
+ ]}]}
+ ]}"#
+ .to_string(),
+ );
+ let mut reader = ReaderBuilder::new()
+ .with_reader_schema(reader_schema)
+ .build(Cursor::new(bytes))
+ .expect("reader should build");
+ let batch = reader
+ .next()
+ .expect("should have a batch")
+ .expect("Skipper must consume all events bytes using writer field
types");
+ assert_eq!(batch.num_rows(), 1);
+ let ts = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+ let seconds = ts
+ .column_by_name("seconds")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int64Array>()
+ .unwrap();
+ assert_eq!(seconds.value(0), 100);
+ }
}
diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs
index 97cdeed20f..b71a6bdc7c 100644
--- a/arrow-avro/src/reader/record.rs
+++ b/arrow-avro/src/reader/record.rs
@@ -2458,7 +2458,7 @@ impl<'a> ProjectorBuilder<'a> {
.writer_fields
.iter()
.map(|field| match field {
- ResolvedField::ToReader(index) =>
Ok(FieldProjection::ToReader(*index)),
+ ResolvedField::ToReader(index, _) =>
Ok(FieldProjection::ToReader(*index)),
ResolvedField::Skip(datatype) => {
let skipper = Skipper::from_avro(datatype)?;
Ok(FieldProjection::Skip(skipper))
@@ -2568,12 +2568,27 @@ impl Skipper {
Codec::Uuid => Self::UuidString, // encoded as string
Codec::Enum(_) => Self::Enum,
Codec::List(item) =>
Self::List(Box::new(Skipper::from_avro(item)?)),
- Codec::Struct(fields) => Self::Struct(
- fields
- .iter()
- .map(|f| Skipper::from_avro(f.data_type()))
- .collect::<Result<_, _>>()?,
- ),
+ Codec::Struct(fields) => {
+ if let Some(ResolutionInfo::Record(rec)) =
dt.resolution.as_ref() {
+ Self::Struct(
+ rec.writer_fields
+ .iter()
+ .map(|wf| match wf {
+ ResolvedField::ToReader(_, wdt) |
ResolvedField::Skip(wdt) => {
+ Skipper::from_avro(wdt)
+ }
+ })
+ .collect::<Result<_, _>>()?,
+ )
+ } else {
+ Self::Struct(
+ fields
+ .iter()
+ .map(|f| Skipper::from_avro(f.data_type()))
+ .collect::<Result<_, _>>()?,
+ )
+ }
+ }
Codec::Map(values) =>
Self::Map(Box::new(Skipper::from_avro(values)?)),
Codec::Interval => Self::DurationFixed12,
Codec::Union(encodings, _, _) => {