This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new a03000d  feat: support transform function (#42)
a03000d is described below

commit a03000dca603107bfbf9166ad1227423314ede95
Author: ZENOTME <[email protected]>
AuthorDate: Fri Sep 22 18:15:24 2023 +0800

    feat: support transform function (#42)
    
    * support transform function and add related test:
    1. identity
    2. void
    3. temporal
    
    * 1. remove unsupport type in iceberg
    2. refactor error handle
    
    * add license
    
    * clean code
    
    * 1. use arrow-* instead of arrow
    2. refine test
    
    * support bucket and truncate transform function
    
    * make code more clear
    
    * fix check
    
    * fix to truncate Unicode correctly and add related test
    
    * fix cargo-sort
    
    ---------
    
    Co-authored-by: ZENOTME <[email protected]>
---
 crates/iceberg/Cargo.toml                          |   4 +
 crates/iceberg/src/lib.rs                          |   2 +
 crates/iceberg/src/transform/bucket.rs             | 244 ++++++++++++
 .../iceberg/src/{lib.rs => transform/identity.rs}  |  29 +-
 crates/iceberg/src/transform/mod.rs                |  55 +++
 crates/iceberg/src/transform/temporal.rs           | 408 +++++++++++++++++++++
 crates/iceberg/src/transform/truncate.rs           | 217 +++++++++++
 crates/iceberg/src/{lib.rs => transform/void.rs}   |  28 +-
 8 files changed, 949 insertions(+), 38 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 4a4839d..5535ba7 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -29,6 +29,9 @@ keywords = ["iceberg"]
 [dependencies]
 anyhow = "1.0.72"
 apache-avro = "0.15"
+arrow-arith = { version = ">=46" }
+arrow-array = { version = ">=46" }
+arrow-schema = { version = ">=46" }
 async-trait = "0.1"
 bimap = "0.6"
 bitvec = "1.0.1"
@@ -38,6 +41,7 @@ either = "1"
 futures = "0.3"
 itertools = "0.11"
 lazy_static = "1"
+murmur3 = "0.5.2"
 once_cell = "1"
 opendal = "0.39"
 ordered-float = "3.7.0"
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 573b58e..0f2a134 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -37,3 +37,5 @@ pub mod table;
 mod avro;
 pub mod io;
 pub mod spec;
+
+pub mod transform;
diff --git a/crates/iceberg/src/transform/bucket.rs 
b/crates/iceberg/src/transform/bucket.rs
new file mode 100644
index 0000000..c9fe9df
--- /dev/null
+++ b/crates/iceberg/src/transform/bucket.rs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::ArrayRef;
+use arrow_schema::{DataType, TimeUnit};
+
+use super::TransformFunction;
+
+pub struct Bucket {
+    mod_n: u32,
+}
+
+impl Bucket {
+    pub fn new(mod_n: u32) -> Self {
+        Self { mod_n }
+    }
+}
+
+impl Bucket {
+    /// When switch the hash function, we only need to change this function.
+    fn hash_bytes(mut v: &[u8]) -> i32 {
+        murmur3::murmur3_32(&mut v, 0).unwrap() as i32
+    }
+
+    fn hash_int(v: i32) -> i32 {
+        Self::hash_long(v as i64)
+    }
+
+    fn hash_long(v: i64) -> i32 {
+        Self::hash_bytes(v.to_le_bytes().as_slice())
+    }
+
+    /// v is days from unix epoch
+    fn hash_date(v: i32) -> i32 {
+        Self::hash_int(v)
+    }
+
+    /// v is microseconds from midnight
+    fn hash_time(v: i64) -> i32 {
+        Self::hash_long(v)
+    }
+
+    /// v is microseconds from unix epoch
+    fn hash_timestamp(v: i64) -> i32 {
+        Self::hash_long(v)
+    }
+
+    fn hash_str(s: &str) -> i32 {
+        Self::hash_bytes(s.as_bytes())
+    }
+
+    /// Decimal values are hashed using the minimum number of bytes required 
to hold the unscaled value as a two’s complement big-endian
+    /// ref: 
https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
+    fn hash_decimal(v: i128) -> i32 {
+        let bytes = v.to_be_bytes();
+        if let Some(start) = bytes.iter().position(|&x| x != 0) {
+            Self::hash_bytes(&bytes[start..])
+        } else {
+            Self::hash_bytes(&[0])
+        }
+    }
+
+    /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
+    /// ref: https://iceberg.apache.org/spec/#partitioning
+    fn bucket_n(&self, v: i32) -> i32 {
+        (v & i32::MAX) % (self.mod_n as i32)
+    }
+}
+
+impl TransformFunction for Bucket {
+    fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
+        let res: arrow_array::Int32Array = match input.data_type() {
+            DataType::Int32 => input
+                .as_any()
+                .downcast_ref::<arrow_array::Int32Array>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_int(v))),
+            DataType::Int64 => input
+                .as_any()
+                .downcast_ref::<arrow_array::Int64Array>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_long(v))),
+            DataType::Decimal128(_, _) => input
+                .as_any()
+                .downcast_ref::<arrow_array::Decimal128Array>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_decimal(v))),
+            DataType::Date32 => input
+                .as_any()
+                .downcast_ref::<arrow_array::Date32Array>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_date(v))),
+            DataType::Time64(TimeUnit::Microsecond) => input
+                .as_any()
+                .downcast_ref::<arrow_array::Time64MicrosecondArray>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_time(v))),
+            DataType::Timestamp(TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| self.bucket_n(Self::hash_timestamp(v))),
+            DataType::Utf8 => arrow_array::Int32Array::from_iter(
+                input
+                    .as_any()
+                    .downcast_ref::<arrow_array::StringArray>()
+                    .unwrap()
+                    .iter()
+                    .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+            ),
+            DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
+                input
+                    .as_any()
+                    .downcast_ref::<arrow_array::LargeStringArray>()
+                    .unwrap()
+                    .iter()
+                    .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+            ),
+            DataType::Binary => arrow_array::Int32Array::from_iter(
+                input
+                    .as_any()
+                    .downcast_ref::<arrow_array::BinaryArray>()
+                    .unwrap()
+                    .iter()
+                    .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+            ),
+            DataType::LargeBinary => arrow_array::Int32Array::from_iter(
+                input
+                    .as_any()
+                    .downcast_ref::<arrow_array::LargeBinaryArray>()
+                    .unwrap()
+                    .iter()
+                    .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+            ),
+            DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
+                input
+                    .as_any()
+                    .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
+                    .unwrap()
+                    .iter()
+                    .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+            ),
+            _ => unreachable!("Unsupported data type: {:?}", 
input.data_type()),
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
+
+    use super::Bucket;
+    #[test]
+    fn test_hash() {
+        // test int
+        assert_eq!(Bucket::hash_int(34), 2017239379);
+        // test long
+        assert_eq!(Bucket::hash_long(34), 2017239379);
+        // test decimal
+        assert_eq!(Bucket::hash_decimal(1420), -500754589);
+        // test date
+        let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap();
+        assert_eq!(
+            Bucket::hash_date(
+                date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 
1).unwrap())
+                    .num_days() as i32
+            ),
+            -653330422
+        );
+        // test time
+        let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap();
+        assert_eq!(
+            Bucket::hash_time(
+                time.signed_duration_since(NaiveTime::from_hms_opt(0, 0, 
0).unwrap())
+                    .num_microseconds()
+                    .unwrap()
+            ),
+            -662762989
+        );
+        // test timestamp
+        let timestamp =
+            NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d 
%H:%M:%S").unwrap();
+        assert_eq!(
+            Bucket::hash_timestamp(
+                timestamp
+                    .signed_duration_since(
+                        NaiveDateTime::parse_from_str("1970-01-01 00:00:00", 
"%Y-%m-%d %H:%M:%S")
+                            .unwrap()
+                    )
+                    .num_microseconds()
+                    .unwrap()
+            ),
+            -2047944441
+        );
+        // test timestamp with tz
+        let timestamp = 
DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap();
+        assert_eq!(
+            Bucket::hash_timestamp(
+                timestamp
+                    .signed_duration_since(
+                        
DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap()
+                    )
+                    .num_microseconds()
+                    .unwrap()
+            ),
+            -2047944441
+        );
+        // test str
+        assert_eq!(Bucket::hash_str("iceberg"), 1210000089);
+        // test uuid
+        assert_eq!(
+            Bucket::hash_bytes(
+                [
+                    0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4, 
0x79, 0x3F, 0x34, 0x9C,
+                    0xB7, 0x85, 0xE7
+                ]
+                .as_ref()
+            ),
+            1488055340
+        );
+        // test fixed and binary
+        assert_eq!(
+            Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()),
+            -188683207
+        );
+    }
+}
diff --git a/crates/iceberg/src/lib.rs 
b/crates/iceberg/src/transform/identity.rs
similarity index 65%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/transform/identity.rs
index 573b58e..2ea6a20 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/transform/identity.rs
@@ -15,25 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Native Rust implementation of Apache Iceberg
+use crate::Result;
+use arrow_array::ArrayRef;
 
