This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 567f4415dc Add array/map/fixed schema resolution and default value 
support to arrow-avro codec (#8292)
567f4415dc is described below

commit 567f4415dc574d5934cfc8913560e7ff14fb6a38
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Sep 11 10:56:22 2025 -0500

    Add array/map/fixed schema resolution and default value support to 
arrow-avro codec (#8292)
    
    # Which issue does this PR close?
    
    This work continues arrow-avro schema resolution support and aligns
    behavior with the Avro spec.
    
    - **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
    the reader/decoder, including schema resolution and type promotion.
    - **Follow-ups/Context**: #8124 (schema resolution & type promotion for
    the decoder), #8223 (enum mapping for schema resolution). These previous
    efforts established the foundations that this PR extends to default
    values and additional resolvable types.
    
    # Rationale for this change
    
    Avro’s **schema resolution** requires readers to reconcile differences
    between the writer and reader schemas, including:
    - Using record-field **default values** when the writer lacks a field
    present in the reader; defaults must be type-correct (i.e., union
    defaults match the first union member; bytes/fixed defaults are JSON
    strings).
    - Recursively resolving **arrays** (by item schema) and **maps** (by
    value schema).
    - Resolving **fixed** types (size and unqualified name must match) and
    erroring when they do not.
    
    Prior to this change, arrow-avro’s resolution handled some cases but
    lacked full Codec support for **default values** and for resolving
    **array/map/fixed** shapes between writer and reader. This led to gaps
    when reading evolved data or datasets produced by heterogeneous systems.
    This PR implements these missing pieces so the Arrow reader behaves per
    the spec in common evolution scenarios.
    
    # What changes are included in this PR?
    
    This PR modifies **`arrow-avro/src/codec.rs`** to extend the
    schema-resolution path
    
    - **Default value handling** for record fields
    - Reads and applies default values when the reader expects a field
    absent from the writer, including **nested defaults**.
    - Validates defaults per the Avro spec (e.g., union defaults match the
    first schema; bytes/fixed defaults are JSON strings).
    
    - **Array / Map / Fixed schema resolution**
      - **Array**: recursively resolves item schemas (writer↔reader).
      - **Map**: recursively resolves value schemas.
    - **Fixed**: enforces matching size and (unqualified) name; otherwise
    signals an error, consistent with the spec.
    
    - **Codec updates**
    - Refactors internal codec logic to support the above during decoding,
    including resolution for **record fields** and **nested defaults**. (See
    commit message for the high-level summary.)
    
    # Are these changes tested?
    
    **Yes.** This PR includes new unit tests in `arrow-avro/src/codec.rs`
    covering:
    
    1) **Default validation & persistence**
    - `Null`/union‑nullability rules; metadata persistence of defaults
    (`AVRO_FIELD_DEFAULT_METADATA_KEY`).
    2) **`AvroLiteral` Parsing**
    - Range checks for `i32`/`f32`; correct literals for `i64`/`f64`;
    `Utf8`/`Utf8View`; `uuid` strings (RFC‑4122).
    - Byte‑range mapping for `bytes`/`fixed` defaults; `Fixed(n)` length
    enforcement; `decimal` on `fixed` vs `bytes`; `duration`/interval fixed
    **12**‑byte enforcement.
    3) **Collections & records**
    - Array/map defaults shape; enum symbol validity; record defaults for
    missing fields, required‑field errors, and honoring field‑level
    defaults; skip‑fields retained for writer‑only fields.
    4) **Resolution mechanics**
    - Element **promotion** (`int` to `long`) for arrays; **reader metadata
    precedence** for colliding attributes; `fixed` name/size match including
    **alias**.
    
    # Are there any user-facing changes?
    
    N/A
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-avro/src/codec.rs | 863 +++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 810 insertions(+), 53 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 0cac8c5786..3f94391c25 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -16,8 +16,9 @@
 // under the License.
 
 use crate::schema::{
-    Attributes, AvroSchema, ComplexType, Enum, Nullability, PrimitiveType, 
Record, Schema, Type,
-    TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
+    Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability, 
PrimitiveType,
+    Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
+    AVRO_FIELD_DEFAULT_METADATA_KEY,
 };
 use arrow_schema::{
     ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, 
DECIMAL128_MAX_PRECISION,
@@ -25,6 +26,8 @@ use arrow_schema::{
 };
 #[cfg(feature = "small_decimals")]
 use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
+use indexmap::IndexMap;
+use serde_json::Value;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -33,11 +36,11 @@ use std::sync::Arc;
 pub(crate) enum ResolutionInfo {
     /// Indicates that the writer's type should be promoted to the reader's 
type.
     Promotion(Promotion),
-    /// Indicates that a default value should be used for a field. 
(Implemented in a Follow-up PR)
+    /// Indicates that a default value should be used for a field.
     DefaultValue(AvroLiteral),
     /// Provides mapping information for resolving enums.
     EnumMapping(EnumMapping),
-    /// Provides resolution information for record fields. (Implemented in a 
Follow-up PR)
+    /// Provides resolution information for record fields.
     Record(ResolvedRecord),
 }
 
