This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch avro-3683-multiple-schemas in repository https://gitbox.apache.org/repos/asf/avro.git
commit cc5e7d32bf584996cb0d23db787bb88ba3f40498 Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Fri Dec 9 16:16:01 2022 +0200 AVRO-3683: Add support for using multiple schemata for resolve/validate/write Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --- lang/rust/avro/src/encode.rs | 5 +++ lang/rust/avro/src/schema.rs | 88 ++++++++++++++++++++++---------------------- lang/rust/avro/src/writer.rs | 45 ++++++++++++++++++++++ 3 files changed, 95 insertions(+), 43 deletions(-) diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index 2ae48f91c..5f0819b60 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -37,6 +37,11 @@ pub fn encode(value: &Value, schema: &Schema, buffer: &mut Vec<u8>) -> AvroResul encode_internal(value, schema, rs.get_names(), &None, buffer) } +pub fn encode_schemata(value: &Value, schemata: &[&Schema], buffer: &mut Vec<u8>) -> AvroResult<()> { + let rs = ResolvedSchema::try_from(schemata)?; + encode_internal(value, schema, rs.get_names(), &None, buffer) +} + fn encode_bytes<B: AsRef<[u8]> + ?Sized>(s: &B, buffer: &mut Vec<u8>) { let bytes = s.as_ref(); encode_long(bytes.len() as i64, buffer); diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 6aa1d7fe0..3e73e59a2 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -401,19 +401,19 @@ impl Serialize for Alias { pub(crate) struct ResolvedSchema<'s> { names_ref: NamesRef<'s>, - root_schema: &'s Schema, + schemata: &[&'s Schema], } impl<'s> TryFrom<&'s Schema> for ResolvedSchema<'s> { type Error = Error; - fn try_from(schema: &'s Schema) -> AvroResult<Self> { + fn try_from(schemata: &[&'s Schema]) -> AvroResult<Self> { let names = HashMap::new(); let mut rs = ResolvedSchema { names_ref: names, - root_schema: schema, + schemata, }; - Self::from_internal(rs.root_schema, &mut rs.names_ref, &None)?; + Self::from_internal(rs.schemata, &mut rs.names_ref, &None)?; Ok(rs) } } @@ -427,54 +427,56 @@ impl<'s> ResolvedSchema<'s> { } fn from_internal( - schema: &'s Schema, + schemata: &[&'s Schema], names_ref: &mut NamesRef<'s>, enclosing_namespace: &Namespace, ) -> AvroResult<()> { - match schema { - Schema::Array(schema) | Schema::Map(schema) => { - Self::from_internal(schema, names_ref, enclosing_namespace) - } - Schema::Union(UnionSchema { schemas, .. }) => { - for schema in schemas { - Self::from_internal(schema, names_ref, enclosing_namespace)? + for schema in schemata { + match schema { + Schema::Array(schema) | Schema::Map(schema) => { + Self::from_internal(schema, names_ref, enclosing_namespace) } - Ok(()) - } - Schema::Enum { name, .. } | Schema::Fixed { name, .. } => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - if names_ref - .insert(fully_qualified_name.clone(), schema) - .is_some() - { - Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) - } else { + Schema::Union(UnionSchema { schemas, .. }) => { + for schema in schemas { + Self::from_internal(schema, names_ref, enclosing_namespace)? + } Ok(()) } - } - Schema::Record { name, fields, .. } => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - if names_ref - .insert(fully_qualified_name.clone(), schema) - .is_some() - { - Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) - } else { - let record_namespace = fully_qualified_name.namespace; - for field in fields { - Self::from_internal(&field.schema, names_ref, &record_namespace)? + Schema::Enum { name, .. } | Schema::Fixed { name, .. } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names_ref + .insert(fully_qualified_name.clone(), schema) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + Ok(()) } - Ok(()) } + Schema::Record { name, fields, .. } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names_ref + .insert(fully_qualified_name.clone(), schema) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + let record_namespace = fully_qualified_name.namespace; + for field in fields { + Self::from_internal(&field.schema, names_ref, &record_namespace)? + } + Ok(()) + } + } + Schema::Ref { name } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + names_ref + .get(&fully_qualified_name) + .map(|_| ()) + .ok_or(Error::SchemaResolutionError(fully_qualified_name)) + } + _ => Ok(()), } - Schema::Ref { name } => { - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - names_ref - .get(&fully_qualified_name) - .map(|_| ()) - .ok_or(Error::SchemaResolutionError(fully_qualified_name)) - } - _ => Ok(()), } } } diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index 3bc5c5122..6ed93fbc7 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -376,6 +376,19 @@ fn write_avro_datum<T: Into<Value>>( Ok(()) } +fn write_avro_datum_schemata<T: Into<Value>>( + schemata: &[&Schema], + value: T, + buffer: &mut Vec<u8>, +) -> Result<(), Error> { + let avro = value.into(); + if !avro.validate(schemata) { + return Err(Error::Validation); + } + encode(&avro, schemata, buffer)?; + Ok(()) +} + /// Writer that encodes messages according to the single object encoding v1 spec /// Uses an API similar to the current File Writer /// Writes all object bytes at once, and drains internal buffer @@ -541,6 +554,12 @@ pub fn to_avro_datum<T: Into<Value>>(schema: &Schema, value: T) -> AvroResult<Ve Ok(buffer) } +pub fn to_avro_datum_schemata<T: Into<Value>>(schemata: &[&Schema], value: T) -> AvroResult<Vec<u8>> { + let mut buffer = Vec::new(); + write_avro_datum(schemata, value, &mut buffer)?; + Ok(buffer) +} + #[cfg(not(target_arch = "wasm32"))] fn generate_sync_marker() -> [u8; 16] { let mut marker = [0_u8; 16]; @@ -1243,4 +1262,30 @@ mod tests { assert_eq!(buf1, buf2); assert_eq!(buf1, buf3); } + + #[test] + fn test_multiple_schemata_to_avro_datum() { + let schema_a_str = r#"{ + "name": "A", + "type": "record", + "fields": [ + {"name": "field_a", "type": "float"} + ] + }"#; + let schema_b_str = r#"{ + "name": "B", + "type": "record", + "fields": [ + {"name": "field_b", "type": "A"} + ] + }"#; + + let schemata = Schema::parse_list(&[schema_a_str, schema_b_str]).unwrap(); + let record = Value::Record(vec![( + "field_b".into(), + Value::Record(vec![("field_a".into(), Value::Float(1.0))]), + )]); + + assert_eq!(to_avro_datum(&schema, record).unwrap(), expected); + } }
