This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git
The following commit(s) were added to refs/heads/main by this push:
new a6debf7cd AVRO-3896: support to custom more logical type (#2569)
a6debf7cd is described below
commit a6debf7cdb026f80f5a364f606845cdd86ff07ef
Author: ZENOTME <[email protected]>
AuthorDate: Tue Nov 7 15:50:07 2023 +0800
AVRO-3896: support to custom more logical type (#2569)
* AVRO-3896 refactor verify logical type
* AVRO-3896 rename internal function and add test case
* AVRO-3896: [Rust] Improve error handling, comments and tests
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: ZENOTME <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
lang/rust/avro/src/error.rs | 4 +-
lang/rust/avro/src/schema.rs | 319 +++++++++++++++++++++++++++----------------
2 files changed, 203 insertions(+), 120 deletions(-)
diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs
index 00a04c931..8fa146027 100644
--- a/lang/rust/avro/src/error.rs
+++ b/lang/rust/avro/src/error.rs
@@ -307,8 +307,8 @@ pub enum Error {
#[error("No `type` field found for `logicalType`")]
GetLogicalTypeField,
- #[error("logicalType must be a string")]
- GetLogicalTypeFieldType,
+ #[error("logicalType must be a string, but is {0:?}")]
+ GetLogicalTypeFieldType(serde_json::Value),
#[error("Unknown complex type: {0}")]
GetComplexType(serde_json::Value),
diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs
index 47900e46b..262aa2aff 100644
--- a/lang/rust/avro/src/schema.rs
+++ b/lang/rust/avro/src/schema.rs
@@ -1272,186 +1272,155 @@ impl Parser {
complex: &Map<String, Value>,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
- fn logical_verify_type(
+ // Try to parse this as a native complex type.
+ fn parse_as_native_complex(
complex: &Map<String, Value>,
- kinds: &[SchemaKind],
parser: &mut Parser,
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
match complex.get("type") {
- Some(value) => {
- let ty = match value {
- Value::String(s) if s == "fixed" => {
- parser.parse_fixed(complex, enclosing_namespace)?
- }
- _ => parser.parse(value, enclosing_namespace)?,
- };
-
- if kinds
- .iter()
- .any(|&kind| SchemaKind::from(ty.clone()) == kind)
- {
- Ok(ty)
- } else {
- match get_type_rec(value.clone()) {
- Ok(v) => Err(Error::GetLogicalTypeVariant(v)),
- Err(err) => Err(err),
- }
+ Some(value) => match value {
+ Value::String(s) if s == "fixed" => {
+ parser.parse_fixed(complex, enclosing_namespace)
}
- }
- None => Err(Error::GetLogicalTypeField),
- }
- }
-
- fn get_type_rec(json_value: Value) -> AvroResult<Value> {
- match json_value {
- typ @ Value::String(_) => Ok(typ),
- Value::Object(ref complex) => match complex.get("type") {
- Some(v) => get_type_rec(v.clone()),
- None => Err(Error::GetComplexTypeField),
+ _ => parser.parse(value, enclosing_namespace),
},
- _ => Err(Error::GetComplexTypeField),
+ None => Err(Error::GetLogicalTypeField),
}
}
- // checks whether the logicalType is supported by the type
- fn try_logical_type(
+ // This crate support some logical types natively, and this function
tries to convert
+ // a native complex type with a logical type attribute to these
logical types.
+ // This function:
+ // 1. Checks whether the native complex type is in the supported kinds.
+ // 2. If it is, using the convert function to convert the native
complex type to
+ // a logical type.
+ fn try_convert_to_logical_type<F>(
logical_type: &str,
- complex: &Map<String, Value>,
- kinds: &[SchemaKind],
- ok_schema: Schema,
- parser: &mut Parser,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- match logical_verify_type(complex, kinds, parser,
enclosing_namespace) {
- // type and logicalType match!
- Ok(_) => Ok(ok_schema),
- // the logicalType is not expected for this type!
- Err(Error::GetLogicalTypeVariant(json_value)) => match
json_value {
- Value::String(_) => match parser.parse(&json_value,
enclosing_namespace) {
- Ok(schema) => {
- warn!(
- "Ignoring invalid logical type '{}' for schema
of type: {:?}!",
- logical_type, schema
- );
- Ok(schema)
- }
- Err(parse_err) => Err(parse_err),
- },
- _ => Err(Error::GetLogicalTypeVariant(json_value)),
- },
- err => err,
+ schema: Schema,
+ supported_schema_kinds: &[SchemaKind],
+ convert: F,
+ ) -> AvroResult<Schema>
+ where
+ F: Fn(Schema) -> AvroResult<Schema>,
+ {
+ let kind = SchemaKind::from(schema.clone());
+ if supported_schema_kinds.contains(&kind) {
+ convert(schema)
+ } else {
+ warn!(
+ "Ignoring unknown logical type '{}' for schema of type:
{:?}!",
+ logical_type, schema
+ );
+ Ok(schema)
}
}
match complex.get("logicalType") {
Some(Value::String(t)) => match t.as_str() {
"decimal" => {
- let inner = Box::new(logical_verify_type(
- complex,
+ return try_convert_to_logical_type(
+ "decimal",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Fixed, SchemaKind::Bytes],
- self,
- enclosing_namespace,
- )?);
-
- let (precision, scale) =
Self::parse_precision_and_scale(complex)?;
-
- return Ok(Schema::Decimal(DecimalSchema {
- precision,
- scale,
- inner,
- }));
+ |inner| -> AvroResult<Schema> {
+ let (precision, scale) =
Self::parse_precision_and_scale(complex)?;
+ Ok(Schema::Decimal(DecimalSchema {
+ precision,
+ scale,
+ inner: Box::new(inner),
+ }))
+ },
+ );
}
"big-decimal" => {
- logical_verify_type(complex, &[SchemaKind::Bytes], self,
enclosing_namespace)?;
- return Ok(Schema::BigDecimal);
+ return try_convert_to_logical_type(
+ "big-decimal",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Bytes],
+ |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
+ );
}
"uuid" => {
- logical_verify_type(complex, &[SchemaKind::String], self,
enclosing_namespace)?;
- return Ok(Schema::Uuid);
+ return try_convert_to_logical_type(
+ "uuid",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::String],
+ |_| -> AvroResult<Schema> { Ok(Schema::Uuid) },
+ );
}
"date" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"date",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Int],
- Schema::Date,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> { Ok(Schema::Date) },
);
}
"time-millis" => {
- return try_logical_type(
- "time-millis",
- complex,
+ return try_convert_to_logical_type(
+ "date",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Int],
- Schema::TimeMillis,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
);
}
"time-micros" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"time-micros",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Long],
- Schema::TimeMicros,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
);
}
"timestamp-millis" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"timestamp-millis",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Long],
- Schema::TimestampMillis,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMillis) },
);
}
"timestamp-micros" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"timestamp-micros",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Long],
- Schema::TimestampMicros,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMicros) },
);
}
"local-timestamp-millis" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"local-timestamp-millis",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Long],
- Schema::LocalTimestampMillis,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMillis) },
);
}
"local-timestamp-micros" => {
- return try_logical_type(
+ return try_convert_to_logical_type(
"local-timestamp-micros",
- complex,
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Long],
- Schema::LocalTimestampMicros,
- self,
- enclosing_namespace,
+ |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMicros) },
);
}
"duration" => {
- logical_verify_type(complex, &[SchemaKind::Fixed], self,
enclosing_namespace)?;
- return Ok(Schema::Duration);
+ return try_convert_to_logical_type(
+ "duration",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Fixed],
+ |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
+ );
}
- // In this case, of an unknown logical type, we just pass
through to the underlying
+ // In this case, of an unknown logical type, we just pass
through the underlying
// type.
_ => {}
},
- // The spec says to ignore invalid logical types and just continue
through to the
- // underlying type - It is unclear whether that applies to this
case or not, where the
+ // The spec says to ignore invalid logical types and just pass
through the
+ // underlying type. It is unclear whether that applies to this
case or not, where the
// `logicalType` is not a string.
- Some(_) => return Err(Error::GetLogicalTypeFieldType),
+ Some(value) => return
Err(Error::GetLogicalTypeFieldType(value.clone())),
_ => {}
}
match complex.get("type") {
@@ -6213,7 +6182,6 @@ mod tests {
Error::InvalidSchemaName(full_name.to_string(),
SCHEMA_NAME_R.as_str()).to_string();
let err = name.map_err(|e| e.to_string()).err().unwrap();
assert_eq!(expected, err);
-
Ok(())
}
@@ -6225,6 +6193,121 @@ mod tests {
let name = Name::new(funny_name);
assert!(name.is_ok());
}
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3896_decimal_schema() -> TestResult {
+ // bytes decimal, represented as native logical type.
+ let schema = json!(
+ {
+ "type": "bytes",
+ "name": "BytesDecimal",
+ "logicalType": "decimal",
+ "size": 38,
+ "precision": 9,
+ "scale": 2
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert!(matches!(
+ parse_result,
+ Schema::Decimal(DecimalSchema {
+ precision: 9,
+ scale: 2,
+ ..
+ })
+ ));
+
+ // long decimal, represents as native complex type.
+ let schema = json!(
+ {
+ "type": "long",
+ "name": "LongDecimal",
+ "logicalType": "decimal"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ // assert!(matches!(parse_result, Schema::Long));
+ assert_eq!(parse_result, Schema::Long);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3896_uuid_schema() -> TestResult {
+ // string uuid, represents as native logical type.
+ let schema = json!(
+ {
+ "type": "string",
+ "name": "StringUUID",
+ "logicalType": "uuid"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert_eq!(parse_result, Schema::Uuid);
+
+ // uuid logical type is not supported for SchemaKind::Fixed, so it is
parsed as Schema::Fixed
+ // and the `logicalType` is preserved as an attribute.
+ let schema = json!(
+ {
+ "type": "fixed",
+ "name": "FixedUUID",
+ "size": 16,
+ "logicalType": "uuid"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert_eq!(
+ parse_result,
+ Schema::Fixed(FixedSchema {
+ name: Name::new("FixedUUID")?,
+ doc: None,
+ aliases: None,
+ size: 16,
+ attributes: BTreeMap::from([(
+ "logicalType".to_string(),
+ Value::String(String::from("uuid")),
+ )]),
+ })
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3896_timestamp_millis_schema() -> TestResult {
+ // long timestamp-millis, represents as native logical type.
+ let schema = json!(
+ {
+ "type": "long",
+ "name": "LongTimestampMillis",
+ "logicalType": "timestamp-millis"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert_eq!(parse_result, Schema::TimestampMillis);
+
+ // int timestamp-millis, represents as native complex type.
+ let schema = json!(
+ {
+ "type": "int",
+ "name": "IntTimestampMillis",
+ "logicalType": "timestamp-millis"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert_eq!(parse_result, Schema::Int);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_avro_3896_custom_bytes_schema() -> TestResult {
+ // log type, represents as complex type.
+ let schema = json!(
+ {
+ "type": "bytes",
+ "name": "BytesLog",
+ "logicalType": "custom"
+ });
+ let parse_result = Schema::parse(&schema)?;
+ assert_eq!(parse_result, Schema::Bytes);
+ assert_eq!(parse_result.custom_attributes(), None);
Ok(())
}