@@ -64,6 +67,10 @@ pub(crate) enum AvroLiteral {
     String(String),
     /// Represents an enum symbol.
     Enum(String),
+    /// Represents a JSON array default for an Avro array, containing element 
literals.
+    Array(Vec<AvroLiteral>),
+    /// Represents a JSON object default for an Avro map/struct, mapping 
string keys to value literals.
+    Map(IndexMap<String, AvroLiteral>),
     /// Represents an unsupported literal type.
     Unsupported,
 }
@@ -193,6 +200,225 @@ impl AvroDataType {
     pub fn nullability(&self) -> Option<Nullability> {
         self.nullability
     }
+
+    #[inline]
+    fn parse_default_literal(&self, default_json: &Value) -> 
Result<AvroLiteral, ArrowError> {
+        fn expect_string<'v>(
+            default_json: &'v Value,
+            data_type: &str,
+        ) -> Result<&'v str, ArrowError> {
+            match default_json {
+                Value::String(s) => Ok(s.as_str()),
+                _ => Err(ArrowError::SchemaError(format!(
+                    "Default value must be a JSON string for {data_type}"
+                ))),
+            }
+        }
+
+        fn parse_bytes_default(
+            default_json: &Value,
+            expected_len: Option<usize>,
+        ) -> Result<Vec<u8>, ArrowError> {
+            let s = expect_string(default_json, "bytes/fixed logical types")?;
+            let mut out = Vec::with_capacity(s.len());
+            for ch in s.chars() {
+                let cp = ch as u32;
+                if cp > 0xFF {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Invalid codepoint U+{cp:04X} in bytes/fixed default; 
must be ≤ 0xFF"
+                    )));
+                }
+                out.push(cp as u8);
+            }
+            if let Some(len) = expected_len {
+                if out.len() != len {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Default length {} does not match expected fixed size 
{len}",
+                        out.len(),
+                    )));
+                }
+            }
+            Ok(out)
+        }
+
+        fn parse_json_i64(default_json: &Value, data_type: &str) -> 
Result<i64, ArrowError> {
+            match default_json {
+                Value::Number(n) => n.as_i64().ok_or_else(|| {
+                    ArrowError::SchemaError(format!("Default {data_type} must 
be an integer"))
+                }),
+                _ => Err(ArrowError::SchemaError(format!(
+                    "Default {data_type} must be a JSON integer"
+                ))),
+            }
+        }
+
+        fn parse_json_f64(default_json: &Value, data_type: &str) -> 
Result<f64, ArrowError> {
+            match default_json {
+                Value::Number(n) => n.as_f64().ok_or_else(|| {
+                    ArrowError::SchemaError(format!("Default {data_type} must 
be a number"))
+                }),
+                _ => Err(ArrowError::SchemaError(format!(
+                    "Default {data_type} must be a JSON number"
+                ))),
+            }
+        }
+
+        // Handle JSON nulls per-spec: allowed only for `null` type or unions 
with null FIRST
+        if default_json.is_null() {
+            return match self.codec() {
+                Codec::Null => Ok(AvroLiteral::Null),
+                _ if self.nullability() == Some(Nullability::NullFirst) => 
Ok(AvroLiteral::Null),
+                _ => Err(ArrowError::SchemaError(
+                    "JSON null default is only valid for `null` type or for a 
union whose first branch is `null`"
+                        .to_string(),
+                )),
+            };
+        }
+        let lit = match self.codec() {
+            Codec::Null => {
+                return Err(ArrowError::SchemaError(
+                    "Default for `null` type must be JSON null".to_string(),
+                ))
+            }
+            Codec::Boolean => match default_json {
+                Value::Bool(b) => AvroLiteral::Boolean(*b),
+                _ => {
+                    return Err(ArrowError::SchemaError(
+                        "Boolean default must be a JSON boolean".to_string(),
+                    ))
+                }
+            },
+            Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
+                let i = parse_json_i64(default_json, "int")?;
+                if i < i32::MIN as i64 || i > i32::MAX as i64 {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Default int {i} out of i32 range"
+                    )));
+                }
+                AvroLiteral::Int(i as i32)
+            }
+            Codec::Int64
+            | Codec::TimeMicros
+            | Codec::TimestampMillis(_)
+            | Codec::TimestampMicros(_) => 
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
+            Codec::Float32 => {
+                let f = parse_json_f64(default_json, "float")?;
+                if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as 
f64 {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Default float {f} out of f32 range or not finite"
+                    )));
+                }
+                AvroLiteral::Float(f as f32)
+            }
+            Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, 
"double")?),
+            Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
+                AvroLiteral::String(expect_string(default_json, 
"string/uuid")?.to_string())
+            }
+            Codec::Binary => 
AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
+            Codec::Fixed(sz) => {
+                AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz 
as usize))?)
+            }
+            Codec::Decimal(_, _, fixed_size) => {
+                AvroLiteral::Bytes(parse_bytes_default(default_json, 
*fixed_size)?)
+            }
+            Codec::Enum(symbols) => {
+                let s = expect_string(default_json, "enum")?;
+                if symbols.iter().any(|sym| sym == s) {
+                    AvroLiteral::Enum(s.to_string())
+                } else {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Default enum symbol {s:?} not found in reader enum 
symbols"
+                    )));
+                }
+            }
+            Codec::Interval => 
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
+            Codec::List(item_dt) => match default_json {
+                Value::Array(items) => AvroLiteral::Array(
+                    items
+                        .iter()
+                        .map(|v| item_dt.parse_default_literal(v))
+                        .collect::<Result<_, _>>()?,
+                ),
+                _ => {
+                    return Err(ArrowError::SchemaError(
+                        "Default value must be a JSON array for Avro array 
type".to_string(),
+                    ))
+                }
+            },
+            Codec::Map(val_dt) => match default_json {
+                Value::Object(map) => {
+                    let mut out = IndexMap::with_capacity(map.len());
+                    for (k, v) in map {
+                        out.insert(k.clone(), 
val_dt.parse_default_literal(v)?);
+                    }
+                    AvroLiteral::Map(out)
+                }
+                _ => {
+                    return Err(ArrowError::SchemaError(
+                        "Default value must be a JSON object for Avro map 
type".to_string(),
+                    ))
+                }
+            },
+            Codec::Struct(fields) => match default_json {
+                Value::Object(obj) => {
+                    let mut out: IndexMap<String, AvroLiteral> =
+                        IndexMap::with_capacity(fields.len());
+                    for f in fields.as_ref() {
+                        let name = f.name().to_string();
+                        if let Some(sub) = obj.get(&name) {
+                            out.insert(name, 
f.data_type().parse_default_literal(sub)?);
+                        } else {
+                            // Cache metadata lookup once
+                            let stored_default =
+                                
f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
+                            if stored_default.is_none()
+                                && f.data_type().nullability() == 
Some(Nullability::default())
+                            {
+                                out.insert(name, AvroLiteral::Null);
+                            } else if let Some(default_json) = stored_default {
+                                let v: Value =
+                                    
serde_json::from_str(default_json).map_err(|e| {
+                                        ArrowError::SchemaError(format!(
+                                            "Failed to parse stored subfield 
default JSON for '{}': {e}",
+                                            f.name(),
+                                        ))
+                                    })?;
+                                out.insert(name, 
f.data_type().parse_default_literal(&v)?);
+                            } else {
+                                return Err(ArrowError::SchemaError(format!(
+                                    "Record default missing required subfield 
'{}' with non-nullable type {:?}",
+                                    f.name(),
+                                    f.data_type().codec()
+                                )));
+                            }
+                        }
+                    }
+                    AvroLiteral::Map(out)
+                }
+                _ => {
+                    return Err(ArrowError::SchemaError(
+                        "Default value for record/struct must be a JSON 
object".to_string(),
+                    ))
+                }
+            },
+        };
+        Ok(lit)
+    }
+
+    fn store_default(&mut self, default_json: &Value) -> Result<(), 
ArrowError> {
+        let json_text = serde_json::to_string(default_json).map_err(|e| {
+            ArrowError::ParseError(format!("Failed to serialize default to 
JSON: {e}"))
+        })?;
+        self.metadata
+            .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
+        Ok(())
+    }
+
+    fn parse_and_store_default(&mut self, default_json: &Value) -> 
Result<AvroLiteral, ArrowError> {
+        let lit = self.parse_default_literal(default_json)?;
+        self.store_default(default_json)?;
+        Ok(lit)
+    }
 }
 
 /// A named [`AvroDataType`]
@@ -625,7 +851,6 @@ impl<'a> Resolver<'a> {
         let (namespace, name) = name
             .rsplit_once('.')
             .unwrap_or_else(|| (namespace.unwrap_or(""), name));
-
         self.map
             .get(&(namespace, name))
             .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve 
{namespace}.{name}")))
@@ -924,6 +1149,18 @@ impl<'a> Maker<'a> {
             return self.resolve_primitives(write_primitive, read_primitive, 
reader_schema);
         }
         match (writer_schema, reader_schema) {
+            (
+                Schema::Complex(ComplexType::Array(writer_array)),
+                Schema::Complex(ComplexType::Array(reader_array)),
+            ) => self.resolve_array(writer_array, reader_array, namespace),
+            (
+                Schema::Complex(ComplexType::Map(writer_map)),
+                Schema::Complex(ComplexType::Map(reader_map)),
+            ) => self.resolve_map(writer_map, reader_map, namespace),
+            (
+                Schema::Complex(ComplexType::Fixed(writer_fixed)),
+                Schema::Complex(ComplexType::Fixed(reader_fixed)),
+            ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, 
namespace),
             (
                 Schema::Complex(ComplexType::Record(writer_record)),
                 Schema::Complex(ComplexType::Record(reader_record)),
@@ -940,20 +1177,71 @@ impl<'a> Maker<'a> {
                 ),
             (Schema::TypeName(TypeName::Ref(_)), _) => 
self.parse_type(reader_schema, namespace),
             (_, Schema::TypeName(TypeName::Ref(_))) => 
self.parse_type(reader_schema, namespace),
-            // if both sides are the same complex kind (non-record), adopt the 
reader type.
-            // This aligns with Avro spec: arrays, maps, and enums resolve 
recursively;
-            // for identical shapes we can just parse the reader schema.
-            (Schema::Complex(ComplexType::Array(_)), 
Schema::Complex(ComplexType::Array(_)))
-            | (Schema::Complex(ComplexType::Map(_)), 
Schema::Complex(ComplexType::Map(_)))
-            | (Schema::Complex(ComplexType::Fixed(_)), 
Schema::Complex(ComplexType::Fixed(_))) => {
-                self.parse_type(reader_schema, namespace)
-            }
             _ => Err(ArrowError::NotYetImplemented(
                 "Other resolutions not yet implemented".to_string(),
             )),
         }
     }
 
+    fn resolve_array(
+        &mut self,
+        writer_array: &Array<'a>,
+        reader_array: &Array<'a>,
+        namespace: Option<&'a str>,
+    ) -> Result<AvroDataType, ArrowError> {
+        Ok(AvroDataType {
+            nullability: None,
+            metadata: reader_array.attributes.field_metadata(),
+            codec: Codec::List(Arc::new(self.make_data_type(
+                writer_array.items.as_ref(),
+                Some(reader_array.items.as_ref()),
+                namespace,
+            )?)),
+            resolution: None,
+        })
+    }
+
+    fn resolve_map(
+        &mut self,
+        writer_map: &Map<'a>,
+        reader_map: &Map<'a>,
+        namespace: Option<&'a str>,
+    ) -> Result<AvroDataType, ArrowError> {
+        Ok(AvroDataType {
+            nullability: None,
+            metadata: reader_map.attributes.field_metadata(),
+            codec: Codec::Map(Arc::new(self.make_data_type(
+                &writer_map.values,
+                Some(&reader_map.values),
+                namespace,
+            )?)),
+            resolution: None,
+        })
+    }
+
+    fn resolve_fixed<'s>(
+        &mut self,
+        writer_fixed: &Fixed<'a>,
+        reader_fixed: &Fixed<'a>,
+        reader_schema: &'s Schema<'a>,
+        namespace: Option<&'a str>,
+    ) -> Result<AvroDataType, ArrowError> {
+        ensure_names_match(
+            "Fixed",
+            writer_fixed.name,
+            &writer_fixed.aliases,
+            reader_fixed.name,
+            &reader_fixed.aliases,
+        )?;
+        if writer_fixed.size != reader_fixed.size {
+            return Err(ArrowError::SchemaError(format!(
+                "Fixed size mismatch for {}: writer={}, reader={}",
+                reader_fixed.name, writer_fixed.size, reader_fixed.size
+            )));
+        }
+        self.parse_type(reader_schema, namespace)
+    }
+
     fn resolve_primitives(
         &mut self,
         write_primitive: PrimitiveType,
@@ -1135,52 +1423,85 @@ impl<'a> Maker<'a> {
         )?;
         let writer_ns = writer_record.namespace.or(namespace);
         let reader_ns = reader_record.namespace.or(namespace);
-        // Map writer field name -> index
-        let mut writer_index_map =
-            HashMap::<&str, usize>::with_capacity(writer_record.fields.len());
-        for (idx, write_field) in writer_record.fields.iter().enumerate() {
-            writer_index_map.insert(write_field.name, idx);
-        }
-        // Prepare outputs
-        let mut reader_fields: Vec<AvroField> = 
Vec::with_capacity(reader_record.fields.len());
+        let reader_md = reader_record.attributes.field_metadata();
+        let writer_index_map: HashMap<&str, usize> = writer_record
+            .fields
+            .iter()
+            .enumerate()
+            .map(|(idx, wf)| (wf.name, idx))
+            .collect();
         let mut writer_to_reader: Vec<Option<usize>> = vec![None; 
writer_record.fields.len()];
-        let mut skip_fields: Vec<Option<AvroDataType>> = vec![None; 
writer_record.fields.len()];
-        //let mut default_fields: Vec<usize> = Vec::new();
-        // Build reader fields and mapping
-        for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
-            if let Some(&writer_idx) = writer_index_map.get(r_field.name) {
-                // Field exists in a writer: resolve types (including 
promotions and union-of-null)
-                let w_schema = &writer_record.fields[writer_idx].r#type;
-                let resolved_dt =
-                    self.make_data_type(w_schema, Some(&r_field.r#type), 
reader_ns)?;
-                reader_fields.push(AvroField {
-                    name: r_field.name.to_string(),
-                    data_type: resolved_dt,
-                });
-                writer_to_reader[writer_idx] = Some(reader_idx);
-            } else {
-                return Err(ArrowError::NotYetImplemented(
-                    "New fields from reader with default values not yet 
implemented".to_string(),
-                ));
-            }
-        }
-        // Any writer fields not mapped should be skipped
-        for (writer_idx, writer_field) in 
writer_record.fields.iter().enumerate() {
-            if writer_to_reader[writer_idx].is_none() {
-                // Parse writer field type to know how to skip data
-                let writer_dt = self.parse_type(&writer_field.r#type, 
writer_ns)?;
-                skip_fields[writer_idx] = Some(writer_dt);
-            }
-        }
-        // Implement writer-only fields to skip in Follow-up PR here
-        // Build resolved record AvroDataType
+        let reader_fields: Vec<AvroField> = reader_record
+            .fields
+            .iter()
+            .enumerate()
+            .map(|(reader_idx, r_field)| -> Result<AvroField, ArrowError> {
+                if let Some(&writer_idx) = writer_index_map.get(r_field.name) {
+                    let w_schema = &writer_record.fields[writer_idx].r#type;
+                    let dt = self.make_data_type(w_schema, 
Some(&r_field.r#type), reader_ns)?;
+                    writer_to_reader[writer_idx] = Some(reader_idx);
+                    Ok(AvroField {
+                        name: r_field.name.to_string(),
+                        data_type: dt,
+                    })
+                } else {
+                    let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
+                    match r_field.default.as_ref() {
+                        Some(default_json) => {
+                            dt.resolution = Some(ResolutionInfo::DefaultValue(
+                                dt.parse_and_store_default(default_json)?,
+                            ));
+                        }
+                        None => {
+                            if dt.nullability() == 
Some(Nullability::NullFirst) {
+                                dt.resolution = 
Some(ResolutionInfo::DefaultValue(
+                                    dt.parse_and_store_default(&Value::Null)?,
+                                ));
+                            } else {
+                                return Err(ArrowError::SchemaError(format!(
+                                    "Reader field '{}' not present in writer 
schema must have a default value",
+                                    r_field.name
+                                )));
+                            }
+                        }
+                    }
+                    Ok(AvroField {
+                        name: r_field.name.to_string(),
+                        data_type: dt,
+                    })
+                }
+            })
+            .collect::<Result<_, _>>()?;
+        let default_fields: Vec<usize> = reader_fields
+            .iter()
+            .enumerate()
+            .filter_map(|(index, field)| {
+                matches!(
+                    field.data_type().resolution,
+                    Some(ResolutionInfo::DefaultValue(_))
+                )
+                .then_some(index)
+            })
+            .collect();
+        let skip_fields: Vec<Option<AvroDataType>> = writer_record
+            .fields
+            .iter()
+            .enumerate()
+            .map(|(writer_index, writer_field)| {
+                if writer_to_reader[writer_index].is_some() {
+                    Ok(None)
+                } else {
+                    self.parse_type(&writer_field.r#type, writer_ns).map(Some)
+                }
+            })
+            .collect::<Result<_, ArrowError>>()?;
         let resolved = AvroDataType::new_with_resolution(
             Codec::Struct(Arc::from(reader_fields)),
-            reader_record.attributes.field_metadata(),
+            reader_md,
             None,
             Some(ResolutionInfo::Record(ResolvedRecord {
                 writer_to_reader: Arc::from(writer_to_reader),
-                default_fields: Arc::default(),
+                default_fields: Arc::from(default_fields),
                 skip_fields: Arc::from(skip_fields),
             })),
         );
@@ -1712,4 +2033,440 @@ mod tests {
             panic!("Top-level schema is not a struct");
         }
     }
+
+    fn json_string(s: &str) -> Value {
+        Value::String(s.to_string())
+    }
+
+    fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
+        let stored = dt
+            .metadata
+            .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
+            .cloned()
+            .unwrap_or_default();
+        let expected = serde_json::to_string(default_json).unwrap();
+        assert_eq!(stored, expected, "stored default metadata should match");
+    }
+
+    #[test]
+    fn test_validate_and_store_default_null_and_nullability_rules() {
+        let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
+        let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
+        assert_eq!(lit, AvroLiteral::Null);
+        assert_default_stored(&dt_null, &Value::Null);
+        let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
+        let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("JSON null default is only valid for `null` type"),
+            "unexpected error: {err}"
+        );
+        let mut dt_int_nf =
+            AvroDataType::new(Codec::Int32, HashMap::new(), 
Some(Nullability::NullFirst));
+        let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
+        assert_eq!(lit2, AvroLiteral::Null);
+        assert_default_stored(&dt_int_nf, &Value::Null);
+        let mut dt_int_ns =
+            AvroDataType::new(Codec::Int32, HashMap::new(), 
Some(Nullability::NullSecond));
+        let err2 = 
dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
+        assert!(
+            err2.to_string()
+                .contains("JSON null default is only valid for `null` type"),
+            "unexpected error: {err2}"
+        );
+    }
+
+    #[test]
+    fn test_validate_and_store_default_primitives_and_temporal() {
+        let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), 
None);
+        let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
+        assert_eq!(lit, AvroLiteral::Boolean(true));
+        assert_default_stored(&dt_bool, &Value::Bool(true));
+        let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
+        let lit = dt_i32
+            .parse_and_store_default(&serde_json::json!(123))
+            .unwrap();
+        assert_eq!(lit, AvroLiteral::Int(123));
+        assert_default_stored(&dt_i32, &serde_json::json!(123));
+        let err = dt_i32
+            .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 
1))
+            .unwrap_err();
+        assert!(format!("{err}").contains("out of i32 range"));
+        let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
+        let lit = dt_i64
+            .parse_and_store_default(&serde_json::json!(1234567890))
+            .unwrap();
+        assert_eq!(lit, AvroLiteral::Long(1234567890));
+        assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
+        let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), 
None);
+        let lit = dt_f32
+            .parse_and_store_default(&serde_json::json!(1.25))
+            .unwrap();
+        assert_eq!(lit, AvroLiteral::Float(1.25));
+        assert_default_stored(&dt_f32, &serde_json::json!(1.25));
+        let err = dt_f32
+            .parse_and_store_default(&serde_json::json!(1e39))
+            .unwrap_err();
+        assert!(format!("{err}").contains("out of f32 range"));
+        let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), 
None);
+        let lit = dt_f64
+            .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
+            .unwrap();
+        assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
+        assert_default_stored(&dt_f64, 
&serde_json::json!(std::f64::consts::PI));
+        let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
+        let l = dt_str
+            .parse_and_store_default(&json_string("hello"))
+            .unwrap();
+        assert_eq!(l, AvroLiteral::String("hello".into()));
+        assert_default_stored(&dt_str, &json_string("hello"));
+        let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), 
None);
+        let l = dt_strv
+            .parse_and_store_default(&json_string("view"))
+            .unwrap();
+        assert_eq!(l, AvroLiteral::String("view".into()));
+        assert_default_stored(&dt_strv, &json_string("view"));
+        let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
+        let l = dt_uuid
+            
.parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
+            .unwrap();
+        assert_eq!(
+            l,
+            AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
+        );
+        let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), 
None);
+        let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
+        assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
+        let err = dt_bin
+            .parse_and_store_default(&json_string("€")) // U+20AC
+            .unwrap_err();
+        assert!(format!("{err}").contains("Invalid codepoint"));
+        let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), 
None);
+        let ld = dt_date
+            .parse_and_store_default(&serde_json::json!(1))
+            .unwrap();
+        assert_eq!(ld, AvroLiteral::Int(1));
+        let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, 
HashMap::new(), None);
+        let lt = dt_tmill
+            .parse_and_store_default(&serde_json::json!(86_400_000))
+            .unwrap();
+        assert_eq!(lt, AvroLiteral::Int(86_400_000));
+        let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, 
HashMap::new(), None);
+        let ltm = dt_tmicros
+            .parse_and_store_default(&serde_json::json!(1_000_000))
+            .unwrap();
+        assert_eq!(ltm, AvroLiteral::Long(1_000_000));
+        let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), 
HashMap::new(), None);
+        let l1 = dt_ts_milli
+            .parse_and_store_default(&serde_json::json!(123))
+            .unwrap();
+        assert_eq!(l1, AvroLiteral::Long(123));
+        let mut dt_ts_micro =
+            AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), 
None);
+        let l2 = dt_ts_micro
+            .parse_and_store_default(&serde_json::json!(456))
+            .unwrap();
+        assert_eq!(l2, AvroLiteral::Long(456));
+    }
+
+    #[test]
+    fn test_validate_and_store_default_fixed_decimal_interval() {
+        let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), 
None);
+        let l = dt_fixed
+            .parse_and_store_default(&json_string("WXYZ"))
+            .unwrap();
+        assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
+        let err = dt_fixed
+            .parse_and_store_default(&json_string("TOO LONG"))
+            .unwrap_err();
+        assert!(err.to_string().contains("Default length"));
+        let mut dt_dec_fixed =
+            AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), 
HashMap::new(), None);
+        let l = dt_dec_fixed
+            .parse_and_store_default(&json_string("abc"))
+            .unwrap();
+        assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
+        let err = dt_dec_fixed
+            .parse_and_store_default(&json_string("toolong"))
+            .unwrap_err();
+        assert!(err.to_string().contains("Default length"));
+        let mut dt_dec_bytes =
+            AvroDataType::new(Codec::Decimal(10, Some(2), None), 
HashMap::new(), None);
+        let l = dt_dec_bytes
+            .parse_and_store_default(&json_string("freeform"))
+            .unwrap();
+        assert_eq!(
+            l,
+            AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
+        );
+        let mut dt_interval = AvroDataType::new(Codec::Interval, 
HashMap::new(), None);
+        let l = dt_interval
+            .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
+            .unwrap();
+        assert_eq!(
+            l,
+            AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
+        );
+        let err = dt_interval
+            .parse_and_store_default(&json_string("short"))
+            .unwrap_err();
+        assert!(err.to_string().contains("Default length"));
+    }
+
+    #[test]
+    fn test_validate_and_store_default_enum_list_map_struct() {
+        let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), 
"BLUE".to_string()]
+            .into_iter()
+            .collect();
+        let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), 
HashMap::new(), None);
+        let l = dt_enum
+            .parse_and_store_default(&json_string("GREEN"))
+            .unwrap();
+        assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
+        let err = dt_enum
+            .parse_and_store_default(&json_string("YELLOW"))
+            .unwrap_err();
+        assert!(err.to_string().contains("Default enum symbol"));
+        let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
+        let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), 
HashMap::new(), None);
+        let val = serde_json::json!([1, 2, 3]);
+        let l = dt_list.parse_and_store_default(&val).unwrap();
+        assert_eq!(
+            l,
+            AvroLiteral::Array(vec![
+                AvroLiteral::Long(1),
+                AvroLiteral::Long(2),
+                AvroLiteral::Long(3)
+            ])
+        );
+        let err = dt_list
+            .parse_and_store_default(&serde_json::json!({"not":"array"}))
+            .unwrap_err();
+        assert!(err.to_string().contains("JSON array"));
+        let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
+        let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), 
HashMap::new(), None);
+        let mv = serde_json::json!({"x": 1.5, "y": 2.5});
+        let l = dt_map.parse_and_store_default(&mv).unwrap();
+        let mut expected = IndexMap::new();
+        expected.insert("x".into(), AvroLiteral::Double(1.5));
+        expected.insert("y".into(), AvroLiteral::Double(2.5));
+        assert_eq!(l, AvroLiteral::Map(expected));
+        // Not object -> error
+        let err = dt_map
+            .parse_and_store_default(&serde_json::json!(123))
+            .unwrap_err();
+        assert!(err.to_string().contains("JSON object"));
+        let mut field_a = AvroField {
+            name: "a".into(),
+            data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
+        };
+        let field_b = AvroField {
+            name: "b".into(),
+            data_type: AvroDataType::new(
+                Codec::Int64,
+                HashMap::new(),
+                Some(Nullability::NullFirst),
+            ),
+        };
+        let mut c_md = HashMap::new();
+        c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
+        let field_c = AvroField {
+            name: "c".into(),
+            data_type: AvroDataType::new(Codec::Utf8, c_md, None),
+        };
+        field_a.data_type.metadata.insert("doc".into(), "na".into());
+        let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, 
field_c]);
+        let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), 
HashMap::new(), None);
+        let default_obj = serde_json::json!({"a": 7});
+        let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
+        let mut expected = IndexMap::new();
+        expected.insert("a".into(), AvroLiteral::Int(7));
+        expected.insert("b".into(), AvroLiteral::Null);
+        expected.insert("c".into(), AvroLiteral::String("xyz".into()));
+        assert_eq!(l, AvroLiteral::Map(expected));
+        assert_default_stored(&dt_struct, &default_obj);
+        let req_field = AvroField {
+            name: "req".into(),
+            data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
+        };
+        let mut dt_bad = AvroDataType::new(
+            Codec::Struct(Arc::from(vec![req_field])),
+            HashMap::new(),
+            None,
+        );
+        let err = dt_bad
+            .parse_and_store_default(&serde_json::json!({}))
+            .unwrap_err();
+        assert!(
+            err.to_string().contains("missing required subfield 'req'"),
+            "unexpected error: {err}"
+        );
+        let err = dt_struct
+            .parse_and_store_default(&serde_json::json!(10))
+            .unwrap_err();
+        err.to_string().contains("must be a JSON object");
+    }
+
+    #[test]
+    fn test_resolve_array_promotion_and_reader_metadata() {
+        let mut w_add: HashMap<&str, Value> = HashMap::new();
+        w_add.insert("who", json_string("writer"));
+        let mut r_add: HashMap<&str, Value> = HashMap::new();
+        r_add.insert("who", json_string("reader"));
+        let writer_schema = Schema::Complex(ComplexType::Array(Array {
+            items: 
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
+            attributes: Attributes {
+                logical_type: None,
+                additional: w_add,
+            },
+        }));
+        let reader_schema = Schema::Complex(ComplexType::Array(Array {
+            items: 
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
+            attributes: Attributes {
+                logical_type: None,
+                additional: r_add,
+            },
+        }));
+        let mut maker = Maker::new(false, false);
+        let dt = maker
+            .make_data_type(&writer_schema, Some(&reader_schema), None)
+            .unwrap();
+        assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
+        if let Codec::List(inner) = dt.codec() {
+            assert!(matches!(inner.codec(), Codec::Int64));
+            assert_eq!(
+                inner.resolution,
+                Some(ResolutionInfo::Promotion(Promotion::IntToLong))
+            );
+        } else {
+            panic!("expected list codec");
+        }
+    }
+
+    #[test]
+    fn test_resolve_fixed_success_name_and_size_match_and_alias() {
+        let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
+            name: "MD5",
+            namespace: None,
+            aliases: vec!["Hash16"],
+            size: 16,
+            attributes: Attributes::default(),
+        }));
+        let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
+            name: "Hash16",
+            namespace: None,
+            aliases: vec![],
+            size: 16,
+            attributes: Attributes::default(),
+        }));
+        let mut maker = Maker::new(false, false);
+        let dt = maker
+            .make_data_type(&writer_schema, Some(&reader_schema), None)
+            .unwrap();
+        assert!(matches!(dt.codec(), Codec::Fixed(16)));
+    }
+
+    #[test]
+    fn test_resolve_records_mapping_default_fields_and_skip_fields() {
+        let writer = Schema::Complex(ComplexType::Record(Record {
+            name: "R",
+            namespace: None,
+            doc: None,
+            aliases: vec![],
+            fields: vec![
+                crate::schema::Field {
+                    name: "a",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+                    default: None,
+                },
+                crate::schema::Field {
+                    name: "skipme",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+                    default: None,
+                },
+                crate::schema::Field {
+                    name: "b",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+                    default: None,
+                },
+            ],
+            attributes: Attributes::default(),
+        }));
+        let reader = Schema::Complex(ComplexType::Record(Record {
+            name: "R",
+            namespace: None,
+            doc: None,
+            aliases: vec![],
+            fields: vec![
+                crate::schema::Field {
+                    name: "b",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+                    default: None,
+                },
+                crate::schema::Field {
+                    name: "a",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+                    default: None,
+                },
+                crate::schema::Field {
+                    name: "name",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+                    default: Some(json_string("anon")),
+                },
+                crate::schema::Field {
+                    name: "opt",
+                    doc: None,
+                    r#type: Schema::Union(vec![
+                        
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+                        
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+                    ]),
+                    default: None, // should default to null because NullFirst
+                },
+            ],
+            attributes: Attributes::default(),
+        }));
+        let mut maker = Maker::new(false, false);
+        let dt = maker
+            .make_data_type(&writer, Some(&reader), None)
+            .expect("record resolution");
+        let fields = match dt.codec() {
+            Codec::Struct(f) => f,
+            other => panic!("expected struct, got {other:?}"),
+        };
+        assert_eq!(fields.len(), 4);
+        assert_eq!(fields[0].name(), "b");
+        assert_eq!(fields[1].name(), "a");
+        assert_eq!(fields[2].name(), "name");
+        assert_eq!(fields[3].name(), "opt");
+        assert!(matches!(
+            fields[1].data_type().resolution,
+            Some(ResolutionInfo::Promotion(Promotion::IntToLong))
+        ));
+        let rec = match dt.resolution {
+            Some(ResolutionInfo::Record(ref r)) => r.clone(),
+            other => panic!("expected record resolution, got {other:?}"),
+        };
+        assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
+        assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
+        assert!(rec.skip_fields[0].is_none());
+        assert!(rec.skip_fields[2].is_none());
+        let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
+        assert!(matches!(skip1.codec(), Codec::Utf8));
+        let name_md = &fields[2].data_type().metadata;
+        assert_eq!(
+            name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
+            Some(&"\"anon\"".to_string())
+        );
+        let opt_md = &fields[3].data_type().metadata;
+        assert_eq!(
+            opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
+            Some(&"null".to_string())
+        );
+    }
 }


Reply via email to