This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 10cc7c9  feat: Introduce conversion between iceberg schema and avro 
schema (#40)
10cc7c9 is described below

commit 10cc7c9ac66777ce91464d3454c2bfb68dbead1a
Author: Renjie Liu <[email protected]>
AuthorDate: Thu Aug 31 19:28:43 2023 +0800

    feat: Introduce conversion between iceberg schema and avro schema (#40)
    
    * feat: Introduce conversion between iceberg schema and avro schema
    
    * Fix typo
    
    * Fix comments
    
    * Fix clippy
    
    * Calculate decimal bytes
---
 crates/iceberg/Cargo.toml                          |   4 +-
 crates/iceberg/src/{lib.rs => avro/mod.rs}         |  16 +-
 crates/iceberg/src/avro/schema.rs                  | 913 +++++++++++++++++++++
 crates/iceberg/src/error.rs                        |   6 +
 crates/iceberg/src/lib.rs                          |   1 +
 crates/iceberg/src/spec/datatypes.rs               | 132 ++-
 .../testdata/avro_schema_manifest_entry.json       | 286 +++++++
 .../testdata/avro_schema_manifest_file_v1.json     | 139 ++++
 .../testdata/avro_schema_manifest_file_v2.json     | 141 ++++
 9 files changed, 1614 insertions(+), 24 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 053acf8..665ccad 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -27,7 +27,7 @@ license = "Apache-2.0"
 keywords = ["iceberg"]
 
 [dependencies]
-apache-avro = "0.15.0"
+apache-avro = "0.15"
 serde = {version = "^1.0", features = ["rc"]}
 serde_bytes = "0.11.8"
 serde_json = "^1.0"
@@ -43,6 +43,8 @@ serde_repr = "0.1.16"
 itertools = "0.11"
 bimap = "0.6"
 derive_builder = "0.12.0"
+either = "1"
+lazy_static = "1"
 
 
 [dev-dependencies]
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/avro/mod.rs
similarity index 78%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/avro/mod.rs
index 8e77b97..a6bb073 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/avro/mod.rs
@@ -15,16 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Native Rust implementation of Apache Iceberg
-
-#![deny(missing_docs)]
-
-#[macro_use]
-extern crate derive_builder;
-
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-pub mod spec;
+//! Avro related codes.
+#[allow(dead_code)]
+mod schema;
diff --git a/crates/iceberg/src/avro/schema.rs 
b/crates/iceberg/src/avro/schema.rs
new file mode 100644
index 0000000..244d208
--- /dev/null
+++ b/crates/iceberg/src/avro/schema.rs
@@ -0,0 +1,913 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Conversion between iceberg and avro schema.
+use crate::spec::{
+    visit_schema, ListType, MapType, NestedField, NestedFieldRef, 
PrimitiveType, Schema,
+    SchemaVisitor, StructType, Type,
+};
+use crate::{ensure_data_valid, Error, ErrorKind, Result};
+use apache_avro::schema::{
+    DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, 
RecordFieldOrder,
+    RecordSchema, UnionSchema,
+};
+use apache_avro::Schema as AvroSchema;
+use itertools::{Either, Itertools};
+use serde_json::{Number, Value};
+
+const FILED_ID_PROP: &str = "field-id";
+
+struct SchemaToAvroSchema {
+    schema: String,
+}
+
+type AvroSchemaOrField = Either<AvroSchema, AvroRecordField>;
+
+impl SchemaVisitor for SchemaToAvroSchema {
+    type T = AvroSchemaOrField;
+
+    fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> 
Result<AvroSchemaOrField> {
+        let mut avro_schema = value.unwrap_left();
+
+        if let AvroSchema::Record(record) = &mut avro_schema {
+            record.name = Name::from(self.schema.as_str());
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Schema result must be avro record!",
+            ));
+        }
+
+        Ok(Either::Left(avro_schema))
+    }
+
+    fn field(
+        &mut self,
+        field: &NestedFieldRef,
+        avro_schema: AvroSchemaOrField,
+    ) -> Result<AvroSchemaOrField> {
+        let mut field_schema = avro_schema.unwrap_left();
+        if let AvroSchema::Record(record) = &mut field_schema {
+            record.name = Name::from(format!("r{}", field.id).as_str());
+        }
+
+        if !field.required {
+            field_schema = avro_optional(field_schema)?;
+        }
+
+        let mut avro_record_field = AvroRecordField {
+            name: field.name.clone(),
+            schema: field_schema,
+            order: RecordFieldOrder::Ignore,
+            position: 0,
+            doc: field.doc.clone(),
+            aliases: None,
+            default: None,
+            custom_attributes: Default::default(),
+        };
+
+        if !field.required {
+            avro_record_field.default = Some(Value::Null);
+        }
+        avro_record_field.custom_attributes.insert(
+            FILED_ID_PROP.to_string(),
+            Value::Number(Number::from(field.id)),
+        );
+
+        Ok(Either::Right(avro_record_field))
+    }
+
+    fn r#struct(
+        &mut self,
+        _struct: &StructType,
+        results: Vec<AvroSchemaOrField>,
+    ) -> Result<AvroSchemaOrField> {
+        let avro_fields = results.into_iter().map(|r| 
r.unwrap_right()).collect();
+
+        Ok(Either::Left(AvroSchema::Record(RecordSchema {
+            // The name of this record schema should be determined later, by 
schema name or field
+            // name, here we use a temporary placeholder to do it.
+            name: Name::new("null")?,
+            aliases: None,
+            doc: None,
+            fields: avro_fields,
+            lookup: Default::default(),
+            attributes: Default::default(),
+        })))
+    }
+
+    fn list(&mut self, list: &ListType, value: AvroSchemaOrField) -> 
Result<AvroSchemaOrField> {
+        let mut field_schema = value.unwrap_left();
+
+        if let AvroSchema::Record(record) = &mut field_schema {
+            record.name = Name::from(format!("r{}", 
list.element_field.id).as_str());
+        }
+
+        if !list.element_field.required {
+            field_schema = avro_optional(field_schema)?;
+        }
+
+        // TODO: We need to add element id prop here, but rust's avro schema 
doesn't support property except record schema.
+        Ok(Either::Left(AvroSchema::Array(Box::new(field_schema))))
+    }
+
+    fn map(
+        &mut self,
+        map: &MapType,
+        key_value: AvroSchemaOrField,
+        value: AvroSchemaOrField,
+    ) -> Result<AvroSchemaOrField> {
+        let key_field_schema = key_value.unwrap_left();
+        let mut value_field_schema = value.unwrap_left();
+        if !map.value_field.required {
+            value_field_schema = avro_optional(value_field_schema)?;
+        }
+
+        if matches!(key_field_schema, AvroSchema::String) {
+            Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema))))
+        } else {
+            // Avro map requires that key must be string type. Here we convert 
it to array if key is
+            // not string type.
+            let key_field = {
+                let mut field = AvroRecordField {
+                    name: map.key_field.name.clone(),
+                    doc: None,
+                    aliases: None,
+                    default: None,
+                    schema: key_field_schema,
+                    order: RecordFieldOrder::Ascending,
+                    position: 0,
+                    custom_attributes: Default::default(),
+                };
+                field.custom_attributes.insert(
+                    FILED_ID_PROP.to_string(),
+                    Value::Number(Number::from(map.key_field.id)),
+                );
+                field
+            };
+
+            let value_field = {
+                let mut field = AvroRecordField {
+                    name: map.key_field.name.clone(),
+                    doc: None,
+                    aliases: None,
+                    default: None,
+                    schema: value_field_schema,
+                    order: RecordFieldOrder::Ignore,
+                    position: 0,
+                    custom_attributes: Default::default(),
+                };
+                field.custom_attributes.insert(
+                    FILED_ID_PROP.to_string(),
+                    Value::Number(Number::from(map.value_field.id)),
+                );
+                field
+            };
+
+            let item_avro_schema = AvroSchema::Record(RecordSchema {
+                name: Name::from(format!("k{}_v{}", map.key_field.id, 
map.value_field.id).as_str()),
+                aliases: None,
+                doc: None,
+                fields: vec![key_field, value_field],
+                lookup: Default::default(),
+                attributes: Default::default(),
+            });
+
+            Ok(Either::Left(item_avro_schema))
+        }
+    }
+
+    fn primitive(&mut self, p: &PrimitiveType) -> Result<AvroSchemaOrField> {
+        let avro_schema = match p {
+            PrimitiveType::Boolean => AvroSchema::Boolean,
+            PrimitiveType::Int => AvroSchema::Int,
+            PrimitiveType::Long => AvroSchema::Long,
+            PrimitiveType::Float => AvroSchema::Float,
+            PrimitiveType::Double => AvroSchema::Double,
+            PrimitiveType::Date => AvroSchema::Date,
+            PrimitiveType::Time => AvroSchema::TimeMicros,
+            PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
+            PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
+            PrimitiveType::String => AvroSchema::String,
+            PrimitiveType::Uuid => AvroSchema::Uuid,
+            PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
+            PrimitiveType::Binary => AvroSchema::Bytes,
+            PrimitiveType::Decimal { precision, scale } => {
+                avro_decimal_schema(*precision as usize, *scale as usize)?
+            }
+        };
+        Ok(Either::Left(avro_schema))
+    }
+}
+
+/// Converting iceberg schema to avro schema.
+pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> 
Result<AvroSchema> {
+    let mut converter = SchemaToAvroSchema {
+        schema: name.to_string(),
+    };
+
+    visit_schema(schema, &mut converter).map(Either::unwrap_left)
+}
+
+pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
+    Ok(AvroSchema::Fixed(FixedSchema {
+        name: Name::new(format!("fixed_{len}").as_str())?,
+        aliases: None,
+        doc: None,
+        size: len,
+        attributes: Default::default(),
+    }))
+}
+
+pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> 
Result<AvroSchema> {
+    Ok(AvroSchema::Decimal(DecimalSchema {
+        precision,
+        scale,
+        inner: Box::new(avro_fixed_schema(
+            Type::decimal_required_bytes(precision as u32)? as usize,
+        )?),
+    }))
+}
+
+fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
+    Ok(AvroSchema::Union(UnionSchema::new(vec![
+        AvroSchema::Null,
+        avro_schema,
+    ])?))
+}
+
+fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
+    match avro_schema {
+        AvroSchema::Union(union) => union.is_nullable(),
+        _ => false,
+    }
+}
+
+/// Post order avro schema visitor.
+pub(crate) trait AvroSchemaVisitor {
+    type T;
+
+    fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> 
Result<Self::T>;
+
+    fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> 
Result<Self::T>;
+
+    fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result<Self::T>;
+    fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result<Self::T>;
+
+    fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
+}
+
+/// Visit avro schema in post order visitor.
+pub(crate) fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut 
V) -> Result<V::T> {
+    match schema {
+        AvroSchema::Record(record) => {
+            let field_results = record
+                .fields
+                .iter()
+                .map(|f| visit(&f.schema, visitor))
+                .collect::<Result<Vec<V::T>>>()?;
+
+            visitor.record(record, field_results)
+        }
+        AvroSchema::Union(union) => {
+            let option_results = union
+                .variants()
+                .iter()
+                .map(|f| visit(f, visitor))
+                .collect::<Result<Vec<V::T>>>()?;
+
+            visitor.union(union, option_results)
+        }
+        AvroSchema::Array(item) => {
+            let item_result = visit(item, visitor)?;
+            visitor.array(schema, item_result)
+        }
+        AvroSchema::Map(inner) => {
+            let item_result = visit(inner, visitor)?;
+            visitor.map(schema, item_result)
+        }
+        schema => visitor.primitive(schema),
+    }
+}
+
+struct AvroSchemaToSchema {
+    next_id: i32,
+}
+
+impl AvroSchemaToSchema {
+    fn next_field_id(&mut self) -> i32 {
+        self.next_id += 1;
+        self.next_id
+    }
+}
+
+impl AvroSchemaVisitor for AvroSchemaToSchema {
+    // Only `AvroSchema::Null` will return `None`
+    type T = Option<Type>;
+
+    fn record(
+        &mut self,
+        record: &RecordSchema,
+        field_types: Vec<Option<Type>>,
+    ) -> Result<Option<Type>> {
+        let mut fields = Vec::with_capacity(field_types.len());
+        for (avro_field, typ) in record.fields.iter().zip_eq(field_types) {
+            let field_id = avro_field
+                .custom_attributes
+                .get(FILED_ID_PROP)
+                .and_then(Value::as_i64)
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!("Can't convert field, missing field id: 
{avro_field:?}"),
+                    )
+                })?;
+
+            let optional = is_avro_optional(&avro_field.schema);
+
+            let mut field = if optional {
+                NestedField::optional(field_id as i32, &avro_field.name, 
typ.unwrap())
+            } else {
+                NestedField::required(field_id as i32, &avro_field.name, 
typ.unwrap())
+            };
+
+            if let Some(doc) = &avro_field.doc {
+                field = field.with_doc(doc);
+            }
+
+            fields.push(field.into());
+        }
+
+        Ok(Some(Type::Struct(StructType::new(fields))))
+    }
+
+    fn union(
+        &mut self,
+        union: &UnionSchema,
+        mut options: Vec<Option<Type>>,
+    ) -> Result<Option<Type>> {
+        ensure_data_valid!(
+            options.len() <= 2 && !options.is_empty(),
+            "Can't convert avro union type {:?} to iceberg.",
+            union
+        );
+
+        if options.len() > 1 {
+            ensure_data_valid!(
+                options[0].is_none(),
+                "Can't convert avro union type {:?} to iceberg.",
+                union
+            );
+        }
+
+        if options.len() == 1 {
+            Ok(Some(options.remove(0).unwrap()))
+        } else {
+            Ok(Some(options.remove(1).unwrap()))
+        }
+    }
+
+    fn array(&mut self, array: &AvroSchema, item: Option<Type>) -> 
Result<Self::T> {
+        if let AvroSchema::Array(item_schema) = array {
+            let element_field = NestedField::list_element(
+                self.next_field_id(),
+                item.unwrap(),
+                !is_avro_optional(item_schema),
+            )
+            .into();
+            Ok(Some(Type::List(ListType { element_field })))
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                "Expected avro array schema, but {array}",
+            ))
+        }
+    }
+
+    fn map(&mut self, map: &AvroSchema, value: Option<Type>) -> 
Result<Option<Type>> {
+        if let AvroSchema::Map(value_schema) = map {
+            // Due to avro rust implementation's limitation, we can't store 
attributes in map schema,
+            // we will fix it later when it has been resolved.
+            let key_field = NestedField::map_key_element(
+                self.next_field_id(),
+                Type::Primitive(PrimitiveType::String),
+            );
+            let value_field = NestedField::map_value_element(
+                self.next_field_id(),
+                value.unwrap(),
+                !is_avro_optional(value_schema),
+            );
+            Ok(Some(Type::Map(MapType {
+                key_field: key_field.into(),
+                value_field: value_field.into(),
+            })))
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                "Expected avro map schema, but {map}",
+            ))
+        }
+    }
+
+    fn primitive(&mut self, schema: &AvroSchema) -> Result<Option<Type>> {
+        let typ = match schema {
+            AvroSchema::Decimal(decimal) => {
+                Type::decimal(decimal.precision as u32, decimal.scale as u32)?
+            }
+            AvroSchema::Date => Type::Primitive(PrimitiveType::Date),
+            AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time),
+            AvroSchema::TimestampMicros => 
Type::Primitive(PrimitiveType::Timestamp),
+            AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid),
+            AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean),
+            AvroSchema::Int => Type::Primitive(PrimitiveType::Int),
+            AvroSchema::Long => Type::Primitive(PrimitiveType::Long),
+            AvroSchema::Float => Type::Primitive(PrimitiveType::Float),
+            AvroSchema::Double => Type::Primitive(PrimitiveType::Double),
+            AvroSchema::String | AvroSchema::Enum(_) => 
Type::Primitive(PrimitiveType::String),
+            AvroSchema::Fixed(fixed) => 
Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)),
+            AvroSchema::Bytes => Type::Primitive(PrimitiveType::Binary),
+            AvroSchema::Null => return Ok(None),
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    "Unable to convert avro {schema} to iceberg primitive 
type.",
+                ))
+            }
+        };
+
+        Ok(Some(typ))
+    }
+}
+
+/// Converts avro schema to iceberg schema.
+pub(crate) fn avro_schema_to_schema(avro_schema: &AvroSchema) -> 
Result<Schema> {
+    if let AvroSchema::Record(_) = avro_schema {
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let typ = visit(avro_schema, &mut converter)?.expect("Iceberg schema 
should not be none.");
+        if let Type::Struct(s) = typ {
+            Schema::builder()
+                .with_fields(s.fields().iter().cloned())
+                .build()
+        } else {
+            Err(Error::new(
+                ErrorKind::Unexpected,
+                format!("Expected to convert avro record schema to struct 
type, but {typ}"),
+            ))
+        }
+    } else {
+        Err(Error::new(
+            ErrorKind::DataInvalid,
+            "Can't convert non record avro schema to iceberg schema: 
{avro_schema}",
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::avro::schema::AvroSchemaToSchema;
+    use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, 
StructType, Type};
+    use apache_avro::schema::{Namespace, UnionSchema};
+    use apache_avro::Schema as AvroSchema;
+    use std::fs::read_to_string;
+
+    fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema {
+        let input = read_to_string(format!(
+            "{}/testdata/{}",
+            env!("CARGO_MANIFEST_DIR"),
+            filename
+        ))
+        .unwrap();
+
+        println!("Input is {input}");
+        AvroSchema::parse_str(input.as_str()).unwrap()
+    }
+
+    fn check_schema_conversion(
+        avro_schema: AvroSchema,
+        expected_iceberg_schema: Schema,
+        check_avro_to_iceberg: bool,
+    ) {
+        if check_avro_to_iceberg {
+            let converted_iceberg_schema = 
avro_schema_to_schema(&avro_schema).unwrap();
+            assert_eq!(expected_iceberg_schema, converted_iceberg_schema);
+        }
+
+        let converted_avro_schema = schema_to_avro_schema(
+            avro_schema.name().unwrap().fullname(Namespace::None),
+            &expected_iceberg_schema,
+        )
+        .unwrap();
+        assert_eq!(avro_schema, converted_avro_schema);
+    }
+
+    #[test]
+    fn test_manifest_file_v1_schema() {
+        let fields = vec![
+            NestedField::required(500, "manifest_path", 
PrimitiveType::String.into())
+                .with_doc("Location URI with FS scheme")
+                .into(),
+            NestedField::required(501, "manifest_length", 
PrimitiveType::Long.into())
+                .with_doc("Total file size in bytes")
+                .into(),
+            NestedField::required(502, "partition_spec_id", 
PrimitiveType::Int.into())
+                .with_doc("Spec ID used to write")
+                .into(),
+            NestedField::optional(503, "added_snapshot_id", 
PrimitiveType::Long.into())
+                .with_doc("Snapshot ID that added the manifest")
+                .into(),
+            NestedField::optional(504, "added_data_files_count", 
PrimitiveType::Int.into())
+                .with_doc("Added entry count")
+                .into(),
+            NestedField::optional(505, "existing_data_files_count", 
PrimitiveType::Int.into())
+                .with_doc("Existing entry count")
+                .into(),
+            NestedField::optional(506, "deleted_data_files_count", 
PrimitiveType::Int.into())
+                .with_doc("Deleted entry count")
+                .into(),
+            NestedField::optional(
+                507,
+                "partitions",
+                ListType {
+                    element_field: NestedField::list_element(
+                        508,
+                        StructType::new(vec![
+                            NestedField::required(
+                                509,
+                                "contains_null",
+                                PrimitiveType::Boolean.into(),
+                            )
+                            .with_doc("True if any file has a null partition 
value")
+                            .into(),
+                            NestedField::optional(
+                                518,
+                                "contains_nan",
+                                PrimitiveType::Boolean.into(),
+                            )
+                            .with_doc("True if any file has a nan partition 
value")
+                            .into(),
+                            NestedField::optional(510, "lower_bound", 
PrimitiveType::Binary.into())
+                                .with_doc("Partition lower bound for all 
files")
+                                .into(),
+                            NestedField::optional(511, "upper_bound", 
PrimitiveType::Binary.into())
+                                .with_doc("Partition upper bound for all 
files")
+                                .into(),
+                        ])
+                        .into(),
+                        true,
+                    )
+                    .into(),
+                }
+                .into(),
+            )
+            .with_doc("Summary for each partition")
+            .into(),
+            NestedField::optional(512, "added_rows_count", 
PrimitiveType::Long.into())
+                .with_doc("Added rows count")
+                .into(),
+            NestedField::optional(513, "existing_rows_count", 
PrimitiveType::Long.into())
+                .with_doc("Existing rows count")
+                .into(),
+            NestedField::optional(514, "deleted_rows_count", 
PrimitiveType::Long.into())
+                .with_doc("Deleted rows count")
+                .into(),
+        ];
+
+        let iceberg_schema = 
Schema::builder().with_fields(fields).build().unwrap();
+        check_schema_conversion(
+            
read_test_data_file_to_avro_schema("avro_schema_manifest_file_v1.json"),
+            iceberg_schema,
+            false,
+        );
+    }
+
+    #[test]
+    fn test_avro_list_required_primitive() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "record",
+    "name": "avro_schema",
+    "fields": [
+        {
+            "name": "array_with_string",
+            "type": {
+                "type": "array",
+                "items": "string",
+                "default": [],
+                "element-id": 101
+            },
+            "field-id": 100
+        }
+    ]
+}"#,
+            )
+            .unwrap()
+        };
+
+        let iceberg_schema = {
+            Schema::builder()
+                .with_fields(vec![NestedField::required(
+                    100,
+                    "array_with_string",
+                    ListType {
+                        element_field: NestedField::list_element(
+                            101,
+                            PrimitiveType::String.into(),
+                            true,
+                        )
+                        .into(),
+                    }
+                    .into(),
+                )
+                .into()])
+                .build()
+                .unwrap()
+        };
+
+        check_schema_conversion(avro_schema, iceberg_schema, false);
+    }
+
+    #[test]
+    fn test_avro_list_wrapped_primitive() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "record",
+    "name": "avro_schema",
+    "fields": [
+        {
+            "name": "array_with_string",
+            "type": {
+                "type": "array",
+                "items": {"type": "string"},
+                "default": [],
+                "element-id": 101
+            },
+            "field-id": 100
+        }
+    ]
+}            
+"#,
+            )
+            .unwrap()
+        };
+
+        let iceberg_schema = {
+            Schema::builder()
+                .with_fields(vec![NestedField::required(
+                    100,
+                    "array_with_string",
+                    ListType {
+                        element_field: NestedField::list_element(
+                            101,
+                            PrimitiveType::String.into(),
+                            true,
+                        )
+                        .into(),
+                    }
+                    .into(),
+                )
+                .into()])
+                .build()
+                .unwrap()
+        };
+
+        check_schema_conversion(avro_schema, iceberg_schema, false);
+    }
+
+    #[test]
+    fn test_avro_list_required_record() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "record",
+    "name": "avro_schema",
+    "fields": [
+        {
+            "name": "array_with_record",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "name": "r101",
+                    "fields": [
+                        {
+                            "name": "contains_null",
+                            "type": "boolean",
+                            "field-id": 102
+                        },
+                        {
+                            "name": "contains_nan",
+                            "type": ["null", "boolean"],
+                            "field-id": 103
+                        }
+                    ]
+                },
+                "element-id": 101
+            },
+            "field-id": 100
+        }
+    ]
+}            
+"#,
+            )
+            .unwrap()
+        };
+
+        let iceberg_schema = {
+            Schema::builder()
+                .with_fields(vec![NestedField::required(
+                    100,
+                    "array_with_record",
+                    ListType {
+                        element_field: NestedField::list_element(
+                            101,
+                            StructType::new(vec![
+                                NestedField::required(
+                                    102,
+                                    "contains_null",
+                                    PrimitiveType::Boolean.into(),
+                                )
+                                .into(),
+                                NestedField::optional(
+                                    103,
+                                    "contains_nan",
+                                    PrimitiveType::Boolean.into(),
+                                )
+                                .into(),
+                            ])
+                            .into(),
+                            true,
+                        )
+                        .into(),
+                    }
+                    .into(),
+                )
+                .into()])
+                .build()
+                .unwrap()
+        };
+
+        check_schema_conversion(avro_schema, iceberg_schema, false);
+    }
+
+    #[test]
+    fn test_resolve_union() {
+        let avro_schema = UnionSchema::new(vec![
+            AvroSchema::Null,
+            AvroSchema::String,
+            AvroSchema::Boolean,
+        ])
+        .unwrap();
+
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+        let options = avro_schema
+            .variants()
+            .iter()
+            .map(|v| converter.primitive(v).unwrap())
+            .collect();
+        assert!(converter.union(&avro_schema, options).is_err());
+    }
+
+    #[test]
+    fn test_string_type() {
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let avro_schema = AvroSchema::String;
+
+        assert_eq!(
+            Some(PrimitiveType::String.into()),
+            converter.primitive(&avro_schema).unwrap()
+        );
+    }
+
+    #[test]
+    fn test_map_type() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "map",
+    "values": ["null", "long"],
+    "key-id": 101,
+    "value-id": 102
+}
+"#,
+            )
+            .unwrap()
+        };
+
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+        let iceberg_type = Type::Map(MapType {
+            key_field: NestedField::map_key_element(1, 
PrimitiveType::String.into()).into(),
+            value_field: NestedField::map_value_element(2, 
PrimitiveType::Long.into(), false)
+                .into(),
+        });
+
+        assert_eq!(
+            iceberg_type,
+            converter
+                .map(&avro_schema, Some(PrimitiveType::Long.into()))
+                .unwrap()
+                .unwrap()
+        );
+    }
+
+    #[test]
+    fn test_fixed_type() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+            {"name": "test", "type": "fixed", "size": 22}
+            "#,
+            )
+            .unwrap()
+        };
+
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+        let iceberg_type = Type::from(PrimitiveType::Fixed(22));
+
+        assert_eq!(
+            iceberg_type,
+            converter.primitive(&avro_schema).unwrap().unwrap()
+        );
+    }
+
+    #[test]
+    fn test_unknown_primitive() {
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+        assert!(converter.primitive(&AvroSchema::Duration).is_err());
+    }
+
+    #[test]
+    fn test_no_field_id() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+{
+    "type": "record",
+    "name": "avro_schema",
+    "fields": [
+        {
+            "name": "array_with_string",
+            "type": "string"
+        }
+    ]
+}               
+"#,
+            )
+            .unwrap()
+        };
+
+        assert!(avro_schema_to_schema(&avro_schema).is_err());
+    }
+
+    #[test]
+    fn test_decimal_type() {
+        let avro_schema = {
+            AvroSchema::parse_str(
+                r#"
+      {"type": "bytes", "logicalType": "decimal", "precision": 25, "scale": 19}
+            "#,
+            )
+            .unwrap()
+        };
+
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+        assert_eq!(
+            Type::decimal(25, 19).unwrap(),
+            converter.primitive(&avro_schema).unwrap().unwrap()
+        );
+    }
+
+    #[test]
+    fn test_date_type() {
+        let mut converter = AvroSchemaToSchema { next_id: 0 };
+
+        assert_eq!(
+            Type::from(PrimitiveType::Date),
+            converter.primitive(&AvroSchema::Date).unwrap().unwrap()
+        );
+    }
+}
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index b2a4ba8..48bdebc 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -289,6 +289,12 @@ define_from_err!(
     "Failed to convert between uuid und iceberg value"
 );
 