-#![deny(missing_docs)]
+use super::TransformFunction;
 
-#[macro_use]
-extern crate derive_builder;
+/// Return identity array.
+pub struct Identity {}
 
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-/// There is no implementation for this trait, allow dead code for now, should
-/// be removed after we have one.
-#[allow(dead_code)]
-pub mod catalog;
-#[allow(dead_code)]
-pub mod table;
-
-mod avro;
-pub mod io;
-pub mod spec;
+impl TransformFunction for Identity {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        Ok(input)
+    }
+}
diff --git a/crates/iceberg/src/transform/mod.rs 
b/crates/iceberg/src/transform/mod.rs
new file mode 100644
index 0000000..dead9db
--- /dev/null
+++ b/crates/iceberg/src/transform/mod.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Transform function used to compute partition values.
+use crate::{spec::Transform, Result};
+use arrow_array::ArrayRef;
+
+mod bucket;
+mod identity;
+mod temporal;
+mod truncate;
+mod void;
+
+/// TransformFunction is a trait that defines the interface for all transform 
functions.
+pub trait TransformFunction: Send {
+    /// transform will take an input array and transform it into a new array.
+    /// The implementation of this function will need to check and downcast 
the input to specific
+    /// type.
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
+}
+
+/// BoxedTransformFunction is a boxed trait object of TransformFunction.
+pub type BoxedTransformFunction = Box<dyn TransformFunction>;
+
+/// create_transform_function creates a boxed trait object of 
TransformFunction from a Transform.
+pub fn create_transform_function(transform: &Transform) -> 
Result<BoxedTransformFunction> {
+    match transform {
+        Transform::Identity => Ok(Box::new(identity::Identity {})),
+        Transform::Void => Ok(Box::new(void::Void {})),
+        Transform::Year => Ok(Box::new(temporal::Year {})),
+        Transform::Month => Ok(Box::new(temporal::Month {})),
+        Transform::Day => Ok(Box::new(temporal::Day {})),
+        Transform::Hour => Ok(Box::new(temporal::Hour {})),
+        Transform::Bucket(mod_n) => Ok(Box::new(bucket::Bucket::new(*mod_n))),
+        Transform::Truncate(width) => 
Ok(Box::new(truncate::Truncate::new(*width))),
+        Transform::Unknown => Err(crate::error::Error::new(
+            crate::ErrorKind::FeatureUnsupported,
+            "Transform Unknown is not implemented",
+        )),
+    }
+}
diff --git a/crates/iceberg/src/transform/temporal.rs 
b/crates/iceberg/src/transform/temporal.rs
new file mode 100644
index 0000000..f914d50
--- /dev/null
+++ b/crates/iceberg/src/transform/temporal.rs
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::TransformFunction;
+use crate::{Error, ErrorKind, Result};
+use arrow_arith::{
+    arity::binary,
+    temporal::{month_dyn, year_dyn},
+};
+use arrow_array::{
+    types::Date32Type, Array, ArrayRef, Date32Array, Int32Array, 
TimestampMicrosecondArray,
+};
+use arrow_schema::{DataType, TimeUnit};
+use chrono::Datelike;
+use std::sync::Arc;
+
+/// The number of days since unix epoch.
+const DAY_SINCE_UNIX_EPOCH: i32 = 719163;
+/// Hour in one second.
+const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64;
+/// Day in one second.
+const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64;
+/// Year of unix epoch.
+const UNIX_EPOCH_YEAR: i32 = 1970;
+
+/// Extract a date or timestamp year, as years from 1970
+pub struct Year;
+
+impl TransformFunction for Year {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            array
+                .as_any()
+                .downcast_ref::<Int32Array>()
+                .unwrap()
+                .unary(|v| v - UNIX_EPOCH_YEAR),
+        ))
+    }
+}
+
+/// Extract a date or timestamp month, as months from 1970-01-01
+pub struct Month;
+
+impl TransformFunction for Month {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let year_array =
+            year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        let year_array: Int32Array = year_array
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap()
+            .unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
+        let month_array =
+            month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected, 
format!("{err}")))?;
+        Ok(Arc::<Int32Array>::new(
+            binary(
+                month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                year_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+                // Compute month from 1970-01-01, so minus 1 here.
+                |a, b| a + b - 1,
+            )
+            .unwrap(),
+        ))
+    }
+}
+
+/// Extract a date or timestamp day, as days from 1970-01-01
+pub struct Day;
+
+impl TransformFunction for Day {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * 
DAY_PER_SECOND) as i32 }),
+            DataType::Date32 => {
+                input
+                    .as_any()
+                    .downcast_ref::<Date32Array>()
+                    .unwrap()
+                    .unary(|v| -> i32 {
+                        Date32Type::to_naive_date(v).num_days_from_ce() - 
DAY_SINCE_UNIX_EPOCH
+                    })
+            }
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupported data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
+pub struct Hour;
+
+impl TransformFunction for Hour {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        let res: Int32Array = match input.data_type() {
+            DataType::Timestamp(TimeUnit::Microsecond, _) => input
+                .as_any()
+                .downcast_ref::<TimestampMicrosecondArray>()
+                .unwrap()
+                .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 / 
1000.0) as i32 }),
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::Unexpected,
+                    format!(
+                        "Should not call internally for unsupported data type 
{:?}",
+                        input.data_type()
+                    ),
+                ))
+            }
+        };
+        Ok(Arc::new(res))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use arrow_array::{ArrayRef, Date32Array, Int32Array, 
TimestampMicrosecondArray};
+    use chrono::{NaiveDate, NaiveDateTime};
+    use std::sync::Arc;
+
+    use crate::transform::TransformFunction;
+
+    #[test]
+    fn test_transform_years() {
+        let year = super::Year;
+
+        // Test Date32
+        let ori_date = vec![
+            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(),
+        ];
+        let date_array: ArrayRef = Arc::new(Date32Array::from(
+            ori_date
+                .into_iter()
+                .map(|date| {
+                    date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 
1, 1).unwrap())
+                        .num_days() as i32
+                })
+                .collect::<Vec<i32>>(),
+        ));
+        let res = year.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), 0);
+        assert_eq!(res.value(1), 30);
+        assert_eq!(res.value(2), 60);
+        assert_eq!(res.value(3), 90);
+
+        // Test TimestampMicrosecond
+        let ori_timestamp = vec![
+            NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2000-01-01 19:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2030-01-01 10:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+        ];
+        let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+            ori_timestamp
+                .into_iter()
+                .map(|timestamp| {
+                    timestamp
+                        .signed_duration_since(
+                            NaiveDateTime::parse_from_str(
+                                "1970-01-01 00:00:00.0",
+                                "%Y-%m-%d %H:%M:%S.%f",
+                            )
+                            .unwrap(),
+                        )
+                        .num_microseconds()
+                        .unwrap()
+                })
+                .collect::<Vec<i64>>(),
+        ));
+        let res = year.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), 0);
+        assert_eq!(res.value(1), 30);
+        assert_eq!(res.value(2), 60);
+        assert_eq!(res.value(3), 90);
+    }
+
+    #[test]
+    fn test_transform_months() {
+        let month = super::Month;
+
+        // Test Date32
+        let ori_date = vec![
+            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+        ];
+        let date_array: ArrayRef = Arc::new(Date32Array::from(
+            ori_date
+                .into_iter()
+                .map(|date| {
+                    date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 
1, 1).unwrap())
+                        .num_days() as i32
+                })
+                .collect::<Vec<i32>>(),
+        ));
+        let res = month.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), 0);
+        assert_eq!(res.value(1), 30 * 12 + 3);
+        assert_eq!(res.value(2), 60 * 12 + 6);
+        assert_eq!(res.value(3), 90 * 12 + 9);
+
+        // Test TimestampMicrosecond
+        let ori_timestamp = vec![
+            NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+        ];
+        let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+            ori_timestamp
+                .into_iter()
+                .map(|timestamp| {
+                    timestamp
+                        .signed_duration_since(
+                            NaiveDateTime::parse_from_str(
+                                "1970-01-01 00:00:00.0",
+                                "%Y-%m-%d %H:%M:%S.%f",
+                            )
+                            .unwrap(),
+                        )
+                        .num_microseconds()
+                        .unwrap()
+                })
+                .collect::<Vec<i64>>(),
+        ));
+        let res = month.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), 0);
+        assert_eq!(res.value(1), 30 * 12 + 3);
+        assert_eq!(res.value(2), 60 * 12 + 6);
+        assert_eq!(res.value(3), 90 * 12 + 9);
+    }
+
+    #[test]
+    fn test_transform_days() {
+        let day = super::Day;
+        let ori_date = vec![
+            NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
+            NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+        ];
+        let expect_day = ori_date
+            .clone()
+            .into_iter()
+            .map(|data| {
+                data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1, 
1).unwrap())
+                    .num_days() as i32
+            })
+            .collect::<Vec<i32>>();
+
+        // Test Date32
+        let date_array: ArrayRef = Arc::new(Date32Array::from(
+            ori_date
+                .into_iter()
+                .map(|date| {
+                    date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 
1, 1).unwrap())
+                        .num_days() as i32
+                })
+                .collect::<Vec<i32>>(),
+        ));
+        let res = day.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), expect_day[0]);
+        assert_eq!(res.value(1), expect_day[1]);
+        assert_eq!(res.value(2), expect_day[2]);
+        assert_eq!(res.value(3), expect_day[3]);
+
+        // Test TimestampMicrosecond
+        let ori_timestamp = vec![
+            NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+        ];
+        let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+            ori_timestamp
+                .into_iter()
+                .map(|timestamp| {
+                    timestamp
+                        .signed_duration_since(
+                            NaiveDateTime::parse_from_str(
+                                "1970-01-01 00:00:00.0",
+                                "%Y-%m-%d %H:%M:%S.%f",
+                            )
+                            .unwrap(),
+                        )
+                        .num_microseconds()
+                        .unwrap()
+                })
+                .collect::<Vec<i64>>(),
+        ));
+        let res = day.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), expect_day[0]);
+        assert_eq!(res.value(1), expect_day[1]);
+        assert_eq!(res.value(2), expect_day[2]);
+        assert_eq!(res.value(3), expect_day[3]);
+    }
+
+    #[test]
+    fn test_transform_hours() {
+        let hour = super::Hour;
+        let ori_timestamp = vec![
+            NaiveDateTime::parse_from_str("1970-01-01 19:01:23.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2000-03-01 12:01:23.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2030-10-02 10:01:23.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+            NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d 
%H:%M:%S.%f")
+                .unwrap(),
+        ];
+        let expect_hour = ori_timestamp
+            .clone()
+            .into_iter()
+            .map(|timestamp| {
+                timestamp
+                    .signed_duration_since(
+                        NaiveDateTime::parse_from_str(
+                            "1970-01-01 00:00:0.0",
+                            "%Y-%m-%d %H:%M:%S.%f",
+                        )
+                        .unwrap(),
+                    )
+                    .num_hours() as i32
+            })
+            .collect::<Vec<i32>>();
+
+        // Test TimestampMicrosecond
+        let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+            ori_timestamp
+                .into_iter()
+                .map(|timestamp| {
+                    timestamp
+                        .signed_duration_since(
+                            NaiveDateTime::parse_from_str(
+                                "1970-01-01 00:00:0.0",
+                                "%Y-%m-%d %H:%M:%S.%f",
+                            )
+                            .unwrap(),
+                        )
+                        .num_microseconds()
+                        .unwrap()
+                })
+                .collect::<Vec<i64>>(),
+        ));
+        let res = hour.transform(date_array).unwrap();
+        let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(res.len(), 4);
+        assert_eq!(res.value(0), expect_hour[0]);
+        assert_eq!(res.value(1), expect_hour[1]);
+        assert_eq!(res.value(2), expect_hour[2]);
+        assert_eq!(res.value(3), expect_hour[3]);
+    }
+}
diff --git a/crates/iceberg/src/transform/truncate.rs 
b/crates/iceberg/src/transform/truncate.rs
new file mode 100644
index 0000000..43e79e4
--- /dev/null
+++ b/crates/iceberg/src/transform/truncate.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::ArrayRef;
+use arrow_schema::DataType;
+
+use crate::Error;
+
+use super::TransformFunction;
+
+pub struct Truncate {
+    width: u32,
+}
+
+impl Truncate {
+    pub fn new(width: u32) -> Self {
+        Self { width }
+    }
+
+    fn truncate_str_by_char(s: &str, max_chars: usize) -> &str {
+        match s.char_indices().nth(max_chars) {
+            None => s,
+            Some((idx, _)) => &s[..idx],
+        }
+    }
+}
+
+impl TransformFunction for Truncate {
+    fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
+        match input.data_type() {
+            DataType::Int32 => {
+                let width: i32 = self.width.try_into().map_err(|_| {
+                    Error::new(
+                        crate::ErrorKind::DataInvalid,
+                        "width is failed to convert to i32 when truncate 
Int32Array",
+                    )
+                })?;
+                let res: arrow_array::Int32Array = input
+                    .as_any()
+                    .downcast_ref::<arrow_array::Int32Array>()
+                    .unwrap()
+                    .unary(|v| v - v.rem_euclid(width));
+                Ok(Arc::new(res))
+            }
+            DataType::Int64 => {
+                let width = self.width as i64;
+                let res: arrow_array::Int64Array = input
+                    .as_any()
+                    .downcast_ref::<arrow_array::Int64Array>()
+                    .unwrap()
+                    .unary(|v| v - (((v % width) + width) % width));
+                Ok(Arc::new(res))
+            }
+            DataType::Decimal128(precision, scale) => {
+                let width = self.width as i128;
+                let res: arrow_array::Decimal128Array = input
+                    .as_any()
+                    .downcast_ref::<arrow_array::Decimal128Array>()
+                    .unwrap()
+                    .unary(|v| v - (((v % width) + width) % width))
+                    .with_precision_and_scale(*precision, *scale)
+                    .map_err(|err| Error::new(crate::ErrorKind::Unexpected, 
format!("{err}")))?;
+                Ok(Arc::new(res))
+            }
+            DataType::Utf8 => {
+                let len = self.width as usize;
+                let res: arrow_array::StringArray = 
arrow_array::StringArray::from_iter(
+                    input
+                        .as_any()
+                        .downcast_ref::<arrow_array::StringArray>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| Self::truncate_str_by_char(v, 
len))),
+                );
+                Ok(Arc::new(res))
+            }
+            DataType::LargeUtf8 => {
+                let len = self.width as usize;
+                let res: arrow_array::LargeStringArray = 
arrow_array::LargeStringArray::from_iter(
+                    input
+                        .as_any()
+                        .downcast_ref::<arrow_array::LargeStringArray>()
+                        .unwrap()
+                        .iter()
+                        .map(|v| v.map(|v| Self::truncate_str_by_char(v, 
len))),
+                );
+                Ok(Arc::new(res))
+            }
+            _ => unreachable!("Truncate transform only supports 
(int,long,decimal,string) types"),
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::sync::Arc;
+
+    use arrow_array::{
+        builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array, 
Int32Array, Int64Array,
+    };
+
+    use crate::transform::TransformFunction;
+
+    // Test case ref from: 
https://iceberg.apache.org/spec/#truncate-transform-details
+    #[test]
+    fn test_truncate_simple() {
+        // test truncate int
+        let input = Arc::new(Int32Array::from(vec![1, -1]));
+        let res = super::Truncate::new(10).transform(input).unwrap();
+        assert_eq!(
+            res.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
+            0
+        );
+        assert_eq!(
+            res.as_any().downcast_ref::<Int32Array>().unwrap().value(1),
+            -10
+        );
+
+        // test truncate long
+        let input = Arc::new(Int64Array::from(vec![1, -1]));
+        let res = super::Truncate::new(10).transform(input).unwrap();
+        assert_eq!(
+            res.as_any().downcast_ref::<Int64Array>().unwrap().value(0),
+            0
+        );
+        assert_eq!(
+            res.as_any().downcast_ref::<Int64Array>().unwrap().value(1),
+            -10
+        );
+
+        // test decimal
+        let mut buidler = PrimitiveBuilder::<Decimal128Type>::new()
+            .with_precision_and_scale(20, 2)
+            .unwrap();
+        buidler.append_value(1065);
+        let input = Arc::new(buidler.finish());
+        let res = super::Truncate::new(50).transform(input).unwrap();
+        assert_eq!(
+            res.as_any()
+                .downcast_ref::<Decimal128Array>()
+                .unwrap()
+                .value(0),
+            1050
+        );
+
+        // test string
+        let input = Arc::new(arrow_array::StringArray::from(vec!["iceberg"]));
+        let res = super::Truncate::new(3).transform(input).unwrap();
+        assert_eq!(
+            res.as_any()
+                .downcast_ref::<arrow_array::StringArray>()
+                .unwrap()
+                .value(0),
+            "ice"
+        );
+
+        // test large string
+        let input = 
Arc::new(arrow_array::LargeStringArray::from(vec!["iceberg"]));
+        let res = super::Truncate::new(3).transform(input).unwrap();
+        assert_eq!(
+            res.as_any()
+                .downcast_ref::<arrow_array::LargeStringArray>()
+                .unwrap()
+                .value(0),
+            "ice"
+        );
+    }
+
+    #[test]
+    fn test_string_truncate() {
+        let test1 = "イロハニホヘト";
+        let test1_2_expected = "イロ";
+        assert_eq!(
+            super::Truncate::truncate_str_by_char(test1, 2),
+            test1_2_expected
+        );
+
+        let test1_3_expected = "イロハ";
+        assert_eq!(
+            super::Truncate::truncate_str_by_char(test1, 3),
+            test1_3_expected
+        );
+
+        let test2 = "щщаεはчωいにπάほхεろへσκζ";
+        let test2_7_expected = "щщаεはчω";
+        assert_eq!(
+            super::Truncate::truncate_str_by_char(test2, 7),
+            test2_7_expected
+        );
+
+        let test3 = "\u{FFFF}\u{FFFF}";
+        assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3);
+
+        let test4 = "\u{10000}\u{10000}";
+        let test4_1_expected = "\u{10000}";
+        assert_eq!(
+            super::Truncate::truncate_str_by_char(test4, 1),
+            test4_1_expected
+        );
+    }
+}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/transform/void.rs
similarity index 65%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/transform/void.rs
index 573b58e..56fc3c5 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/transform/void.rs
@@ -15,25 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Native Rust implementation of Apache Iceberg
+use crate::Result;
+use arrow_array::{new_null_array, ArrayRef};
 
-#![deny(missing_docs)]
+use super::TransformFunction;
 
-#[macro_use]
-extern crate derive_builder;
+pub struct Void {}
 
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-/// There is no implementation for this trait, allow dead code for now, should
-/// be removed after we have one.
-#[allow(dead_code)]
-pub mod catalog;
-#[allow(dead_code)]
-pub mod table;
-
-mod avro;
-pub mod io;
-pub mod spec;
+impl TransformFunction for Void {
+    fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+        Ok(new_null_array(input.data_type(), input.len()))
+    }
+}


Reply via email to