liurenjie1024 commented on code in PR #40:
URL: https://github.com/apache/iceberg-rust/pull/40#discussion_r1311059717


##########
crates/iceberg/src/avro/schema.rs:
##########
@@ -0,0 +1,917 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Conversion between iceberg and avro schema.
+use crate::spec::{
+    visit_schema, ListType, MapType, NestedField, NestedFieldRef, 
PrimitiveType, Schema,
+    SchemaVisitor, StructType, Type, DECIMAL_LENGTH,
+};
+use crate::{ensure_data_valid, Error, ErrorKind, Result};
+use apache_avro::schema::{
+    DecimalSchema, FixedSchema, Name, RecordField as AvroRecordField, 
RecordFieldOrder,
+    RecordSchema, UnionSchema,
+};
+use apache_avro::Schema as AvroSchema;
+use itertools::{Either, Itertools};
+use serde_json::{Number, Value};
+
+const FILED_ID_PROP: &str = "field-id";
+
+struct SchemaToAvroSchema {
+    schema: String,
+}
+
+impl SchemaVisitor for SchemaToAvroSchema {
+    type T = Either<AvroSchema, AvroRecordField>;
+
+    fn schema(
+        &mut self,
+        _schema: &Schema,
+        value: Either<AvroSchema, AvroRecordField>,
+    ) -> Result<Either<AvroSchema, AvroRecordField>> {
+        let mut avro_schema = value.unwrap_left();
+
+        if let AvroSchema::Record(record) = &mut avro_schema {
+            record.name = Name::from(self.schema.as_str());
+        } else {
+            return Err(Error::new(
+                ErrorKind::Unexpected,
+                "Schema result must be avro record!",
+            ));
+        }
+
+        Ok(Either::Left(avro_schema))
+    }
+
+    fn field(
+        &mut self,
+        field: &NestedFieldRef,
+        avro_schema: Either<AvroSchema, AvroRecordField>,
+    ) -> Result<Either<AvroSchema, AvroRecordField>> {
+        let mut field_schema = avro_schema.unwrap_left();
+        if let AvroSchema::Record(record) = &mut field_schema {
+            record.name = Name::from(format!("r{}", field.id).as_str());
+        }
+
+        if !field.required {
+            field_schema = avro_optional(field_schema)?;
+        }
+
+        let mut avro_record_field = AvroRecordField {
+            name: field.name.clone(),
+            schema: field_schema,
+            order: RecordFieldOrder::Ignore,
+            position: 0,
+            doc: field.doc.clone(),
+            aliases: None,
+            default: None,
+            custom_attributes: Default::default(),
+        };
+
+        if !field.required {
+            avro_record_field.default = Some(Value::Null);
+        }
+        avro_record_field.custom_attributes.insert(
+            FILED_ID_PROP.to_string(),
+            Value::Number(Number::from(field.id)),
+        );
+
+        Ok(Either::Right(avro_record_field))
+    }
+
+    fn r#struct(
+        &mut self,
+        _struct: &StructType,
+        results: Vec<Either<AvroSchema, AvroRecordField>>,
+    ) -> Result<Either<AvroSchema, AvroRecordField>> {
+        let avro_fields = results.into_iter().map(|r| 
r.unwrap_right()).collect();
+
+        Ok(Either::Left(AvroSchema::Record(RecordSchema {
+            // The name of this record schema should be determined later, by 
schema name or field
+            // name, here we use a temporary placeholder to do it.
+            name: Name::new("null")?,
+            aliases: None,
+            doc: None,
+            fields: avro_fields,
+            lookup: Default::default(),
+            attributes: Default::default(),
+        })))
+    }
+
+    fn list(
+        &mut self,
+        list: &ListType,
+        value: Either<AvroSchema, AvroRecordField>,
+    ) -> Result<Either<AvroSchema, AvroRecordField>> {
+        let mut field_schema = value.unwrap_left();
+
+        if let AvroSchema::Record(record) = &mut field_schema {
+            record.name = Name::from(format!("r{}", 
list.element_field.id).as_str());
+        }
+
+        if !list.element_field.required {
+            field_schema = avro_optional(field_schema)?;
+        }
+
+        // TODO: We need to add element id prop here, but rust's avro schema 
doesn't support property except record schema.
+        Ok(Either::Left(AvroSchema::Array(Box::new(field_schema))))
+    }
+
+    fn map(
+        &mut self,
+        map: &MapType,
+        key_value: Either<AvroSchema, AvroRecordField>,
+        value: Either<AvroSchema, AvroRecordField>,
+    ) -> Result<Either<AvroSchema, AvroRecordField>> {
+        let key_field_schema = key_value.unwrap_left();
+        let mut value_field_schema = value.unwrap_left();
+        if !map.value_field.required {
+            value_field_schema = avro_optional(value_field_schema)?;
+        }
+
+        if matches!(key_field_schema, AvroSchema::String) {
+            Ok(Either::Left(AvroSchema::Map(Box::new(value_field_schema))))
+        } else {
+            // Avro map requires that key must be string type. Here we convert 
it to array if key is
+            // not string type.
+            let key_field = {
+                let mut field = AvroRecordField {
+                    name: map.key_field.name.clone(),
+                    doc: None,
+                    aliases: None,
+                    default: None,
+                    schema: key_field_schema,
+                    order: RecordFieldOrder::Ascending,
+                    position: 0,
+                    custom_attributes: Default::default(),
+                };
+                field.custom_attributes.insert(
+                    FILED_ID_PROP.to_string(),
+                    Value::Number(Number::from(map.key_field.id)),
+                );
+                field
+            };
+
+            let value_field = {
+                let mut field = AvroRecordField {
+                    name: map.key_field.name.clone(),
+                    doc: None,
+                    aliases: None,
+                    default: None,
+                    schema: value_field_schema,
+                    order: RecordFieldOrder::Ignore,
+                    position: 0,
+                    custom_attributes: Default::default(),
+                };
+                field.custom_attributes.insert(
+                    FILED_ID_PROP.to_string(),
+                    Value::Number(Number::from(map.value_field.id)),
+                );
+                field
+            };
+
+            let item_avro_schema = AvroSchema::Record(RecordSchema {
+                name: Name::from(format!("k{}_v{}", map.key_field.id, 
map.value_field.id).as_str()),
+                aliases: None,
+                doc: None,
+                fields: vec![key_field, value_field],
+                lookup: Default::default(),
+                attributes: Default::default(),
+            });
+
+            Ok(Either::Left(item_avro_schema))
+        }
+    }
+
+    fn primitive(&mut self, p: &PrimitiveType) -> Result<Either<AvroSchema, 
AvroRecordField>> {
+        let avro_schema = match p {
+            PrimitiveType::Boolean => AvroSchema::Boolean,
+            PrimitiveType::Int => AvroSchema::Int,
+            PrimitiveType::Long => AvroSchema::Long,
+            PrimitiveType::Float => AvroSchema::Float,
+            PrimitiveType::Double => AvroSchema::Double,
+            PrimitiveType::Date => AvroSchema::Date,
+            PrimitiveType::Time => AvroSchema::TimeMicros,
+            PrimitiveType::Timestamp => AvroSchema::TimestampMicros,
+            PrimitiveType::Timestamptz => AvroSchema::TimestampMicros,
+            PrimitiveType::String => AvroSchema::String,
+            PrimitiveType::Uuid => AvroSchema::Uuid,
+            PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?,
+            PrimitiveType::Binary => AvroSchema::Bytes,
+            PrimitiveType::Decimal { precision, scale } => {
+                avro_decimal_schema(*precision as usize, *scale as usize)?
+            }
+        };
+        Ok(Either::Left(avro_schema))
+    }
+}
+
+/// Converting iceberg schema to avro schema.
+pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> 
Result<AvroSchema> {
+    let mut converter = SchemaToAvroSchema {
+        schema: name.to_string(),
+    };
+
+    visit_schema(schema, &mut converter).map(Either::unwrap_left)
+}
+
+pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> {
+    Ok(AvroSchema::Fixed(FixedSchema {
+        name: Name::new(format!("fixed_{len}").as_str())?,
+        aliases: None,
+        doc: None,
+        size: len,
+        attributes: Default::default(),
+    }))
+}
+
+pub(crate) fn avro_decimal_schema(precision: usize, scale: usize) -> 
Result<AvroSchema> {
+    Ok(AvroSchema::Decimal(DecimalSchema {
+        precision,
+        scale,
+        inner: Box::new(avro_fixed_schema(DECIMAL_LENGTH)?),
+    }))
+}
+
+fn avro_optional(avro_schema: AvroSchema) -> Result<AvroSchema> {
+    Ok(AvroSchema::Union(UnionSchema::new(vec![
+        AvroSchema::Null,
+        avro_schema,
+    ])?))
+}
+
+fn is_avro_optional(avro_schema: &AvroSchema) -> bool {
+    match avro_schema {
+        AvroSchema::Union(union) => union.is_nullable(),
+        _ => false,
+    }
+}
+
+/// Post order avro schema visitor.
+pub trait AvroSchemaVisitor {
+    type T;
+
+    fn record(&mut self, record: &RecordSchema, fields: Vec<Self::T>) -> 
Result<Self::T>;
+
+    fn union(&mut self, union: &UnionSchema, options: Vec<Self::T>) -> 
Result<Self::T>;
+
+    fn array(&mut self, array: &AvroSchema, item: Self::T) -> Result<Self::T>;
+    fn map(&mut self, map: &AvroSchema, value: Self::T) -> Result<Self::T>;
+
+    fn primitive(&mut self, schema: &AvroSchema) -> Result<Self::T>;
+}
+
+/// Visit avro schema in post order visitor.
+pub fn visit<V: AvroSchemaVisitor>(schema: &AvroSchema, visitor: &mut V) -> 
Result<V::T> {

Review Comment:
   > Converting the schema might be a bit simple for the visitor, but later on, 
when you start pruning schema (or for Avro, having a read schema that differs 
from the write schema), it also gives this concept of divide and conquer where 
you can easily isolate the different cases.
   
   This is also my concern. I prefer to keep visitor pattern to see if we have 
more features to implement on avro schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to