+define_from_err!(
+    apache_avro::Error,
+    ErrorKind::DataInvalid,
+    "Failure in conversion with avro"
+);
+
 /// Helper macro to check arguments.
 ///
 ///
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 8e77b97..5ef9ad3 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -27,4 +27,5 @@ pub use error::Error;
 pub use error::ErrorKind;
 pub use error::Result;
 
+mod avro;
 pub mod spec;
diff --git a/crates/iceberg/src/spec/datatypes.rs 
b/crates/iceberg/src/spec/datatypes.rs
index eef24e1..d5fc3ea 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -18,6 +18,9 @@
 /*!
  * Data Types
 */
+use crate::ensure_data_valid;
+use crate::error::Result;
+use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH};
 use ::serde::de::{MapAccess, Visitor};
 use serde::de::{Error, IntoDeserializer};
 use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
@@ -34,6 +37,44 @@ pub(crate) const LIST_FILED_NAME: &str = "element";
 pub(crate) const MAP_KEY_FIELD_NAME: &str = "key";
 pub(crate) const MAP_VALUE_FIELD_NAME: &str = "value";
 
+pub(crate) const MAX_DECIMAL_BYTES: u32 = 24;
+pub(crate) const MAX_DECIMAL_PRECISION: u32 = 38;
+
+mod _decimal {
+    use lazy_static::lazy_static;
+
+    use crate::spec::{MAX_DECIMAL_BYTES, MAX_DECIMAL_PRECISION};
+
+    lazy_static! {
+        // Max precision of bytes, starts from 1
+        pub(super) static ref MAX_PRECISION: [u32; MAX_DECIMAL_BYTES as usize] 
= {
+            let mut ret: [u32; 24] = [0; 24];
+            for (i, prec) in ret.iter_mut().enumerate() {
+                *prec = 2f64.powi((8 * (i + 1) - 1) as i32).log10().floor() as 
u32;
+            }
+
+            ret
+        };
+
+        //  Required bytes of precision, starts from 1
+        pub(super) static ref REQUIRED_LENGTH: [u32; MAX_DECIMAL_PRECISION as 
usize] = {
+            let mut ret: [u32; MAX_DECIMAL_PRECISION as usize] = [0; 
MAX_DECIMAL_PRECISION as usize];
+
+            for (i, required_len) in ret.iter_mut().enumerate() {
+                for j in 0..MAX_PRECISION.len() {
+                    if MAX_PRECISION[j] >= ((i+1) as u32) {
+                        *required_len = (j+1) as u32;
+                        break;
+                    }
+                }
+            }
+
+            ret
+        };
+
+    }
+}
+
 #[derive(Debug, PartialEq, Eq, Clone)]
 /// All data types are either primitives or nested types, which are maps, 
