This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ee05b4  feat: Implement Iceberg values (#20)
4ee05b4 is described below

commit 4ee05b4d6a3101af424de7e44a8d3eeb63e688bd
Author: JanKaul <[email protected]>
AuthorDate: Wed Aug 9 12:52:36 2023 +0200

    feat: Implement Iceberg values (#20)
    
    * implement values
    
    * improve getters
    
    * fix clippy warnings
    
    * fix clippy warnings
    
    * change into bytebuf to from
    
    * add license header
    
    * Use Long instead of LongInt
    
    Co-authored-by: Renjie Liu <[email protected]>
    
    * use more general error kind
    
    * use Naivetime
    
    * use naivedate
    
    * use naivedatetime
    
    * fix clippy warnings
    
    * use uuid
    
    * use orderedfloat
    
    * fix clippy warnings
    
    * fix tests
    
    * use datatime utz
    
    * fix docs
    
    * rename value to literal
    
    * introduce primitive literal
    
    * remove length from fixed
    
    * serialize json via serde_json value
    
    * remove derive serialize/deserialize
    
    * implement From Literal for ByteBuf
    
    * implement From Literal for JsonValue
    
    * fix From Literal for JsonValue
    
    * implement struct
    
    * fix clippy warnings
    
    * add avro tests for some types
    
    * fix clippy warnings
    
    * fix nested field
    
    * fix nested field
    
    * implement list test
    
    * implement map test
    
    * fix error
    
    * fix clippy warnings
    
    * change timestamps to int/long
    
    * convert nulls to None
    
    * add tests for null
    
    * null test for struct
    
    * fix clippy warning
    
    * convert json null to option
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/iceberg/Cargo.toml            |   5 +
 crates/iceberg/src/error.rs          |  24 +
 crates/iceberg/src/spec/datatypes.rs |   5 +
 crates/iceberg/src/spec/mod.rs       |   1 +
 crates/iceberg/src/spec/values.rs    | 964 +++++++++++++++++++++++++++++++++++
 5 files changed, 999 insertions(+)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 72c10c7..e1b5de2 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -34,6 +34,11 @@ serde_json = "^1.0"
 serde_derive = "^1.0"
 anyhow = "1.0.72"
 once_cell = "1"
+rust_decimal = "1.31.0"
+chrono = "0.4"
+uuid = "1.4.1"
+ordered-float = "3.7.0"
+bitvec = "1.0.1"
 
 [dev-dependencies]
 pretty_assertions = "1.4.0"
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index c64d351..6cb41db 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -259,6 +259,30 @@ define_from_err!(
     "handling invalid utf-8 characters"
 );
 
+define_from_err!(
+    std::array::TryFromSliceError,
+    ErrorKind::DataInvalid,
+    "failed to convert byte slive to array"
+);
+
+define_from_err!(
+    std::num::TryFromIntError,
+    ErrorKind::DataInvalid,
+    "failed to convert integer"
+);
+
+define_from_err!(
+    chrono::ParseError,
+    ErrorKind::DataInvalid,
+    "Failed to parse string to date or time"
+);
+
+define_from_err!(
+    uuid::Error,
+    ErrorKind::DataInvalid,
+    "Failed to convert between uuid und iceberg value"
+);
+
 #[cfg(test)]
 mod tests {
     use anyhow::anyhow;
diff --git a/crates/iceberg/src/spec/datatypes.rs 
b/crates/iceberg/src/spec/datatypes.rs
index 301415c..5750ab9 100644
--- a/crates/iceberg/src/spec/datatypes.rs
+++ b/crates/iceberg/src/spec/datatypes.rs
@@ -22,6 +22,7 @@ use ::serde::de::{MapAccess, Visitor};
 use serde::de::{Error, IntoDeserializer};
 use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
 use std::cell::OnceCell;
+use std::slice::Iter;
 use std::{collections::HashMap, fmt, ops::Index};
 
 /// Field name for list type.
@@ -293,6 +294,10 @@ impl StructType {
             .get(&field_id)
             .copied()
     }
+    /// Returns an iteratorr over the struct fields
+    pub fn iter(&self) -> Iter<NestedField> {
+        self.fields.iter()
+    }
 }
 
 impl PartialEq for StructType {
diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs
index 3b998a7..eb66abc 100644
--- a/crates/iceberg/src/spec/mod.rs
+++ b/crates/iceberg/src/spec/mod.rs
@@ -19,3 +19,4 @@
 
 pub mod datatypes;
 pub mod schema;
+pub mod values;
diff --git a/crates/iceberg/src/spec/values.rs 
b/crates/iceberg/src/spec/values.rs
new file mode 100644
index 0000000..771ffd8
--- /dev/null
+++ b/crates/iceberg/src/spec/values.rs
@@ -0,0 +1,964 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/*!
+ * Value in iceberg
+ */
+
+use std::{any::Any, collections::BTreeMap};
+
+use bitvec::vec::BitVec;
+use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Utc};
+use ordered_float::OrderedFloat;
+use rust_decimal::Decimal;
+use serde_bytes::ByteBuf;
+use serde_json::{Map as JsonMap, Number, Value as JsonValue};
+use uuid::Uuid;
+
+use crate::{Error, ErrorKind};
+
+use super::datatypes::{PrimitiveType, Type};
+
+/// Values present in iceberg type
+#[derive(Clone, Debug, PartialEq, Hash, Eq, PartialOrd, Ord)]
+pub enum PrimitiveLiteral {
+    /// 0x00 for false, non-zero byte for true
+    Boolean(bool),
+    /// Stored as 4-byte little-endian
+    Int(i32),
+    /// Stored as 8-byte little-endian
+    Long(i64),
+    /// Stored as 4-byte little-endian
+    Float(OrderedFloat<f32>),
+    /// Stored as 8-byte little-endian
+    Double(OrderedFloat<f64>),
+    /// Stores days from the 1970-01-01 in an 4-byte little-endian int
+    Date(i32),
+    /// Stores microseconds from midnight in an 8-byte little-endian long
+    Time(i64),
+    /// Timestamp without timezone
+    Timestamp(i64),
+    /// Timestamp with timezone
+    TimestampTZ(i64),
+    /// UTF-8 bytes (without length)
+    String(String),
+    /// 16-byte big-endian value
+    UUID(Uuid),
+    /// Binary value
+    Fixed(Vec<u8>),
+    /// Binary value (without length)
+    Binary(Vec<u8>),
+    /// Stores unscaled value as two’s-complement big-endian binary,
+    /// using the minimum number of bytes for the value
+    Decimal(Decimal),
+}
+
+/// Values present in iceberg type
+#[derive(Clone, Debug, PartialEq, Hash, Eq, PartialOrd, Ord)]
+pub enum Literal {
+    /// A primitive value
+    Primitive(PrimitiveLiteral),
+    /// A struct is a tuple of typed values. Each field in the tuple is named 
and has an integer id that is unique in the table schema.
+    /// Each field can be either optional or required, meaning that values can 
(or cannot) be null. Fields may be any type.
+    /// Fields may have an optional comment or doc string. Fields can have 
default values.
+    Struct(Struct),
+    /// A list is a collection of values with some element type.
+    /// The element field has an integer id that is unique in the table schema.
+    /// Elements can be either optional or required. Element types may be any 
type.
+    List(Vec<Option<Literal>>),
+    /// A map is a collection of key-value pairs with a key type and a value 
type.
+    /// Both the key field and value field each have an integer id that is 
unique in the table schema.
+    /// Map keys are required and map values can be either optional or 
required. Both map keys and map values may be any type, including nested types.
+    Map(BTreeMap<Literal, Option<Literal>>),
+}
+
+impl From<Literal> for ByteBuf {
+    fn from(value: Literal) -> Self {
+        match value {
+            Literal::Primitive(prim) => match prim {
+                PrimitiveLiteral::Boolean(val) => {
+                    if val {
+                        ByteBuf::from([0u8])
+                    } else {
+                        ByteBuf::from([1u8])
+                    }
+                }
+                PrimitiveLiteral::Int(val) => ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Long(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Float(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Double(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Date(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Time(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::Timestamp(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::TimestampTZ(val) => 
ByteBuf::from(val.to_le_bytes()),
+                PrimitiveLiteral::String(val) => ByteBuf::from(val.as_bytes()),
+                PrimitiveLiteral::UUID(val) => 
ByteBuf::from(val.as_u128().to_be_bytes()),
+                PrimitiveLiteral::Fixed(val) => ByteBuf::from(val),
+                PrimitiveLiteral::Binary(val) => ByteBuf::from(val),
+                PrimitiveLiteral::Decimal(_) => todo!(),
+            },
+            _ => unimplemented!(),
+        }
+    }
+}
+
+impl From<&Literal> for JsonValue {
+    fn from(value: &Literal) -> Self {
+        match value {
+            Literal::Primitive(prim) => match prim {
+                PrimitiveLiteral::Boolean(val) => JsonValue::Bool(*val),
+                PrimitiveLiteral::Int(val) => JsonValue::Number((*val).into()),
+                PrimitiveLiteral::Long(val) => 
JsonValue::Number((*val).into()),
+                PrimitiveLiteral::Float(val) => match Number::from_f64(val.0 
as f64) {
+                    Some(number) => JsonValue::Number(number),
+                    None => JsonValue::Null,
+                },
+                PrimitiveLiteral::Double(val) => match Number::from_f64(val.0) 
{
+                    Some(number) => JsonValue::Number(number),
+                    None => JsonValue::Null,
+                },
+                PrimitiveLiteral::Date(val) => {
+                    JsonValue::String(date::days_to_date(*val).to_string())
+                }
+                PrimitiveLiteral::Time(val) => {
+                    
JsonValue::String(time::microseconds_to_time(*val).to_string())
+                }
+                PrimitiveLiteral::Timestamp(val) => JsonValue::String(
+                    timestamp::microseconds_to_datetime(*val)
+                        .format("%Y-%m-%dT%H:%M:%S%.f")
+                        .to_string(),
+                ),
+                PrimitiveLiteral::TimestampTZ(val) => JsonValue::String(
+                    timestamptz::microseconds_to_datetimetz(*val)
+                        .format("%Y-%m-%dT%H:%M:%S%.f+00:00")
+                        .to_string(),
+                ),
+                PrimitiveLiteral::String(val) => 
JsonValue::String(val.clone()),
+                PrimitiveLiteral::UUID(val) => 
JsonValue::String(val.to_string()),
+                PrimitiveLiteral::Fixed(val) => {
+                    JsonValue::String(val.iter().fold(String::new(), |mut acc, 
x| {
+                        acc.push_str(&format!("{:x}", x));
+                        acc
+                    }))
+                }
+                PrimitiveLiteral::Binary(val) => {
+                    JsonValue::String(val.iter().fold(String::new(), |mut acc, 
x| {
+                        acc.push_str(&format!("{:x}", x));
+                        acc
+                    }))
+                }
+                PrimitiveLiteral::Decimal(_) => todo!(),
+            },
+            Literal::Struct(s) => {
+                JsonValue::Object(JsonMap::from_iter(s.iter().map(|(id, value, 
_)| {
+                    let json: JsonValue = match value {
+                        Some(val) => val.into(),
+                        None => JsonValue::Null,
+                    };
+                    (id.to_string(), json)
+                })))
+            }
+            Literal::List(list) => JsonValue::Array(
+                list.iter()
+                    .map(|opt| match opt {
+                        Some(literal) => literal.into(),
+                        None => JsonValue::Null,
+                    })
+                    .collect(),
+            ),
+            Literal::Map(map) => {
+                let mut object = JsonMap::with_capacity(2);
+                object.insert(
+                    "keys".to_string(),
+                    JsonValue::Array(map.keys().map(|literal| 
literal.into()).collect()),
+                );
+                object.insert(
+                    "values".to_string(),
+                    JsonValue::Array(
+                        map.values()
+                            .map(|literal| match literal {
+                                Some(literal) => literal.into(),
+                                None => JsonValue::Null,
+                            })
+                            .collect(),
+                    ),
+                );
+                JsonValue::Object(object)
+            }
+        }
+    }
+}
+
+/// The partition struct stores the tuple of partition values for each file.
+/// Its type is derived from the partition fields of the partition spec used 
to write the manifest file.
+/// In v2, the partition struct’s field ids must match the ids from the 
partition spec.
+#[derive(Debug, Clone, PartialEq, Hash, Eq, PartialOrd, Ord)]
+pub struct Struct {
+    /// Vector to store the field values
+    fields: Vec<Literal>,
+    /// Vector to store the field ids
+    field_ids: Vec<i32>,
+    /// Vector to store the field names
+    field_names: Vec<String>,
+    /// Null bitmap
+    null_bitmap: BitVec,
+}
+
+impl Struct {
+    /// Create a iterator to read the field in order of (field_id, 
field_value, field_name).
+    pub fn iter(&self) -> impl Iterator<Item = (&i32, Option<&Literal>, &str)> 
{
+        self.null_bitmap
+            .iter()
+            .zip(self.fields.iter())
+            .zip(self.field_ids.iter())
+            .zip(self.field_names.iter())
+            .map(|(((null, value), id), name)| {
+                (id, if *null { None } else { Some(value) }, name.as_str())
+            })
+    }
+}
+
+impl FromIterator<(i32, Option<Literal>, String)> for Struct {
+    fn from_iter<I: IntoIterator<Item = (i32, Option<Literal>, String)>>(iter: 
I) -> Self {
+        let mut fields = Vec::new();
+        let mut field_ids = Vec::new();
+        let mut field_names = Vec::new();
+        let mut null_bitmap = BitVec::new();
+
+        for (id, value, name) in iter.into_iter() {
+            field_ids.push(id);
+            field_names.push(name);
+            match value {
+                Some(value) => {
+                    fields.push(value);
+                    null_bitmap.push(false)
+                }
+                None => {
+                    
fields.push(Literal::Primitive(PrimitiveLiteral::Boolean(false)));
+                    null_bitmap.push(true)
+                }
+            }
+        }
+        Struct {
+            fields,
+            field_ids,
+            field_names,
+            null_bitmap,
+        }
+    }
+}
+
+impl Literal {
+    /// Create iceberg value from bytes
+    pub fn try_from_bytes(bytes: &[u8], data_type: &Type) -> Result<Self, 
Error> {
+        match data_type {
+            Type::Primitive(primitive) => match primitive {
+                PrimitiveType::Boolean => {
+                    if bytes.len() == 1 && bytes[0] == 0u8 {
+                        
Ok(Literal::Primitive(PrimitiveLiteral::Boolean(false)))
+                    } else {
+                        Ok(Literal::Primitive(PrimitiveLiteral::Boolean(true)))
+                    }
+                }
+                PrimitiveType::Int => 
Ok(Literal::Primitive(PrimitiveLiteral::Int(
+                    i32::from_le_bytes(bytes.try_into()?),
+                ))),
+                PrimitiveType::Long => 
Ok(Literal::Primitive(PrimitiveLiteral::Long(
+                    i64::from_le_bytes(bytes.try_into()?),
+                ))),
+                PrimitiveType::Float => 
Ok(Literal::Primitive(PrimitiveLiteral::Float(
+                    OrderedFloat(f32::from_le_bytes(bytes.try_into()?)),
+                ))),
+                PrimitiveType::Double => 
Ok(Literal::Primitive(PrimitiveLiteral::Double(
+                    OrderedFloat(f64::from_le_bytes(bytes.try_into()?)),
+                ))),
+                PrimitiveType::Date => 
Ok(Literal::Primitive(PrimitiveLiteral::Date(
+                    i32::from_le_bytes(bytes.try_into()?),
+                ))),
+                PrimitiveType::Time => 
Ok(Literal::Primitive(PrimitiveLiteral::Time(
+                    i64::from_le_bytes(bytes.try_into()?),
+                ))),
+                PrimitiveType::Timestamp => 
Ok(Literal::Primitive(PrimitiveLiteral::Timestamp(
+                    i64::from_le_bytes(bytes.try_into()?),
+                ))),
+                PrimitiveType::Timestamptz => Ok(Literal::Primitive(
+                    
PrimitiveLiteral::TimestampTZ(i64::from_le_bytes(bytes.try_into()?)),
+                )),
+                PrimitiveType::String => 
Ok(Literal::Primitive(PrimitiveLiteral::String(
+                    std::str::from_utf8(bytes)?.to_string(),
+                ))),
+                PrimitiveType::Uuid => 
Ok(Literal::Primitive(PrimitiveLiteral::UUID(
+                    Uuid::from_u128(u128::from_be_bytes(bytes.try_into()?)),
+                ))),
+                PrimitiveType::Fixed(_) => 
Ok(Literal::Primitive(PrimitiveLiteral::Fixed(
+                    Vec::from(bytes),
+                ))),
+                PrimitiveType::Binary => 
Ok(Literal::Primitive(PrimitiveLiteral::Binary(
+                    Vec::from(bytes),
+                ))),
+                PrimitiveType::Decimal {
+                    precision: _,
+                    scale: _,
+                } => todo!(),
+            },
+            _ => Err(Error::new(
+                crate::ErrorKind::DataInvalid,
+                "Converting bytes to non-primitive types is not supported.",
+            )),
+        }
+    }
+
+    /// Create iceberg value from a json value
+    pub fn try_from_json(value: JsonValue, data_type: &Type) -> 
Result<Option<Self>, Error> {
+        match data_type {
+            Type::Primitive(primitive) => match (primitive, value) {
+                (PrimitiveType::Boolean, JsonValue::Bool(bool)) => {
+                    
Ok(Some(Literal::Primitive(PrimitiveLiteral::Boolean(bool))))
+                }
+                (PrimitiveType::Int, JsonValue::Number(number)) => {
+                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Int(
+                        number
+                            .as_i64()
+                            .ok_or(Error::new(
+                                crate::ErrorKind::DataInvalid,
+                                "Failed to convert json number to int",
+                            ))?
+                            .try_into()?,
+                    ))))
+                }
+                (PrimitiveType::Long, JsonValue::Number(number)) => 
Ok(Some(Literal::Primitive(
+                    PrimitiveLiteral::Long(number.as_i64().ok_or(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "Failed to convert json number to long",
+                    ))?),
+                ))),
+                (PrimitiveType::Float, JsonValue::Number(number)) => 
Ok(Some(Literal::Primitive(
+                    
PrimitiveLiteral::Float(OrderedFloat(number.as_f64().ok_or(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "Failed to convert json number to float",
+                    ))? as f32)),
+                ))),
+                (PrimitiveType::Double, JsonValue::Number(number)) => 
Ok(Some(Literal::Primitive(
+                    
PrimitiveLiteral::Double(OrderedFloat(number.as_f64().ok_or(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "Failed to convert json number to double",
+                    ))?)),
+                ))),
+                (PrimitiveType::Date, JsonValue::String(s)) => {
+                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Date(
+                        date::date_to_days(&NaiveDate::parse_from_str(&s, 
"%Y-%m-%d")?),
+                    ))))
+                }
+                (PrimitiveType::Time, JsonValue::String(s)) => {
+                    Ok(Some(Literal::Primitive(PrimitiveLiteral::Time(
+                        
time::time_to_microseconds(&NaiveTime::parse_from_str(&s, "%H:%M:%S%.f")?),
+                    ))))
+                }
+                (PrimitiveType::Timestamp, JsonValue::String(s)) => 
Ok(Some(Literal::Primitive(
+                    
PrimitiveLiteral::Timestamp(timestamp::datetime_to_microseconds(
+                        &NaiveDateTime::parse_from_str(&s, 
"%Y-%m-%dT%H:%M:%S%.f")?,
+                    )),
+                ))),
+                (PrimitiveType::Timestamptz, JsonValue::String(s)) => {
+                    Ok(Some(Literal::Primitive(PrimitiveLiteral::TimestampTZ(
+                        
timestamptz::datetimetz_to_microseconds(&DateTime::from_utc(
+                            NaiveDateTime::parse_from_str(&s, 
"%Y-%m-%dT%H:%M:%S%.f+00:00")?,
+                            Utc,
+                        )),
+                    ))))
+                }
+                (PrimitiveType::String, JsonValue::String(s)) => {
+                    Ok(Some(Literal::Primitive(PrimitiveLiteral::String(s))))
+                }
+                (PrimitiveType::Uuid, JsonValue::String(s)) => 
Ok(Some(Literal::Primitive(
+                    PrimitiveLiteral::UUID(Uuid::parse_str(&s)?),
+                ))),
+                (PrimitiveType::Fixed(_), JsonValue::String(_)) => todo!(),
+                (PrimitiveType::Binary, JsonValue::String(_)) => todo!(),
+                (
+                    PrimitiveType::Decimal {
+                        precision: _,
+                        scale: _,
+                    },
+                    JsonValue::String(_),
+                ) => todo!(),
+                (_, JsonValue::Null) => Ok(None),
+                (i, j) => Err(Error::new(
+                    crate::ErrorKind::DataInvalid,
+                    format!(
+                        "The json value {} doesn't fit to the iceberg type 
{}.",
+                        j, i
+                    ),
+                )),
+            },
+            Type::Struct(schema) => {
+                if let JsonValue::Object(mut object) = value {
+                    
Ok(Some(Literal::Struct(Struct::from_iter(schema.iter().map(
+                        |field| {
+                            (
+                                field.id,
+                                
object.remove(&field.id.to_string()).and_then(|value| {
+                                    Literal::try_from_json(value, 
&field.field_type)
+                                        .and_then(|value| {
+                                            value.ok_or(Error::new(
+                                                ErrorKind::DataInvalid,
+                                                "Key of map cannot be null",
+                                            ))
+                                        })
+                                        .ok()
+                                }),
+                                field.name.clone(),
+                            )
+                        },
+                    )))))
+                } else {
+                    Err(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "The json value for a struct type must be an object.",
+                    ))
+                }
+            }
+            Type::List(list) => {
+                if let JsonValue::Array(array) = value {
+                    Ok(Some(Literal::List(
+                        array
+                            .into_iter()
+                            .map(|value| {
+                                Literal::try_from_json(value, 
&list.element_field.field_type)
+                            })
+                            .collect::<Result<Vec<_>, Error>>()?,
+                    )))
+                } else {
+                    Err(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "The json value for a list type must be an array.",
+                    ))
+                }
+            }
+            Type::Map(map) => {
+                if let JsonValue::Object(mut object) = value {
+                    if let (Some(JsonValue::Array(keys)), 
Some(JsonValue::Array(values))) =
+                        (object.remove("keys"), object.remove("values"))
+                    {
+                        Ok(Some(Literal::Map(BTreeMap::from_iter(
+                            keys.into_iter()
+                                .zip(values.into_iter())
+                                .map(|(key, value)| {
+                                    Ok((
+                                        Literal::try_from_json(key, 
&map.key_field.field_type)
+                                            .and_then(|value| {
+                                                value.ok_or(Error::new(
+                                                    ErrorKind::DataInvalid,
+                                                    "Key of map cannot be 
null",
+                                                ))
+                                            })?,
+                                        Literal::try_from_json(value, 
&map.value_field.field_type)?,
+                                    ))
+                                })
+                                .collect::<Result<Vec<_>, Error>>()?
+                                .into_iter(),
+                        ))))
+                    } else {
+                        Err(Error::new(
+                            crate::ErrorKind::DataInvalid,
+                            "The json value for a list type must be an array.",
+                        ))
+                    }
+                } else {
+                    Err(Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "The json value for a list type must be an array.",
+                    ))
+                }
+            }
+        }
+    }
+
+    /// Get datatype of value
+    pub fn datatype(&self) -> Type {
+        match self {
+            Literal::Primitive(prim) => match prim {
+                PrimitiveLiteral::Boolean(_) => 
Type::Primitive(PrimitiveType::Boolean),
+                PrimitiveLiteral::Int(_) => 
Type::Primitive(PrimitiveType::Int),
+                PrimitiveLiteral::Long(_) => 
Type::Primitive(PrimitiveType::Long),
+                PrimitiveLiteral::Float(_) => 
Type::Primitive(PrimitiveType::Float),
+                PrimitiveLiteral::Double(_) => 
Type::Primitive(PrimitiveType::Double),
+                PrimitiveLiteral::Date(_) => 
Type::Primitive(PrimitiveType::Date),
+                PrimitiveLiteral::Time(_) => 
Type::Primitive(PrimitiveType::Time),
+                PrimitiveLiteral::Timestamp(_) => 
Type::Primitive(PrimitiveType::Timestamp),
+                PrimitiveLiteral::TimestampTZ(_) => 
Type::Primitive(PrimitiveType::Timestamptz),
+                PrimitiveLiteral::Fixed(vec) => {
+                    Type::Primitive(PrimitiveType::Fixed(vec.len() as u64))
+                }
+                PrimitiveLiteral::Binary(_) => 
Type::Primitive(PrimitiveType::Binary),
+                PrimitiveLiteral::String(_) => 
Type::Primitive(PrimitiveType::String),
+                PrimitiveLiteral::UUID(_) => 
Type::Primitive(PrimitiveType::Uuid),
+                PrimitiveLiteral::Decimal(dec) => 
Type::Primitive(PrimitiveType::Decimal {
+                    precision: 38,
+                    scale: dec.scale(),
+                }),
+            },
+            _ => unimplemented!(),
+        }
+    }
+
+    /// Convert Value to the any type
+    pub fn into_any(self) -> Box<dyn Any> {
+        match self {
+            Literal::Primitive(prim) => match prim {
+                PrimitiveLiteral::Boolean(any) => Box::new(any),
+                PrimitiveLiteral::Int(any) => Box::new(any),
+                PrimitiveLiteral::Long(any) => Box::new(any),
+                PrimitiveLiteral::Float(any) => Box::new(any),
+                PrimitiveLiteral::Double(any) => Box::new(any),
+                PrimitiveLiteral::Date(any) => Box::new(any),
+                PrimitiveLiteral::Time(any) => Box::new(any),
+                PrimitiveLiteral::Timestamp(any) => Box::new(any),
+                PrimitiveLiteral::TimestampTZ(any) => Box::new(any),
+                PrimitiveLiteral::Fixed(any) => Box::new(any),
+                PrimitiveLiteral::Binary(any) => Box::new(any),
+                PrimitiveLiteral::String(any) => Box::new(any),
+                PrimitiveLiteral::UUID(any) => Box::new(any),
+                PrimitiveLiteral::Decimal(any) => Box::new(any),
+            },
+            _ => unimplemented!(),
+        }
+    }
+}
+
+mod date {
+    use chrono::{NaiveDate, NaiveDateTime};
+
+    pub(crate) fn date_to_days(date: &NaiveDate) -> i32 {
+        date.signed_duration_since(
+            // This is always the same and shouldn't fail
+            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+        )
+        .num_days() as i32
+    }
+
+    pub(crate) fn days_to_date(days: i32) -> NaiveDate {
+        // This shouldn't fail until the year 262000
+        NaiveDateTime::from_timestamp_opt(days as i64 * 86_400, 0)
+            .unwrap()
+            .date()
+    }
+}
+
+mod time {
+    use chrono::NaiveTime;
+
+    pub(crate) fn time_to_microseconds(time: &NaiveTime) -> i64 {
+        time.signed_duration_since(
+            // This is always the same and shouldn't fail
+            NaiveTime::from_num_seconds_from_midnight_opt(0, 0).unwrap(),
+        )
+        .num_microseconds()
+        .unwrap()
+    }
+
+    pub(crate) fn microseconds_to_time(micros: i64) -> NaiveTime {
+        let (secs, rem) = (micros / 1_000_000, micros % 1_000_000);
+
+        NaiveTime::from_num_seconds_from_midnight_opt(secs as u32, rem as u32 
* 1_000).unwrap()
+    }
+}
+
+mod timestamp {
+    use chrono::NaiveDateTime;
+
+    pub(crate) fn datetime_to_microseconds(time: &NaiveDateTime) -> i64 {
+        time.timestamp_micros()
+    }
+
+    pub(crate) fn microseconds_to_datetime(micros: i64) -> NaiveDateTime {
+        let (secs, rem) = (micros / 1_000_000, micros % 1_000_000);
+
+        // This shouldn't fail until the year 262000
+        NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 1_000).unwrap()
+    }
+}
+
+mod timestamptz {
+    use chrono::{DateTime, NaiveDateTime, Utc};
+
+    pub(crate) fn datetimetz_to_microseconds(time: &DateTime<Utc>) -> i64 {
+        time.timestamp_micros()
+    }
+
+    pub(crate) fn microseconds_to_datetimetz(micros: i64) -> DateTime<Utc> {
+        let (secs, rem) = (micros / 1_000_000, micros % 1_000_000);
+
+        DateTime::<Utc>::from_utc(
+            // This shouldn't fail until the year 262000
+            NaiveDateTime::from_timestamp_opt(secs, rem as u32 * 
1_000).unwrap(),
+            Utc,
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use crate::spec::datatypes::{ListType, MapType, NestedField, StructType};
+
+    use super::*;
+
+    fn check_json_serde(json: &str, expected_literal: Literal, expected_type: 
&Type) {
+        let raw_json_value = serde_json::from_str::<JsonValue>(json).unwrap();
+        let desered_literal =
+            Literal::try_from_json(raw_json_value.clone(), 
expected_type).unwrap();
+        assert_eq!(desered_literal, Some(expected_literal.clone()));
+
+        let expected_json_value: JsonValue = (&expected_literal).into();
+        let sered_json = serde_json::to_string(&expected_json_value).unwrap();
+        let parsed_json_value = 
serde_json::from_str::<JsonValue>(&sered_json).unwrap();
+
+        assert_eq!(parsed_json_value, raw_json_value);
+    }
+
+    fn check_avro_bytes_serde(input: Vec<u8>, expected_literal: Literal, 
expected_type: &Type) {
+        let raw_schema = r#""bytes""#;
+        let schema = apache_avro::Schema::parse_str(raw_schema).unwrap();
+
+        let bytes = ByteBuf::from(input);
+        let literal = Literal::try_from_bytes(&bytes, expected_type).unwrap();
+        assert_eq!(literal, expected_literal);
+
+        let mut writer = apache_avro::Writer::new(&schema, Vec::new());
+        writer.append_ser(bytes).unwrap();
+        let encoded = writer.into_inner().unwrap();
+        let reader = apache_avro::Reader::new(&*encoded).unwrap();
+
+        for record in reader {
+            let result = 
apache_avro::from_value::<ByteBuf>(&record.unwrap()).unwrap();
+            let desered_literal = Literal::try_from_bytes(&result, 
expected_type).unwrap();
+            assert_eq!(desered_literal, expected_literal);
+        }
+    }
+
+    #[test]
+    fn json_boolean() {
+        let record = r#"true"#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Boolean(true)),
+            &Type::Primitive(PrimitiveType::Boolean),
+        );
+    }
+
+    #[test]
+    fn json_int() {
+        let record = r#"32"#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Int(32)),
+            &Type::Primitive(PrimitiveType::Int),
+        );
+    }
+
+    #[test]
+    fn json_long() {
+        let record = r#"32"#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Long(32)),
+            &Type::Primitive(PrimitiveType::Long),
+        );
+    }
+
+    #[test]
+    fn json_float() {
+        let record = r#"1.0"#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))),
+            &Type::Primitive(PrimitiveType::Float),
+        );
+    }
+
+    #[test]
+    fn json_double() {
+        let record = r#"1.0"#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))),
+            &Type::Primitive(PrimitiveType::Double),
+        );
+    }
+
+    #[test]
+    fn json_date() {
+        let record = r#""2017-11-16""#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Date(17486)),
+            &Type::Primitive(PrimitiveType::Date),
+        );
+    }
+
+    #[test]
+    fn json_time() {
+        let record = r#""22:31:08.123456""#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Time(81068123456)),
+            &Type::Primitive(PrimitiveType::Time),
+        );
+    }
+
+    #[test]
+    fn json_timestamp() {
+        let record = r#""2017-11-16T22:31:08.123456""#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::Timestamp(1510871468123456)),
+            &Type::Primitive(PrimitiveType::Timestamp),
+        );
+    }
+
+    #[test]
+    fn json_timestamptz() {
+        let record = r#""2017-11-16T22:31:08.123456+00:00""#;
+
+        check_json_serde(
+            record,
+            
Literal::Primitive(PrimitiveLiteral::TimestampTZ(1510871468123456)),
+            &Type::Primitive(PrimitiveType::Timestamptz),
+        );
+    }
+
+    #[test]
+    fn json_string() {
+        let record = r#""iceberg""#;
+
+        check_json_serde(
+            record,
+            
Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())),
+            &Type::Primitive(PrimitiveType::String),
+        );
+    }
+
+    #[test]
+    fn json_uuid() {
+        let record = r#""f79c3e09-677c-4bbd-a479-3f349cb785e7""#;
+
+        check_json_serde(
+            record,
+            Literal::Primitive(PrimitiveLiteral::UUID(
+                
Uuid::parse_str("f79c3e09-677c-4bbd-a479-3f349cb785e7").unwrap(),
+            )),
+            &Type::Primitive(PrimitiveType::Uuid),
+        );
+    }
+
+    #[test]
+    fn json_struct() {
+        let record = r#"{"1": 1, "2": "bar", "3": null}"#;
+
+        check_json_serde(
+            record,
+            Literal::Struct(Struct::from_iter(vec![
+                (
+                    1,
+                    Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
+                    "id".to_string(),
+                ),
+                (
+                    2,
+                    Some(Literal::Primitive(PrimitiveLiteral::String(
+                        "bar".to_string(),
+                    ))),
+                    "name".to_string(),
+                ),
+                (3, None, "address".to_string()),
+            ])),
+            &Type::Struct(StructType::new(vec![
+                NestedField {
+                    id: 1,
+                    name: "id".to_string(),
+                    required: true,
+                    field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+                NestedField {
+                    id: 2,
+                    name: "name".to_string(),
+                    required: false,
+                    field_type: 
Box::new(Type::Primitive(PrimitiveType::String)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+                NestedField {
+                    id: 3,
+                    name: "address".to_string(),
+                    required: false,
+                    field_type: 
Box::new(Type::Primitive(PrimitiveType::String)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+            ])),
+        );
+    }
+
+    #[test]
+    fn json_list() {
+        let record = r#"[1, 2, 3, null]"#;
+
+        check_json_serde(
+            record,
+            Literal::List(vec![
+                Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
+                Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
+                Some(Literal::Primitive(PrimitiveLiteral::Int(3))),
+                None,
+            ]),
+            &Type::List(ListType {
+                element_field: NestedField {
+                    id: 0,
+                    name: "".to_string(),
+                    required: true,
+                    field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+            }),
+        );
+    }
+
+    #[test]
+    fn json_map() {
+        let record = r#"{ "keys": ["a", "b", "c"], "values": [1, 2, null] }"#;
+
+        check_json_serde(
+            record,
+            Literal::Map(BTreeMap::from([
+                (
+                    
Literal::Primitive(PrimitiveLiteral::String("a".to_string())),
+                    Some(Literal::Primitive(PrimitiveLiteral::Int(1))),
+                ),
+                (
+                    
Literal::Primitive(PrimitiveLiteral::String("b".to_string())),
+                    Some(Literal::Primitive(PrimitiveLiteral::Int(2))),
+                ),
+                (
+                    
Literal::Primitive(PrimitiveLiteral::String("c".to_string())),
+                    None,
+                ),
+            ])),
+            &Type::Map(MapType {
+                key_field: NestedField {
+                    id: 0,
+                    name: "key".to_string(),
+                    required: true,
+                    field_type: 
Box::new(Type::Primitive(PrimitiveType::String)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+                value_field: NestedField {
+                    id: 1,
+                    name: "value".to_string(),
+                    required: true,
+                    field_type: Box::new(Type::Primitive(PrimitiveType::Int)),
+                    doc: None,
+                    initial_default: None,
+                    write_default: None,
+                },
+            }),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_boolean() {
+        let bytes = vec![1u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            Literal::Primitive(PrimitiveLiteral::Boolean(true)),
+            &Type::Primitive(PrimitiveType::Boolean),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_int() {
+        let bytes = vec![32u8, 0u8, 0u8, 0u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            Literal::Primitive(PrimitiveLiteral::Int(32)),
+            &Type::Primitive(PrimitiveType::Int),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_long() {
+        let bytes = vec![32u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            Literal::Primitive(PrimitiveLiteral::Long(32)),
+            &Type::Primitive(PrimitiveType::Long),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_float() {
+        let bytes = vec![0u8, 0u8, 128u8, 63u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))),
+            &Type::Primitive(PrimitiveType::Float),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_double() {
+        let bytes = vec![0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 240u8, 63u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))),
+            &Type::Primitive(PrimitiveType::Double),
+        );
+    }
+
+    #[test]
+    fn avro_bytes_string() {
+        let bytes = vec![105u8, 99u8, 101u8, 98u8, 101u8, 114u8, 103u8];
+
+        check_avro_bytes_serde(
+            bytes,
+            
Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())),
+            &Type::Primitive(PrimitiveType::String),
+        );
+    }
+}


Reply via email to