This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 18be750f18 Follow-up Improvements to Avro union handling  (#8385)
18be750f18 is described below

commit 18be750f18b784e7c3c6716dd65bae83146d379a
Author: Connor Sanders <[email protected]>
AuthorDate: Fri Sep 19 14:05:32 2025 -0500

    Follow-up Improvements to Avro union handling  (#8385)
    
    # Which issue does this PR close?
    
    This work continues arrow-avro schema resolution support and aligns
    behavior with the Avro spec.
    
    - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
    the reader/decoder, including schema resolution and type promotion.
    - **Follow-ups/Context**: #8348 (Add arrow-avro Reader support for Dense
    Union and Union resolution (Part 1))
    
    # Rationale for this change
    
    @scovich left a really solid
    
[review](https://github.com/apache/arrow-rs/pull/8348#pullrequestreview-3237862269)
    on #8348 that wasn't completed until after the PR was merged in. This PR
    is for addressing the suggestions and improving the code.
    
    # What changes are included in this PR?
    
    * Code quality improvements to `codec.rs`
    * Improvements to `schema.rs` including spec compliant named type
    errors.
    
    # Are these changes tested?
    
    1. No functionality was added / modified in `codec.rs` and all existing
    tests are passing without changes.
    2. Two new unit tests were added to `schema.rs` to cover the spec
    compliant named type changes.
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Ryan Johnson <[email protected]>
---
 arrow-avro/src/codec.rs  | 157 +++++++++++++++++++++--------------------------
 arrow-avro/src/schema.rs |  41 +++++++++----
 2 files changed, 99 insertions(+), 99 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index b3c8da2b5e..64fc0488e3 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -424,12 +424,12 @@ impl AvroDataType {
                 }
             },
             Codec::Union(encodings, _, _) => {
-                if encodings.is_empty() {
+                let Some(default_encoding) = encodings.first() else {
                     return Err(ArrowError::SchemaError(
                         "Union with no branches cannot have a 
default".to_string(),
                     ));
-                }
-                encodings[0].parse_default_literal(default_json)?
+                };
+                default_encoding.parse_default_literal(default_json)?
             }
         };
         Ok(lit)
@@ -1029,53 +1029,35 @@ enum UnionBranchKey {
 }
 
 fn branch_key_of<'a>(s: &Schema<'a>, enclosing_ns: Option<&'a str>) -> 