lists, or structs.
 pub enum Type {
@@ -70,6 +111,54 @@ impl Type {
     pub fn is_struct(&self) -> bool {
         matches!(self, Type::Struct(_))
     }
+
+    /// Return max precision for decimal given [`num_bytes`] bytes.
+    #[inline(always)]
+    pub fn decimal_max_precision(num_bytes: u32) -> Result<u32> {
+        ensure_data_valid!(
+            num_bytes > 0 && num_bytes <= MAX_DECIMAL_BYTES,
+            "Decimal length larger than {MAX_DECIMAL_BYTES} is not supported: 
{num_bytes}",
+        );
+        Ok(MAX_PRECISION[num_bytes as usize - 1])
+    }
+
+    /// Returns minimum bytes required for decimal with [`precision`].
+    #[inline(always)]
+    pub fn decimal_required_bytes(precision: u32) -> Result<u32> {
+        ensure_data_valid!(precision > 0 && precision <= 
MAX_DECIMAL_PRECISION, "Decimals with precision larger than 
{MAX_DECIMAL_PRECISION} are not supported: {precision}",);
+        Ok(REQUIRED_LENGTH[precision as usize - 1])
+    }
+
+    /// Creates  decimal type.
+    #[inline(always)]
+    pub fn decimal(precision: u32, scale: u32) -> Result<Self> {
+        ensure_data_valid!(precision > 0 && precision <= 
MAX_DECIMAL_PRECISION, "Decimals with precision larger than 
{MAX_DECIMAL_PRECISION} are not supported: {precision}",);
+        Ok(Type::Primitive(PrimitiveType::Decimal { precision, scale }))
+    }
+}
+
+impl From<PrimitiveType> for Type {
+    fn from(value: PrimitiveType) -> Self {
+        Self::Primitive(value)
+    }
+}
+
+impl From<StructType> for Type {
+    fn from(value: StructType) -> Self {
+        Type::Struct(value)
+    }
+}
+
+impl From<ListType> for Type {
+    fn from(value: ListType) -> Self {
+        Type::List(value)
+    }
+}
+
+impl From<MapType> for Type {
+    fn from(value: MapType) -> Self {
+        Type::Map(value)
+    }
 }
 
 /// Primitive data types
@@ -112,7 +201,7 @@ pub enum PrimitiveType {
 }
 
 impl Serialize for Type {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
     where
         S: Serializer,
     {
@@ -122,7 +211,7 @@ impl Serialize for Type {
 }
 
 impl<'de> Deserialize<'de> for Type {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
     where
         D: Deserializer<'de>,
     {
@@ -132,7 +221,7 @@ impl<'de> Deserialize<'de> for Type {
 }
 
 impl<'de> Deserialize<'de> for PrimitiveType {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
     where
         D: Deserializer<'de>,
     {
@@ -148,7 +237,7 @@ impl<'de> Deserialize<'de> for PrimitiveType {
 }
 
 impl Serialize for PrimitiveType {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
     where
         S: Serializer,
     {
@@ -162,7 +251,7 @@ impl Serialize for PrimitiveType {
     }
 }
 
-fn deserialize_decimal<'de, D>(deserializer: D) -> Result<PrimitiveType, 
D::Error>
+fn deserialize_decimal<'de, D>(deserializer: D) -> 
std::result::Result<PrimitiveType, D::Error>
 where
     D: Deserializer<'de>,
 {
@@ -179,14 +268,18 @@ where
     })
 }
 
-fn serialize_decimal<S>(precision: &u32, scale: &u32, serializer: S) -> 
Result<S::Ok, S::Error>
+fn serialize_decimal<S>(
+    precision: &u32,
+    scale: &u32,
+    serializer: S,
+) -> std::result::Result<S::Ok, S::Error>
 where
     S: Serializer,
 {
     serializer.serialize_str(&format!("decimal({precision},{scale})"))
 }
 
-fn deserialize_fixed<'de, D>(deserializer: D) -> Result<PrimitiveType, 
D::Error>
+fn deserialize_fixed<'de, D>(deserializer: D) -> 
std::result::Result<PrimitiveType, D::Error>
 where
     D: Deserializer<'de>,
 {
@@ -201,7 +294,7 @@ where
         .map_err(D::Error::custom)
 }
 
-fn serialize_fixed<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
+fn serialize_fixed<S>(value: &u64, serializer: S) -> 
std::result::Result<S::Ok, S::Error>
 where
     S: Serializer,
 {
@@ -243,7 +336,7 @@ pub struct StructType {
 }
 
 impl<'de> Deserialize<'de> for StructType {
-    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
     where
         D: Deserializer<'de>,
     {
@@ -263,7 +356,7 @@ impl<'de> Deserialize<'de> for StructType {
                 formatter.write_str("struct")
             }
 
-            fn visit_map<V>(self, mut map: V) -> Result<StructType, V::Error>
+            fn visit_map<V>(self, mut map: V) -> 
std::result::Result<StructType, V::Error>
             where
                 V: MapAccess<'de>,
             {
@@ -633,6 +726,7 @@ pub struct MapType {
 
 #[cfg(test)]
 mod tests {
+    use pretty_assertions::assert_eq;
     use uuid::Uuid;
 
     use crate::spec::values::PrimitiveLiteral;
@@ -913,4 +1007,22 @@ mod tests {
             }),
         );
     }
+
+    #[test]
+    fn test_decimal_precision() {
+        let expected_max_precision = [
+            2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 
43, 45, 47, 50, 52, 55,
+            57,
+        ];
+        for (i, max_precision) in expected_max_precision.iter().enumerate() {
+            assert_eq!(
+                *max_precision,
+                Type::decimal_max_precision(i as u32 + 1).unwrap(),
+                "Failed calculate max precision for {i}"
+            );
+        }
+
+        assert_eq!(5, Type::decimal_required_bytes(10).unwrap());
+        assert_eq!(16, Type::decimal_required_bytes(38).unwrap());
+    }
 }
diff --git a/crates/iceberg/testdata/avro_schema_manifest_entry.json 
b/crates/iceberg/testdata/avro_schema_manifest_entry.json
new file mode 100644
index 0000000..876c5fa
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_entry.json
@@ -0,0 +1,286 @@
+{
+  "type": "record",
+  "name": "manifest_entry",
+  "fields": [
+    {
+      "name": "status",
+      "type": "int",
+      "field-id": 0
+    },
+    {
+      "name": "snapshot_id",
+      "type": [
+        "null",
+        "long"
+      ],
+      "field-id": 1
+    },
+    {
+      "name": "data_file",
+      "type": {
+        "type": "record",
+        "name": "r2",
+        "fields": [
+          {
+            "name": "file_path",
+            "type": "string",
+            "doc": "Location URI with FS scheme",
+            "field-id": 100
+          },
+          {
+            "name": "file_format",
+            "type": "string",
+            "doc": "File format name: avro, orc, or parquet",
+            "field-id": 101
+          },
+          {
+            "name": "partition",
+            "type": {
+              "type": "record",
+              "name": "r102",
+              "fields": [
+                {
+                  "field-id": 1000,
+                  "name": "VendorID",
+                  "type": [
+                    "null",
+                    "int"
+                  ]
+                },
+                {
+                  "field-id": 1001,
+                  "name": "tpep_pickup_datetime",
+                  "type": [
+                    "null",
+                    {
+                      "type": "int",
+                      "logicalType": "date"
+                    }
+                  ]
+                }
+              ]
+            },
+            "field-id": 102
+          },
+          {
+            "name": "record_count",
+            "type": "long",
+            "doc": "Number of records in the file",
+            "field-id": 103
+          },
+          {
+            "name": "file_size_in_bytes",
+            "type": "long",
+            "doc": "Total file size in bytes",
+            "field-id": 104
+          },
+          {
+            "name": "block_size_in_bytes",
+            "type": "long",
+            "field-id": 105
+          },
+          {
+            "name": "column_sizes",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k117_v118",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 117
+                    },
+                    {
+                      "name": "value",
+                      "type": "long",
+                      "field-id": 118
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to total size on disk",
+            "field-id": 108
+          },
+          {
+            "name": "value_counts",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k119_v120",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 119
+                    },
+                    {
+                      "name": "value",
+                      "type": "long",
+                      "field-id": 120
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to total count, including null and NaN",
+            "field-id": 109
+          },
+          {
+            "name": "null_value_counts",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k121_v122",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 121
+                    },
+                    {
+                      "name": "value",
+                      "type": "long",
+                      "field-id": 122
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to null value count",
+            "field-id": 110
+          },
+          {
+            "name": "nan_value_counts",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k138_v139",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 138
+                    },
+                    {
+                      "name": "value",
+                      "type": "long",
+                      "field-id": 139
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to number of NaN values in the column",
+            "field-id": 137
+          },
+          {
+            "name": "lower_bounds",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k126_v127",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 126
+                    },
+                    {
+                      "name": "value",
+                      "type": "bytes",
+                      "field-id": 127
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to lower bound",
+            "field-id": 125
+          },
+          {
+            "name": "upper_bounds",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "k129_v130",
+                  "fields": [
+                    {
+                      "name": "key",
+                      "type": "int",
+                      "field-id": 129
+                    },
+                    {
+                      "name": "value",
+                      "type": "bytes",
+                      "field-id": 130
+                    }
+                  ]
+                },
+                "logicalType": "map"
+              }
+            ],
+            "doc": "Map of column id to upper bound",
+            "field-id": 128
+          },
+          {
+            "name": "key_metadata",
+            "type": [
+              "null",
+              "bytes"
+            ],
+            "doc": "Encryption key metadata blob",
+            "field-id": 131
+          },
+          {
+            "name": "split_offsets",
+            "type": [
+              "null",
+              {
+                "type": "array",
+                "items": "long",
+                "element-id": 133
+              }
+            ],
+            "doc": "Splittable offsets",
+            "field-id": 132
+          },
+          {
+            "name": "sort_order_id",
+            "type": [
+              "null",
+              "int"
+            ],
+            "doc": "Sort order ID",
+            "field-id": 140
+          }
+        ]
+      },
+      "field-id": 2
+    }
+  ]
+}
\ No newline at end of file
diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v1.json 
b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json
new file mode 100644
index 0000000..b185094
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_file_v1.json
@@ -0,0 +1,139 @@
+{
+  "type": "record",
+  "name": "manifest_file",
+  "fields": [
+    {
+      "name": "manifest_path",
+      "type": "string",
+      "doc": "Location URI with FS scheme",
+      "field-id": 500
+    },
+    {
+      "name": "manifest_length",
+      "type": "long",
+      "doc": "Total file size in bytes",
+      "field-id": 501
+    },
+    {
+      "name": "partition_spec_id",
+      "type": "int",
+      "doc": "Spec ID used to write",
+      "field-id": 502
+    },
+    {
+      "name": "added_snapshot_id",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Snapshot ID that added the manifest",
+      "default": null,
+      "field-id": 503
+    },
+    {
+      "name": "added_data_files_count",
+      "type": [
+        "null",
+        "int"
+      ],
+      "doc": "Added entry count",
+      "field-id": 504
+    },
+    {
+      "name": "existing_data_files_count",
+      "type": [
+        "null",
+        "int"
+      ],
+      "doc": "Existing entry count",
+      "field-id": 505
+    },
+    {
+      "name": "deleted_data_files_count",
+      "type": [
+        "null",
+        "int"
+      ],
+      "doc": "Deleted entry count",
+      "field-id": 506
+    },
+    {
+      "name": "partitions",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "r508",
+            "fields": [
+              {
+                "name": "contains_null",
+                "type": "boolean",
+                "doc": "True if any file has a null partition value",
+                "field-id": 509
+              },
+              {
+                "name": "contains_nan",
+                "type": [
+                  "null",
+                  "boolean"
+                ],
+                "doc": "True if any file has a nan partition value",
+                "field-id": 518
+              },
+              {
+                "name": "lower_bound",
+                "type": [
+                  "null",
+                  "bytes"
+                ],
+                "doc": "Partition lower bound for all files",
+                "field-id": 510
+              },
+              {
+                "name": "upper_bound",
+                "type": [
+                  "null",
+                  "bytes"
+                ],
+                "doc": "Partition upper bound for all files",
+                "field-id": 511
+              }
+            ]
+          },
+          "element-id": 508
+        }
+      ],
+      "doc": "Summary for each partition",
+      "field-id": 507
+    },
+    {
+      "name": "added_rows_count",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Added rows count",
+      "field-id": 512
+    },
+    {
+      "name": "existing_rows_count",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Existing rows count",
+      "field-id": 513
+    },
+    {
+      "name": "deleted_rows_count",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Deleted rows count",
+      "field-id": 514
+    }
+  ]
+}
diff --git a/crates/iceberg/testdata/avro_schema_manifest_file_v2.json 
b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json
new file mode 100644
index 0000000..34b97b9
--- /dev/null
+++ b/crates/iceberg/testdata/avro_schema_manifest_file_v2.json
@@ -0,0 +1,141 @@
+{
+  "type": "record",
+  "name": "manifest_file",
+  "fields": [
+    {
+      "name": "manifest_path",
+      "type": "string",
+      "doc": "Location URI with FS scheme",
+      "field-id": 500
+    },
+    {
+      "name": "manifest_length",
+      "type": "long",
+      "doc": "Total file size in bytes",
+      "field-id": 501
+    },
+    {
+      "name": "partition_spec_id",
+      "type": "int",
+      "doc": "Spec ID used to write",
+      "field-id": 502
+    },
+    {
+      "name": "content",
+      "type": "int",
+      "doc": "Contents of the manifest: 0=data, 1=deletes",
+      "field-id": 517
+    },
+    {
+      "name": "sequence_number",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Sequence number when the manifest was added",
+      "field-id": 515
+    },
+    {
+      "name": "min_sequence_number",
+      "type": [
+        "null",
+        "long"
+      ],
+      "doc": "Lowest sequence number in the manifest",
+      "field-id": 516
+    },
+    {
+      "name": "added_snapshot_id",
+      "type": "long",
+      "doc": "Snapshot ID that added the manifest",
+      "field-id": 503
+    },
+    {
+      "name": "added_files_count",
+      "type": "int",
+      "doc": "Added entry count",
+      "field-id": 504
+    },
+    {
+      "name": "existing_files_count",
+      "type": "int",
+      "doc": "Existing entry count",
+      "field-id": 505
+    },
+    {
+      "name": "deleted_files_count",
+      "type": "int",
+      "doc": "Deleted entry count",
+      "field-id": 506
+    },
+    {
+      "name": "added_rows_count",
+      "type": "long",
+      "doc": "Added rows count",
+      "field-id": 512
+    },
+    {
+      "name": "existing_rows_count",
+      "type": "long",
+      "doc": "Existing rows count",
+      "field-id": 513
+    },
+    {
+      "name": "deleted_rows_count",
+      "type": "long",
+      "doc": "Deleted rows count",
+      "field-id": 514
+    },
+    {
+      "name": "partitions",
+      "type": [
+        "null",
+        {
+          "type": "array",
+          "items": {
+            "type": "record",
+            "name": "r508",
+            "fields": [
+              {
+                "name": "contains_null",
+                "type": "boolean",
+                "doc": "True if any file has a null partition value",
+                "field-id": 509
+              },
+              {
+                "name": "contains_nan",
+                "type": [
+                  "null",
+                  "boolean"
+                ],
+                "doc": "True if any file has a nan partition value",
+                "field-id": 518
+              },
+              {
+                "name": "lower_bound",
+                "type": [
+                  "null",
+                  "bytes"
+                ],
+                "doc": "Partition lower bound for all files",
+                "field-id": 510
+              },
+              {
+                "name": "upper_bound",
+                "type": [
+                  "null",
+                  "bytes"
+                ],
+                "doc": "Partition upper bound for all files",
+                "field-id": 511
+              }
+            ]
+          },
+          "element-id": 508
+        }
+      ],
+      "doc": "Summary for each partition",
+      "field-id": 507
+    }
+  ]
+}
\ No newline at end of file

Reply via email to