This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 567f4415dc Add array/map/fixed schema resolution and default value
support to arrow-avro codec (#8292)
567f4415dc is described below
commit 567f4415dc574d5934cfc8913560e7ff14fb6a38
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Sep 11 10:56:22 2025 -0500
Add array/map/fixed schema resolution and default value support to
arrow-avro codec (#8292)
# Which issue does this PR close?
This work continues arrow-avro schema resolution support and aligns
behavior with the Avro spec.
- **Related to**: #4886 (“Add Avro Support”): ongoing work to round out
the reader/decoder, including schema resolution and type promotion.
- **Follow-ups/Context**: #8124 (schema resolution & type promotion for
the decoder), #8223 (enum mapping for schema resolution). These previous
efforts established the foundations that this PR extends to default
values and additional resolvable types.
# Rationale for this change
Avro’s **schema resolution** requires readers to reconcile differences
between the writer and reader schemas, including:
- Using record-field **default values** when the writer lacks a field
present in the reader; defaults must be type-correct (i.e., union
defaults match the first union member; bytes/fixed defaults are JSON
strings).
- Recursively resolving **arrays** (by item schema) and **maps** (by
value schema).
- Resolving **fixed** types (size and unqualified name must match) and
erroring when they do not.
Prior to this change, arrow-avro’s resolution handled some cases but
lacked full Codec support for **default values** and for resolving
**array/map/fixed** shapes between writer and reader. This led to gaps
when reading evolved data or datasets produced by heterogeneous systems.
This PR implements these missing pieces so the Arrow reader behaves per
the spec in common evolution scenarios.
# What changes are included in this PR?
This PR modifies **`arrow-avro/src/codec.rs`** to extend the
schema-resolution path
- **Default value handling** for record fields
- Reads and applies default values when the reader expects a field
absent from the writer, including **nested defaults**.
- Validates defaults per the Avro spec (e.g., union defaults match the
first schema; bytes/fixed defaults are JSON strings).
- **Array / Map / Fixed schema resolution**
- **Array**: recursively resolves item schemas (writer↔reader).
- **Map**: recursively resolves value schemas.
- **Fixed**: enforces matching size and (unqualified) name; otherwise
signals an error, consistent with the spec.
- **Codec updates**
- Refactors internal codec logic to support the above during decoding,
including resolution for **record fields** and **nested defaults**. (See
commit message for the high-level summary.)
# Are these changes tested?
**Yes.** This PR includes new unit tests in `arrow-avro/src/codec.rs`
covering:
1) **Default validation & persistence**
- `Null`/union‑nullability rules; metadata persistence of defaults
(`AVRO_FIELD_DEFAULT_METADATA_KEY`).
2) **`AvroLiteral` Parsing**
- Range checks for `i32`/`f32`; correct literals for `i64`/`f64`;
`Utf8`/`Utf8View`; `uuid` strings (RFC‑4122).
- Byte‑range mapping for `bytes`/`fixed` defaults; `Fixed(n)` length
enforcement; `decimal` on `fixed` vs `bytes`; `duration`/interval fixed
**12**‑byte enforcement.
3) **Collections & records**
- Array/map defaults shape; enum symbol validity; record defaults for
missing fields, required‑field errors, and honoring field‑level
defaults; skip‑fields retained for writer‑only fields.
4) **Resolution mechanics**
- Element **promotion** (`int` to `long`) for arrays; **reader metadata
precedence** for colliding attributes; `fixed` name/size match including
**alias**.
# Are there any user-facing changes?
N/A
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-avro/src/codec.rs | 863 +++++++++++++++++++++++++++++++++++++++++++++---
1 file changed, 810 insertions(+), 53 deletions(-)
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index 0cac8c5786..3f94391c25 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -16,8 +16,9 @@
// under the License.
use crate::schema::{
- Attributes, AvroSchema, ComplexType, Enum, Nullability, PrimitiveType,
Record, Schema, Type,
- TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
+ Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability,
PrimitiveType,
+ Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY,
+ AVRO_FIELD_DEFAULT_METADATA_KEY,
};
use arrow_schema::{
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit,
DECIMAL128_MAX_PRECISION,
@@ -25,6 +26,8 @@ use arrow_schema::{
};
#[cfg(feature = "small_decimals")]
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
+use indexmap::IndexMap;
+use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
@@ -33,11 +36,11 @@ use std::sync::Arc;
pub(crate) enum ResolutionInfo {
/// Indicates that the writer's type should be promoted to the reader's
type.
Promotion(Promotion),
- /// Indicates that a default value should be used for a field.
(Implemented in a Follow-up PR)
+ /// Indicates that a default value should be used for a field.
DefaultValue(AvroLiteral),
/// Provides mapping information for resolving enums.
EnumMapping(EnumMapping),
- /// Provides resolution information for record fields. (Implemented in a
Follow-up PR)
+ /// Provides resolution information for record fields.
Record(ResolvedRecord),
}
@@ -64,6 +67,10 @@ pub(crate) enum AvroLiteral {
String(String),
/// Represents an enum symbol.
Enum(String),
+ /// Represents a JSON array default for an Avro array, containing element
literals.
+ Array(Vec<AvroLiteral>),
+ /// Represents a JSON object default for an Avro map/struct, mapping
string keys to value literals.
+ Map(IndexMap<String, AvroLiteral>),
/// Represents an unsupported literal type.
Unsupported,
}
@@ -193,6 +200,225 @@ impl AvroDataType {
pub fn nullability(&self) -> Option<Nullability> {
self.nullability
}
+
+ #[inline]
+ fn parse_default_literal(&self, default_json: &Value) ->
Result<AvroLiteral, ArrowError> {
+ fn expect_string<'v>(
+ default_json: &'v Value,
+ data_type: &str,
+ ) -> Result<&'v str, ArrowError> {
+ match default_json {
+ Value::String(s) => Ok(s.as_str()),
+ _ => Err(ArrowError::SchemaError(format!(
+ "Default value must be a JSON string for {data_type}"
+ ))),
+ }
+ }
+
+ fn parse_bytes_default(
+ default_json: &Value,
+ expected_len: Option<usize>,
+ ) -> Result<Vec<u8>, ArrowError> {
+ let s = expect_string(default_json, "bytes/fixed logical types")?;
+ let mut out = Vec::with_capacity(s.len());
+ for ch in s.chars() {
+ let cp = ch as u32;
+ if cp > 0xFF {
+ return Err(ArrowError::SchemaError(format!(
+ "Invalid codepoint U+{cp:04X} in bytes/fixed default;
must be ≤ 0xFF"
+ )));
+ }
+ out.push(cp as u8);
+ }
+ if let Some(len) = expected_len {
+ if out.len() != len {
+ return Err(ArrowError::SchemaError(format!(
+ "Default length {} does not match expected fixed size
{len}",
+ out.len(),
+ )));
+ }
+ }
+ Ok(out)
+ }
+
+ fn parse_json_i64(default_json: &Value, data_type: &str) ->
Result<i64, ArrowError> {
+ match default_json {
+ Value::Number(n) => n.as_i64().ok_or_else(|| {
+ ArrowError::SchemaError(format!("Default {data_type} must
be an integer"))
+ }),
+ _ => Err(ArrowError::SchemaError(format!(
+ "Default {data_type} must be a JSON integer"
+ ))),
+ }
+ }
+
+ fn parse_json_f64(default_json: &Value, data_type: &str) ->
Result<f64, ArrowError> {
+ match default_json {
+ Value::Number(n) => n.as_f64().ok_or_else(|| {
+ ArrowError::SchemaError(format!("Default {data_type} must
be a number"))
+ }),
+ _ => Err(ArrowError::SchemaError(format!(
+ "Default {data_type} must be a JSON number"
+ ))),
+ }
+ }
+
+ // Handle JSON nulls per-spec: allowed only for `null` type or unions
with null FIRST
+ if default_json.is_null() {
+ return match self.codec() {
+ Codec::Null => Ok(AvroLiteral::Null),
+ _ if self.nullability() == Some(Nullability::NullFirst) =>
Ok(AvroLiteral::Null),
+ _ => Err(ArrowError::SchemaError(
+ "JSON null default is only valid for `null` type or for a
union whose first branch is `null`"
+ .to_string(),
+ )),
+ };
+ }
+ let lit = match self.codec() {
+ Codec::Null => {
+ return Err(ArrowError::SchemaError(
+ "Default for `null` type must be JSON null".to_string(),
+ ))
+ }
+ Codec::Boolean => match default_json {
+ Value::Bool(b) => AvroLiteral::Boolean(*b),
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Boolean default must be a JSON boolean".to_string(),
+ ))
+ }
+ },
+ Codec::Int32 | Codec::Date32 | Codec::TimeMillis => {
+ let i = parse_json_i64(default_json, "int")?;
+ if i < i32::MIN as i64 || i > i32::MAX as i64 {
+ return Err(ArrowError::SchemaError(format!(
+ "Default int {i} out of i32 range"
+ )));
+ }
+ AvroLiteral::Int(i as i32)
+ }
+ Codec::Int64
+ | Codec::TimeMicros
+ | Codec::TimestampMillis(_)
+ | Codec::TimestampMicros(_) =>
AvroLiteral::Long(parse_json_i64(default_json, "long")?),
+ Codec::Float32 => {
+ let f = parse_json_f64(default_json, "float")?;
+ if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as
f64 {
+ return Err(ArrowError::SchemaError(format!(
+ "Default float {f} out of f32 range or not finite"
+ )));
+ }
+ AvroLiteral::Float(f as f32)
+ }
+ Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json,
"double")?),
+ Codec::Utf8 | Codec::Utf8View | Codec::Uuid => {
+ AvroLiteral::String(expect_string(default_json,
"string/uuid")?.to_string())
+ }
+ Codec::Binary =>
AvroLiteral::Bytes(parse_bytes_default(default_json, None)?),
+ Codec::Fixed(sz) => {
+ AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz
as usize))?)
+ }
+ Codec::Decimal(_, _, fixed_size) => {
+ AvroLiteral::Bytes(parse_bytes_default(default_json,
*fixed_size)?)
+ }
+ Codec::Enum(symbols) => {
+ let s = expect_string(default_json, "enum")?;
+ if symbols.iter().any(|sym| sym == s) {
+ AvroLiteral::Enum(s.to_string())
+ } else {
+ return Err(ArrowError::SchemaError(format!(
+ "Default enum symbol {s:?} not found in reader enum
symbols"
+ )));
+ }
+ }
+ Codec::Interval =>
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?),
+ Codec::List(item_dt) => match default_json {
+ Value::Array(items) => AvroLiteral::Array(
+ items
+ .iter()
+ .map(|v| item_dt.parse_default_literal(v))
+ .collect::<Result<_, _>>()?,
+ ),
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Default value must be a JSON array for Avro array
type".to_string(),
+ ))
+ }
+ },
+ Codec::Map(val_dt) => match default_json {
+ Value::Object(map) => {
+ let mut out = IndexMap::with_capacity(map.len());
+ for (k, v) in map {
+ out.insert(k.clone(),
val_dt.parse_default_literal(v)?);
+ }
+ AvroLiteral::Map(out)
+ }
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Default value must be a JSON object for Avro map
type".to_string(),
+ ))
+ }
+ },
+ Codec::Struct(fields) => match default_json {
+ Value::Object(obj) => {
+ let mut out: IndexMap<String, AvroLiteral> =
+ IndexMap::with_capacity(fields.len());
+ for f in fields.as_ref() {
+ let name = f.name().to_string();
+ if let Some(sub) = obj.get(&name) {
+ out.insert(name,
f.data_type().parse_default_literal(sub)?);
+ } else {
+ // Cache metadata lookup once
+ let stored_default =
+
f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY);
+ if stored_default.is_none()
+ && f.data_type().nullability() ==
Some(Nullability::default())
+ {
+ out.insert(name, AvroLiteral::Null);
+ } else if let Some(default_json) = stored_default {
+ let v: Value =
+
serde_json::from_str(default_json).map_err(|e| {
+ ArrowError::SchemaError(format!(
+ "Failed to parse stored subfield
default JSON for '{}': {e}",
+ f.name(),
+ ))
+ })?;
+ out.insert(name,
f.data_type().parse_default_literal(&v)?);
+ } else {
+ return Err(ArrowError::SchemaError(format!(
+ "Record default missing required subfield
'{}' with non-nullable type {:?}",
+ f.name(),
+ f.data_type().codec()
+ )));
+ }
+ }
+ }
+ AvroLiteral::Map(out)
+ }
+ _ => {
+ return Err(ArrowError::SchemaError(
+ "Default value for record/struct must be a JSON
object".to_string(),
+ ))
+ }
+ },
+ };
+ Ok(lit)
+ }
+
+ fn store_default(&mut self, default_json: &Value) -> Result<(),
ArrowError> {
+ let json_text = serde_json::to_string(default_json).map_err(|e| {
+ ArrowError::ParseError(format!("Failed to serialize default to
JSON: {e}"))
+ })?;
+ self.metadata
+ .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text);
+ Ok(())
+ }
+
+ fn parse_and_store_default(&mut self, default_json: &Value) ->
Result<AvroLiteral, ArrowError> {
+ let lit = self.parse_default_literal(default_json)?;
+ self.store_default(default_json)?;
+ Ok(lit)
+ }
}
/// A named [`AvroDataType`]
@@ -625,7 +851,6 @@ impl<'a> Resolver<'a> {
let (namespace, name) = name
.rsplit_once('.')
.unwrap_or_else(|| (namespace.unwrap_or(""), name));
-
self.map
.get(&(namespace, name))
.ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve
{namespace}.{name}")))
@@ -924,6 +1149,18 @@ impl<'a> Maker<'a> {
return self.resolve_primitives(write_primitive, read_primitive,
reader_schema);
}
match (writer_schema, reader_schema) {
+ (
+ Schema::Complex(ComplexType::Array(writer_array)),
+ Schema::Complex(ComplexType::Array(reader_array)),
+ ) => self.resolve_array(writer_array, reader_array, namespace),
+ (
+ Schema::Complex(ComplexType::Map(writer_map)),
+ Schema::Complex(ComplexType::Map(reader_map)),
+ ) => self.resolve_map(writer_map, reader_map, namespace),
+ (
+ Schema::Complex(ComplexType::Fixed(writer_fixed)),
+ Schema::Complex(ComplexType::Fixed(reader_fixed)),
+ ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema,
namespace),
(
Schema::Complex(ComplexType::Record(writer_record)),
Schema::Complex(ComplexType::Record(reader_record)),
@@ -940,20 +1177,71 @@ impl<'a> Maker<'a> {
),
(Schema::TypeName(TypeName::Ref(_)), _) =>
self.parse_type(reader_schema, namespace),
(_, Schema::TypeName(TypeName::Ref(_))) =>
self.parse_type(reader_schema, namespace),
- // if both sides are the same complex kind (non-record), adopt the
reader type.
- // This aligns with Avro spec: arrays, maps, and enums resolve
recursively;
- // for identical shapes we can just parse the reader schema.
- (Schema::Complex(ComplexType::Array(_)),
Schema::Complex(ComplexType::Array(_)))
- | (Schema::Complex(ComplexType::Map(_)),
Schema::Complex(ComplexType::Map(_)))
- | (Schema::Complex(ComplexType::Fixed(_)),
Schema::Complex(ComplexType::Fixed(_))) => {
- self.parse_type(reader_schema, namespace)
- }
_ => Err(ArrowError::NotYetImplemented(
"Other resolutions not yet implemented".to_string(),
)),
}
}
+ fn resolve_array(
+ &mut self,
+ writer_array: &Array<'a>,
+ reader_array: &Array<'a>,
+ namespace: Option<&'a str>,
+ ) -> Result<AvroDataType, ArrowError> {
+ Ok(AvroDataType {
+ nullability: None,
+ metadata: reader_array.attributes.field_metadata(),
+ codec: Codec::List(Arc::new(self.make_data_type(
+ writer_array.items.as_ref(),
+ Some(reader_array.items.as_ref()),
+ namespace,
+ )?)),
+ resolution: None,
+ })
+ }
+
+ fn resolve_map(
+ &mut self,
+ writer_map: &Map<'a>,
+ reader_map: &Map<'a>,
+ namespace: Option<&'a str>,
+ ) -> Result<AvroDataType, ArrowError> {
+ Ok(AvroDataType {
+ nullability: None,
+ metadata: reader_map.attributes.field_metadata(),
+ codec: Codec::Map(Arc::new(self.make_data_type(
+ &writer_map.values,
+ Some(&reader_map.values),
+ namespace,
+ )?)),
+ resolution: None,
+ })
+ }
+
+ fn resolve_fixed<'s>(
+ &mut self,
+ writer_fixed: &Fixed<'a>,
+ reader_fixed: &Fixed<'a>,
+ reader_schema: &'s Schema<'a>,
+ namespace: Option<&'a str>,
+ ) -> Result<AvroDataType, ArrowError> {
+ ensure_names_match(
+ "Fixed",
+ writer_fixed.name,
+ &writer_fixed.aliases,
+ reader_fixed.name,
+ &reader_fixed.aliases,
+ )?;
+ if writer_fixed.size != reader_fixed.size {
+ return Err(ArrowError::SchemaError(format!(
+ "Fixed size mismatch for {}: writer={}, reader={}",
+ reader_fixed.name, writer_fixed.size, reader_fixed.size
+ )));
+ }
+ self.parse_type(reader_schema, namespace)
+ }
+
fn resolve_primitives(
&mut self,
write_primitive: PrimitiveType,
@@ -1135,52 +1423,85 @@ impl<'a> Maker<'a> {
)?;
let writer_ns = writer_record.namespace.or(namespace);
let reader_ns = reader_record.namespace.or(namespace);
- // Map writer field name -> index
- let mut writer_index_map =
- HashMap::<&str, usize>::with_capacity(writer_record.fields.len());
- for (idx, write_field) in writer_record.fields.iter().enumerate() {
- writer_index_map.insert(write_field.name, idx);
- }
- // Prepare outputs
- let mut reader_fields: Vec<AvroField> =
Vec::with_capacity(reader_record.fields.len());
+ let reader_md = reader_record.attributes.field_metadata();
+ let writer_index_map: HashMap<&str, usize> = writer_record
+ .fields
+ .iter()
+ .enumerate()
+ .map(|(idx, wf)| (wf.name, idx))
+ .collect();
let mut writer_to_reader: Vec<Option<usize>> = vec![None;
writer_record.fields.len()];
- let mut skip_fields: Vec<Option<AvroDataType>> = vec![None;
writer_record.fields.len()];
- //let mut default_fields: Vec<usize> = Vec::new();
- // Build reader fields and mapping
- for (reader_idx, r_field) in reader_record.fields.iter().enumerate() {
- if let Some(&writer_idx) = writer_index_map.get(r_field.name) {
- // Field exists in a writer: resolve types (including
promotions and union-of-null)
- let w_schema = &writer_record.fields[writer_idx].r#type;
- let resolved_dt =
- self.make_data_type(w_schema, Some(&r_field.r#type),
reader_ns)?;
- reader_fields.push(AvroField {
- name: r_field.name.to_string(),
- data_type: resolved_dt,
- });
- writer_to_reader[writer_idx] = Some(reader_idx);
- } else {
- return Err(ArrowError::NotYetImplemented(
- "New fields from reader with default values not yet
implemented".to_string(),
- ));
- }
- }
- // Any writer fields not mapped should be skipped
- for (writer_idx, writer_field) in
writer_record.fields.iter().enumerate() {
- if writer_to_reader[writer_idx].is_none() {
- // Parse writer field type to know how to skip data
- let writer_dt = self.parse_type(&writer_field.r#type,
writer_ns)?;
- skip_fields[writer_idx] = Some(writer_dt);
- }
- }
- // Implement writer-only fields to skip in Follow-up PR here
- // Build resolved record AvroDataType
+ let reader_fields: Vec<AvroField> = reader_record
+ .fields
+ .iter()
+ .enumerate()
+ .map(|(reader_idx, r_field)| -> Result<AvroField, ArrowError> {
+ if let Some(&writer_idx) = writer_index_map.get(r_field.name) {
+ let w_schema = &writer_record.fields[writer_idx].r#type;
+ let dt = self.make_data_type(w_schema,
Some(&r_field.r#type), reader_ns)?;
+ writer_to_reader[writer_idx] = Some(reader_idx);
+ Ok(AvroField {
+ name: r_field.name.to_string(),
+ data_type: dt,
+ })
+ } else {
+ let mut dt = self.parse_type(&r_field.r#type, reader_ns)?;
+ match r_field.default.as_ref() {
+ Some(default_json) => {
+ dt.resolution = Some(ResolutionInfo::DefaultValue(
+ dt.parse_and_store_default(default_json)?,
+ ));
+ }
+ None => {
+ if dt.nullability() ==
Some(Nullability::NullFirst) {
+ dt.resolution =
Some(ResolutionInfo::DefaultValue(
+ dt.parse_and_store_default(&Value::Null)?,
+ ));
+ } else {
+ return Err(ArrowError::SchemaError(format!(
+ "Reader field '{}' not present in writer
schema must have a default value",
+ r_field.name
+ )));
+ }
+ }
+ }
+ Ok(AvroField {
+ name: r_field.name.to_string(),
+ data_type: dt,
+ })
+ }
+ })
+ .collect::<Result<_, _>>()?;
+ let default_fields: Vec<usize> = reader_fields
+ .iter()
+ .enumerate()
+ .filter_map(|(index, field)| {
+ matches!(
+ field.data_type().resolution,
+ Some(ResolutionInfo::DefaultValue(_))
+ )
+ .then_some(index)
+ })
+ .collect();
+ let skip_fields: Vec<Option<AvroDataType>> = writer_record
+ .fields
+ .iter()
+ .enumerate()
+ .map(|(writer_index, writer_field)| {
+ if writer_to_reader[writer_index].is_some() {
+ Ok(None)
+ } else {
+ self.parse_type(&writer_field.r#type, writer_ns).map(Some)
+ }
+ })
+ .collect::<Result<_, ArrowError>>()?;
let resolved = AvroDataType::new_with_resolution(
Codec::Struct(Arc::from(reader_fields)),
- reader_record.attributes.field_metadata(),
+ reader_md,
None,
Some(ResolutionInfo::Record(ResolvedRecord {
writer_to_reader: Arc::from(writer_to_reader),
- default_fields: Arc::default(),
+ default_fields: Arc::from(default_fields),
skip_fields: Arc::from(skip_fields),
})),
);
@@ -1712,4 +2033,440 @@ mod tests {
panic!("Top-level schema is not a struct");
}
}
+
+ fn json_string(s: &str) -> Value {
+ Value::String(s.to_string())
+ }
+
+ fn assert_default_stored(dt: &AvroDataType, default_json: &Value) {
+ let stored = dt
+ .metadata
+ .get(AVRO_FIELD_DEFAULT_METADATA_KEY)
+ .cloned()
+ .unwrap_or_default();
+ let expected = serde_json::to_string(default_json).unwrap();
+ assert_eq!(stored, expected, "stored default metadata should match");
+ }
+
+ #[test]
+ fn test_validate_and_store_default_null_and_nullability_rules() {
+ let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None);
+ let lit = dt_null.parse_and_store_default(&Value::Null).unwrap();
+ assert_eq!(lit, AvroLiteral::Null);
+ assert_default_stored(&dt_null, &Value::Null);
+ let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None);
+ let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("JSON null default is only valid for `null` type"),
+ "unexpected error: {err}"
+ );
+ let mut dt_int_nf =
+ AvroDataType::new(Codec::Int32, HashMap::new(),
Some(Nullability::NullFirst));
+ let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap();
+ assert_eq!(lit2, AvroLiteral::Null);
+ assert_default_stored(&dt_int_nf, &Value::Null);
+ let mut dt_int_ns =
+ AvroDataType::new(Codec::Int32, HashMap::new(),
Some(Nullability::NullSecond));
+ let err2 =
dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err();
+ assert!(
+ err2.to_string()
+ .contains("JSON null default is only valid for `null` type"),
+ "unexpected error: {err2}"
+ );
+ }
+
+ #[test]
+ fn test_validate_and_store_default_primitives_and_temporal() {
+ let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(),
None);
+ let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap();
+ assert_eq!(lit, AvroLiteral::Boolean(true));
+ assert_default_stored(&dt_bool, &Value::Bool(true));
+ let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None);
+ let lit = dt_i32
+ .parse_and_store_default(&serde_json::json!(123))
+ .unwrap();
+ assert_eq!(lit, AvroLiteral::Int(123));
+ assert_default_stored(&dt_i32, &serde_json::json!(123));
+ let err = dt_i32
+ .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) +
1))
+ .unwrap_err();
+ assert!(format!("{err}").contains("out of i32 range"));
+ let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None);
+ let lit = dt_i64
+ .parse_and_store_default(&serde_json::json!(1234567890))
+ .unwrap();
+ assert_eq!(lit, AvroLiteral::Long(1234567890));
+ assert_default_stored(&dt_i64, &serde_json::json!(1234567890));
+ let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(),
None);
+ let lit = dt_f32
+ .parse_and_store_default(&serde_json::json!(1.25))
+ .unwrap();
+ assert_eq!(lit, AvroLiteral::Float(1.25));
+ assert_default_stored(&dt_f32, &serde_json::json!(1.25));
+ let err = dt_f32
+ .parse_and_store_default(&serde_json::json!(1e39))
+ .unwrap_err();
+ assert!(format!("{err}").contains("out of f32 range"));
+ let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(),
None);
+ let lit = dt_f64
+ .parse_and_store_default(&serde_json::json!(std::f64::consts::PI))
+ .unwrap();
+ assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI));
+ assert_default_stored(&dt_f64,
&serde_json::json!(std::f64::consts::PI));
+ let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None);
+ let l = dt_str
+ .parse_and_store_default(&json_string("hello"))
+ .unwrap();
+ assert_eq!(l, AvroLiteral::String("hello".into()));
+ assert_default_stored(&dt_str, &json_string("hello"));
+ let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(),
None);
+ let l = dt_strv
+ .parse_and_store_default(&json_string("view"))
+ .unwrap();
+ assert_eq!(l, AvroLiteral::String("view".into()));
+ assert_default_stored(&dt_strv, &json_string("view"));
+ let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None);
+ let l = dt_uuid
+
.parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000"))
+ .unwrap();
+ assert_eq!(
+ l,
+ AvroLiteral::String("00000000-0000-0000-0000-000000000000".into())
+ );
+ let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(),
None);
+ let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap();
+ assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67]));
+ let err = dt_bin
+ .parse_and_store_default(&json_string("€")) // U+20AC
+ .unwrap_err();
+ assert!(format!("{err}").contains("Invalid codepoint"));
+ let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(),
None);
+ let ld = dt_date
+ .parse_and_store_default(&serde_json::json!(1))
+ .unwrap();
+ assert_eq!(ld, AvroLiteral::Int(1));
+ let mut dt_tmill = AvroDataType::new(Codec::TimeMillis,
HashMap::new(), None);
+ let lt = dt_tmill
+ .parse_and_store_default(&serde_json::json!(86_400_000))
+ .unwrap();
+ assert_eq!(lt, AvroLiteral::Int(86_400_000));
+ let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros,
HashMap::new(), None);
+ let ltm = dt_tmicros
+ .parse_and_store_default(&serde_json::json!(1_000_000))
+ .unwrap();
+ assert_eq!(ltm, AvroLiteral::Long(1_000_000));
+ let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true),
HashMap::new(), None);
+ let l1 = dt_ts_milli
+ .parse_and_store_default(&serde_json::json!(123))
+ .unwrap();
+ assert_eq!(l1, AvroLiteral::Long(123));
+ let mut dt_ts_micro =
+ AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(),
None);
+ let l2 = dt_ts_micro
+ .parse_and_store_default(&serde_json::json!(456))
+ .unwrap();
+ assert_eq!(l2, AvroLiteral::Long(456));
+ }
+
+ #[test]
+ fn test_validate_and_store_default_fixed_decimal_interval() {
+ let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(),
None);
+ let l = dt_fixed
+ .parse_and_store_default(&json_string("WXYZ"))
+ .unwrap();
+ assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90]));
+ let err = dt_fixed
+ .parse_and_store_default(&json_string("TOO LONG"))
+ .unwrap_err();
+ assert!(err.to_string().contains("Default length"));
+ let mut dt_dec_fixed =
+ AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)),
HashMap::new(), None);
+ let l = dt_dec_fixed
+ .parse_and_store_default(&json_string("abc"))
+ .unwrap();
+ assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99]));
+ let err = dt_dec_fixed
+ .parse_and_store_default(&json_string("toolong"))
+ .unwrap_err();
+ assert!(err.to_string().contains("Default length"));
+ let mut dt_dec_bytes =
+ AvroDataType::new(Codec::Decimal(10, Some(2), None),
HashMap::new(), None);
+ let l = dt_dec_bytes
+ .parse_and_store_default(&json_string("freeform"))
+ .unwrap();
+ assert_eq!(
+ l,
+ AvroLiteral::Bytes("freeform".bytes().collect::<Vec<_>>())
+ );
+ let mut dt_interval = AvroDataType::new(Codec::Interval,
HashMap::new(), None);
+ let l = dt_interval
+ .parse_and_store_default(&json_string("ABCDEFGHIJKL"))
+ .unwrap();
+ assert_eq!(
+ l,
+ AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::<Vec<_>>())
+ );
+ let err = dt_interval
+ .parse_and_store_default(&json_string("short"))
+ .unwrap_err();
+ assert!(err.to_string().contains("Default length"));
+ }
+
+ #[test]
+ fn test_validate_and_store_default_enum_list_map_struct() {
+ let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(),
"BLUE".to_string()]
+ .into_iter()
+ .collect();
+ let mut dt_enum = AvroDataType::new(Codec::Enum(symbols),
HashMap::new(), None);
+ let l = dt_enum
+ .parse_and_store_default(&json_string("GREEN"))
+ .unwrap();
+ assert_eq!(l, AvroLiteral::Enum("GREEN".into()));
+ let err = dt_enum
+ .parse_and_store_default(&json_string("YELLOW"))
+ .unwrap_err();
+ assert!(err.to_string().contains("Default enum symbol"));
+ let item = AvroDataType::new(Codec::Int64, HashMap::new(), None);
+ let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)),
HashMap::new(), None);
+ let val = serde_json::json!([1, 2, 3]);
+ let l = dt_list.parse_and_store_default(&val).unwrap();
+ assert_eq!(
+ l,
+ AvroLiteral::Array(vec![
+ AvroLiteral::Long(1),
+ AvroLiteral::Long(2),
+ AvroLiteral::Long(3)
+ ])
+ );
+ let err = dt_list
+ .parse_and_store_default(&serde_json::json!({"not":"array"}))
+ .unwrap_err();
+ assert!(err.to_string().contains("JSON array"));
+ let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None);
+ let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)),
HashMap::new(), None);
+ let mv = serde_json::json!({"x": 1.5, "y": 2.5});
+ let l = dt_map.parse_and_store_default(&mv).unwrap();
+ let mut expected = IndexMap::new();
+ expected.insert("x".into(), AvroLiteral::Double(1.5));
+ expected.insert("y".into(), AvroLiteral::Double(2.5));
+ assert_eq!(l, AvroLiteral::Map(expected));
+ // Not object -> error
+ let err = dt_map
+ .parse_and_store_default(&serde_json::json!(123))
+ .unwrap_err();
+ assert!(err.to_string().contains("JSON object"));
+ let mut field_a = AvroField {
+ name: "a".into(),
+ data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None),
+ };
+ let field_b = AvroField {
+ name: "b".into(),
+ data_type: AvroDataType::new(
+ Codec::Int64,
+ HashMap::new(),
+ Some(Nullability::NullFirst),
+ ),
+ };
+ let mut c_md = HashMap::new();
+ c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into());
+ let field_c = AvroField {
+ name: "c".into(),
+ data_type: AvroDataType::new(Codec::Utf8, c_md, None),
+ };
+ field_a.data_type.metadata.insert("doc".into(), "na".into());
+ let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b,
field_c]);
+ let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields),
HashMap::new(), None);
+ let default_obj = serde_json::json!({"a": 7});
+ let l = dt_struct.parse_and_store_default(&default_obj).unwrap();
+ let mut expected = IndexMap::new();
+ expected.insert("a".into(), AvroLiteral::Int(7));
+ expected.insert("b".into(), AvroLiteral::Null);
+ expected.insert("c".into(), AvroLiteral::String("xyz".into()));
+ assert_eq!(l, AvroLiteral::Map(expected));
+ assert_default_stored(&dt_struct, &default_obj);
+ let req_field = AvroField {
+ name: "req".into(),
+ data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None),
+ };
+ let mut dt_bad = AvroDataType::new(
+ Codec::Struct(Arc::from(vec![req_field])),
+ HashMap::new(),
+ None,
+ );
+ let err = dt_bad
+ .parse_and_store_default(&serde_json::json!({}))
+ .unwrap_err();
+ assert!(
+ err.to_string().contains("missing required subfield 'req'"),
+ "unexpected error: {err}"
+ );
+ let err = dt_struct
+ .parse_and_store_default(&serde_json::json!(10))
+ .unwrap_err();
+ err.to_string().contains("must be a JSON object");
+ }
+
+ #[test]
+ fn test_resolve_array_promotion_and_reader_metadata() {
+ let mut w_add: HashMap<&str, Value> = HashMap::new();
+ w_add.insert("who", json_string("writer"));
+ let mut r_add: HashMap<&str, Value> = HashMap::new();
+ r_add.insert("who", json_string("reader"));
+ let writer_schema = Schema::Complex(ComplexType::Array(Array {
+ items:
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
+ attributes: Attributes {
+ logical_type: None,
+ additional: w_add,
+ },
+ }));
+ let reader_schema = Schema::Complex(ComplexType::Array(Array {
+ items:
Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))),
+ attributes: Attributes {
+ logical_type: None,
+ additional: r_add,
+ },
+ }));
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer_schema, Some(&reader_schema), None)
+ .unwrap();
+ assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string()));
+ if let Codec::List(inner) = dt.codec() {
+ assert!(matches!(inner.codec(), Codec::Int64));
+ assert_eq!(
+ inner.resolution,
+ Some(ResolutionInfo::Promotion(Promotion::IntToLong))
+ );
+ } else {
+ panic!("expected list codec");
+ }
+ }
+
+ #[test]
+ fn test_resolve_fixed_success_name_and_size_match_and_alias() {
+ let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {
+ name: "MD5",
+ namespace: None,
+ aliases: vec!["Hash16"],
+ size: 16,
+ attributes: Attributes::default(),
+ }));
+ let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed {
+ name: "Hash16",
+ namespace: None,
+ aliases: vec![],
+ size: 16,
+ attributes: Attributes::default(),
+ }));
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer_schema, Some(&reader_schema), None)
+ .unwrap();
+ assert!(matches!(dt.codec(), Codec::Fixed(16)));
+ }
+
+ #[test]
+ fn test_resolve_records_mapping_default_fields_and_skip_fields() {
+ let writer = Schema::Complex(ComplexType::Record(Record {
+ name: "R",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ crate::schema::Field {
+ name: "a",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+ default: None,
+ },
+ crate::schema::Field {
+ name: "skipme",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+ default: None,
+ },
+ crate::schema::Field {
+ name: "b",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+ default: None,
+ },
+ ],
+ attributes: Attributes::default(),
+ }));
+ let reader = Schema::Complex(ComplexType::Record(Record {
+ name: "R",
+ namespace: None,
+ doc: None,
+ aliases: vec![],
+ fields: vec![
+ crate::schema::Field {
+ name: "b",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+ default: None,
+ },
+ crate::schema::Field {
+ name: "a",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)),
+ default: None,
+ },
+ crate::schema::Field {
+ name: "name",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+ default: Some(json_string("anon")),
+ },
+ crate::schema::Field {
+ name: "opt",
+ doc: None,
+ r#type: Schema::Union(vec![
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
+ ]),
+ default: None, // should default to null because NullFirst
+ },
+ ],
+ attributes: Attributes::default(),
+ }));
+ let mut maker = Maker::new(false, false);
+ let dt = maker
+ .make_data_type(&writer, Some(&reader), None)
+ .expect("record resolution");
+ let fields = match dt.codec() {
+ Codec::Struct(f) => f,
+ other => panic!("expected struct, got {other:?}"),
+ };
+ assert_eq!(fields.len(), 4);
+ assert_eq!(fields[0].name(), "b");
+ assert_eq!(fields[1].name(), "a");
+ assert_eq!(fields[2].name(), "name");
+ assert_eq!(fields[3].name(), "opt");
+ assert!(matches!(
+ fields[1].data_type().resolution,
+ Some(ResolutionInfo::Promotion(Promotion::IntToLong))
+ ));
+ let rec = match dt.resolution {
+ Some(ResolutionInfo::Record(ref r)) => r.clone(),
+ other => panic!("expected record resolution, got {other:?}"),
+ };
+ assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]);
+ assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]);
+ assert!(rec.skip_fields[0].is_none());
+ assert!(rec.skip_fields[2].is_none());
+ let skip1 = rec.skip_fields[1].as_ref().expect("skip field present");
+ assert!(matches!(skip1.codec(), Codec::Utf8));
+ let name_md = &fields[2].data_type().metadata;
+ assert_eq!(
+ name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
+ Some(&"\"anon\"".to_string())
+ );
+ let opt_md = &fields[3].data_type().metadata;
+ assert_eq!(
+ opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY),
+ Some(&"null".to_string())
+ );
+ }
}