This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 18be750f18 Follow-up Improvements to Avro union handling (#8385)
18be750f18 is described below
commit 18be750f18b784e7c3c6716dd65bae83146d379a
Author: Connor Sanders <[email protected]>
AuthorDate: Fri Sep 19 14:05:32 2025 -0500
Follow-up Improvements to Avro union handling (#8385)
# Which issue does this PR close?
This work continues arrow-avro schema resolution support and aligns
behavior with the Avro spec.
- **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
the reader/decoder, including schema resolution and type promotion.
- **Follow-ups/Context**: #8348 (Add arrow-avro Reader support for Dense
Union and Union resolution (Part 1))
# Rationale for this change
@scovich left a really solid
[review](https://github.com/apache/arrow-rs/pull/8348#pullrequestreview-3237862269)
on #8348 that wasn't completed until after the PR was merged in. This PR
is for addressing the suggestions and improving the code.
# What changes are included in this PR?
* Code quality improvements to `codec.rs`
* Improvements to `schema.rs` including spec compliant named type
errors.
# Are these changes tested?
1. No functionality was added / modified in `codec.rs` and all existing
tests are passing without changes.
2. Two new unit tests were added to `schema.rs` to cover the spec
compliant named type changes.
# Are there any user-facing changes?
N/A
---------
Co-authored-by: Ryan Johnson <[email protected]>
---
arrow-avro/src/codec.rs | 157 +++++++++++++++++++++--------------------------
arrow-avro/src/schema.rs | 41 +++++++++----
2 files changed, 99 insertions(+), 99 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index b3c8da2b5e..64fc0488e3 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -424,12 +424,12 @@ impl AvroDataType {
}
},
Codec::Union(encodings, _, _) => {
- if encodings.is_empty() {
+ let Some(default_encoding) = encodings.first() else {
return Err(ArrowError::SchemaError(
"Union with no branches cannot have a
default".to_string(),
));
- }
- encodings[0].parse_default_literal(default_json)?
+ };
+ default_encoding.parse_default_literal(default_json)?
}
};
Ok(lit)
@@ -1029,53 +1029,35 @@ enum UnionBranchKey {
}
fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) ->
Option<UnionBranchKey> {
- match s {
- // Primitives
- Schema::TypeName(TypeName::Primitive(p)) =>
Some(UnionBranchKey::Primitive(*p)),
- Schema::Type(Type {
+ let (name, namespace) = match s {
+ Schema::TypeName(TypeName::Primitive(p))
+ | Schema::Type(Type {
r#type: TypeName::Primitive(p),
..
- }) => Some(UnionBranchKey::Primitive(*p)),
- // Named references
- Schema::TypeName(TypeName::Ref(name)) => {
- let (full, _) = make_full_name(name, None, enclosing_ns);
- Some(UnionBranchKey::Named(full))
- }
- Schema::Type(Type {
+ }) => return Some(UnionBranchKey::Primitive(*p)),
+ Schema::TypeName(TypeName::Ref(name))
+ | Schema::Type(Type {
r#type: TypeName::Ref(name),
..
- }) => {
- let (full, _) = make_full_name(name, None, enclosing_ns);
- Some(UnionBranchKey::Named(full))
- }
- // Complex non‑named
- Schema::Complex(ComplexType::Array(_)) => Some(UnionBranchKey::Array),
- Schema::Complex(ComplexType::Map(_)) => Some(UnionBranchKey::Map),
- // Inline named definitions
- Schema::Complex(ComplexType::Record(r)) => {
- let (full, _) = make_full_name(r.name, r.namespace, enclosing_ns);
- Some(UnionBranchKey::Named(full))
- }
- Schema::Complex(ComplexType::Enum(e)) => {
- let (full, _) = make_full_name(e.name, e.namespace, enclosing_ns);
- Some(UnionBranchKey::Named(full))
- }
- Schema::Complex(ComplexType::Fixed(f)) => {
- let (full, _) = make_full_name(f.name, f.namespace, enclosing_ns);
- Some(UnionBranchKey::Named(full))
- }
- // Unions are validated separately (and disallowed as immediate
branches)
- Schema::Union(_) => None,
- }
+ }) => (name, None),
+ Schema::Complex(ComplexType::Array(_)) => return
Some(UnionBranchKey::Array),
+ Schema::Complex(ComplexType::Map(_)) => return
Some(UnionBranchKey::Map),
+ Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
+ Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
+ Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
+ Schema::Union(_) => return None,
+ };
+ let (full, _) = make_full_name(name, namespace, enclosing_ns);
+ Some(UnionBranchKey::Named(full))
}
fn union_first_duplicate<'a>(
branches: &'a [Schema<'a>],
enclosing_ns: Option<&'a str>,
) -> Option<String> {
- let mut seen: HashSet<UnionBranchKey> =
HashSet::with_capacity(branches.len());
- for b in branches {
- if let Some(key) = branch_key_of(b, enclosing_ns) {
+ let mut seen = HashSet::with_capacity(branches.len());
+ for schema in branches {
+ if let Some(key) = branch_key_of(schema, enclosing_ns) {
if !seen.insert(key.clone()) {
let msg = match key {
UnionBranchKey::Named(full) => format!("named type
{full}"),
@@ -1346,31 +1328,29 @@ impl<'a> Maker<'a> {
}
match (writer_schema, reader_schema) {
(Schema::Union(writer_variants), Schema::Union(reader_variants))
=> {
+ let writer_variants = writer_variants.as_slice();
+ let reader_variants = reader_variants.as_slice();
match (
- nullable_union_variants(writer_variants.as_slice()),
- nullable_union_variants(reader_variants.as_slice()),
+ nullable_union_variants(writer_variants),
+ nullable_union_variants(reader_variants),
) {
(Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
let mut dt = self.make_data_type(w_nonnull,
Some(r_nonnull), namespace)?;
dt.nullability = Some(w_nb);
Ok(dt)
}
- _ => self.resolve_unions(
- writer_variants.as_slice(),
- reader_variants.as_slice(),
- namespace,
- ),
+ _ => self.resolve_unions(writer_variants, reader_variants,
namespace),
}
}
(Schema::Union(writer_variants), reader_non_union) => {
- let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
- Vec::with_capacity(writer_variants.len());
- for writer in writer_variants {
- match self.resolve_type(writer, reader_non_union,
namespace) {
- Ok(tmp) => writer_to_reader.push(Some((0usize,
Self::coercion_from(&tmp)))),
- Err(_) => writer_to_reader.push(None),
- }
- }
+ let writer_to_reader: Vec<Option<(usize, Promotion)>> =
writer_variants
+ .iter()
+ .map(|writer| {
+ self.resolve_type(writer, reader_non_union, namespace)
+ .ok()
+ .map(|tmp| (0usize, Self::coercion_from(&tmp)))
+ })
+ .collect();
let mut dt = self.parse_type(reader_non_union, namespace)?;
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
writer_to_reader: Arc::from(writer_to_reader),
@@ -1380,24 +1360,16 @@ impl<'a> Maker<'a> {
Ok(dt)
}
(writer_non_union, Schema::Union(reader_variants)) => {
- let mut direct: Option<(usize, Promotion)> = None;
- let mut promo: Option<(usize, Promotion)> = None;
- for (reader_index, reader) in
reader_variants.iter().enumerate() {
- if let Ok(tmp) = self.resolve_type(writer_non_union,
reader, namespace) {
- let how = Self::coercion_from(&tmp);
- if how == Promotion::Direct {
- direct = Some((reader_index, how));
- break; // first exact match wins
- } else if promo.is_none() {
- promo = Some((reader_index, how));
- }
- }
- }
- let (reader_index, promotion) = direct.or(promo).ok_or_else(||
{
- ArrowError::SchemaError(
+ let promo = self.find_best_promotion(
+ writer_non_union,
+ reader_variants.as_slice(),
+ namespace,
+ );
+ let Some((reader_index, promotion)) = promo else {
+ return Err(ArrowError::SchemaError(
"Writer schema does not match any reader union
branch".to_string(),
- )
- })?;
+ ));
+ };
let mut dt = self.parse_type(reader_schema, namespace)?;
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
writer_to_reader: Arc::from(vec![Some((reader_index,
promotion))]),
@@ -1442,6 +1414,28 @@ impl<'a> Maker<'a> {
}
}
+ fn find_best_promotion(
+ &mut self,
+ writer: &Schema<'a>,
+ reader_variants: &[Schema<'a>],
+ namespace: Option<&'a str>,
+ ) -> Option<(usize, Promotion)> {
+ let mut first_promotion: Option<(usize, Promotion)> = None;
+ for (reader_index, reader) in reader_variants.iter().enumerate() {
+ if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
+ let promotion = Self::coercion_from(&tmp);
+ if promotion == Promotion::Direct {
+ // An exact match is best, return immediately.
+ return Some((reader_index, promotion));
+ } else if first_promotion.is_none() {
+ // Store the first valid promotion but keep searching for
a direct match.
+ first_promotion = Some((reader_index, promotion));
+ }
+ }
+ }
+ first_promotion
+ }
+
fn resolve_unions<'s>(
&mut self,
writer_variants: &'s [Schema<'a>],
@@ -1455,20 +1449,7 @@ impl<'a> Maker<'a> {
let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
Vec::with_capacity(writer_variants.len());
for writer in writer_variants {
- let mut direct: Option<(usize, Promotion)> = None;
- let mut promo: Option<(usize, Promotion)> = None;
- for (reader_index, reader) in reader_variants.iter().enumerate() {
- if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
- let promotion = Self::coercion_from(&tmp);
- if promotion == Promotion::Direct {
- direct = Some((reader_index, promotion));
- break;
- } else if promo.is_none() {
- promo = Some((reader_index, promotion));
- }
- }
- }
- writer_to_reader.push(direct.or(promo));
+ writer_to_reader.push(self.find_best_promotion(writer,
reader_variants, namespace));
}
let union_fields = build_union_fields(&reader_encodings);
let mut dt = AvroDataType::new(
@@ -1860,11 +1841,11 @@ mod tests {
fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
Schema::TypeName(TypeName::Primitive(pt))
}
- fn mk_union(branches: Vec<Schema<'static>>) -> Schema<'static> {
+ fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
Schema::Union(branches)
}
- fn mk_record_named(name: &'static str) -> Schema<'static> {
+ fn mk_record_name(name: &str) -> Schema<'_> {
Schema::Complex(ComplexType::Record(Record {
name,
namespace: None,
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 6c501a56ab..1df012f292 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -984,17 +984,10 @@ fn wrap_nullable(inner: Value, null_order: Nullability)
-> Value {
Value::Array(mut union) => {
union.retain(|v| !is_avro_json_null(v));
match null_order {
- Nullability::NullFirst => {
- let mut out = Vec::with_capacity(union.len() + 1);
- out.push(null);
- out.extend(union);
- Value::Array(out)
- }
- Nullability::NullSecond => {
- union.push(null);
- Value::Array(union)
- }
+ Nullability::NullFirst => union.insert(0, null),
+ Nullability::NullSecond => union.push(null),
}
+ Value::Array(union)
}
other => match null_order {
Nullability::NullFirst => Value::Array(vec![null, other]),
@@ -1012,7 +1005,11 @@ fn union_branch_signature(branch: &Value) ->
Result<String, ArrowError> {
})?;
match t {
"record" | "enum" | "fixed" => {
- let name = map.get("name").and_then(|v|
v.as_str()).unwrap_or_default();
+ let name = map.get("name").and_then(|v|
v.as_str()).ok_or_else(|| {
+ ArrowError::SchemaError(format!(
+ "Union branch '{t}' missing required 'name'"
+ ))
+ })?;
Ok(format!("N:{t}:{name}"))
}
"array" | "map" => Ok(format!("C:{t}")),
@@ -2304,4 +2301,26 @@ mod tests {
let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
assert_eq!(a, b.unwrap().json_string);
}
+
+ #[test]
+ fn test_union_branch_missing_name_errors() {
+ for t in ["record", "enum", "fixed"] {
+ let branch = json!({ "type": t });
+ let err = union_branch_signature(&branch).unwrap_err().to_string();
+ assert!(
+ err.contains(&format!("Union branch '{t}' missing required
'name'")),
+ "expected missing-name error for {t}, got: {err}"
+ );
+ }
+ }
+
+ #[test]
+ fn test_union_branch_named_type_signature_includes_name() {
+ let rec = json!({ "type": "record", "name": "Foo" });
+ assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
+ let en = json!({ "type": "enum", "name": "Color", "symbols": ["R",
"G", "B"] });
+ assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
+ let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
+ assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
+ }
}