Kriskras99 commented on code in PR #342:
URL: https://github.com/apache/avro-rs/pull/342#discussion_r2609995678
##########
avro/src/schema_compatibility.rs:
##########
@@ -18,268 +18,363 @@
//! Logic for checking schema compatibility
use crate::{
error::CompatibilityError,
- schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+ schema::{
+ ArraySchema, DecimalSchema, EnumSchema, FixedSchema,
InnerDecimalSchema, MapSchema,
+ RecordSchema, Schema, UuidSchema,
+ },
};
use std::{
- collections::{HashSet, hash_map::DefaultHasher},
+ collections::{HashMap, hash_map::DefaultHasher},
hash::Hasher,
+ iter::once,
+ ops::BitAndAssign,
ptr,
};
-fn match_ref_schemas(
- writers_schema: &Schema,
- readers_schema: &Schema,
-) -> Result<(), CompatibilityError> {
- match (readers_schema, writers_schema) {
- (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
- if r_name == w_name {
- Ok(())
- } else {
- Err(CompatibilityError::NameMismatch {
- writer_name: w_name.fullname(None),
- reader_name: r_name.fullname(None),
- })
- }
+pub struct SchemaCompatibility;
+
+/// How compatible are two schemas.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub enum Compatibility {
+ /// Full compatibility, resolving will always work.
+ Full,
+ /// Partial compatibility, resolving may error.
+ ///
+ /// This can happen if an enum doesn't have all fields, or unions don't
entirely overlap.
+ Partial,
+}
+
+impl BitAndAssign for Compatibility {
+ /// Combine two compatibilities.
+ ///
+ /// # Truth table
+ /// | | Full | Partial |
+ /// | ------- | ------- | ------- |
+ /// | Full | Full | Partial |
+ /// | Partial | Partial | Partial |
+ fn bitand_assign(&mut self, rhs: Self) {
+ match (*self, rhs) {
+ (Self::Full, Self::Full) => *self = Self::Full,
+ _ => *self = Self::Partial,
}
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
}
}
-pub struct SchemaCompatibility;
-
struct Checker {
- recursion: HashSet<(u64, u64)>,
+ recursion: HashMap<(u64, u64), Compatibility>,
}
impl Checker {
/// Create a new checker, with recursion set to an empty set.
pub(crate) fn new() -> Self {
Self {
- recursion: HashSet::new(),
+ recursion: HashMap::new(),
}
}
- pub(crate) fn can_read(
+ /// Check if the reader schema can be resolved from the writer schema.
+ pub(crate) fn full_match_schemas(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- self.full_match_schemas(writers_schema, readers_schema)
+ ) -> Result<Compatibility, CompatibilityError> {
+ // Hash both reader and writer based on their pointer value. This is a
fast way to see if
+ // we get the exact same schemas multiple times (because of recursive
types)
+ let key = (
+ Self::pointer_hash(writers_schema),
+ Self::pointer_hash(readers_schema),
+ );
+
+ // If we already saw this pairing, return the previous value
+ if let Some(c) = self.recursion.get(&key).copied() {
+ Ok(c)
+ } else {
+ let c = self.inner_full_match_schemas(writers_schema,
readers_schema)?;
+ // Insert the new value
+ self.recursion.insert(key, c);
+ Ok(c)
+ }
}
- pub(crate) fn full_match_schemas(
+ /// Hash a schema based only on its pointer value.
+ fn pointer_hash(schema: &Schema) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ ptr::hash(schema, &mut hasher);
+ hasher.finish()
+ }
+
+ /// The actual implementation of [`full_match_schemas`] but without the
recursion protection.
+ ///
+ /// This function should never be called directly as it can recurse
infinitely on recursive types.
+ #[rustfmt::skip]
+ fn inner_full_match_schemas(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- if self.recursion_in_progress(writers_schema, readers_schema) {
- return Ok(());
- }
-
- SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
-
- let w_type = SchemaKind::from(writers_schema);
- let r_type = SchemaKind::from(readers_schema);
-
- if w_type != SchemaKind::Union
- && (r_type.is_primitive()
- || r_type == SchemaKind::Fixed
- || r_type == SchemaKind::Uuid
- || r_type == SchemaKind::Date
- || r_type == SchemaKind::TimeMillis
- || r_type == SchemaKind::TimeMicros
- || r_type == SchemaKind::TimestampMillis
- || r_type == SchemaKind::TimestampMicros
- || r_type == SchemaKind::TimestampNanos
- || r_type == SchemaKind::LocalTimestampMillis
- || r_type == SchemaKind::LocalTimestampMicros
- || r_type == SchemaKind::LocalTimestampNanos)
+ ) -> Result<Compatibility, CompatibilityError> {
+ // Compare unqualified names if the schemas have them
+ if let Some(w_name) = writers_schema.name()
+ && let Some(r_name) = readers_schema.name()
+ && w_name.name != r_name.name
{
- return Ok(());
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
}
- match r_type {
- SchemaKind::Ref => match_ref_schemas(writers_schema,
readers_schema),
- SchemaKind::Record => self.match_record_schemas(writers_schema,
readers_schema),
- SchemaKind::Map => {
- if let Schema::Map(w_m) = writers_schema {
- match readers_schema {
- Schema::Map(r_m) =>
self.full_match_schemas(&w_m.types, &r_m.types),
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
- }
+ // Logical types are downgraded to their actual type
+ match (writers_schema, readers_schema) {
+ (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
+ if r_name == w_name {
+ Ok(Compatibility::Full)
} else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Record],
+ Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.fullname(None),
+ reader_name: r_name.fullname(None),
})
}
}
- SchemaKind::Array => {
- if let Schema::Array(w_a) = writers_schema {
- match readers_schema {
- Schema::Array(r_a) =>
self.full_match_schemas(&w_a.items, &r_a.items),
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
- }
+ (Schema::Union(writer), Schema::Union(reader)) => {
+ let mut any = false;
+ let mut all = true;
+ for writer in &writer.schemas {
+ // Try to find a reader variant that is fully compatible
with this writer variant.
+ // In case that does not exist, we keep track of any
partial compatibility we find.
+ let mut local_any = false;
+ all &= reader.schemas.iter().any(|reader| {
+ match self.full_match_schemas(writer, reader) {
+ Ok(Compatibility::Full) => {
+ local_any = true;
+ true
+ }
+ Ok(Compatibility::Partial) => {
+ local_any = true;
+ false
+ }
+ Err(_) => false,
+ }
+ });
+ // Save any match we found
+ any |= local_any;
+ }
+ if all {
+ // All writer variants are fully compatible with reader
variants
+ Ok(Compatibility::Full)
+ } else if any {
+ // At least one writer variant is partially or fully
compatible with a reader variant
+ Ok(Compatibility::Partial)
} else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Array],
- })
+ Err(CompatibilityError::MissingUnionElements)
}
}
- SchemaKind::Union => self.match_union_schemas(writers_schema,
readers_schema),
- SchemaKind::Enum => {
- // reader's symbols must contain all writer's symbols
- if let Schema::Enum(EnumSchema {
- symbols: w_symbols, ..
- }) = writers_schema
- {
- if let Schema::Enum(EnumSchema {
- symbols: r_symbols, ..
- }) = readers_schema
- {
- if w_symbols.iter().all(|e| r_symbols.contains(e)) {
- return Ok(());
+ (Schema::Union(writer), _) => {
+ // Check if all writer variants are fully compatible with the
reader schema.
+ // We keep track of if we see any (partial) compatibility.
+ let mut any = false;
+ let mut all = true;
+ for writer in &writer.schemas {
+ match self.full_match_schemas(writer, readers_schema) {
+ Ok(Compatibility::Full) => any = true,
+ Ok(Compatibility::Partial) => {
+ any = true;
+ all = false;
+ }
+ Err(_) => {
+ all = false;
}
}
}
- Err(CompatibilityError::MissingSymbols)
+ if all {
+ // All writer variants are fully compatible with the
reader schema
+ Ok(Compatibility::Full)
+ } else if any {
+ // At least one writer variant is partially compatible
with the reader schema
+ Ok(Compatibility::Partial)
+ } else {
+ Err(CompatibilityError::SchemaMismatchAllUnionElements)
+ }
}
- _ => {
- if w_type == SchemaKind::Union {
- if let Schema::Union(r) = writers_schema {
- if r.schemas.len() == 1 {
- return self.full_match_schemas(&r.schemas[0],
readers_schema);
+ (_, Schema::Union(reader)) => {
+ // Try to find a fully compatible reader variant for the
writer schema.
+ // In case that does not exist, we keep track of any partial
compatibility.
+ let mut partial = false;
+ if reader.schemas.iter().any(|reader| {
+ match self.full_match_schemas(writers_schema, reader) {
+ Ok(Compatibility::Full) => true,
+ Ok(Compatibility::Partial) => {
+ partial = true;
+ false
}
+ Err(_) => false,
}
+ }) {
+ // At least one reader variant is fully compatible with
the writer schema
+ Ok(Compatibility::Full)
+ } else if partial {
+ // At least one reader variant is partially compatible
with the writer schema
+ Ok(Compatibility::Partial)
+ } else {
+ Err(CompatibilityError::SchemaMismatchAllUnionElements)
}
- Err(CompatibilityError::Inconclusive(String::from(
- "writers_schema",
- )))
}
- }
- }
-
- fn match_record_schemas(
- &mut self,
- writers_schema: &Schema,
- readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- let w_type = SchemaKind::from(writers_schema);
-
- if w_type == SchemaKind::Union {
- return Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Record],
- });
- }
-
- if let Schema::Record(RecordSchema {
- fields: w_fields,
- lookup: w_lookup,
- ..
- }) = writers_schema
- {
- if let Schema::Record(RecordSchema {
- fields: r_fields, ..
- }) = readers_schema
- {
- for field in r_fields.iter() {
- // get all field names in a vector (field.name + aliases)
- let mut fields_names = vec![&field.name];
- if let Some(ref aliases) = field.aliases {
- for alias in aliases {
- fields_names.push(alias);
- }
- }
-
- // Check whether any of the possible fields names are in
the writer schema.
- // If the field was found, then it must have the exact
same name with the writer field,
- // otherwise we would have a false positive with the
writers aliases
- let position = fields_names.iter().find_map(|field_name| {
- if let Some(pos) = w_lookup.get(*field_name) {
- if &w_fields[*pos].name == *field_name {
- return Some(pos);
- }
- }
- None
+ (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
+ (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
+ // int promotes to long, float and double
+ (
+ Schema::Int | Schema::Date | Schema::TimeMillis,
+ Schema::Int | Schema::Long | Schema::Float | Schema::Double |
Schema::Date
+ | Schema::TimeMillis | Schema::TimeMicros |
Schema::TimestampMillis
+ | Schema::TimestampMicros | Schema::TimestampNanos |
Schema::LocalTimestampMillis
+ | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+ ) => Ok(Compatibility::Full),
+ // long promotes to float and double
+ (
+ Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
+ | Schema::TimestampMicros | Schema::TimestampNanos |
Schema::LocalTimestampMillis
+ | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+ Schema::Long | Schema::Float | Schema::Double |
Schema::TimeMicros
+ | Schema::TimestampMillis | Schema::TimestampMicros |
Schema::TimestampNanos
+ | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
+ | Schema::LocalTimestampNanos,
+ ) => Ok(Compatibility::Full),
+ // float promotes to double
+ (Schema::Float, Schema::Float | Schema::Double) =>
Ok(Compatibility::Full),
+ (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
+ // bytes and strings are interchangeable
+ (
+ Schema::Bytes | Schema::String | Schema::BigDecimal
+ | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Bytes, .. }),
+ Schema::Bytes | Schema::String | Schema::BigDecimal
+ | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Bytes, .. }),
+ ) => Ok(Compatibility::Full),
+ // This should also check the unqualified name but we don't store
that for duration
+ (Schema::Duration, Schema::Duration) => Ok(Compatibility::Full),
+ (Schema::Fixed(FixedSchema { size: 12, .. }), Schema::Duration) =>
{
+ Ok(Compatibility::Full)
+ }
+ (Schema::Duration, Schema::Fixed(FixedSchema { size: 12, .. })) =>
{
+ Ok(Compatibility::Full)
+ }
+ (
+ Schema::Decimal(DecimalSchema { precision: w_precision, scale:
w_scale, .. }),
+ Schema::Decimal(DecimalSchema { precision: r_precision, scale:
r_scale, .. }),
Review Comment:
Looking at the implementation of [`Value::resolve` for
`Decimal`](https://github.com/apache/avro-rs/blob/13625abd7d08caa2eff8535768d5547b21f4bdff/avro/src/types.rs#L746),
the only thing that matters is that the underlying type can store enough
bytes, so the check could actually be slightly relaxed to not require the exact
same precision and scale (but lets not do that for now).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]