Option<UnionBranchKey> {
-    match s {
-        // Primitives
-        Schema::TypeName(TypeName::Primitive(p)) => 
Some(UnionBranchKey::Primitive(*p)),
-        Schema::Type(Type {
+    let (name, namespace) = match s {
+        Schema::TypeName(TypeName::Primitive(p))
+        | Schema::Type(Type {
             r#type: TypeName::Primitive(p),
             ..
-        }) => Some(UnionBranchKey::Primitive(*p)),
-        // Named references
-        Schema::TypeName(TypeName::Ref(name)) => {
-            let (full, _) = make_full_name(name, None, enclosing_ns);
-            Some(UnionBranchKey::Named(full))
-        }
-        Schema::Type(Type {
+        }) => return Some(UnionBranchKey::Primitive(*p)),
+        Schema::TypeName(TypeName::Ref(name))
+        | Schema::Type(Type {
             r#type: TypeName::Ref(name),
             ..
-        }) => {
-            let (full, _) = make_full_name(name, None, enclosing_ns);
-            Some(UnionBranchKey::Named(full))
-        }
-        // Complex non‑named
-        Schema::Complex(ComplexType::Array(_)) => Some(UnionBranchKey::Array),
-        Schema::Complex(ComplexType::Map(_)) => Some(UnionBranchKey::Map),
-        // Inline named definitions
-        Schema::Complex(ComplexType::Record(r)) => {
-            let (full, _) = make_full_name(r.name, r.namespace, enclosing_ns);
-            Some(UnionBranchKey::Named(full))
-        }
-        Schema::Complex(ComplexType::Enum(e)) => {
-            let (full, _) = make_full_name(e.name, e.namespace, enclosing_ns);
-            Some(UnionBranchKey::Named(full))
-        }
-        Schema::Complex(ComplexType::Fixed(f)) => {
-            let (full, _) = make_full_name(f.name, f.namespace, enclosing_ns);
-            Some(UnionBranchKey::Named(full))
-        }
-        // Unions are validated separately (and disallowed as immediate 
branches)
-        Schema::Union(_) => None,
-    }
+        }) => (name, None),
+        Schema::Complex(ComplexType::Array(_)) => return 
Some(UnionBranchKey::Array),
+        Schema::Complex(ComplexType::Map(_)) => return 
Some(UnionBranchKey::Map),
+        Schema::Complex(ComplexType::Record(r)) => (&r.name, r.namespace),
+        Schema::Complex(ComplexType::Enum(e)) => (&e.name, e.namespace),
+        Schema::Complex(ComplexType::Fixed(f)) => (&f.name, f.namespace),
+        Schema::Union(_) => return None,
+    };
+    let (full, _) = make_full_name(name, namespace, enclosing_ns);
+    Some(UnionBranchKey::Named(full))
 }
 
 fn union_first_duplicate<'a>(
     branches: &'a [Schema<'a>],
     enclosing_ns: Option<&'a str>,
 ) -> Option<String> {
-    let mut seen: HashSet<UnionBranchKey> = 
HashSet::with_capacity(branches.len());
-    for b in branches {
-        if let Some(key) = branch_key_of(b, enclosing_ns) {
+    let mut seen = HashSet::with_capacity(branches.len());
+    for schema in branches {
+        if let Some(key) = branch_key_of(schema, enclosing_ns) {
             if !seen.insert(key.clone()) {
                 let msg = match key {
                     UnionBranchKey::Named(full) => format!("named type 
{full}"),
@@ -1346,31 +1328,29 @@ impl<'a> Maker<'a> {
         }
         match (writer_schema, reader_schema) {
             (Schema::Union(writer_variants), Schema::Union(reader_variants)) 
=> {
+                let writer_variants = writer_variants.as_slice();
+                let reader_variants = reader_variants.as_slice();
                 match (
-                    nullable_union_variants(writer_variants.as_slice()),
-                    nullable_union_variants(reader_variants.as_slice()),
+                    nullable_union_variants(writer_variants),
+                    nullable_union_variants(reader_variants),
                 ) {
                     (Some((w_nb, w_nonnull)), Some((_r_nb, r_nonnull))) => {
                         let mut dt = self.make_data_type(w_nonnull, 
Some(r_nonnull), namespace)?;
                         dt.nullability = Some(w_nb);
                         Ok(dt)
                     }
-                    _ => self.resolve_unions(
-                        writer_variants.as_slice(),
-                        reader_variants.as_slice(),
-                        namespace,
-                    ),
+                    _ => self.resolve_unions(writer_variants, reader_variants, 
namespace),
                 }
             }
             (Schema::Union(writer_variants), reader_non_union) => {
-                let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
-                    Vec::with_capacity(writer_variants.len());
-                for writer in writer_variants {
-                    match self.resolve_type(writer, reader_non_union, 
namespace) {
-                        Ok(tmp) => writer_to_reader.push(Some((0usize, 
Self::coercion_from(&tmp)))),
-                        Err(_) => writer_to_reader.push(None),
-                    }
-                }
+                let writer_to_reader: Vec<Option<(usize, Promotion)>> = 
writer_variants
+                    .iter()
+                    .map(|writer| {
+                        self.resolve_type(writer, reader_non_union, namespace)
+                            .ok()
+                            .map(|tmp| (0usize, Self::coercion_from(&tmp)))
+                    })
+                    .collect();
                 let mut dt = self.parse_type(reader_non_union, namespace)?;
                 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
                     writer_to_reader: Arc::from(writer_to_reader),
@@ -1380,24 +1360,16 @@ impl<'a> Maker<'a> {
                 Ok(dt)
             }
             (writer_non_union, Schema::Union(reader_variants)) => {
-                let mut direct: Option<(usize, Promotion)> = None;
-                let mut promo: Option<(usize, Promotion)> = None;
-                for (reader_index, reader) in 
reader_variants.iter().enumerate() {
-                    if let Ok(tmp) = self.resolve_type(writer_non_union, 
reader, namespace) {
-                        let how = Self::coercion_from(&tmp);
-                        if how == Promotion::Direct {
-                            direct = Some((reader_index, how));
-                            break; // first exact match wins
-                        } else if promo.is_none() {
-                            promo = Some((reader_index, how));
-                        }
-                    }
-                }
-                let (reader_index, promotion) = direct.or(promo).ok_or_else(|| 
{
-                    ArrowError::SchemaError(
+                let promo = self.find_best_promotion(
+                    writer_non_union,
+                    reader_variants.as_slice(),
+                    namespace,
+                );
+                let Some((reader_index, promotion)) = promo else {
+                    return Err(ArrowError::SchemaError(
                         "Writer schema does not match any reader union 
branch".to_string(),
-                    )
-                })?;
+                    ));
+                };
                 let mut dt = self.parse_type(reader_schema, namespace)?;
                 dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
                     writer_to_reader: Arc::from(vec![Some((reader_index, 
promotion))]),
@@ -1442,6 +1414,28 @@ impl<'a> Maker<'a> {
         }
     }
 
+    fn find_best_promotion(
+        &mut self,
+        writer: &Schema<'a>,
+        reader_variants: &[Schema<'a>],
+        namespace: Option<&'a str>,
+    ) -> Option<(usize, Promotion)> {
+        let mut first_promotion: Option<(usize, Promotion)> = None;
+        for (reader_index, reader) in reader_variants.iter().enumerate() {
+            if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
+                let promotion = Self::coercion_from(&tmp);
+                if promotion == Promotion::Direct {
+                    // An exact match is best, return immediately.
+                    return Some((reader_index, promotion));
+                } else if first_promotion.is_none() {
+                    // Store the first valid promotion but keep searching for 
a direct match.
+                    first_promotion = Some((reader_index, promotion));
+                }
+            }
+        }
+        first_promotion
+    }
+
     fn resolve_unions<'s>(
         &mut self,
         writer_variants: &'s [Schema<'a>],
@@ -1455,20 +1449,7 @@ impl<'a> Maker<'a> {
         let mut writer_to_reader: Vec<Option<(usize, Promotion)>> =
             Vec::with_capacity(writer_variants.len());
         for writer in writer_variants {
-            let mut direct: Option<(usize, Promotion)> = None;
-            let mut promo: Option<(usize, Promotion)> = None;
-            for (reader_index, reader) in reader_variants.iter().enumerate() {
-                if let Ok(tmp) = self.resolve_type(writer, reader, namespace) {
-                    let promotion = Self::coercion_from(&tmp);
-                    if promotion == Promotion::Direct {
-                        direct = Some((reader_index, promotion));
-                        break;
-                    } else if promo.is_none() {
-                        promo = Some((reader_index, promotion));
-                    }
-                }
-            }
-            writer_to_reader.push(direct.or(promo));
+            writer_to_reader.push(self.find_best_promotion(writer, 
reader_variants, namespace));
         }
         let union_fields = build_union_fields(&reader_encodings);
         let mut dt = AvroDataType::new(
@@ -1860,11 +1841,11 @@ mod tests {
     fn mk_primitive(pt: PrimitiveType) -> Schema<'static> {
         Schema::TypeName(TypeName::Primitive(pt))
     }
-    fn mk_union(branches: Vec<Schema<'static>>) -> Schema<'static> {
+    fn mk_union(branches: Vec<Schema<'_>>) -> Schema<'_> {
         Schema::Union(branches)
     }
 
-    fn mk_record_named(name: &'static str) -> Schema<'static> {
+    fn mk_record_name(name: &str) -> Schema<'_> {
         Schema::Complex(ComplexType::Record(Record {
             name,
             namespace: None,
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 6c501a56ab..1df012f292 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -984,17 +984,10 @@ fn wrap_nullable(inner: Value, null_order: Nullability) 
-> Value {
         Value::Array(mut union) => {
             union.retain(|v| !is_avro_json_null(v));
             match null_order {
-                Nullability::NullFirst => {
-                    let mut out = Vec::with_capacity(union.len() + 1);
-                    out.push(null);
-                    out.extend(union);
-                    Value::Array(out)
-                }
-                Nullability::NullSecond => {
-                    union.push(null);
-                    Value::Array(union)
-                }
+                Nullability::NullFirst => union.insert(0, null),
+                Nullability::NullSecond => union.push(null),
             }
+            Value::Array(union)
         }
         other => match null_order {
             Nullability::NullFirst => Value::Array(vec![null, other]),
@@ -1012,7 +1005,11 @@ fn union_branch_signature(branch: &Value) -> 
Result<String, ArrowError> {
             })?;
             match t {
                 "record" | "enum" | "fixed" => {
-                    let name = map.get("name").and_then(|v| 
v.as_str()).unwrap_or_default();
+                    let name = map.get("name").and_then(|v| 
v.as_str()).ok_or_else(|| {
+                        ArrowError::SchemaError(format!(
+                            "Union branch '{t}' missing required 'name'"
+                        ))
+                    })?;
                     Ok(format!("N:{t}:{name}"))
                 }
                 "array" | "map" => Ok(format!("C:{t}")),
@@ -2304,4 +2301,26 @@ mod tests {
         let b = AvroSchema::from_arrow_with_options(&arrow_schema, None);
         assert_eq!(a, b.unwrap().json_string);
     }
+
+    #[test]
+    fn test_union_branch_missing_name_errors() {
+        for t in ["record", "enum", "fixed"] {
+            let branch = json!({ "type": t });
+            let err = union_branch_signature(&branch).unwrap_err().to_string();
+            assert!(
+                err.contains(&format!("Union branch '{t}' missing required 
'name'")),
+                "expected missing-name error for {t}, got: {err}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_union_branch_named_type_signature_includes_name() {
+        let rec = json!({ "type": "record", "name": "Foo" });
+        assert_eq!(union_branch_signature(&rec).unwrap(), "N:record:Foo");
+        let en = json!({ "type": "enum", "name": "Color", "symbols": ["R", 
"G", "B"] });
+        assert_eq!(union_branch_signature(&en).unwrap(), "N:enum:Color");
+        let fx = json!({ "type": "fixed", "name": "Bytes16", "size": 16 });
+        assert_eq!(union_branch_signature(&fx).unwrap(), "N:fixed:Bytes16");
+    }
 }

Reply via email to