This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new a03000d feat: support transform function (#42)
a03000d is described below
commit a03000dca603107bfbf9166ad1227423314ede95
Author: ZENOTME <[email protected]>
AuthorDate: Fri Sep 22 18:15:24 2023 +0800
feat: support transform function (#42)
* support transform function and add related test:
1. identity
2. void
3. temporal
* 1. remove unsupport type in iceberg
2. refactor error handle
* add license
* clean code
* 1. use arrow-* instead of arrow
2. refine test
* support bucket and truncate transform function
* make code more clear
* fix check
* fix to truncate Unicode correctly and add related test
* fix cargo-sort
---------
Co-authored-by: ZENOTME <[email protected]>
---
crates/iceberg/Cargo.toml | 4 +
crates/iceberg/src/lib.rs | 2 +
crates/iceberg/src/transform/bucket.rs | 244 ++++++++++++
.../iceberg/src/{lib.rs => transform/identity.rs} | 29 +-
crates/iceberg/src/transform/mod.rs | 55 +++
crates/iceberg/src/transform/temporal.rs | 408 +++++++++++++++++++++
crates/iceberg/src/transform/truncate.rs | 217 +++++++++++
crates/iceberg/src/{lib.rs => transform/void.rs} | 28 +-
8 files changed, 949 insertions(+), 38 deletions(-)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 4a4839d..5535ba7 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -29,6 +29,9 @@ keywords = ["iceberg"]
[dependencies]
anyhow = "1.0.72"
apache-avro = "0.15"
+arrow-arith = { version = ">=46" }
+arrow-array = { version = ">=46" }
+arrow-schema = { version = ">=46" }
async-trait = "0.1"
bimap = "0.6"
bitvec = "1.0.1"
@@ -38,6 +41,7 @@ either = "1"
futures = "0.3"
itertools = "0.11"
lazy_static = "1"
+murmur3 = "0.5.2"
once_cell = "1"
opendal = "0.39"
ordered-float = "3.7.0"
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs
index 573b58e..0f2a134 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/lib.rs
@@ -37,3 +37,5 @@ pub mod table;
mod avro;
pub mod io;
pub mod spec;
+
+pub mod transform;
diff --git a/crates/iceberg/src/transform/bucket.rs
b/crates/iceberg/src/transform/bucket.rs
new file mode 100644
index 0000000..c9fe9df
--- /dev/null
+++ b/crates/iceberg/src/transform/bucket.rs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::ArrayRef;
+use arrow_schema::{DataType, TimeUnit};
+
+use super::TransformFunction;
+
+pub struct Bucket {
+ mod_n: u32,
+}
+
+impl Bucket {
+ pub fn new(mod_n: u32) -> Self {
+ Self { mod_n }
+ }
+}
+
+impl Bucket {
+ /// When switch the hash function, we only need to change this function.
+ fn hash_bytes(mut v: &[u8]) -> i32 {
+ murmur3::murmur3_32(&mut v, 0).unwrap() as i32
+ }
+
+ fn hash_int(v: i32) -> i32 {
+ Self::hash_long(v as i64)
+ }
+
+ fn hash_long(v: i64) -> i32 {
+ Self::hash_bytes(v.to_le_bytes().as_slice())
+ }
+
+ /// v is days from unix epoch
+ fn hash_date(v: i32) -> i32 {
+ Self::hash_int(v)
+ }
+
+ /// v is microseconds from midnight
+ fn hash_time(v: i64) -> i32 {
+ Self::hash_long(v)
+ }
+
+ /// v is microseconds from unix epoch
+ fn hash_timestamp(v: i64) -> i32 {
+ Self::hash_long(v)
+ }
+
+ fn hash_str(s: &str) -> i32 {
+ Self::hash_bytes(s.as_bytes())
+ }
+
+ /// Decimal values are hashed using the minimum number of bytes required
to hold the unscaled value as a two’s complement big-endian
+ /// ref:
https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
+ fn hash_decimal(v: i128) -> i32 {
+ let bytes = v.to_be_bytes();
+ if let Some(start) = bytes.iter().position(|&x| x != 0) {
+ Self::hash_bytes(&bytes[start..])
+ } else {
+ Self::hash_bytes(&[0])
+ }
+ }
+
+ /// def bucket_N(x) = (murmur3_x86_32_hash(x) & Integer.MAX_VALUE) % N
+ /// ref: https://iceberg.apache.org/spec/#partitioning
+ fn bucket_n(&self, v: i32) -> i32 {
+ (v & i32::MAX) % (self.mod_n as i32)
+ }
+}
+
+impl TransformFunction for Bucket {
+ fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
+ let res: arrow_array::Int32Array = match input.data_type() {
+ DataType::Int32 => input
+ .as_any()
+ .downcast_ref::<arrow_array::Int32Array>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_int(v))),
+ DataType::Int64 => input
+ .as_any()
+ .downcast_ref::<arrow_array::Int64Array>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_long(v))),
+ DataType::Decimal128(_, _) => input
+ .as_any()
+ .downcast_ref::<arrow_array::Decimal128Array>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_decimal(v))),
+ DataType::Date32 => input
+ .as_any()
+ .downcast_ref::<arrow_array::Date32Array>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_date(v))),
+ DataType::Time64(TimeUnit::Microsecond) => input
+ .as_any()
+ .downcast_ref::<arrow_array::Time64MicrosecondArray>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_time(v))),
+ DataType::Timestamp(TimeUnit::Microsecond, _) => input
+ .as_any()
+ .downcast_ref::<arrow_array::TimestampMicrosecondArray>()
+ .unwrap()
+ .unary(|v| self.bucket_n(Self::hash_timestamp(v))),
+ DataType::Utf8 => arrow_array::Int32Array::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::StringArray>()
+ .unwrap()
+ .iter()
+ .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+ ),
+ DataType::LargeUtf8 => arrow_array::Int32Array::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::LargeStringArray>()
+ .unwrap()
+ .iter()
+ .map(|v| self.bucket_n(Self::hash_str(v.unwrap()))),
+ ),
+ DataType::Binary => arrow_array::Int32Array::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::BinaryArray>()
+ .unwrap()
+ .iter()
+ .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ ),
+ DataType::LargeBinary => arrow_array::Int32Array::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::LargeBinaryArray>()
+ .unwrap()
+ .iter()
+ .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ ),
+ DataType::FixedSizeBinary(_) => arrow_array::Int32Array::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::FixedSizeBinaryArray>()
+ .unwrap()
+ .iter()
+ .map(|v| self.bucket_n(Self::hash_bytes(v.unwrap()))),
+ ),
+ _ => unreachable!("Unsupported data type: {:?}",
input.data_type()),
+ };
+ Ok(Arc::new(res))
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
+
+ use super::Bucket;
+ #[test]
+ fn test_hash() {
+ // test int
+ assert_eq!(Bucket::hash_int(34), 2017239379);
+ // test long
+ assert_eq!(Bucket::hash_long(34), 2017239379);
+ // test decimal
+ assert_eq!(Bucket::hash_decimal(1420), -500754589);
+ // test date
+ let date = NaiveDate::from_ymd_opt(2017, 11, 16).unwrap();
+ assert_eq!(
+ Bucket::hash_date(
+ date.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1,
1).unwrap())
+ .num_days() as i32
+ ),
+ -653330422
+ );
+ // test time
+ let time = NaiveTime::from_hms_opt(22, 31, 8).unwrap();
+ assert_eq!(
+ Bucket::hash_time(
+ time.signed_duration_since(NaiveTime::from_hms_opt(0, 0,
0).unwrap())
+ .num_microseconds()
+ .unwrap()
+ ),
+ -662762989
+ );
+ // test timestamp
+ let timestamp =
+ NaiveDateTime::parse_from_str("2017-11-16 22:31:08", "%Y-%m-%d
%H:%M:%S").unwrap();
+ assert_eq!(
+ Bucket::hash_timestamp(
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str("1970-01-01 00:00:00",
"%Y-%m-%d %H:%M:%S")
+ .unwrap()
+ )
+ .num_microseconds()
+ .unwrap()
+ ),
+ -2047944441
+ );
+ // test timestamp with tz
+ let timestamp =
DateTime::parse_from_rfc3339("2017-11-16T14:31:08-08:00").unwrap();
+ assert_eq!(
+ Bucket::hash_timestamp(
+ timestamp
+ .signed_duration_since(
+
DateTime::parse_from_rfc3339("1970-01-01T00:00:00-00:00").unwrap()
+ )
+ .num_microseconds()
+ .unwrap()
+ ),
+ -2047944441
+ );
+ // test str
+ assert_eq!(Bucket::hash_str("iceberg"), 1210000089);
+ // test uuid
+ assert_eq!(
+ Bucket::hash_bytes(
+ [
+ 0xF7, 0x9C, 0x3E, 0x09, 0x67, 0x7C, 0x4B, 0xBD, 0xA4,
0x79, 0x3F, 0x34, 0x9C,
+ 0xB7, 0x85, 0xE7
+ ]
+ .as_ref()
+ ),
+ 1488055340
+ );
+ // test fixed and binary
+ assert_eq!(
+ Bucket::hash_bytes([0x00, 0x01, 0x02, 0x03].as_ref()),
+ -188683207
+ );
+ }
+}
diff --git a/crates/iceberg/src/lib.rs
b/crates/iceberg/src/transform/identity.rs
similarity index 65%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/transform/identity.rs
index 573b58e..2ea6a20 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/transform/identity.rs
@@ -15,25 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//! Native Rust implementation of Apache Iceberg
+use crate::Result;
+use arrow_array::ArrayRef;
-#![deny(missing_docs)]
+use super::TransformFunction;
-#[macro_use]
-extern crate derive_builder;
+/// Return identity array.
+pub struct Identity {}
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-/// There is no implementation for this trait, allow dead code for now, should
-/// be removed after we have one.
-#[allow(dead_code)]
-pub mod catalog;
-#[allow(dead_code)]
-pub mod table;
-
-mod avro;
-pub mod io;
-pub mod spec;
+impl TransformFunction for Identity {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ Ok(input)
+ }
+}
diff --git a/crates/iceberg/src/transform/mod.rs
b/crates/iceberg/src/transform/mod.rs
new file mode 100644
index 0000000..dead9db
--- /dev/null
+++ b/crates/iceberg/src/transform/mod.rs
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Transform function used to compute partition values.
+use crate::{spec::Transform, Result};
+use arrow_array::ArrayRef;
+
+mod bucket;
+mod identity;
+mod temporal;
+mod truncate;
+mod void;
+
+/// TransformFunction is a trait that defines the interface for all transform
functions.
+pub trait TransformFunction: Send {
+ /// transform will take an input array and transform it into a new array.
+ /// The implementation of this function will need to check and downcast
the input to specific
+ /// type.
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef>;
+}
+
+/// BoxedTransformFunction is a boxed trait object of TransformFunction.
+pub type BoxedTransformFunction = Box<dyn TransformFunction>;
+
+/// create_transform_function creates a boxed trait object of
TransformFunction from a Transform.
+pub fn create_transform_function(transform: &Transform) ->
Result<BoxedTransformFunction> {
+ match transform {
+ Transform::Identity => Ok(Box::new(identity::Identity {})),
+ Transform::Void => Ok(Box::new(void::Void {})),
+ Transform::Year => Ok(Box::new(temporal::Year {})),
+ Transform::Month => Ok(Box::new(temporal::Month {})),
+ Transform::Day => Ok(Box::new(temporal::Day {})),
+ Transform::Hour => Ok(Box::new(temporal::Hour {})),
+ Transform::Bucket(mod_n) => Ok(Box::new(bucket::Bucket::new(*mod_n))),
+ Transform::Truncate(width) =>
Ok(Box::new(truncate::Truncate::new(*width))),
+ Transform::Unknown => Err(crate::error::Error::new(
+ crate::ErrorKind::FeatureUnsupported,
+ "Transform Unknown is not implemented",
+ )),
+ }
+}
diff --git a/crates/iceberg/src/transform/temporal.rs
b/crates/iceberg/src/transform/temporal.rs
new file mode 100644
index 0000000..f914d50
--- /dev/null
+++ b/crates/iceberg/src/transform/temporal.rs
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::TransformFunction;
+use crate::{Error, ErrorKind, Result};
+use arrow_arith::{
+ arity::binary,
+ temporal::{month_dyn, year_dyn},
+};
+use arrow_array::{
+ types::Date32Type, Array, ArrayRef, Date32Array, Int32Array,
TimestampMicrosecondArray,
+};
+use arrow_schema::{DataType, TimeUnit};
+use chrono::Datelike;
+use std::sync::Arc;
+
+/// The number of days since unix epoch.
+const DAY_SINCE_UNIX_EPOCH: i32 = 719163;
+/// Hour in one second.
+const HOUR_PER_SECOND: f64 = 1.0_f64 / 3600.0_f64;
+/// Day in one second.
+const DAY_PER_SECOND: f64 = 1.0_f64 / 24.0_f64 / 3600.0_f64;
+/// Year of unix epoch.
+const UNIX_EPOCH_YEAR: i32 = 1970;
+
+/// Extract a date or timestamp year, as years from 1970
+pub struct Year;
+
+impl TransformFunction for Year {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ let array =
+ year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ Ok(Arc::<Int32Array>::new(
+ array
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .unary(|v| v - UNIX_EPOCH_YEAR),
+ ))
+ }
+}
+
+/// Extract a date or timestamp month, as months from 1970-01-01
+pub struct Month;
+
+impl TransformFunction for Month {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ let year_array =
+ year_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ let year_array: Int32Array = year_array
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .unary(|v| 12 * (v - UNIX_EPOCH_YEAR));
+ let month_array =
+ month_dyn(&input).map_err(|err| Error::new(ErrorKind::Unexpected,
format!("{err}")))?;
+ Ok(Arc::<Int32Array>::new(
+ binary(
+ month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+ year_array.as_any().downcast_ref::<Int32Array>().unwrap(),
+ // Compute month from 1970-01-01, so minus 1 here.
+ |a, b| a + b - 1,
+ )
+ .unwrap(),
+ ))
+ }
+}
+
+/// Extract a date or timestamp day, as days from 1970-01-01
+pub struct Day;
+
+impl TransformFunction for Day {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ let res: Int32Array = match input.data_type() {
+ DataType::Timestamp(TimeUnit::Microsecond, _) => input
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 *
DAY_PER_SECOND) as i32 }),
+ DataType::Date32 => {
+ input
+ .as_any()
+ .downcast_ref::<Date32Array>()
+ .unwrap()
+ .unary(|v| -> i32 {
+ Date32Type::to_naive_date(v).num_days_from_ce() -
DAY_SINCE_UNIX_EPOCH
+ })
+ }
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Should not call internally for unsupported data type
{:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Arc::new(res))
+ }
+}
+
+/// Extract a timestamp hour, as hours from 1970-01-01 00:00:00
+pub struct Hour;
+
+impl TransformFunction for Hour {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ let res: Int32Array = match input.data_type() {
+ DataType::Timestamp(TimeUnit::Microsecond, _) => input
+ .as_any()
+ .downcast_ref::<TimestampMicrosecondArray>()
+ .unwrap()
+ .unary(|v| -> i32 { (v as f64 * HOUR_PER_SECOND / 1000.0 /
1000.0) as i32 }),
+ _ => {
+ return Err(Error::new(
+ ErrorKind::Unexpected,
+ format!(
+ "Should not call internally for unsupported data type
{:?}",
+ input.data_type()
+ ),
+ ))
+ }
+ };
+ Ok(Arc::new(res))
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use arrow_array::{ArrayRef, Date32Array, Int32Array,
TimestampMicrosecondArray};
+ use chrono::{NaiveDate, NaiveDateTime};
+ use std::sync::Arc;
+
+ use crate::transform::TransformFunction;
+
+ #[test]
+ fn test_transform_years() {
+ let year = super::Year;
+
+ // Test Date32
+ let ori_date = vec![
+ NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2000, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2030, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2060, 1, 1).unwrap(),
+ ];
+ let date_array: ArrayRef = Arc::new(Date32Array::from(
+ ori_date
+ .into_iter()
+ .map(|date| {
+ date.signed_duration_since(NaiveDate::from_ymd_opt(1970,
1, 1).unwrap())
+ .num_days() as i32
+ })
+ .collect::<Vec<i32>>(),
+ ));
+ let res = year.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), 0);
+ assert_eq!(res.value(1), 30);
+ assert_eq!(res.value(2), 60);
+ assert_eq!(res.value(3), 90);
+
+ // Test TimestampMicrosecond
+ let ori_timestamp = vec![
+ NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2000-01-01 19:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2030-01-01 10:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2060-01-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ ];
+ let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+ ori_timestamp
+ .into_iter()
+ .map(|timestamp| {
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str(
+ "1970-01-01 00:00:00.0",
+ "%Y-%m-%d %H:%M:%S.%f",
+ )
+ .unwrap(),
+ )
+ .num_microseconds()
+ .unwrap()
+ })
+ .collect::<Vec<i64>>(),
+ ));
+ let res = year.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), 0);
+ assert_eq!(res.value(1), 30);
+ assert_eq!(res.value(2), 60);
+ assert_eq!(res.value(3), 90);
+ }
+
+ #[test]
+ fn test_transform_months() {
+ let month = super::Month;
+
+ // Test Date32
+ let ori_date = vec![
+ NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+ ];
+ let date_array: ArrayRef = Arc::new(Date32Array::from(
+ ori_date
+ .into_iter()
+ .map(|date| {
+ date.signed_duration_since(NaiveDate::from_ymd_opt(1970,
1, 1).unwrap())
+ .num_days() as i32
+ })
+ .collect::<Vec<i32>>(),
+ ));
+ let res = month.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), 0);
+ assert_eq!(res.value(1), 30 * 12 + 3);
+ assert_eq!(res.value(2), 60 * 12 + 6);
+ assert_eq!(res.value(3), 90 * 12 + 9);
+
+ // Test TimestampMicrosecond
+ let ori_timestamp = vec![
+ NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ ];
+ let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+ ori_timestamp
+ .into_iter()
+ .map(|timestamp| {
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str(
+ "1970-01-01 00:00:00.0",
+ "%Y-%m-%d %H:%M:%S.%f",
+ )
+ .unwrap(),
+ )
+ .num_microseconds()
+ .unwrap()
+ })
+ .collect::<Vec<i64>>(),
+ ));
+ let res = month.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), 0);
+ assert_eq!(res.value(1), 30 * 12 + 3);
+ assert_eq!(res.value(2), 60 * 12 + 6);
+ assert_eq!(res.value(3), 90 * 12 + 9);
+ }
+
+ #[test]
+ fn test_transform_days() {
+ let day = super::Day;
+ let ori_date = vec![
+ NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2000, 4, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2030, 7, 1).unwrap(),
+ NaiveDate::from_ymd_opt(2060, 10, 1).unwrap(),
+ ];
+ let expect_day = ori_date
+ .clone()
+ .into_iter()
+ .map(|data| {
+ data.signed_duration_since(NaiveDate::from_ymd_opt(1970, 1,
1).unwrap())
+ .num_days() as i32
+ })
+ .collect::<Vec<i32>>();
+
+ // Test Date32
+ let date_array: ArrayRef = Arc::new(Date32Array::from(
+ ori_date
+ .into_iter()
+ .map(|date| {
+ date.signed_duration_since(NaiveDate::from_ymd_opt(1970,
1, 1).unwrap())
+ .num_days() as i32
+ })
+ .collect::<Vec<i32>>(),
+ ));
+ let res = day.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), expect_day[0]);
+ assert_eq!(res.value(1), expect_day[1]);
+ assert_eq!(res.value(2), expect_day[2]);
+ assert_eq!(res.value(3), expect_day[3]);
+
+ // Test TimestampMicrosecond
+ let ori_timestamp = vec![
+ NaiveDateTime::parse_from_str("1970-01-01 12:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2000-04-01 19:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2030-07-01 10:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2060-10-01 11:30:42.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ ];
+ let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+ ori_timestamp
+ .into_iter()
+ .map(|timestamp| {
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str(
+ "1970-01-01 00:00:00.0",
+ "%Y-%m-%d %H:%M:%S.%f",
+ )
+ .unwrap(),
+ )
+ .num_microseconds()
+ .unwrap()
+ })
+ .collect::<Vec<i64>>(),
+ ));
+ let res = day.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), expect_day[0]);
+ assert_eq!(res.value(1), expect_day[1]);
+ assert_eq!(res.value(2), expect_day[2]);
+ assert_eq!(res.value(3), expect_day[3]);
+ }
+
+ #[test]
+ fn test_transform_hours() {
+ let hour = super::Hour;
+ let ori_timestamp = vec![
+ NaiveDateTime::parse_from_str("1970-01-01 19:01:23.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2000-03-01 12:01:23.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2030-10-02 10:01:23.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ NaiveDateTime::parse_from_str("2060-09-01 05:03:23.123", "%Y-%m-%d
%H:%M:%S.%f")
+ .unwrap(),
+ ];
+ let expect_hour = ori_timestamp
+ .clone()
+ .into_iter()
+ .map(|timestamp| {
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str(
+ "1970-01-01 00:00:0.0",
+ "%Y-%m-%d %H:%M:%S.%f",
+ )
+ .unwrap(),
+ )
+ .num_hours() as i32
+ })
+ .collect::<Vec<i32>>();
+
+ // Test TimestampMicrosecond
+ let date_array: ArrayRef = Arc::new(TimestampMicrosecondArray::from(
+ ori_timestamp
+ .into_iter()
+ .map(|timestamp| {
+ timestamp
+ .signed_duration_since(
+ NaiveDateTime::parse_from_str(
+ "1970-01-01 00:00:0.0",
+ "%Y-%m-%d %H:%M:%S.%f",
+ )
+ .unwrap(),
+ )
+ .num_microseconds()
+ .unwrap()
+ })
+ .collect::<Vec<i64>>(),
+ ));
+ let res = hour.transform(date_array).unwrap();
+ let res = res.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(res.len(), 4);
+ assert_eq!(res.value(0), expect_hour[0]);
+ assert_eq!(res.value(1), expect_hour[1]);
+ assert_eq!(res.value(2), expect_hour[2]);
+ assert_eq!(res.value(3), expect_hour[3]);
+ }
+}
diff --git a/crates/iceberg/src/transform/truncate.rs
b/crates/iceberg/src/transform/truncate.rs
new file mode 100644
index 0000000..43e79e4
--- /dev/null
+++ b/crates/iceberg/src/transform/truncate.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_array::ArrayRef;
+use arrow_schema::DataType;
+
+use crate::Error;
+
+use super::TransformFunction;
+
+pub struct Truncate {
+ width: u32,
+}
+
+impl Truncate {
+ pub fn new(width: u32) -> Self {
+ Self { width }
+ }
+
+ fn truncate_str_by_char(s: &str, max_chars: usize) -> &str {
+ match s.char_indices().nth(max_chars) {
+ None => s,
+ Some((idx, _)) => &s[..idx],
+ }
+ }
+}
+
+impl TransformFunction for Truncate {
+ fn transform(&self, input: ArrayRef) -> crate::Result<ArrayRef> {
+ match input.data_type() {
+ DataType::Int32 => {
+ let width: i32 = self.width.try_into().map_err(|_| {
+ Error::new(
+ crate::ErrorKind::DataInvalid,
+ "width is failed to convert to i32 when truncate
Int32Array",
+ )
+ })?;
+ let res: arrow_array::Int32Array = input
+ .as_any()
+ .downcast_ref::<arrow_array::Int32Array>()
+ .unwrap()
+ .unary(|v| v - v.rem_euclid(width));
+ Ok(Arc::new(res))
+ }
+ DataType::Int64 => {
+ let width = self.width as i64;
+ let res: arrow_array::Int64Array = input
+ .as_any()
+ .downcast_ref::<arrow_array::Int64Array>()
+ .unwrap()
+ .unary(|v| v - (((v % width) + width) % width));
+ Ok(Arc::new(res))
+ }
+ DataType::Decimal128(precision, scale) => {
+ let width = self.width as i128;
+ let res: arrow_array::Decimal128Array = input
+ .as_any()
+ .downcast_ref::<arrow_array::Decimal128Array>()
+ .unwrap()
+ .unary(|v| v - (((v % width) + width) % width))
+ .with_precision_and_scale(*precision, *scale)
+ .map_err(|err| Error::new(crate::ErrorKind::Unexpected,
format!("{err}")))?;
+ Ok(Arc::new(res))
+ }
+ DataType::Utf8 => {
+ let len = self.width as usize;
+ let res: arrow_array::StringArray =
arrow_array::StringArray::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::StringArray>()
+ .unwrap()
+ .iter()
+ .map(|v| v.map(|v| Self::truncate_str_by_char(v,
len))),
+ );
+ Ok(Arc::new(res))
+ }
+ DataType::LargeUtf8 => {
+ let len = self.width as usize;
+ let res: arrow_array::LargeStringArray =
arrow_array::LargeStringArray::from_iter(
+ input
+ .as_any()
+ .downcast_ref::<arrow_array::LargeStringArray>()
+ .unwrap()
+ .iter()
+ .map(|v| v.map(|v| Self::truncate_str_by_char(v,
len))),
+ );
+ Ok(Arc::new(res))
+ }
+ _ => unreachable!("Truncate transform only supports
(int,long,decimal,string) types"),
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::sync::Arc;
+
+ use arrow_array::{
+ builder::PrimitiveBuilder, types::Decimal128Type, Decimal128Array,
Int32Array, Int64Array,
+ };
+
+ use crate::transform::TransformFunction;
+
+ // Test case ref from:
https://iceberg.apache.org/spec/#truncate-transform-details
+ #[test]
+ fn test_truncate_simple() {
+ // test truncate int
+ let input = Arc::new(Int32Array::from(vec![1, -1]));
+ let res = super::Truncate::new(10).transform(input).unwrap();
+ assert_eq!(
+ res.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
+ 0
+ );
+ assert_eq!(
+ res.as_any().downcast_ref::<Int32Array>().unwrap().value(1),
+ -10
+ );
+
+ // test truncate long
+ let input = Arc::new(Int64Array::from(vec![1, -1]));
+ let res = super::Truncate::new(10).transform(input).unwrap();
+ assert_eq!(
+ res.as_any().downcast_ref::<Int64Array>().unwrap().value(0),
+ 0
+ );
+ assert_eq!(
+ res.as_any().downcast_ref::<Int64Array>().unwrap().value(1),
+ -10
+ );
+
+ // test decimal
+ let mut buidler = PrimitiveBuilder::<Decimal128Type>::new()
+ .with_precision_and_scale(20, 2)
+ .unwrap();
+ buidler.append_value(1065);
+ let input = Arc::new(buidler.finish());
+ let res = super::Truncate::new(50).transform(input).unwrap();
+ assert_eq!(
+ res.as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap()
+ .value(0),
+ 1050
+ );
+
+ // test string
+ let input = Arc::new(arrow_array::StringArray::from(vec!["iceberg"]));
+ let res = super::Truncate::new(3).transform(input).unwrap();
+ assert_eq!(
+ res.as_any()
+ .downcast_ref::<arrow_array::StringArray>()
+ .unwrap()
+ .value(0),
+ "ice"
+ );
+
+ // test large string
+ let input =
Arc::new(arrow_array::LargeStringArray::from(vec!["iceberg"]));
+ let res = super::Truncate::new(3).transform(input).unwrap();
+ assert_eq!(
+ res.as_any()
+ .downcast_ref::<arrow_array::LargeStringArray>()
+ .unwrap()
+ .value(0),
+ "ice"
+ );
+ }
+
+ #[test]
+ fn test_string_truncate() {
+ let test1 = "イロハニホヘト";
+ let test1_2_expected = "イロ";
+ assert_eq!(
+ super::Truncate::truncate_str_by_char(test1, 2),
+ test1_2_expected
+ );
+
+ let test1_3_expected = "イロハ";
+ assert_eq!(
+ super::Truncate::truncate_str_by_char(test1, 3),
+ test1_3_expected
+ );
+
+ let test2 = "щщаεはчωいにπάほхεろへσκζ";
+ let test2_7_expected = "щщаεはчω";
+ assert_eq!(
+ super::Truncate::truncate_str_by_char(test2, 7),
+ test2_7_expected
+ );
+
+ let test3 = "\u{FFFF}\u{FFFF}";
+ assert_eq!(super::Truncate::truncate_str_by_char(test3, 2), test3);
+
+ let test4 = "\u{10000}\u{10000}";
+ let test4_1_expected = "\u{10000}";
+ assert_eq!(
+ super::Truncate::truncate_str_by_char(test4, 1),
+ test4_1_expected
+ );
+ }
+}
diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/transform/void.rs
similarity index 65%
copy from crates/iceberg/src/lib.rs
copy to crates/iceberg/src/transform/void.rs
index 573b58e..56fc3c5 100644
--- a/crates/iceberg/src/lib.rs
+++ b/crates/iceberg/src/transform/void.rs
@@ -15,25 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-//! Native Rust implementation of Apache Iceberg
+use crate::Result;
+use arrow_array::{new_null_array, ArrayRef};
-#![deny(missing_docs)]
+use super::TransformFunction;
-#[macro_use]
-extern crate derive_builder;
+pub struct Void {}
-mod error;
-pub use error::Error;
-pub use error::ErrorKind;
-pub use error::Result;
-
-/// There is no implementation for this trait, allow dead code for now, should
-/// be removed after we have one.
-#[allow(dead_code)]
-pub mod catalog;
-#[allow(dead_code)]
-pub mod table;
-
-mod avro;
-pub mod io;
-pub mod spec;
+impl TransformFunction for Void {
+ fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
+ Ok(new_null_array(input.data_type(), input.len()))
+ }
+}