martin-g commented on code in PR #342:
URL: https://github.com/apache/avro-rs/pull/342#discussion_r2668724842


##########
avro/tests/shared.rs:
##########
@@ -43,17 +43,16 @@ fn test_schema() -> TestResult {
             Err(e) => core::panic!("Can't get file {}", e),
         };
         log::debug!("{:?}", entry.file_name());
-        if let Ok(ft) = entry.file_type() {
-            if ft.is_dir() {
-                let sub_folder =
-                    ROOT_DIRECTORY.to_owned() + "/" + 
entry.file_name().to_str().unwrap();
-
-                let dir_result = test_folder(sub_folder.as_str());
-                if let Err(ed) = dir_result {
-                    result = match result {
-                        Ok(()) => Err(ed),
-                        Err(e) => Err(e.merge(&ed)),
-                    }
+        if let Ok(ft) = entry.file_type()
+            && ft.is_dir()
+        {
+            let sub_folder = ROOT_DIRECTORY.to_owned() + "/" + 
entry.file_name().to_str().unwrap();

Review Comment:
   Not caused by this PR but this path concatenation here is not very 
Windows-friendly.



##########
avro/src/types.rs:
##########
@@ -448,9 +448,28 @@ impl Value {
             (&Value::Double(_), &Schema::Double) => None,
             (&Value::Bytes(_), &Schema::Bytes) => None,
             (&Value::Bytes(_), &Schema::Decimal { .. }) => None,
-            (&Value::Bytes(_), &Schema::Uuid(UuidSchema::Bytes)) => None,
+            (Value::Bytes(bytes), &Schema::Uuid(UuidSchema::Bytes)) => {
+                if bytes.len() != 16 {
+                    Some(format!(
+                        "The value's size ({}) is not the right length for a 
fixed UUID (16)",

Review Comment:
   ```suggestion
                           "The value's size ({}) is not the right length for a 
bytes UUID (16)",
   ```



##########
avro/src/schema_compatibility.rs:
##########
@@ -16,271 +16,359 @@
 // under the License.
 
 //! Logic for checking schema compatibility
-use crate::schema::UuidSchema;
 use crate::{
     error::CompatibilityError,
-    schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+    schema::{
+        ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema, 
RecordSchema,
+        Schema, UuidSchema,
+    },
 };
 use std::{
-    collections::{HashSet, hash_map::DefaultHasher},
+    collections::{HashMap, hash_map::DefaultHasher},
     hash::Hasher,
+    iter::once,
+    ops::BitAndAssign,
     ptr,
 };
 
-fn match_ref_schemas(
-    writers_schema: &Schema,
-    readers_schema: &Schema,
-) -> Result<(), CompatibilityError> {
-    match (readers_schema, writers_schema) {
-        (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
-            if r_name == w_name {
-                Ok(())
-            } else {
-                Err(CompatibilityError::NameMismatch {
-                    writer_name: w_name.fullname(None),
-                    reader_name: r_name.fullname(None),
-                })
-            }
+pub struct SchemaCompatibility;
+
+/// How compatible are two schemas.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub enum Compatibility {
+    /// Full compatibility, resolving will always work.
+    Full,
+    /// Partial compatibility, resolving may error.
+    ///
+    /// This can happen if an enum doesn't have all fields, or unions don't 
entirely overlap.
+    Partial,
+}
+
+impl BitAndAssign for Compatibility {
+    /// Combine two compatibilities.
+    ///
+    /// # Truth table
+    /// |         | Full    | Partial |
+    /// | ------- | ------- | ------- |
+    /// | Full    | Full    | Partial |
+    /// | Partial | Partial | Partial |
+    fn bitand_assign(&mut self, rhs: Self) {
+        match (*self, rhs) {
+            (Self::Full, Self::Full) => *self = Self::Full,
+            _ => *self = Self::Partial,
         }
-        _ => Err(CompatibilityError::WrongType {
-            writer_schema_type: format!("{writers_schema:#?}"),
-            reader_schema_type: format!("{readers_schema:#?}"),
-        }),
     }
 }
 
-pub struct SchemaCompatibility;
-
 struct Checker {
-    recursion: HashSet<(u64, u64)>,
+    recursion: HashMap<(u64, u64), Compatibility>,
 }
 
 impl Checker {
     /// Create a new checker, with recursion set to an empty set.
     pub(crate) fn new() -> Self {
         Self {
-            recursion: HashSet::new(),
+            recursion: HashMap::new(),
         }
     }
 
-    pub(crate) fn can_read(
+    /// Check if the reader schema can be resolved from the writer schema.
+    pub(crate) fn full_match_schemas(
         &mut self,
         writers_schema: &Schema,
         readers_schema: &Schema,
-    ) -> Result<(), CompatibilityError> {
-        self.full_match_schemas(writers_schema, readers_schema)
+    ) -> Result<Compatibility, CompatibilityError> {
+        // Hash both reader and writer based on their pointer value. This is a 
fast way to see if
+        // we get the exact same schemas multiple times (because of recursive 
types)
+        let key = (
+            Self::pointer_hash(writers_schema),
+            Self::pointer_hash(readers_schema),
+        );
+
+        // If we already saw this pairing, return the previous value
+        if let Some(c) = self.recursion.get(&key).copied() {
+            Ok(c)
+        } else {
+            let c = self.inner_full_match_schemas(writers_schema, 
readers_schema)?;
+            // Insert the new value
+            self.recursion.insert(key, c);
+            Ok(c)
+        }
     }
 
-    pub(crate) fn full_match_schemas(
+    /// Hash a schema based only on its pointer value.
+    fn pointer_hash(schema: &Schema) -> u64 {
+        let mut hasher = DefaultHasher::new();
+        ptr::hash(schema, &mut hasher);
+        hasher.finish()
+    }
+
+    /// The actual implementation of [`full_match_schemas`] but without the 
recursion protection.
+    ///
+    /// This function should never be called directly as it can recurse 
infinitely on recursive types.
+    #[rustfmt::skip]
+    fn inner_full_match_schemas(
         &mut self,
         writers_schema: &Schema,
         readers_schema: &Schema,
-    ) -> Result<(), CompatibilityError> {
-        if self.recursion_in_progress(writers_schema, readers_schema) {
-            return Ok(());
-        }
-
-        SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
-
-        let w_type = SchemaKind::from(writers_schema);
-        let r_type = SchemaKind::from(readers_schema);
-
-        if w_type != SchemaKind::Union
-            && (r_type.is_primitive()
-                || r_type == SchemaKind::Fixed
-                || r_type == SchemaKind::Uuid
-                || r_type == SchemaKind::Date
-                || r_type == SchemaKind::TimeMillis
-                || r_type == SchemaKind::TimeMicros
-                || r_type == SchemaKind::TimestampMillis
-                || r_type == SchemaKind::TimestampMicros
-                || r_type == SchemaKind::TimestampNanos
-                || r_type == SchemaKind::LocalTimestampMillis
-                || r_type == SchemaKind::LocalTimestampMicros
-                || r_type == SchemaKind::LocalTimestampNanos)
+    ) -> Result<Compatibility, CompatibilityError> {
+        // Compare unqualified names if the schemas have them
+        if let Some(w_name) = writers_schema.name()
+            && let Some(r_name) = readers_schema.name()
+            && w_name.name != r_name.name
         {
-            return Ok(());
+            return Err(CompatibilityError::NameMismatch {
+                writer_name: w_name.name.clone(),
+                reader_name: r_name.name.clone(),
+            });
         }
 
-        match r_type {
-            SchemaKind::Ref => match_ref_schemas(writers_schema, 
readers_schema),
-            SchemaKind::Record => self.match_record_schemas(writers_schema, 
readers_schema),
-            SchemaKind::Map => {
-                if let Schema::Map(w_m) = writers_schema {
-                    match readers_schema {
-                        Schema::Map(r_m) => 
self.full_match_schemas(&w_m.types, &r_m.types),
-                        _ => Err(CompatibilityError::WrongType {
-                            writer_schema_type: format!("{writers_schema:#?}"),
-                            reader_schema_type: format!("{readers_schema:#?}"),
-                        }),
-                    }
+        // Logical types are downgraded to their actual type
+        match (writers_schema, readers_schema) {
+            (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
+                if r_name == w_name {
+                    Ok(Compatibility::Full)
                 } else {
-                    Err(CompatibilityError::TypeExpected {
-                        schema_type: String::from("writers_schema"),
-                        expected_type: vec![SchemaKind::Record],
+                    Err(CompatibilityError::NameMismatch {
+                        writer_name: w_name.fullname(None),
+                        reader_name: r_name.fullname(None),
                     })
                 }
             }
-            SchemaKind::Array => {
-                if let Schema::Array(w_a) = writers_schema {
-                    match readers_schema {
-                        Schema::Array(r_a) => 
self.full_match_schemas(&w_a.items, &r_a.items),
-                        _ => Err(CompatibilityError::WrongType {
-                            writer_schema_type: format!("{writers_schema:#?}"),
-                            reader_schema_type: format!("{readers_schema:#?}"),
-                        }),
-                    }
+            (Schema::Union(writer), Schema::Union(reader)) => {
+                let mut any = false;
+                let mut all = true;
+                for writer in &writer.schemas {
+                    // Try to find a reader variant that is fully compatible 
with this writer variant.
+                    // In case that does not exist, we keep track of any 
partial compatibility we find.
+                    let mut local_any = false;
+                    all &= reader.schemas.iter().any(|reader| {
+                        match self.full_match_schemas(writer, reader) {
+                            Ok(Compatibility::Full) => {
+                                local_any = true;
+                                true
+                            }
+                            Ok(Compatibility::Partial) => {
+                                local_any = true;
+                                false
+                            }
+                            Err(_) => false,
+                        }
+                    });
+                    // Save any match we found
+                    any |= local_any;
+                }
+                if all {
+                    // All writer variants are fully compatible with reader 
variants
+                    Ok(Compatibility::Full)
+                } else if any {
+                    // At least one writer variant is partially or fully 
compatible with a reader variant
+                    Ok(Compatibility::Partial)
                 } else {
-                    Err(CompatibilityError::TypeExpected {
-                        schema_type: String::from("writers_schema"),
-                        expected_type: vec![SchemaKind::Array],
-                    })
+                    Err(CompatibilityError::MissingUnionElements)
                 }
             }
-            SchemaKind::Union => self.match_union_schemas(writers_schema, 
readers_schema),
-            SchemaKind::Enum => {
-                // reader's symbols must contain all writer's symbols
-                if let Schema::Enum(EnumSchema {
-                    symbols: w_symbols, ..
-                }) = writers_schema
-                {
-                    if let Schema::Enum(EnumSchema {
-                        symbols: r_symbols, ..
-                    }) = readers_schema
-                    {
-                        if w_symbols.iter().all(|e| r_symbols.contains(e)) {
-                            return Ok(());
+            (Schema::Union(writer), _) => {
+                // Check if all writer variants are fully compatible with the 
reader schema.
+                // We keep track of if we see any (partial) compatibility.
+                let mut any = false;
+                let mut all = true;
+                for writer in &writer.schemas {
+                    match self.full_match_schemas(writer, readers_schema) {
+                        Ok(Compatibility::Full) => any = true,
+                        Ok(Compatibility::Partial) => {
+                            any = true;
+                            all = false;
+                        }
+                        Err(_) => {
+                            all = false;
                         }
                     }
                 }
-                Err(CompatibilityError::MissingSymbols)
+                if all {
+                    // All writer variants are fully compatible with the 
reader schema
+                    Ok(Compatibility::Full)
+                } else if any {
+                    // At least one writer variant is partially compatible 
with the reader schema
+                    Ok(Compatibility::Partial)
+                } else {
+                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
+                }
             }
-            _ => {
-                if w_type == SchemaKind::Union {
-                    if let Schema::Union(r) = writers_schema {
-                        if r.schemas.len() == 1 {
-                            return self.full_match_schemas(&r.schemas[0], 
readers_schema);
+            (_, Schema::Union(reader)) => {
+                // Try to find a fully compatible reader variant for the 
writer schema.
+                // In case that does not exist, we keep track of any partial 
compatibility.
+                let mut partial = false;
+                if reader.schemas.iter().any(|reader| {
+                    match self.full_match_schemas(writers_schema, reader) {
+                        Ok(Compatibility::Full) => true,
+                        Ok(Compatibility::Partial) => {
+                            partial = true;
+                            false
                         }
+                        Err(_) => false,
                     }
+                }) {
+                    // At least one reader variant is fully compatible with 
the writer schema
+                    Ok(Compatibility::Full)
+                } else if partial {
+                    // At least one reader variant is partially compatible 
with the writer schema
+                    Ok(Compatibility::Partial)
+                } else {
+                    Err(CompatibilityError::SchemaMismatchAllUnionElements)
                 }
-                Err(CompatibilityError::Inconclusive(String::from(
-                    "writers_schema",
-                )))
             }
-        }
-    }
-
-    fn match_record_schemas(
-        &mut self,
-        writers_schema: &Schema,
-        readers_schema: &Schema,
-    ) -> Result<(), CompatibilityError> {
-        let w_type = SchemaKind::from(writers_schema);
-
-        if w_type == SchemaKind::Union {
-            return Err(CompatibilityError::TypeExpected {
-                schema_type: String::from("writers_schema"),
-                expected_type: vec![SchemaKind::Record],
-            });
-        }
-
-        if let Schema::Record(RecordSchema {
-            fields: w_fields,
-            lookup: w_lookup,
-            ..
-        }) = writers_schema
-        {
-            if let Schema::Record(RecordSchema {
-                fields: r_fields, ..
-            }) = readers_schema
-            {
-                for field in r_fields.iter() {
-                    // get all field names in a vector (field.name + aliases)
-                    let mut fields_names = vec![&field.name];
-                    if let Some(ref aliases) = field.aliases {
-                        for alias in aliases {
-                            fields_names.push(alias);
-                        }
-                    }
-
-                    // Check whether any of the possible fields names are in 
the writer schema.
-                    // If the field was found, then it must have the exact 
same name with the writer field,
-                    // otherwise we would have a false positive with the 
writers aliases
-                    let position = fields_names.iter().find_map(|field_name| {
-                        if let Some(pos) = w_lookup.get(*field_name) {
-                            if &w_fields[*pos].name == *field_name {
-                                return Some(pos);
-                            }
-                        }
-                        None
+            (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
+            (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
+            // int promotes to long, float and double
+            (
+                Schema::Int | Schema::Date | Schema::TimeMillis,
+                Schema::Int | Schema::Long | Schema::Float | Schema::Double | 
Schema::Date
+                | Schema::TimeMillis | Schema::TimeMicros | 
Schema::TimestampMillis
+                | Schema::TimestampMicros | Schema::TimestampNanos | 
Schema::LocalTimestampMillis
+                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+            ) => Ok(Compatibility::Full),
+            // long promotes to float and double
+            (
+                Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
+                | Schema::TimestampMicros | Schema::TimestampNanos | 
Schema::LocalTimestampMillis
+                | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+                Schema::Long | Schema::Float | Schema::Double | 
Schema::TimeMicros
+                | Schema::TimestampMillis | Schema::TimestampMicros | 
Schema::TimestampNanos
+                | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
+                | Schema::LocalTimestampNanos,
+            ) => Ok(Compatibility::Full),
+            // float promotes to double
+            (Schema::Float, Schema::Float | Schema::Double) => 
Ok(Compatibility::Full),
+            (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
+            // bytes and strings are interchangeable

Review Comment:
   Are the always interchangeable ?
   UuidSchema::Bytes has to be exactly 16 bytes.
   UuidString::String has to be at least 32 chars.
   The checks for Decimal bytes schema also miss to check the precision and 
scale here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to